OverSim
BasePastry.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2012 Institute of Telematics, Karlsruhe Institute of Technology (KIT)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <sstream>
25 #include <stdint.h>
26 #include <assert.h>
27 
28 #include <IPAddressResolver.h>
29 #include <IPvXAddress.h>
30 #include <IInterfaceTable.h>
31 #include <IPv4InterfaceData.h>
32 #include <RpcMacros.h>
33 #include <InitStages.h>
34 #include <NeighborCache.h>
35 #include <GlobalStatistics.h>
36 #include <BootstrapList.h>
37 
38 #include "BasePastry.h"
39 
40 
42 {
43  // purge Queue for processing multiple STATE messages:
44  while (! stateCacheQueue.empty()) {
45  delete stateCacheQueue.front().msg;
46  delete stateCacheQueue.front().prox;
47  stateCacheQueue.pop();
48  }
49 
50  // delete cached state message:
51  delete stateCache.msg;
52  stateCache.msg = NULL;
53  delete stateCache.prox;
54  stateCache.prox = NULL;
55 }
56 
58 {
59  bitsPerDigit = par("bitsPerDigit");
60  numberOfLeaves = par("numberOfLeaves");
61  numberOfNeighbors = par("numberOfNeighbors");
62  joinTimeoutAmount = par("joinTimeout");
63  repairTimeout = par("repairTimeout");
64  enableNewLeafs = par("enableNewLeafs");
65  optimizeLookup = par("optimizeLookup");
66  useRegularNextHop = par("useRegularNextHop");
67  alwaysSendUpdate = par("alwaysSendUpdate");
68  proximityNeighborSelection = par("proximityNeighborSelection");
69 
70  if (!neighborCache->isEnabled()) {
71  throw cRuntimeError("NeighborCache is disabled, which is mandatory "
72  "for Pastry/Bamboo. Activate it by setting "
73  "\"**.neighborCache.enableNeighborCache "
74  "= true\" in your omnetpp.ini!");
75  }
76 
77  if (numberOfLeaves % 2) {
78  EV << "[BasePastry::baseInit() @ " << thisNode.getIp()
79  << " (" << thisNode.getKey().toString(16) << ")]\n"
80  << " Warning: numberOfLeaves must be even - adding 1."
81  << endl;
83  }
84 
85  routingTable = check_and_cast<PastryRoutingTable*>
86  (getParentModule()->getSubmodule("pastryRoutingTable"));
87  leafSet = check_and_cast<PastryLeafSet*>
88  (getParentModule()->getSubmodule("pastryLeafSet"));
89  neighborhoodSet = check_and_cast<PastryNeighborhoodSet*>
90  (getParentModule()->getSubmodule("pastryNeighborhoodSet"));
91 
92  stateCache.msg = NULL;
93  stateCache.prox = NULL;
94 
95  // initialize statistics
96  joins = 0;
97  joinTries = 0;
98  joinPartial = 0;
99  joinSeen = 0;
100  joinReceived = 0;
101  joinSent = 0;
102  stateSent = 0;
103  stateReceived = 0;
104  repairReqSent = 0;
105  repairReqReceived = 0;
106  stateReqSent = 0;
107  stateReqReceived = 0;
108 
109  joinBytesSeen = 0;
110  joinBytesReceived = 0;
111  joinBytesSent = 0;
112  stateBytesSent = 0;
113  stateBytesReceived = 0;
114  repairReqBytesSent = 0;
116  stateReqBytesSent = 0;
118 
119  totalLookups = 0;
120  responsibleLookups = 0;
122  closerNodeLookups = 0;
124 
125  leafsetReqSent = 0;
127  leafsetReqReceived = 0;
129  leafsetSent = 0;
130  leafsetBytesSent = 0;
131  leafsetReceived = 0;
133 
142 
143  WATCH(joins);
144  WATCH(joinTries);
145  WATCH(joinSeen);
146  WATCH(joinBytesSeen);
147  WATCH(joinReceived);
148  WATCH(joinBytesReceived);
149  WATCH(joinSent);
150  WATCH(joinBytesSent);
151  WATCH(stateSent);
152  WATCH(stateBytesSent);
153  WATCH(stateReceived);
154  WATCH(stateBytesReceived);
155  WATCH(repairReqSent);
156  WATCH(repairReqBytesSent);
157  WATCH(repairReqReceived);
158  WATCH(repairReqBytesReceived);
159  WATCH(stateReqSent);
160  WATCH(stateReqBytesSent);
161  WATCH(stateReqReceived);
162  WATCH(stateReqBytesReceived);
163  WATCH(lastStateChange);
164 
165  WATCH(leafsetReqSent);
166  WATCH(leafsetReqBytesSent);
167  WATCH(leafsetReqReceived);
169  WATCH(leafsetSent);
170  WATCH(leafsetBytesSent);
171  WATCH(leafsetReceived);
172  WATCH(leafsetBytesReceived);
173 
174  WATCH(routingTableRowReqSent);
178  WATCH(routingTableRowSent);
182 
183  WATCH_PTR(stateCache.msg);
184 }
185 
186 
188 {
189  switch (toState) {
190  case INIT:
191  state = INIT;
192 
193  if (!thisNode.getKey().isUnspecified())
195 
196  cancelAllRpcs();
197  purgeVectors();
198 
200 
203  repairTimeout, thisNode, this);
205  thisNode);
206 
207  updateTooltip();
208  lastStateChange = simTime();
209 
210  getParentModule()->getParentModule()->bubble("entering INIT state");
211 
212  break;
213 
214  case JOIN:
215  state = JOIN;
216 
217  // bootstrapNode must be obtained before calling this method,
218  // for example by calling changeState(INIT)
219 
221  // no existing pastry network -> first node of a new one
223  return;
224  }
225 
226  updateTooltip();
227  getParentModule()->getParentModule()->bubble("entering JOIN state");
228 
230 
231  break;
232 
233  case READY:
234  assert(state != READY);
235  state = READY;
236 
237  // if we are the first node in the network,
238  // there's nothing else to do
241  setOverlayReady(true);
242  return;
243  }
244 
245  getParentModule()->getParentModule()->bubble("entering READY state");
246  updateTooltip();
247  RECORD_STATS(joins++);
248 
249  break;
250 
251  default: // discovery
252  break;
253  }
255 }
256 
257 
259 {
260  if (! enableNewLeafs) return;
261 
263  if (msg) {
264  send(msg, "appOut");
265  EV << "[BasePastry::newLeafs() @ " << thisNode.getIp()
266  << " (" << thisNode.getKey().toString(16) << ")]\n"
267  << " newLeafs() called."
268  << endl;
269  }
270 }
271 
272 
273 void BasePastry::proxCallback(const TransportAddress& node, int rpcId,
274  cPolymorphic *contextPointer, Prox prox)
275 {
276  Enter_Method("proxCallback()");
277 
278  EV << "[BasePastry::proxCallback() @ " << thisNode.getIp()
279  << " (" << thisNode.getKey().toString(16) << ")]\n"
280  << " Pong (or timeout) received (from "
281  << node.getIp() << ")"
282  << endl;
283 
284  double rtt = ((prox == Prox::PROX_TIMEOUT) ? PASTRY_PROX_INFINITE
285  : prox.proximity);
286 
287  // merge single pinged nodes (bamboo global tuning)
288  if (rpcId == PING_SINGLE_NODE) {
289  routingTable->mergeNode((const NodeHandle&)node,
291  rtt : SimTime::getMaxTime());
292  delete contextPointer;
293  return;
294  }
295 
296  if (contextPointer != NULL && stateCache.msg && stateCache.prox) {
297  PingContext* pingContext = check_and_cast<PingContext*>(contextPointer);
298 
299  if (pingContext->nonce != stateCache.nonce) {
300  delete contextPointer;
301  return;
302  }
303  // handle failed node
304  if (rtt == PASTRY_PROX_INFINITE && state == READY) {
305  handleFailedNode(node); // TODO
306  updateTooltip();
307 
308  // this could initiate a re-join, exit the handler in that
309  // case because all local data was erased:
310  if (state != READY) {
311  delete contextPointer;
312  return;
313  }
314  }
315  switch (pingContext->stateObject) {
316  case ROUTINGTABLE:
317  assert(stateCache.prox->pr_rt.size() > pingContext->index);
318  *(stateCache.prox->pr_rt.begin() + pingContext->index) = rtt;
319  break;
320 
321  case LEAFSET:
322  assert(stateCache.prox->pr_ls.size() > pingContext->index);
323  *(stateCache.prox->pr_ls.begin() + pingContext->index) = rtt;
324  break;
325 
326  case NEIGHBORHOODSET:
327  assert(stateCache.prox->pr_ns.size() > pingContext->index);
328  *(stateCache.prox->pr_ns.begin() + pingContext->index) = rtt;
329  break;
330 
331  default:
332  throw cRuntimeError("wrong state object type!");
333  }
334  checkProxCache();
335  }
336 
337  assert(stateCacheQueue.size() < 50);
338  delete contextPointer;
339 }
340 
341 
343 {
344  uint32_t rt_size = stateMsg->getRoutingTableArraySize();
345  uint32_t ls_size = stateMsg->getLeafSetArraySize();
346  uint32_t ns_size = stateMsg->getNeighborhoodSetArraySize();
347 
348  for (uint32_t i = 0; i < rt_size + ls_size + ns_size; i++) {
349  const NodeHandle* node;
350  if (i < rt_size) {
351  node = &(stateMsg->getRoutingTable(i));
352  }
353  else if (i < (rt_size + ls_size) ) {
354  node = &(stateMsg->getLeafSet(i - rt_size));
355  }
356  else {
357  node = &(stateMsg->getNeighborhoodSet(i - rt_size - ls_size));
358  }
359  if ((node->isUnspecified()) || (*node == thisNode)) {
360  continue;
361  }
362 
364  PING_RECEIVED_STATE, this, NULL);
365  }
366 }
367 
368 
370 {
371  EV << "[BasePastry::pingNodes() @ " << thisNode.getIp()
372  << " (" << thisNode.getKey().toString(16) << ")]" << endl;
373 
374  if (stateCache.msg == NULL) throw cRuntimeError("no state msg");
375 
376  assert(stateCache.prox == NULL);
378 
379  uint32_t rt_size = stateCache.msg->getRoutingTableArraySize();
380  stateCache.prox->pr_rt.resize(rt_size, PASTRY_PROX_UNDEF);
381 
382  uint32_t ls_size = stateCache.msg->getLeafSetArraySize();
383  stateCache.prox->pr_ls.resize(ls_size, PASTRY_PROX_UNDEF);
384 
385  uint32_t ns_size = stateCache.msg->getNeighborhoodSetArraySize();
386  stateCache.prox->pr_ns.resize(ns_size, PASTRY_PROX_UNDEF);
387 
388  // set prox state
389  for (uint32_t i = 0; i < rt_size + ls_size + ns_size; i++) {
390  const NodeHandle* node;
391  std::vector<simtime_t>::iterator proxPos;
392  PingContext* pingContext = NULL;
393  StateObject stateObject;
394  uint32_t index;
395  if (stateCache.msg == NULL) break;
396  if (i < rt_size) {
397  node = &(stateCache.msg->getRoutingTable(i));
398  proxPos = stateCache.prox->pr_rt.begin() + i;
399  stateObject = ROUTINGTABLE;
400  index = i;
401  } else if ( i < (rt_size + ls_size) ) {
402  node = &(stateCache.msg->getLeafSet(i - rt_size));
403  proxPos = stateCache.prox->pr_ls.begin() + (i - rt_size);
404  stateObject = LEAFSET;
405  index = i - rt_size;
406  } else {
407  node = &(stateCache.msg->getNeighborhoodSet(i - rt_size - ls_size));
408  proxPos = stateCache.prox->pr_ns.begin() + (i - rt_size - ls_size);
409  stateObject = NEIGHBORHOODSET;
410  index = i - rt_size - ls_size;
411  }
412  // proximity is undefined for unspecified nodes:
413  if (!node->isUnspecified()) {
414  pingContext = new PingContext(stateObject, index,
415  stateCache.nonce);
416 
419  this, pingContext);
420  if (prox == Prox::PROX_SELF) {
421  *proxPos = 0;
422  } else if ((prox == Prox::PROX_TIMEOUT) ||
423  (prox == Prox::PROX_UNKNOWN)) {
424  *proxPos = PASTRY_PROX_INFINITE;
425  } else if (prox == Prox::PROX_WAITING) {
426  *proxPos = PASTRY_PROX_PENDING;
427  } else {
428  *proxPos = prox.proximity;
429  }
430  } else {
431  throw cRuntimeError("Undefined node in STATE message!");
432  }
433  }
434  checkProxCache();
435 }
436 
437 
439 {
440  uint32_t rt_size = stateMsg->getRoutingTableArraySize();
441  aliveTable.pr_rt.clear();
442  aliveTable.pr_rt.resize(rt_size, 1);
443 
444  uint32_t ls_size = stateMsg->getLeafSetArraySize();
445  aliveTable.pr_ls.clear();
446  aliveTable.pr_ls.resize(ls_size, 1);
447 
448  uint32_t ns_size = stateMsg->getNeighborhoodSetArraySize();
449  aliveTable.pr_ns.clear();
450  aliveTable.pr_ns.resize(ns_size, 1);
451 
452  for (uint32_t i = 0; i < rt_size + ls_size + ns_size; i++) {
453  const TransportAddress* node;
454  std::vector<simtime_t>::iterator tblPos;
455  if (i < rt_size) {
456  node = &(stateMsg->getRoutingTable(i));
457  tblPos = aliveTable.pr_rt.begin() + i;
458  } else if ( i < (rt_size + ls_size) ) {
459  node = &(stateMsg->getLeafSet(i - rt_size));
460  tblPos = aliveTable.pr_ls.begin() + (i - rt_size);
461  } else {
462  node = &(stateMsg->getNeighborhoodSet(i - rt_size - ls_size));
463  tblPos = aliveTable.pr_ns.begin() + (i - rt_size - ls_size);
464  }
465  if (node->isUnspecified()) {
466  *tblPos = PASTRY_PROX_UNDEF;
467  } else if (neighborCache->getProx(*node,
470  *tblPos = PASTRY_PROX_INFINITE;
471  }
472  }
473 }
474 
475 
477 {
478  if (state != READY) {
479  EV << "[BasePastry::handleRpcCall() @ " << thisNode.getIp()
480  << " (" << thisNode.getKey().toString(16) << ")]\n"
481  << " Received RPC call and state != READY"
482  << endl;
483  return false;
484  }
485 
486  // delegate messages
487  RPC_SWITCH_START( msg )
488  // RPC_DELEGATE( <messageName>[Call|Response], <methodToCall> )
489  RPC_DELEGATE( RequestLeafSet, handleRequestLeafSetCall );
490  RPC_DELEGATE( RequestRoutingRow, handleRequestRoutingRowCall );
491  RPC_SWITCH_END( )
492 
493  return RPC_HANDLED;
494 }
495 
496 
498  cPolymorphic* context, int rpcId,
499  simtime_t rtt)
500 {
501  RPC_SWITCH_START(msg)
502  RPC_ON_RESPONSE( RequestLeafSet ) {
503  EV << "[BasePastry::handleRpcResponse() @ " << thisNode.getIp()
504  << " (" << thisNode.getKey().toString(16) << ")]\n"
505  << " Received a RequestLeafSet RPC Response: id=" << rpcId << "\n"
506  << " msg=" << *_RequestLeafSetResponse << " rtt=" << SIMTIME_DBL(rtt)
507  << endl;
508  BasePastry::handleRequestLeafSetResponse(_RequestLeafSetResponse);
509  break;
510  }
511  RPC_ON_RESPONSE( RequestRoutingRow ) {
512  EV << "[BasePastry::handleRpcResponse() @ " << thisNode.getIp()
513  << " (" << thisNode.getKey().toString(16) << ")]\n"
514  << " Received a RequestRoutingRow RPC Response: id=" << rpcId << "\n"
515  << " msg=" << *_RequestRoutingRowResponse << " rtt=" << SIMTIME_DBL(rtt)
516  << endl;
517  BasePastry::handleRequestRoutingRowResponse(_RequestRoutingRowResponse);
518  break;
519  }
520  RPC_SWITCH_END( )
521 }
522 
523 
525  const TransportAddress& dest,
526  cPolymorphic* context, int rpcId,
527  const OverlayKey& key)
528 {
529  EV << "[BasePastry::handleRpcTimeout() @ " << thisNode.getIp()
530  << " (" << thisNode.getKey().toString(16) << ")]\n"
531  << " Timeout of RPC Call: id=" << rpcId << "\n"
532  << " msg=" << *call << " key=" << key
533  << endl;
534 
535  if (state == JOIN) {
536  join();
537  } else if (state == READY && !dest.isUnspecified() && key.isUnspecified()) {
538  handleFailedNode(dest);
539  }
540 }
541 
542 
544 {
545  EV << "[BasePastry::handleRequestLeafSetCall() @ " << thisNode.getIp()
546  << " (" << thisNode.getKey().toString(16) << ")]"
547  << endl;
548 
550  leafsetReqBytesReceived += call->getByteLength());
551 
552  if (state != READY) {
553  EV << " local node is NOT in READY state (call deleted)!" << endl;
554  delete call;
555  return;
556  }
557 
558  RequestLeafSetResponse* response =
559  new RequestLeafSetResponse("REQUEST LEAFSET Response");
560  response->setTimestamp(simTime());
561  response->setStatType(MAINTENANCE_STAT);
562 
563  response->setBitLength(PASTRYREQUESTLEAFSETRESPONSE_L(response));
564  response->encapsulate(createStateMessage(PASTRY_STATE_LEAFSET));
565 
566  //merge leafset and sender's NodeHandle from call (if exists: PULL)
567  if (call->getEncapsulatedPacket()) {
568  EV << " ... it's a leafSet PULL message!" << endl;
569 
570  PastryStateMessage* stateMsg =
571  check_and_cast<PastryStateMessage*>(call->decapsulate());
572 
573  stateMsg->setLeafSetArraySize(stateMsg->getLeafSetArraySize() + 1);
574  stateMsg->setLeafSet(stateMsg->getLeafSetArraySize() - 1,
575  stateMsg->getSender());
576 
577  handleStateMessage(stateMsg);
578 
579  //set sender's NodeHandle in response if it was set in call (PULL/PUSH)
580  //response->setSender(thisNode);
581  response->setName("LeafSet PUSH");
582  }
583 
585  leafsetBytesSent += response->getByteLength());
586 
587  sendRpcResponse(call, response);
588 }
589 
590 
592 {
593  EV << "[BasePastry::handleRequestRoutingRowCall() @ " << thisNode.getIp()
594  << " (" << thisNode.getKey().toString(16) << ")]"
595  << endl;
596 
598  routingTableRowReqBytesReceived += call->getByteLength());
599 
600  if (state == READY) {
601  // fill response with row entries
602  if (call->getRow() > routingTable->getLastRow()) {
603  EV << " received request for nonexistent routing"
604  << "table row, dropping message!"
605  << endl;
606  delete call;
607  } else {
608  RequestRoutingRowResponse* response =
609  new RequestRoutingRowResponse("REQUEST ROUTINGROW Response");
610  response->setStatType(MAINTENANCE_STAT);
611 
612  assert(call->getRow() >= -1 &&
613  call->getRow() <= routingTable->getLastRow());
614 
615  response->setBitLength(PASTRYREQUESTROUTINGROWRESPONSE_L(response));
616  response->encapsulate(createStateMessage(PASTRY_STATE_ROUTINGROW,
617  -1,
618  (call->getRow() == -1) ?
620  call->getRow()));
621 
623  routingTableRowBytesSent += call->getByteLength());
624  sendRpcResponse(call, response);
625  }
626  } else {
627  EV << " received routing table request before reaching "
628  << "READY state, dropping message!" << endl;
629  delete call;
630  }
631 }
632 
633 
635 {
636  EV << "[BasePastry::handleRequestLeafSetResponse() @ " << thisNode.getIp()
637  << " (" << thisNode.getKey().toString(16) << ")]"
638  << endl;
639 
641  response->getByteLength());
642 
643  if (state == READY) {
644  handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
645  }
646 }
647 
648 
650 {
651  EV << "[BasePastry::handleRequestRoutingRowResponse() @ " << thisNode.getIp()
652  << " (" << thisNode.getKey().toString(16) << ")]"
653  << endl;
654 
656  routingTableRowBytesReceived += response->getByteLength());
657 
658  if (state == READY) {
659  handleStateMessage(check_and_cast<PastryStateMessage*>(response->decapsulate()));
660  }
661 }
662 
663 
665  simtime_t timestamp,
666  int16_t row,
667  bool lastHop)
668 {
669  //checks
670  if ((type & (PASTRY_STATE_JOIN | PASTRY_STATE_MINJOIN)) &&
671  ((row == -1) || (timestamp != -1))) {
672  throw cRuntimeError("Creating JOIN State without hopCount"
673  " or with timestamp!");
674  } else if ((type & (PASTRY_STATE_ROUTINGROW)) && row == -1) {
675  throw cRuntimeError("Creating ROUTINGROW State without row!");
676  } else if ((type & (PASTRY_STATE_UPDATE)) &&
677  timestamp == -1) {
678  throw cRuntimeError("Creating UPDATE/REPAIR State without timestamp!");
679  } else if ((type & (PASTRY_STATE_LEAFSET | PASTRY_STATE_STD |
681  ((row != -1) || (timestamp != -1))) {
682  throw cRuntimeError("Creating STD/UPDATE State with row/hopCount"
683  " or timestamp!");
684  }
685 
686  std::string typeStr = cEnum::get("PastryStateMsgType")->getStringFor(type);
687  typeStr.erase(0, 13);
688 
689  EV << "[BasePastry::sendStateTables() @ " << thisNode.getIp()
690  << " (" << thisNode.getKey().toString(16) << ")]\n"
691  << " creating new state message (" << typeStr << ")" //TODO
692  << endl;
693 
694  // create new state msg and set special fields for some types:
695  PastryStateMessage* stateMsg =
696  new PastryStateMessage((std::string("STATE (") + typeStr + ")").c_str());
697 
698  stateMsg->setTimestamp((timestamp == -1) ? simTime() : timestamp);
699 
700  if (type == PASTRY_STATE_JOIN || type == PASTRY_STATE_MINJOIN) {
701  stateMsg->setRow(row);
702  stateMsg->setLastHop(lastHop);
703  }
704 
705  // fill in standard content:
706  stateMsg->setStatType(MAINTENANCE_STAT);
707  stateMsg->setPastryStateMsgType(type);
708  stateMsg->setSender(thisNode);
709 
710  // the following part of the new join works on the assumption,
711  // that the node routing the join message is close to the joining node
712  // therefore it should be switched on together with the discovery algorithm
713  if (type == PASTRY_STATE_MINJOIN) {
714  //send just the needed row for new join protocol
715  routingTable->dumpRowToMessage(stateMsg, row);
716  if (lastHop) {
717  leafSet->dumpToStateMessage(stateMsg);
718  } else {
719  stateMsg->setLeafSetArraySize(0);
720  }
721  if (row == 1) {
723  } else {
724  stateMsg->setNeighborhoodSetArraySize(0);
725  }
726  } else if (type == PASTRY_STATE_LEAFSET) {
727  leafSet->dumpToStateMessage(stateMsg);
728  } else if (type == PASTRY_STATE_ROUTINGROW) {
729  routingTable->dumpRowToMessage(stateMsg, row);
730  } else {
731  routingTable->dumpToStateMessage(stateMsg);
732  leafSet->dumpToStateMessage(stateMsg);
734  }
735 
736  stateMsg->setBitLength(PASTRYSTATE_L(stateMsg));
737 
738  return stateMsg;
739 }
740 
741 
743  const OverlayKey& key,
744  int numSiblings,
745  bool* err)
746 {
747  if (key.isUnspecified())
748  error("Pastry::isSiblingFor(): key is unspecified!");
749 
750  if ((numSiblings == 1) && (node == thisNode)) {
751  if (leafSet->isClosestNode(key)) {
752  *err = false;
753  return true;
754  } else {
755  *err = false;
756  return false;
757  }
758  }
759 
760  NodeVector* result = leafSet->createSiblingVector(key, numSiblings);
761 
762  if (result == NULL) {
763  *err = true;
764  return false;
765  }
766 
767  if (result->contains(node.getKey())) {
768  delete result;
769  *err = false;
770  return true;
771  } else {
772  delete result;
773  *err = true;
774  return false;
775  }
776 }
777 
778 
780 {
781  delete msg;
782 }
783 
784 
786 {
787  if (ev.isGUI()) {
788  std::stringstream ttString;
789 
790  // show our predecessor and successor in tooltip
791  ttString << leafSet->getPredecessor() << endl << thisNode << endl
792  << leafSet->getSuccessor();
793 
794  getParentModule()->getParentModule()->getDisplayString().
795  setTagArg("tt", 0, ttString.str().c_str());
796  getParentModule()->getDisplayString().
797  setTagArg("tt", 0, ttString.str().c_str());
798  getDisplayString().setTagArg("tt", 0, ttString.str().c_str());
799 
800  // draw arrows:
802  "m=m,50,0,50,0;ls=red,1");
804  "m=m,50,100,50,100;ls=green,1");
805  }
806 }
807 
809 {
810  purgeVectors();
811 }
812 
814 {
815  // remove this node from the bootstrap list
817 
818  // collect statistics
820  if (time < GlobalStatistics::MIN_MEASURED) return;
821 
822  // join statistics: only evaluate join from nodes
823  // that have been created in measurement phase
825  time >= (simTime() - creationTime)) {
826  // join is on the way...
827  if (joinTries > 0 && state == JOIN) joinTries--;
828  if (joinTries > 0) {
829  globalStatistics->addStdDev("BasePastry: join success ratio", (double)joins / (double)joinTries);
830  globalStatistics->addStdDev("BasePastry: join tries", joinTries);
831  }
832  }
833 
834  // TODO new methods Pastry::finnishOverlay() / Bamboo::finishOverlay()
835  globalStatistics->addStdDev("Pastry: joins with missing replies from routing path/s",
836  joinPartial / time);
837  globalStatistics->addStdDev("Pastry: JOIN Messages seen/s", joinSeen / time);
838  globalStatistics->addStdDev("Pastry: bytes of JOIN Messages seen/s", joinBytesSeen / time);
839  globalStatistics->addStdDev("Pastry: JOIN Messages received/s", joinReceived / time);
840  globalStatistics->addStdDev("Pastry: bytes of JOIN Messages received/s",
841  joinBytesReceived / time);
842  globalStatistics->addStdDev("Pastry: JOIN Messages sent/s", joinSent / time);
843  globalStatistics->addStdDev("Pastry: bytes of JOIN Messages sent/s", joinBytesSent / time);
844 
845  globalStatistics->addStdDev("Pastry: REPAIR Requests sent/s", repairReqSent / time);
846  globalStatistics->addStdDev("Pastry: bytes of REPAIR Requests sent/s",
847  repairReqBytesSent / time);
848  globalStatistics->addStdDev("Pastry: REPAIR Requests received/s", repairReqReceived / time);
849  globalStatistics->addStdDev("Pastry: bytes of REPAIR Requests received/s",
850  repairReqBytesReceived / time);
851 
852  globalStatistics->addStdDev("Pastry: STATE Requests sent/s", stateReqSent / time);
853  globalStatistics->addStdDev("Pastry: bytes of STATE Requests sent/s", stateReqBytesSent / time);
854  globalStatistics->addStdDev("Pastry: STATE Requests received/s", stateReqReceived / time);
855  globalStatistics->addStdDev("Pastry: bytes of STATE Requests received/s",
856  stateReqBytesReceived / time);
857  globalStatistics->addStdDev("Pastry: STATE Messages sent/s", stateSent / time);
858  globalStatistics->addStdDev("Pastry: bytes of STATE Messages sent/s", stateBytesSent / time);
859  globalStatistics->addStdDev("Pastry: STATE Messages received/s", stateReceived / time);
860  globalStatistics->addStdDev("Pastry: bytes of STATE Messages received/s",
861  stateBytesReceived / time);
862 
863  globalStatistics->addStdDev("BasePastry: LEAFSET Requests sent/s", leafsetReqSent / time);
864  globalStatistics->addStdDev("BasePastry: bytes of LEAFSET Requests sent/s", leafsetReqBytesSent / time);
865  globalStatistics->addStdDev("BasePastry: LEAFSET Requests received/s", leafsetReqReceived / time);
866  globalStatistics->addStdDev("BasePastry: bytes of LEAFSET Requests received/s",
867  leafsetReqBytesReceived / time);
868  globalStatistics->addStdDev("BasePastry: LEAFSET Messages sent/s", leafsetSent / time);
869  globalStatistics->addStdDev("BasePastry: bytes of LEAFSET Messages sent/s", leafsetBytesSent / time);
870  globalStatistics->addStdDev("BasePastry: LEAFSET Messages received/s", leafsetReceived / time);
871  globalStatistics->addStdDev("BasePastry: bytes of LEAFSET Messages received/s",
872  leafsetBytesReceived / time);
873 
874  globalStatistics->addStdDev("BasePastry: ROUTING TABLE ROW Requests sent/s", routingTableRowReqSent / time);
875  globalStatistics->addStdDev("BasePastry: bytes of ROUTING TABLE ROW Requests sent/s", routingTableRowReqBytesSent / time);
876  globalStatistics->addStdDev("BasePastry: ROUTING TABLE ROW Requests received/s", routingTableRowReqReceived / time);
877  globalStatistics->addStdDev("BasePastry: bytes of ROUTING TABLE ROW Requests received/s",
879  globalStatistics->addStdDev("BasePastry: ROUTING TABLE ROW Messages sent/s", routingTableRowSent / time);
880  globalStatistics->addStdDev("BasePastry: bytes of ROUTING TABLE ROW Messages sent/s", routingTableRowBytesSent / time);
881  globalStatistics->addStdDev("BasePastry: ROUTING TABLE ROW Messages received/s", routingTableRowReceived / time);
882  globalStatistics->addStdDev("BasePastry: bytes of ROUTING TABLE ROW Messages received/s",
884 
885  globalStatistics->addStdDev("BasePastry: total number of lookups", totalLookups);
886  globalStatistics->addStdDev("BasePastry: responsible lookups", responsibleLookups);
887  globalStatistics->addStdDev("BasePastry: lookups in routing table", routingTableLookups);
888  globalStatistics->addStdDev("BasePastry: lookups using closerNode()", closerNodeLookups);
889  globalStatistics->addStdDev("BasePastry: lookups using closerNode() with result from "
890  "neighborhood set", closerNodeLookupsFromNeighborhood);
891 }
892 
893 
895 {
896  return (int)floor(numberOfLeaves / 2.0);
897 }
898 
899 
901 {
902  return (int)floor(numberOfLeaves);
903 }
904 
905 
907  int numRedundantNodes,
908  int numSiblings,
909  BaseOverlayMessage* msg)
910 {
911  if ((numRedundantNodes > getMaxNumRedundantNodes()) ||
912  (numSiblings > getMaxNumSiblings())) {
913 
914  opp_error("(Pastry::findNode()) numRedundantNodes or numSiblings "
915  "too big!");
916  }
918 
919  NodeVector* nextHops = new NodeVector(numRedundantNodes);
920 
921  if (state != READY) {
922  return nextHops;
923  } else if (key.isUnspecified() || leafSet->isClosestNode(key)) {
925  nextHops->add(thisNode);
926  } else {
927  const NodeHandle* next = &(leafSet->getDestinationNode(key));
928 
929  if (next->isUnspecified()) {
930  next = &(routingTable->lookupNextHop(key));
931  if (!next->isUnspecified()) {
933  }
934  } else {
936  }
937 
938  if (next->isUnspecified()) {
940  // call findCloserNode() on all state objects
941  if (optimizeLookup) {
942  const NodeHandle* tmp;
943  next = &(routingTable->findCloserNode(key, true));
944  tmp = &(neighborhoodSet->findCloserNode(key, true));
945 
946  if ((! tmp->isUnspecified()) &&
947  (leafSet->isCloser(*tmp, key, *next))) {
949  next = tmp;
950  }
951 
952  tmp = &(leafSet->findCloserNode(key, true));
953  if ((! tmp->isUnspecified()) &&
954  (leafSet->isCloser(*tmp, key, *next))) {
956  next = tmp;
957  }
958  } else {
959  next = &(routingTable->findCloserNode(key));
960 
961  if (next->isUnspecified()) {
963  next = &(neighborhoodSet->findCloserNode(key));
964  }
965 
966  if (next->isUnspecified()) {
968  next = &(leafSet->findCloserNode(key));
969  }
970  }
971  }
972 
973  iterativeJoinHook(msg, !next->isUnspecified());
974 
975  if (!next->isUnspecified()) {
976  nextHops->add(*next);
977  }
978  }
979 
980  bool err;
981 
982  // if we're a sibling, return all numSiblings
983  if ((numSiblings >= 0) && isSiblingFor(thisNode, key, numSiblings, &err)) {
984  if (err == false) {
985  delete nextHops;
986  return leafSet->createSiblingVector(key, numSiblings);
987  }
988  }
989 
990  if (/*(nextHops->size() > 0) &&*/ (numRedundantNodes > 1)) {
991 
992  //memleak... comp should be a ptr and deleted in NodeVector::~NodeVector()...
993  //KeyDistanceComparator<KeyRingMetric>* comp =
994  // new KeyDistanceComparator<KeyRingMetric>( key );
995 
997  //KeyDistanceComparator<KeyPrefixMetric> comp(key);
998  NodeVector* additionalHops = new NodeVector( numRedundantNodes, &comp );
999 
1000  routingTable->findCloserNodes(key, additionalHops);
1001  leafSet->findCloserNodes(key, additionalHops);
1002  neighborhoodSet->findCloserNodes(key, additionalHops);
1003 
1004  if (useRegularNextHop && (nextHops->size() > 0) &&
1005  (*additionalHops)[0] != (*nextHops)[0]) {
1006  for (uint32_t i = 0; i < additionalHops->size(); i++) {
1007  if ((*additionalHops)[i] != (*nextHops)[0])
1008  nextHops->push_back((*additionalHops)[i]);
1009  }
1010  delete additionalHops;
1011  } else {
1012  delete nextHops;
1013  return additionalHops;
1014  }
1015  }
1016  return nextHops;
1017 }
1018 
1019 
1021  const BaseOverlayMessage* msg,
1022  const cPacket* dummy,
1023  bool appLookup)
1024 {
1025  assert(dummy == NULL);
1026 
1027  PastryFindNodeExtData* findNodeExt =
1028  new PastryFindNodeExtData("findNodeExt");
1029 
1030  if (msg &&
1031  dynamic_cast<const PastryJoinCall*>(msg->getEncapsulatedPacket())) {
1032  const PastryJoinCall* joinCall =
1033  static_cast<const PastryJoinCall*>(msg->getEncapsulatedPacket());
1034  assert(!joinCall->getSrcNode().isUnspecified());
1035  findNodeExt->setSendStateTo(joinCall->getSrcNode());
1036  findNodeExt->setJoinHopCount(1);
1037  }
1038  findNodeExt->setBitLength(PASTRYFINDNODEEXTDATA_L);
1039 
1040  AbstractLookup* newLookup = BaseOverlay::createLookup(routingType,
1041  msg, findNodeExt,
1042  appLookup);
1043 
1044  delete findNodeExt;
1045  return newLookup;
1046 }
1047 
1048 
1050  const PastryStateMsgHandle& hnd2)
1051 {
1052  return (hnd1.msg->getRow() < hnd2.msg->getRow());
1053 }
1054 
1055 
1057 {
1058  return routingTable->getLastRow();
1059 };
1060 
1061 
1062 std::vector<TransportAddress>* BasePastry::getRTRow(uint8_t index) const
1063 {
1064  return routingTable->getRow(index);
1065 };
1066 
1067 
1068 std::vector<TransportAddress>* BasePastry::getLeafSet() const
1069 {
1070  std::vector<TransportAddress>* ret = new std::vector<TransportAddress>;
1071  leafSet->dumpToVector(*ret);
1072 
1073  return ret;
1074 };
1075 
1076 
1077 std::ostream& operator<<(std::ostream& os, const PastryStateMsgProximity& pr)
1078 {
1079  os << "PastryStateMsgProximity {" << endl;
1080  os << " pr_rt {" << endl;
1081  for (std::vector<simtime_t>::const_iterator i = pr.pr_rt.begin();
1082  i != pr.pr_rt.end(); ++i) {
1083  os << " " << *i << endl;
1084  }
1085  os << " }" << endl;
1086  os << " pr_ls {" << endl;
1087  for (std::vector<simtime_t>::const_iterator i = pr.pr_ls.begin();
1088  i != pr.pr_ls.end(); ++i) {
1089  os << " " << *i << endl;
1090  }
1091  os << " }" << endl;
1092  os << " pr_ns {" << endl;
1093  for (std::vector<simtime_t>::const_iterator i = pr.pr_ns.begin();
1094  i != pr.pr_ns.end(); ++i) {
1095  os << " " << *i << endl;
1096  }
1097  os << " }" << endl;
1098  os << "}" << endl;
1099  return os;
1100 }
1101 
1102 
1104 {
1105  return leafSet->estimateMeanDistance();
1106 }
1107 
1108 
1109 //virtual public: distance metric
1111  const OverlayKey& y,
1112  bool useAlternative) const
1113 {
1114  if (!useAlternative) return KeyRingMetric().distance(x, y);
1115  return KeyPrefixMetric().distance(x, y);
1116 }