Broose overlay module. More...
#include <Broose.h>
Public Member Functions | |
| Broose () | |
| ~Broose () | |
| virtual void | initializeOverlay (int stage) |
| Initializes derived-class-attributes. | |
| virtual void | finishOverlay () |
| collects statistical data in derived class | |
| virtual bool | isSiblingFor (const NodeHandle &node, const OverlayKey &key, int numSiblings, bool *err) |
| Query if a node is among the siblings for a given key. | |
| virtual void | joinOverlay () |
| Join the overlay with a given nodeID in thisNode.key. | |
| virtual void | recordOverlaySentStats (BaseOverlayMessage *msg) |
| Collect overlay specific sent messages statistics. | |
| virtual bool | handleRpcCall (BaseCallMessage *msg) |
| virtual void | handleTimerEvent (cMessage *msg) |
| void | updateTooltip () |
| updates information shown in tk-environment | |
Protected Member Functions | |
| void | handleJoinTimerExpired (cMessage *msg) |
| handles a expired join timer | |
| void | handleBucketTimerExpired (cMessage *msg) |
| handles a expired bucket refresh timer | |
| int | getRoutingDistance (const OverlayKey &key, const OverlayKey &node, int dist) |
| calculates the de-buijn distance between a key and a nodeId | |
| bool | routingAdd (const NodeHandle &node, bool isAlive, simtime_t rtt=MAXTIME) |
| Adds a node to the routing table. | |
| void | changeState (int state) |
| changes the node's state | |
| NodeVector * | findNode (const OverlayKey &key, int numRedundantNodes, int numSiblings, BaseOverlayMessage *msg) |
| Implements the find node call. | |
| int | getMaxNumSiblings () |
| Query the maximum number of siblings (nodes close to a key) that are maintained by this overlay protocol. | |
| int | getMaxNumRedundantNodes () |
| Query the maximum number of redundant next hop nodes that are returned by findNode(). | |
| void | displayBucketState () |
| debug function which output the content of the node's buckets | |
| void | handleRpcResponse (BaseResponseMessage *msg, const RpcState &rpcState, simtime_t rtt) |
| void | handleRpcTimeout (const RpcState &rpcState) |
| void | handleFindNodeTimeout (FindNodeCall *findNode, const TransportAddress &dest, const OverlayKey &destKey) |
| This method is called if an Find Node Call timeout has been reached. | |
| void | handleBucketRequestRpc (BucketCall *msg) |
| handles a received Bucket request | |
| void | handleBucketResponseRpc (BucketResponse *msg, const RpcState &rpcState) |
| handles a received Bucket response | |
| void | handleBucketTimeout (BucketCall *msg) |
| handles a received Bucket timeout | |
| void | routingTimeout (const BrooseHandle &handle) |
| virtual void | pingResponse (PingResponse *pingResponse, cPolymorphic *context, int rpcId, simtime_t rtt) |
| virtual void | pingTimeout (PingCall *pingCall, const TransportAddress &dest, cPolymorphic *context, int rpcId) |
| void | setLastSeen (const NodeHandle &node) |
| updates the timestamp of a node in all buckets | |
| void | addNode (const NodeHandle &node) |
| adds a node to all buckets | |
| void | resetFailedResponses (const NodeHandle &node) |
| resets the counter of failed responses | |
| void | setRTT (const NodeHandle &node, simtime_t rtt) |
| sets the rtt to a node in all buckets | |
Protected Attributes | |
| int | chooseLookup |
| decides which kind of lookup (right/left shifting) is used | |
| simtime_t | joinDelay |
| time interval between two join tries | |
| int | receivedJoinResponse |
| number of received join response messages | |
| int | receivedBBucketLookup |
| number of received lookup responses for the B bucket | |
| int | numberBBucketLookup |
| maximal number of lookup responses for the B bucket | |
| int | receivedLBucketLookup |
| number of received lookup responses for the L bucket | |
| int | numberLBucketLookup |
| maximal number of lookup responses for the L bucket | |
| int | shiftingBits |
| number of bits shifted in/out each step | |
| int | powShiftingBits |
| 2^{variable shiftingBits} | |
| uint32_t | bucketSize |
| maximal number of bucket entries | |
| uint32_t | rBucketSize |
| maximal number of entries in the r buckets | |
| int | keyLength |
| length of the node and data IDs | |
| simtime_t | refreshTime |
| idle time after which a node is pinged | |
| uint32_t | userDist |
| how many hops are added to the estimated hop count | |
| int | numberRetries |
| number of retries in case of timeout | |
| int | bucketRetries |
| number of bucket retries for a successful join | |
| bool | stab1 |
| bool | stab2 |
| int | bucketCount |
| number of Bucket messages | |
| int | bucketBytesSent |
| length of all Bucket messages | |
| int | numFailedPackets |
| number of packets which couldn't be routed correctly | |
| BrooseBucket * | lBucket |
| BrooseBucket * | bBucket |
| BrooseBucket ** | rBucket |
| std::vector< BrooseBucket * > | bucketVector |
| vector of all Broose buckets | |
| cMessage * | join_timer |
| cMessage * | bucket_timer |
| timer to reconstruct all buckets | |
| TransportAddress | bootstrapNode |
| node handle holding the bootstrap node | |
Friends | |
| class | BrooseBucket |
Broose overlay module.
Implementation of the Broose KBR overlay as described in "Broose: A Practical Distributed Hashtable Based on the De-Bruijn Topology" by Anh-Tuan Gai and Laurent Viennot
Definition at line 52 of file Broose.h.
| Broose::Broose | ( | ) |
Definition at line 51 of file Broose.cc.
{
join_timer = NULL;
bucket_timer = NULL;
rBucket = NULL;
lBucket = NULL;
bBucket = NULL;
}
| Broose::~Broose | ( | ) |
Definition at line 59 of file Broose.cc.
{
// delete timers
cancelAndDelete(join_timer);
cancelAndDelete(bucket_timer);
}
| void Broose::addNode | ( | const NodeHandle & | node | ) | [protected] |
adds a node to all buckets
| node | node handle which should be added |
Definition at line 1116 of file Broose.cc.
Referenced by findNode().
{
// add node to all buckets
for (size_t i = 0; i < bucketVector.size(); i++) {
bucketVector[i]->add(node);
}
}
| void Broose::changeState | ( | int | state | ) | [protected] |
changes the node's state
| state | the state to which a node is changing |
Definition at line 145 of file Broose.cc.
Referenced by handleBucketResponseRpc(), handleBucketTimeout(), handleJoinTimerExpired(), and joinOverlay().
{
switch (toState) {
case INIT: {
state = INIT;
// find a new bootstrap node and enroll to the bootstrap list
bootstrapNode = bootstrapList->getBootstrapNode();
cancelEvent(join_timer);
scheduleAt(simTime(), join_timer);
// initialize respectively clear the buckets
for (int i = 0; i < powShiftingBits; i++) {
rBucket[i]->initializeBucket(shiftingBits, i, rBucketSize, this);
}
lBucket->initializeBucket(-shiftingBits, 0, powShiftingBits*rBucketSize,
this);
bBucket->initializeBucket(0, 0, 7*bucketSize, this, true);
// if we have restarted the join protocol reset parameters
receivedBBucketLookup = 0;
receivedLBucketLookup = 0;
receivedJoinResponse = 0;
getParentModule()->getParentModule()->bubble("Enter INIT state.");
updateTooltip();
break;
}
case RSET: {
state = RSET;
BrooseBucket* tmpBucket = new BrooseBucket();
tmpBucket->initializeBucket(0, 0, powShiftingBits*rBucketSize, this);
for (int i = 0; i < powShiftingBits; i++) {
int size = rBucket[i]->getSize();
for (int j = 0; j < size; j++) {
tmpBucket->add(rBucket[i]->get(j));
}
}
BucketCall** bCall = new BucketCall*[tmpBucket->getSize()];
for (uint32_t i = 0; i < tmpBucket->getSize(); i++) {
bCall[i] = new BucketCall("LBucketCall");
bCall[i]->setBucketType(LEFT);
bCall[i]->setProState(PRSET);
bCall[i]->setBitLength(BUCKETCALL_L(bcall[i]));
sendUdpRpcCall(tmpBucket->get(i), bCall[i], NULL,
10);
}
// half of the calls must return for a state change
numberBBucketLookup = ceil((double)tmpBucket->getSize() / 2);
delete tmpBucket;
getParentModule()->getParentModule()->bubble("Enter RSET state.");
break;
}
case BSET: {
state = BSET;
// half of the calls must return for a state change
numberLBucketLookup = ceil((double)bBucket->getSize() / 2);
// send messages to all entries of the B Bucket
int size2 = bBucket->getSize();
BucketCall** bCall2 = new BucketCall*[size2];
for (int i = 0; i < size2; i++) {
bCall2[i] = new BucketCall("LBucketCall");
bCall2[i]->setBucketType(LEFT);
bCall2[i]->setProState(PBSET);
bCall2[i]->setBitLength(BUCKETCALL_L(bcall2[i]));
sendUdpRpcCall(bBucket->get(i), bCall2[i], NULL,
10);
}
getParentModule()->getParentModule()->bubble("Enter BSET state.");
break;
}
case READY: {
state = READY;
// fill the bucket also with this node
for (size_t i = 0; i < bucketVector.size(); i++) {
bucketVector[i]->add(thisNode);
}
// to disable the ping protocol a pingDelay or
// refreshTime of zero was given
if (refreshTime != 0) {
cancelEvent(bucket_timer);
scheduleAt(simTime() + (refreshTime / 2.0), bucket_timer);
}
getParentModule()->getParentModule()->bubble("Enter READY state.");
updateTooltip();
break;
}
}
setOverlayReady(state == READY);
}
| void Broose::displayBucketState | ( | ) | [protected] |
debug function which output the content of the node's buckets
Definition at line 805 of file Broose.cc.
{
EV << "[Broose::displayBucketState() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]" << endl;
for (int i = 0; i < powShiftingBits; i++) {
EV << " Content of rBucket[" << i << "]: ";
rBucket[i]->output();
}
EV << " Content of lBucket: ";
lBucket->output();
EV << " Content of bBucket: ";
bBucket->output();
EV << endl;
}
| NodeVector * Broose::findNode | ( | const OverlayKey & | key, | |
| int | numRedundantNodes, | |||
| int | numSiblings, | |||
| BaseOverlayMessage * | msg | |||
| ) | [protected, virtual] |
Implements the find node call.
This method simply returns the closest nodes known in the corresponding routing topology. If the node is a sibling for this key (isSiblingFor(key) = true), this method returns all numSiblings siblings, with the closest neighbor to the key first.
| key | The lookup key. | |
| numRedundantNodes | Maximum number of next hop nodes to return. | |
| numSiblings | number of siblings to return | |
| msg | A pointer to the BaseRouteMessage or FindNodeCall message of this lookup. |
Reimplemented from BaseOverlay.
Definition at line 574 of file Broose.cc.
{
if ((state == INIT) || (state == RSET) || (state == FAILED))
return new NodeVector();
BrooseFindNodeExtMessage *findNodeExt = NULL;
bool err;
bool isSibling = isSiblingFor(thisNode, key, numSiblings, &err);
int resultSize;
if (numSiblings < 0) {
// exhaustive iterative doesn't care about siblings
resultSize = numRedundantNodes;
} else {
resultSize = isSibling ? (numSiblings ? numSiblings : 1)
: numRedundantNodes;
}
assert(numSiblings || numRedundantNodes);
NodeVector* result = new NodeVector(resultSize);
if (isSibling) {
//return the closest nodes
// sort with XOR distance to key
KeyDistanceComparator<KeyXorMetric>* comp =
new KeyDistanceComparator<KeyXorMetric>(key);
result->setComparator(comp);
bBucket->fillVector(result);
result->add(thisNode);
delete comp;
/*
std::cout << "key: " << key.toString(2).substr(0, 8)
<< " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
if (result->size() > 0) {
std::cout << " next hop (final): " << (*result)[0].getKey().toString(2).substr(0, 8);
} else {
std::cout << " no next hop! (final)";
}
std::cout << std::endl << std::endl;
*/
return result;
}
if (msg != NULL) {
if (!msg->hasObject("findNodeExt")) {
findNodeExt = new BrooseFindNodeExtMessage("findNodeExt");
OverlayKey routeKey = thisNode.getKey();
// estimate distance
int dist = max(rBucket[0]->longestPrefix(),
rBucket[1]->longestPrefix()) + 1 + userDist;
if ((dist % shiftingBits) != 0)
dist += (shiftingBits - (dist % shiftingBits));
if (dist > keyLength) {
if ((keyLength % shiftingBits) == 0) {
dist = keyLength;
} else {
dist = (keyLength - keyLength % shiftingBits);
}
}
if ((chooseLookup++) % 2 == 0) {
// init left shifting lookup
findNodeExt->setRightShifting(false);
int prefix = 0;
for (int i = 0; i < dist; i++) {
prefix += thisNode.getKey().getBit(thisNode.getKey().getLength() - i - 1) << (dist - i - 1);
}
OverlayKey pre(prefix);
routeKey = key >> dist;
routeKey += (pre << key.getLength() - dist);
dist = -dist;
} else {
// init right shifting lookup
findNodeExt->setRightShifting(true);
}
//add contact for next Hop
findNodeExt->setLastNode(thisNode);
findNodeExt->setRouteKey(routeKey);
findNodeExt->setStep(dist);
findNodeExt->setBitLength(BROOSEFINDNODEEXTMESSAGE_L);
msg->addObject( findNodeExt );
}
findNodeExt = (BrooseFindNodeExtMessage*) msg->getObject("findNodeExt");
}
// update buckets with last hop
addNode(findNodeExt->getLastNode());
setLastSeen(findNodeExt->getLastNode());
// replace last hop contact information with
// this hop contact information
findNodeExt->setLastNode(thisNode);
// brother lookup
if (findNodeExt->getStep() == 0) {
// return the closest nodes sorted by XOR distance to key
KeyDistanceComparator<KeyXorMetric>* comp =
new KeyDistanceComparator<KeyXorMetric>(key);
result->setComparator(comp);
bBucket->fillVector(result);
result->add(thisNode);
delete comp;
return result;
}
if (findNodeExt->getRightShifting() == false) {
// Left Shifting Lookup
// can't handle left shifting lookup in BSET-State
if (state == BSET)
return result;
// calculate routing key
findNodeExt->setRouteKey((findNodeExt->getRouteKey()) << shiftingBits);
findNodeExt->setStep(findNodeExt->getStep() + shiftingBits);
KeyDistanceComparator<KeyXorMetric>* comp = NULL;
comp = new KeyDistanceComparator<KeyXorMetric>(
findNodeExt->getRouteKey());
result->setComparator(comp);
lBucket->fillVector(result);
result->add(thisNode);
delete comp;
/*
std::cout << "key: " << key.toString(2).substr(0, 8)
<< " dist: " << findNodeExt->getStep()
<< " rtkey: " << findNodeExt->getRouteKey().toString(2).substr(0, 8)
<< " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
if (result->size() > 0) {
std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
} else {
std::cout << " no next hop!";
}
std::cout << std::endl << std::endl;
*/
} else {
// Right Shifting Lookup
// calculate routing key
int prefix = 0;
int dist = findNodeExt->getStep();
OverlayKey routeKey = findNodeExt->getRouteKey() >> shiftingBits;
for (int i = 0; i < shiftingBits; i++)
prefix += ((int)key.getBit(key.getLength() - dist + i) << i);
OverlayKey pre(prefix);
routeKey += (pre << (routeKey.getLength()-shiftingBits));
findNodeExt->setRouteKey(routeKey);
findNodeExt->setStep(dist - shiftingBits);
KeyDistanceComparator<KeyXorMetric>* comp = NULL;
comp = new KeyDistanceComparator<KeyXorMetric>(routeKey);
result->setComparator(comp);
rBucket[prefix]->fillVector(result);
result->add(thisNode);
delete comp;
/*
std::cout << "key: " << key.toString(2).substr(0, 8)
<< " dist: " << findNodeExt->getStep()
<< " rtkey: " << findNodeExt->getRouteKey().toString(2).substr(0, 8)
<< " ThisNode: " << thisNode.getKey().toString(2).substr(0, 8);
if (result->size() > 0) {
std::cout << " next hop: " << (*result)[0].getKey().toString(2).substr(0, 8);
} else {
std::cout << " no next hop!";
}
std::cout << std::endl << std::endl;
*/
}
if ((*result)[0] == thisNode) {
delete result;
return (findNode(key, numRedundantNodes, numSiblings, msg));
} else
return result;
}
| void Broose::finishOverlay | ( | ) | [virtual] |
collects statistical data in derived class
Reimplemented from BaseOverlay.
Definition at line 772 of file Broose.cc.
{
// store statistics
simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
if (time < GlobalStatistics::MIN_MEASURED) return;
globalStatistics->addStdDev("Broose: Number of non-routable packets/s", numFailedPackets / time);
globalStatistics->addStdDev("Broose: Sent BUCKET Messages/s", bucketCount / time);
globalStatistics->addStdDev("Broose: Sent BUCKET Byte/s", bucketBytesSent / time);
globalStatistics->addStdDev("Broose: Bucket retries at join", bucketRetries);
}
| int Broose::getMaxNumRedundantNodes | ( | ) | [protected, virtual] |
Query the maximum number of redundant next hop nodes that are returned by findNode().
Reimplemented from BaseOverlay.
Definition at line 351 of file Broose.cc.
{
return bucketSize;
}
| int Broose::getMaxNumSiblings | ( | ) | [protected, virtual] |
Query the maximum number of siblings (nodes close to a key) that are maintained by this overlay protocol.
Reimplemented from BaseOverlay.
Definition at line 346 of file Broose.cc.
Referenced by isSiblingFor().
{
return bucketSize;
}
| int Broose::getRoutingDistance | ( | const OverlayKey & | key, | |
| const OverlayKey & | node, | |||
| int | dist | |||
| ) | [protected] |
calculates the de-buijn distance between a key and a nodeId
| key | the overlay key | |
| node | the nodeId | |
| dist | the estimated maximum distance based on the number of nodes in the system |
Definition at line 356 of file Broose.cc.
{
for (uint i = 0; i < (uint)abs(dist); i++) {
if (node.sharedPrefixLength(key << i) >= (abs(dist) - i)) {
return i; // right shifting
}
if (key.sharedPrefixLength(node << i) >= (abs(dist) - i)) {
return -i; // left shifting
}
}
if (((chooseLookup++) % 2) == 0) {
return -dist;
} else {
return dist;
}
}
| void Broose::handleBucketRequestRpc | ( | BucketCall * | msg | ) | [protected] |
handles a received Bucket request
| msg | the message to process |
Definition at line 962 of file Broose.cc.
Referenced by handleRpcCall().
{
if (msg->getBucketType() == LEFT) {
// TODO: dependent on the churn scenarios this may give better
// or worse results
if (stab1 && (state == BSET)) {
// can't handle LBucketRequest in BSET-State
delete msg;
return;
}
// return L-Bucket
int size = lBucket->getSize();
BucketResponse* bResponse = new BucketResponse("LBucketResponse");
bResponse->setNodesArraySize(size);
for (int i = 0; i < size; i++) {
bResponse->setNodes(i, lBucket->get(i));
}
bResponse->setBitLength(BUCKETRESPONSE_L(bResponse));
// only add, if the originator is already in the BSET state
// in which the node already is able to do right shifting lookups
// TODO: this leads to lower lookup success rates in some scenarios
// but helps to prevent deadlock situations with high churn rates
if (stab2 || (msg->getProState() == PBSET)) {
routingAdd(msg->getSrcNode(), true);
}
sendRpcResponse(msg, bResponse);
} else if (msg->getBucketType() == BROTHER) {
// return B-Bucket
int size = bBucket->getSize();
BucketResponse* bResponse = new BucketResponse("BBucketResponse");
bResponse->setNodesArraySize(size);
for (int i = 0; i < size; i++) {
bResponse->setNodes(i, bBucket->get(i));
}
bResponse->setBitLength(BUCKETRESPONSE_L(bResponse));
sendRpcResponse(msg, bResponse);
} else
error("Broose::handleBucketRequestRpc() - Wrong Bucket Type!");
}
| void Broose::handleBucketResponseRpc | ( | BucketResponse * | msg, | |
| const RpcState & | rpcState | |||
| ) | [protected] |
handles a received Bucket response
| msg | the message to process | |
| rpcState | the state object for the received RPC |
Definition at line 1009 of file Broose.cc.
Referenced by handleRpcResponse().
{
BucketCall* call = check_and_cast<BucketCall*>(rpcState.getCallMsg());
for (uint i = 0; i < msg->getNodesArraySize(); i++) {
routingAdd(msg->getNodes(i), false);
}
if (call->getBucketType() == LEFT) {
switch (state) {
case RSET:
if (call->getProState() == PRSET) {
receivedBBucketLookup++;
if (receivedBBucketLookup == numberBBucketLookup)
changeState(BSET);
}
break;
case BSET:
if (call->getProState() == PBSET) {
receivedLBucketLookup++;
if (receivedLBucketLookup == numberLBucketLookup)
changeState(READY);
}
break;
default:
break;
}
} else if (call->getBucketType() == BROTHER) {
switch(state) {
case INIT:
if (call->getProState() == PINIT) {
receivedJoinResponse++;
if (receivedJoinResponse == powShiftingBits)
changeState(RSET);
}
default:
break;
}
} else
error("Broose::handleBucketRequestRpc() - unknown error.");
}
| void Broose::handleBucketTimeout | ( | BucketCall * | msg | ) | [protected] |
handles a received Bucket timeout
| msg | the message to process |
Definition at line 1055 of file Broose.cc.
Referenced by handleRpcTimeout().
{
if (state == READY)
return;
else {
bucketRetries++;
changeState(INIT);
}
}
| void Broose::handleBucketTimerExpired | ( | cMessage * | msg | ) | [protected] |
handles a expired bucket refresh timer
| msg | the bucket refresh self-message |
Definition at line 318 of file Broose.cc.
Referenced by handleTimerEvent().
{
BrooseBucket* tmpBucket = new BrooseBucket();
tmpBucket->initializeBucket(0, 0,
(2*powShiftingBits*rBucketSize + 7*bucketSize),
this);
for (size_t i = 0; i < bucketVector.size(); i++) {
for(uint32_t j = 0; j < bucketVector[i]->getSize(); j++) {
if ((simTime() - bucketVector[i]->getLastSeen(
bucketVector[i]->get(j))) > refreshTime
|| bucketVector[i]->getRTT(bucketVector[i]->get(j)) == -1) {
tmpBucket->add(BrooseHandle(bucketVector[i]->get(j)));
}
}
}
for (uint32_t i = 0; i < tmpBucket->getSize(); i++) {
pingNode(tmpBucket->get(i));
}
delete tmpBucket;
scheduleAt(simTime() + (refreshTime / 2.0), bucket_timer);
}
| void Broose::handleFindNodeTimeout | ( | FindNodeCall * | findNode, | |
| const TransportAddress & | dest, | |||
| const OverlayKey & | destKey | |||
| ) | [protected] |
This method is called if an Find Node Call timeout has been reached.
| findNode | The original FindNodeCall | |
| dest | the destination node | |
| destKey | the destination OverlayKey |
Definition at line 1083 of file Broose.cc.
Referenced by handleRpcTimeout().
{
routingTimeout(dynamic_cast<const NodeHandle&>(dest));
}
| void Broose::handleJoinTimerExpired | ( | cMessage * | msg | ) | [protected] |
handles a expired join timer
| msg | the timer self-message |
Definition at line 268 of file Broose.cc.
Referenced by handleTimerEvent().
{
if (state == READY)
return;
if (!bootstrapNode.isUnspecified()) {
// create new lookup message
#if 0
BucketCall* bCall = new BucketCall();
bCall->setBucketType(BROTHER);
bCall->setProState(FAILED);
bCall->setBitLength(BUCKETCALL_L(call));
sendRouteRpcCall(OVERLAY_COMP, bootstrapNode, thisNode.getKey(),
bCall);
BucketCall* lCall = new BucketCall();
lCall->setBucketType(BROTHER);
lCall->setProState(FAILED);
lCall->setBitLength(BUCKETCALL_L(call));
sendRouteRpcCall(OVERLAY_COMP, bootstrapNode,
thisNode.getKey() << shiftingBits, lCall);
#endif
// do lookups for key >> shiftingBits for each prefix
OverlayKey newKey = thisNode.getKey() >> shiftingBits;
BucketCall* bCallArray[powShiftingBits];
for (int i = 0; i < powShiftingBits; i++) {
OverlayKey add(i);
add = add << (keyLength - shiftingBits);
add += newKey;
bCallArray[i] = new BucketCall("BBucketCall");
bCallArray[i]->setBucketType(BROTHER);
bCallArray[i]->setBucketIndex(i);
bCallArray[i]->setProState(PINIT);
bCallArray[i]->setBitLength(BUCKETCALL_L(bCallArray[i]));
// restart join protocol if one call times out
// otherwise the node might be isolated
sendRouteRpcCall(OVERLAY_COMP, bootstrapNode, add,
bCallArray[i]);
}
//createLookup()->lookup(getThisNode().getKey() + 1, 0, 0, 0,
// new BrooseLookupListener(this));
} else {
// if the bootstrap node is unspecified we are the only node in the network
// so we can skip the "normal" join protocol
changeState(READY);
}
}
| bool Broose::handleRpcCall | ( | BaseCallMessage * | msg | ) | [virtual] |
Definition at line 873 of file Broose.cc.
{
if (state == BSET || state == READY) {
// delegate messages
RPC_SWITCH_START(msg)
RPC_DELEGATE(Bucket, handleBucketRequestRpc);
RPC_ON_CALL(Ping) {
// add pinging node to all buckets and update lastSeen of node
routingAdd(msg->getSrcNode(), true);
return false;
break;
}
RPC_ON_CALL(FindNode) {
// add pinging node to all buckets and update lastSeen of node
routingAdd(msg->getSrcNode(), true);
return false;
break;
}
RPC_SWITCH_END()
return RPC_HANDLED;
} else {
RPC_SWITCH_START(msg)
// don't answer PING and FIND_NODE calls, if the node can't route yet
RPC_ON_CALL(Ping) {
delete msg;
return true;
break;
}
RPC_ON_CALL(FindNode) {
delete msg;
return true;
break;
}
RPC_SWITCH_END()
return RPC_HANDLED;
}
}
| void Broose::handleRpcResponse | ( | BaseResponseMessage * | msg, | |
| const RpcState & | rpcState, | |||
| simtime_t | rtt | |||
| ) | [protected] |
Definition at line 911 of file Broose.cc.
{
// add sender to all buckets and update lastSeen of node
routingAdd(msg->getSrcNode(), true, rtt);
RPC_SWITCH_START(msg)
RPC_ON_RESPONSE( Bucket ) {
handleBucketResponseRpc(_BucketResponse, rpcState);
EV << "[Broose::handleRpcResponse() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Bucket RPC Response received: id=" << rpcState.getId() << "\n"
<< " msg=" << *_BucketResponse << " rtt=" << rtt
<< endl;
break;
}
RPC_ON_RESPONSE(FindNode)
{
// add inactive nodes
for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
routingAdd(_FindNodeResponse->getClosestNodes(i), false);
break;
}
RPC_SWITCH_END( )
}
| void Broose::handleRpcTimeout | ( | const RpcState & | rpcState | ) | [protected] |
Definition at line 938 of file Broose.cc.
{
RPC_SWITCH_START(rpcState.getCallMsg())
RPC_ON_CALL(FindNode) {
handleFindNodeTimeout(_FindNodeCall, rpcState.getDest(), rpcState.getDestKey());
EV << "[Broose::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Find Node RPC Call timed out: id=" << rpcState.getId() << "\n"
<< " msg=" << *_FindNodeCall
<< endl;
break;
}
RPC_ON_CALL(Bucket) {
handleBucketTimeout(_BucketCall);
EV << "[Broose::handleRpcTimeout() @ " << thisNode.getIp()
<< " (" << thisNode.getKey().toString(16) << ")]\n"
<< " Bucket RPC Call timed out: id=" << rpcState.getId() << "\n"
<< " msg=" << *_BucketCall
<< endl;
break;
}
RPC_SWITCH_END()
}
| void Broose::handleTimerEvent | ( | cMessage * | msg | ) | [virtual] |
Definition at line 258 of file Broose.cc.
{
if (msg == join_timer)
handleJoinTimerExpired(msg);
else if (msg == bucket_timer)
handleBucketTimerExpired(msg);
else
error("Broose::handleTimerEvent - no other timer currently in use!");
}
| void Broose::initializeOverlay | ( | int | stage | ) | [virtual] |
Initializes derived-class-attributes.
Initializes derived-class-attributes, called by BaseOverlay::initialize(). By default this method is called once. If more stages are needed one can overload numInitStages() and add more stages.
| stage | the init stage |
Reimplemented from BaseOverlay.
Definition at line 66 of file Broose.cc.
{
// because of IPAddressResolver, we need to wait until interfaces
// are registered, address auto-assignment takes place etc.
if (stage != MIN_STAGE_OVERLAY)
return;
// Broose provides KBR services
kbr = true;
// fetch some parameters
bucketSize = par("bucketSize"); // = k
rBucketSize = par("rBucketSize"); // = k'
joinDelay = par("joinDelay");
shiftingBits = par("brooseShiftingBits");
userDist = par("userDist");
refreshTime = par("refreshTime");
numberRetries = par("numberRetries");
stab1 = par("stab1");
stab2 = par("stab2");
//statistics
bucketCount = 0;
bucketBytesSent = 0;
//init local parameters
chooseLookup = 0;
receivedJoinResponse = 0;
receivedBBucketLookup = 0;
numberBBucketLookup = 0;
receivedLBucketLookup = 0;
numberLBucketLookup = 0;
powShiftingBits = 1 << shiftingBits;
keyLength = OverlayKey::getLength();
numFailedPackets = 0;
bucketRetries = 0;
// add some watches
WATCH(receivedJoinResponse);
WATCH(receivedBBucketLookup);
WATCH(numberBBucketLookup);
WATCH(receivedLBucketLookup);
WATCH(numberLBucketLookup);
WATCH(state);
// get module pointers for all buckets
rBucket = new BrooseBucket*[powShiftingBits];
for (int i = 0; i < powShiftingBits; i++) {
rBucket[i] = check_and_cast<BrooseBucket*>
(getParentModule()->getSubmodule("rBucket",i));
bucketVector.push_back(rBucket[i]);
}
lBucket = check_and_cast<BrooseBucket*>
(getParentModule()->getSubmodule("lBucket"));
bucketVector.push_back(lBucket);
bBucket = check_and_cast<BrooseBucket*>
(getParentModule()->getSubmodule("bBucket"));
bucketVector.push_back(bBucket);
// create join and bucket timer
join_timer = new cMessage("join_timer");
bucket_timer = new cMessage("bucket_timer");
}
| bool Broose::isSiblingFor | ( | const NodeHandle & | node, | |
| const OverlayKey & | key, | |||
| int | numSiblings, | |||
| bool * | err | |||
| ) | [virtual] |
Query if a node is among the siblings for a given key.
Query if a node is among the siblings for a given key. This means, that the nodeId of this node is among the closest numSiblings nodes to the key and that by a local findNode() call all other siblings to this key can be retrieved.
| node | the NodeHandle | |
| key | destination key | |
| numSiblings | The nodes knows all numSiblings nodes close to this key | |
| err | return false if the range could not be determined |
Reimplemented from BaseOverlay.
Definition at line 823 of file Broose.cc.
Referenced by findNode().
{
// TODO: node != thisNode doesn't work yet
if (key.isUnspecified())
error("Broose::isSiblingFor(): key is unspecified!");
if (node != thisNode)
error("Broose::isSiblingsFor(): "
"node != thisNode is not implemented!");
if (numSiblings > getMaxNumSiblings()) {
opp_error("Broose::isSiblingFor(): numSiblings too big!");
}
// set default number of siblings to consider
if (numSiblings == -1) numSiblings = getMaxNumSiblings();
if (numSiblings == 0) {
*err = false;
return (node.getKey() == key);
}
if (state != READY) {
*err = true;
return false;
}
// TODO: handle numSibling parameter
return bBucket->keyInRange(key);
}
| void Broose::joinOverlay | ( | ) | [virtual] |
Join the overlay with a given nodeID in thisNode.key.
Join the overlay with a given nodeID in thisNode.key. This method may be called by an application to join the overlay with a specific nodeID. It is also called if the node's IP address changes.
Reimplemented from BaseOverlay.
Definition at line 133 of file Broose.cc.
{
changeState(INIT);
// if the bootstrap node is unspecified we are the only node in the network
// so we can skip the "normal" join protocol
if (bootstrapNode.isUnspecified()) {
changeState(READY);
}
}
| void Broose::pingResponse | ( | PingResponse * | pingResponse, | |
| cPolymorphic * | context, | |||
| int | rpcId, | |||
| simtime_t | rtt | |||
| ) | [protected, virtual] |
Definition at line 1065 of file Broose.cc.
{
// if node respond reset failedResponses and add lastSeen to node
routingAdd(pingResponse->getSrcNode(), true, rtt);
}
| void Broose::pingTimeout | ( | PingCall * | pingCall, | |
| const TransportAddress & | dest, | |||
| cPolymorphic * | context, | |||
| int | rpcId | |||
| ) | [protected, virtual] |
Definition at line 1090 of file Broose.cc.
{
routingTimeout(dynamic_cast<const NodeHandle&>(dest));
}
| void Broose::recordOverlaySentStats | ( | BaseOverlayMessage * | msg | ) | [virtual] |
Collect overlay specific sent messages statistics.
This method is called from BaseOverlay::sendMessageToUDP() for every overlay message that is sent by a node. Use this to collect statistical data for overlay protocol specific message types.
| msg | The overlay message to be sent to the UDP layer |
Reimplemented from BaseOverlay.
Definition at line 785 of file Broose.cc.
{
BaseOverlayMessage* innerMsg = msg;
while (innerMsg->getType() != APPDATA &&
innerMsg->getEncapsulatedPacket() != NULL) {
innerMsg =
static_cast<BaseOverlayMessage*>(innerMsg->getEncapsulatedPacket());
}
switch (innerMsg->getType()) {
case RPC:
if ((dynamic_cast<BucketCall*>(innerMsg) != NULL) ||
(dynamic_cast<BucketResponse*>(innerMsg) != NULL)) {
RECORD_STATS(bucketCount++; bucketBytesSent +=
msg->getByteLength());
}
break;
}
}
| void Broose::resetFailedResponses | ( | const NodeHandle & | node | ) | [protected] |
resets the counter of failed responses
| node | node handle of the responding node |
Definition at line 1124 of file Broose.cc.
{
for (size_t i = 0; i < bucketVector.size(); i++) {
bucketVector[i]->resetFailedResponses(node);
}
}
| bool Broose::routingAdd | ( | const NodeHandle & | node, | |
| bool | isAlive, | |||
| simtime_t | rtt = MAXTIME | |||
| ) | [protected] |
Adds a node to the routing table.
| node | NodeHandle to add | |
| isAlive | true, if it is known that the node is alive | |
| rtt | measured round-trip-time to node |
Definition at line 1097 of file Broose.cc.
Referenced by handleBucketRequestRpc(), handleBucketResponseRpc(), handleRpcCall(), handleRpcResponse(), and pingResponse().
{
bool added = false;
for (size_t i = 0; i < bucketVector.size(); i++) {
added |= bucketVector[i]->add(node, isAlive, rtt);
}
return added;
}
| void Broose::routingTimeout | ( | const BrooseHandle & | handle | ) | [protected] |
Definition at line 1071 of file Broose.cc.
Referenced by handleFindNodeTimeout(), and pingTimeout().
{
for (size_t i = 0; i < bucketVector.size(); i++) {
if (bucketVector[i]->getFailedResponses(handle) == numberRetries)
bucketVector[i]->remove(handle);
else
bucketVector[i]->increaseFailedResponses(handle);
}
// TODO: if we loose the last node (despite ourself) from the
// B bucket, we should call join() to rejoin the network
}
| void Broose::setLastSeen | ( | const NodeHandle & | node | ) | [protected] |
updates the timestamp of a node in all buckets
| node | node handle which should be updated |
Definition at line 1109 of file Broose.cc.
Referenced by findNode().
{
for (size_t i = 0; i < bucketVector.size(); i++) {
bucketVector[i]->setLastSeen(node, simTime());
}
}
| void Broose::setRTT | ( | const NodeHandle & | node, | |
| simtime_t | rtt | |||
| ) | [protected] |
sets the rtt to a node in all buckets
| node | node handle to which a rtt is added/updated | |
| rtt | round trip time to the node |
Definition at line 1131 of file Broose.cc.
{
for (size_t i = 0; i < bucketVector.size(); i++) {
bucketVector[i]->setRTT(node, rtt);
}
}
| void Broose::updateTooltip | ( | ) |
updates information shown in tk-environment
Definition at line 856 of file Broose.cc.
Referenced by changeState().
{
if (ev.isGUI()) {
std::stringstream ttString;
// show our ip and key in tooltip
ttString << thisNode.getIp() << " " << thisNode.getKey();
getParentModule()->getParentModule()->getDisplayString().
setTagArg("tt", 0, ttString.str().c_str());
getParentModule()->getDisplayString().
setTagArg("tt", 0, ttString.str().c_str());
getDisplayString().setTagArg("tt", 0, ttString.str().c_str());
}
}
friend class BrooseBucket [friend] |
Definition at line 273 of file Broose.h.
Referenced by changeState(), and handleBucketTimerExpired().
BrooseBucket * Broose::bBucket [protected] |
Definition at line 114 of file Broose.h.
Referenced by Broose(), changeState(), displayBucketState(), findNode(), handleBucketRequestRpc(), initializeOverlay(), and isSiblingFor().
TransportAddress Broose::bootstrapNode [protected] |
node handle holding the bootstrap node
Definition at line 124 of file Broose.h.
Referenced by changeState(), handleJoinTimerExpired(), and joinOverlay().
cMessage* Broose::bucket_timer [protected] |
timer to reconstruct all buckets
Definition at line 121 of file Broose.h.
Referenced by Broose(), changeState(), handleBucketTimerExpired(), handleTimerEvent(), initializeOverlay(), and ~Broose().
int Broose::bucketBytesSent [protected] |
length of all Bucket messages
Definition at line 110 of file Broose.h.
Referenced by finishOverlay(), initializeOverlay(), and recordOverlaySentStats().
int Broose::bucketCount [protected] |
number of Bucket messages
Definition at line 109 of file Broose.h.
Referenced by finishOverlay(), initializeOverlay(), and recordOverlaySentStats().
int Broose::bucketRetries [protected] |
number of bucket retries for a successful join
Definition at line 104 of file Broose.h.
Referenced by finishOverlay(), handleBucketTimeout(), and initializeOverlay().
uint32_t Broose::bucketSize [protected] |
maximal number of bucket entries
Definition at line 98 of file Broose.h.
Referenced by changeState(), getMaxNumRedundantNodes(), getMaxNumSiblings(), handleBucketTimerExpired(), and initializeOverlay().
std::vector<BrooseBucket*> Broose::bucketVector [protected] |
vector of all Broose buckets
Definition at line 117 of file Broose.h.
Referenced by addNode(), changeState(), handleBucketTimerExpired(), initializeOverlay(), resetFailedResponses(), routingAdd(), routingTimeout(), setLastSeen(), and setRTT().
int Broose::chooseLookup [protected] |
decides which kind of lookup (right/left shifting) is used
Definition at line 89 of file Broose.h.
Referenced by findNode(), getRoutingDistance(), and initializeOverlay().
cMessage* Broose::join_timer [protected] |
Definition at line 120 of file Broose.h.
Referenced by Broose(), changeState(), handleTimerEvent(), initializeOverlay(), and ~Broose().
simtime_t Broose::joinDelay [protected] |
time interval between two join tries
Definition at line 90 of file Broose.h.
Referenced by initializeOverlay().
int Broose::keyLength [protected] |
length of the node and data IDs
Definition at line 100 of file Broose.h.
Referenced by findNode(), handleJoinTimerExpired(), and initializeOverlay().
BrooseBucket* Broose::lBucket [protected] |
Definition at line 114 of file Broose.h.
Referenced by Broose(), changeState(), displayBucketState(), findNode(), handleBucketRequestRpc(), and initializeOverlay().
int Broose::numberBBucketLookup [protected] |
maximal number of lookup responses for the B bucket
Definition at line 93 of file Broose.h.
Referenced by changeState(), handleBucketResponseRpc(), and initializeOverlay().
int Broose::numberLBucketLookup [protected] |
maximal number of lookup responses for the L bucket
Definition at line 95 of file Broose.h.
Referenced by changeState(), handleBucketResponseRpc(), and initializeOverlay().
int Broose::numberRetries [protected] |
number of retries in case of timeout
Definition at line 103 of file Broose.h.
Referenced by initializeOverlay(), and routingTimeout().
int Broose::numFailedPackets [protected] |
number of packets which couldn't be routed correctly
Definition at line 111 of file Broose.h.
Referenced by finishOverlay(), and initializeOverlay().
int Broose::powShiftingBits [protected] |
2^{variable shiftingBits}
Definition at line 97 of file Broose.h.
Referenced by changeState(), displayBucketState(), handleBucketResponseRpc(), handleBucketTimerExpired(), handleJoinTimerExpired(), and initializeOverlay().
BrooseBucket** Broose::rBucket [protected] |
Definition at line 115 of file Broose.h.
Referenced by Broose(), changeState(), displayBucketState(), findNode(), and initializeOverlay().
uint32_t Broose::rBucketSize [protected] |
maximal number of entries in the r buckets
Definition at line 99 of file Broose.h.
Referenced by changeState(), handleBucketTimerExpired(), and initializeOverlay().
int Broose::receivedBBucketLookup [protected] |
number of received lookup responses for the B bucket
Definition at line 92 of file Broose.h.
Referenced by changeState(), handleBucketResponseRpc(), and initializeOverlay().
int Broose::receivedJoinResponse [protected] |
number of received join response messages
Definition at line 91 of file Broose.h.
Referenced by changeState(), handleBucketResponseRpc(), and initializeOverlay().
int Broose::receivedLBucketLookup [protected] |
number of received lookup responses for the L bucket
Definition at line 94 of file Broose.h.
Referenced by changeState(), handleBucketResponseRpc(), and initializeOverlay().
simtime_t Broose::refreshTime [protected] |
idle time after which a node is pinged
Definition at line 101 of file Broose.h.
Referenced by changeState(), handleBucketTimerExpired(), and initializeOverlay().
int Broose::shiftingBits [protected] |
number of bits shifted in/out each step
Definition at line 96 of file Broose.h.
Referenced by changeState(), handleJoinTimerExpired(), and initializeOverlay().
bool Broose::stab1 [protected] |
Definition at line 105 of file Broose.h.
Referenced by handleBucketRequestRpc(), and initializeOverlay().
bool Broose::stab2 [protected] |
Definition at line 106 of file Broose.h.
Referenced by handleBucketRequestRpc(), and initializeOverlay().
uint32_t Broose::userDist [protected] |
how many hops are added to the estimated hop count
Definition at line 102 of file Broose.h.
Referenced by findNode(), and initializeOverlay().
1.7.1