OverSim
Kademlia.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 
23 #include "Kademlia.h"
24 #include "KademliaMessage_m.h"
25 
26 #include <assert.h>
27 #include <algorithm>
28 
29 #include <IPAddressResolver.h>
30 #include <IPvXAddress.h>
31 #include <IInterfaceTable.h>
32 #include <IPv4InterfaceData.h>
33 #include "TopologyVis.h"
34 #include <AbstractLookup.h>
35 #include <LookupListener.h>
36 #include <RpcMacros.h>
37 #include <BootstrapList.h>
38 
39 
40 #if 0
41 #define BUCKET_CONSISTENCY(msg) \
42  do {\
43  bool stFull = false;\
44  int z = 0;\
45  if (siblingTable != NULL && siblingTable->size() > 0) {\
46  stFull = siblingTable->isFull();\
47  z = routingBucketIndex(siblingTable->back().getKey()) - 1;\
48  if (routingTable[z - 1] != NULL && routingTable[z - 1]->size())\
49  breakpoint(#msg);\
50  }\
51  for (int y = 0; y < (z - 2); ++y) {\
52  if ((routingTable[y] != NULL && routingTable[y]->size() > k) ||\
53  (routingTable[y] != NULL && routingTable[y]->size() && !stFull))\
54  breakpoint(#msg);\
55  }\
56  } while (false)
57 #else
58 #define BUCKET_CONSISTENCY(msg)
59 #endif
60 
62 
63 std::ostream& operator<<(std::ostream& os, const KademliaBucket* n)
64 {
65  if (n == NULL)
66  os << "NULL";
67  else {
68  for (KademliaBucket::const_iterator i=n->begin(); i !=n->end(); i++) {
69  os << *i << endl;
70  }
71  os << "last usage = " << n->getLastUsage();
72  }
73  return os;
74 };
75 
77 {
78 private:
80 public:
82  {
83  this->overlay = overlay;
84  }
85 
86  virtual void lookupFinished(AbstractLookup *lookup)
87  {
88  overlay->lookupFinished(lookup->isValid());
89  delete this;
90  }
91 };
92 
93 //-----------------------------------------------------------------------------
94 
96 {
97  if (stage != MIN_STAGE_OVERLAY)
98  return;
99 
100  // Kademlia provides KBR services
101  kbr = true;
102 
103  // setup kademlia parameters
104  minSiblingTableRefreshInterval = par("minSiblingTableRefreshInterval");
105  minBucketRefreshInterval = par("minBucketRefreshInterval");
106  siblingPingInterval = par("siblingPingInterval");
107  exhaustiveRefresh = par("exhaustiveRefresh");
108  maxStaleCount = par("maxStaleCount");
109  pingNewSiblings = par("pingNewSiblings");
110  enableReplacementCache = par("enableReplacementCache");
111  replacementCachePing = par("replacementCachePing");
112  replacementCandidates = par("replacementCandidates");
113  secureMaintenance = par("secureMaintenance");
114  newMaintenance = par("newMaintenance");
115 
116  // R/Kademlia
117  activePing = par("activePing");
118  proximityRouting = par("proximityRouting");
119  proximityNeighborSelection = par("proximityNeighborSelection");
120  altRecMode = recordRoute = par("altRecMode");
121 
122  k = par("k");
123  b = par("b");
124  s = par("s");
125 
126  siblingRefreshNodes = par("siblingRefreshNodes");
127 
128  if (siblingRefreshNodes <= 0) {
129  siblingRefreshNodes = 5 * s;
130  }
131 
132  bucketRefreshNodes = par("bucketRefreshNodes");
133 
134  if (bucketRefreshNodes <= 0) {
136  }
137 
138  // calculate number of buckets: ( (2^b)-1 ) * ( keylength / b )
139  numBuckets = ((1L << b) - 1L) * (OverlayKey::getLength() / b);
140 
141  // init routing and sibling table
142  siblingTable = new KademliaBucket(s * 5, NULL);
143 
144  // initialize pointers
145  routingTable.assign(numBuckets, (KademliaBucket*)NULL);
146 
147  WATCH_VECTOR(*siblingTable);
148  WATCH_VECTOR(routingTable);
149 
150  // self-message
151  bucketRefreshTimer = new cMessage("bucketRefreshTimer");
152  siblingPingTimer = new cMessage("siblingPingTimer");
153 
154  // statistics
155  bucketRefreshCount = 0;
157  nodesReplaced = 0;
158 
159  comparator = NULL;
160 }
161 
163 {
164  siblingTable = NULL;
165  comparator = NULL;
166  bucketRefreshTimer = NULL;
167  siblingPingTimer = NULL;
168 }
169 
171 {
172  routingDeinit();
173 
174  delete siblingTable;
175  delete comparator;
176  cancelAndDelete(bucketRefreshTimer);
177  cancelAndDelete(siblingPingTimer);
178 }
179 
181 {
183  if (time < GlobalStatistics::MIN_MEASURED) return;
184 
185  globalStatistics->addStdDev("Kademlia: Nodes replaced in buckets/s",
186  nodesReplaced / time);
187  globalStatistics->addStdDev("Kademlia: Bucket Refreshes/s",
188  bucketRefreshCount / time);
189  globalStatistics->addStdDev("Kademlia: Sibling Table Refreshes/s",
190  siblingTableRefreshCount / time);
191 }
192 
194 {
195  FindNodeCall* call = new FindNodeCall("FindNodeCall");
196  call->setExhaustiveIterative(true);
197  call->setLookupKey(thisNode.getKey());
200  call->setBitLength(FINDNODECALL_L(call));
201  sendUdpRpcCall(dest, call);
202 }
203 
205 {
206  // remove current node handle from the bootstrap list
207  if (!thisNode.getKey().isUnspecified()) {
209  }
210 
211  // initialize routing
212  routingDeinit();
213  routingInit();
214 
216 
217  if (!handle.isUnspecified()) {
218  if (secureMaintenance) {
219  sendSiblingFindNodeCall(handle);
220  } else {
221  // ping the bootstrap node to start bootstrapping
222  pingNode(handle);
223  }
224  } else {
225  // we're the only node in the network
226  state = READY;
227  setOverlayReady(true);
228 
229  // schedule bucket refresh timer
230  cancelEvent(bucketRefreshTimer);
231  scheduleAt(simTime(), bucketRefreshTimer);
232  cancelEvent(siblingPingTimer);
233  scheduleAt(simTime() + siblingPingInterval, siblingPingTimer);
234  }
235 }
236 
237 //-----------------------------------------------------------------------------
238 
240 {
241  // set join state
242  state = INIT;
243 
244  setOverlayReady(false);
245 
246  // setup comparator
248 
250 
251  updateTooltip();
253 }
254 
256 {
257  // delete buckets
258  for (uint32_t i = 0; i < routingTable.size(); i++) {
259  if (routingTable[i] != NULL) {
260  delete routingTable[i];
261  routingTable[i] = NULL;
262  }
263  }
264 
265  if (siblingTable != NULL) {
266  siblingTable->clear();
267  }
268 
269  if (comparator != NULL) {
270  delete comparator;
271  comparator = NULL;
272  }
273 }
274 
276 {
277  return s;
278 }
279 
281 {
282  return k;
283 }
284 
285 int Kademlia::routingBucketIndex(const OverlayKey& key, bool firstOnLayer)
286 {
287  // calculate XOR distance
288  OverlayKey delta = key ^ getThisNode().getKey();
289 
290  // find first subinteger that is not zero...
291  int i;
292  for (i = key.getLength() - b; i >= 0 && delta.getBitRange(i, b) == 0;
293  i -= b);
294 
295  if (i < 0)
296  return -1;
297 
298  if (!firstOnLayer)
299  return (i / b) * ((1 << b) - 1) + (delta.getBitRange(i, b) - 1);
300  else
301  return (i / b) * ((1 << b) - 1) + (pow(2, b) - 2);
302 }
303 
305 {
306  // get bucket index
307  int num = routingBucketIndex(key);
308  if (num < 0)
309  return NULL;
310 
311  // get bucket and allocate if necessary
312  KademliaBucket* bucket = routingTable[ num ];
313  if (bucket == NULL && ensure)
314  bucket = routingTable[ num ] = new KademliaBucket( k, comparator );
315 
316  // return bucket
317  return bucket;
318 }
319 
320 bool Kademlia::routingAdd(const NodeHandle& handle, bool isAlive,
321  simtime_t rtt, bool maintenanceLookup)
322 {
324  // never add unspecified node handles
325  if (handle.isUnspecified() || handle.getKey() == getThisNode().getKey()) {
326  return false;
327  }
328 
329  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
330  << " (" << thisNode.getKey().toString(16) << ")]\n"
331  << " inserting node " << handle << " (rtt = ";
332  if (rtt == MAXTIME) {
333  EV << "<unknown>";
334  } else {
335  EV << SIMTIME_DBL(rtt);
336  }
337  EV << ", isAlive = " << (isAlive?"true":"false")
338  << ") ..." << endl;
339 
340  // bucket index
342  bool result = false;
343  bool authenticated = (isAlive && (rtt != MAXTIME));
344 
345  bool needsRtt = (activePing && ((rtt == MAXTIME) ? true : false));
346 
347  // convert node handle
348  KademliaBucketEntry kadHandle = handle;
349  kadHandle.setRtt(rtt);
350  kadHandle.setLastSeen(simTime());
351 
352  /* check if node is already a sibling -----------------------------------*/
353  if ((i = siblingTable->findIterator(kadHandle.getKey()))
354  != siblingTable->end()) {
355  // not alive? -> do not change routing information
356  if (isAlive) {
357  if (!secureMaintenance || authenticated) {
358  if (kadHandle.getRtt() == MAXTIME) {
359  kadHandle.setRtt(i->getRtt());
360  }
361  // refresh sibling
362  (*i) = kadHandle;
363  } else {
364  if (maintenanceLookup) {
365  return false;
366  }
367 
368  if ((i->getIp() != kadHandle.getIp()) ||
369  (i->getPort() != kadHandle.getPort())) {
370  // sibling could have changed transport address
371  // ping new address for authentication
372  pingNode(kadHandle);
373  return false;
374  }
375  }
376  }
377  BUCKET_CONSISTENCY(routingAdd: node is sibling);
378  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
379  << " (" << thisNode.getKey().toString(16) << ")]\n"
380  << " ... node is already in the sibling table." << endl;
381  return true;
382  }
383 
384  /* check if node is already in a bucket ---------------------------------*/
385  KademliaBucket* bucket = routingBucket(kadHandle.getKey(), false);
386  if (bucket != NULL && (i = bucket->findIterator(kadHandle.getKey() ) )
387  != bucket->end() ) {
388  // not alive? -> do not change routing information
389  if (isAlive) {
390  if (!secureMaintenance || authenticated) {
391  if (kadHandle.getRtt() == MAXTIME) {
392  kadHandle.setRtt(i->getRtt());
393  }
394 
395  // R/Kademlia
396  if (needsRtt && (kadHandle.getRtt() == MAXTIME)) {
397  Prox prox =
399  this, NULL);
400  if (prox != Prox::PROX_SELF &&
401  prox != Prox::PROX_UNKNOWN &&
402  prox != Prox::PROX_WAITING &&
403  prox != Prox::PROX_TIMEOUT) {
404  kadHandle.setProx(prox);
405  //routingAdd(handle, true, prox.proximity);//ctrlInfo->getSrcRoute() //TODO
406  } /*else if (prox == Prox::PROX_TIMEOUT) {
407  // remove from bucket
408  bucket->erase(i);
409  return false;
410  }*/ //TODO inform NC that node is alive
411  else {
412  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
413  << " (" << thisNode.getKey().toString(16) << ")]\n"
414  " ... node not added, but ping sent." << endl;
415  return false;
416  }
417  }
418 
419  // remove old handle
420  bucket->erase(i);
421  // re-add to tail
422  bucket->push_back(kadHandle);
423  } else {
424  if (maintenanceLookup) {
425  return false;
426  }
427 
428  if ((i->getIp() != kadHandle.getIp()) ||
429  (i->getPort() != kadHandle.getPort())) {
430  // sibling could have changed transport address
431  // ping new address for authentication
432  pingNode(kadHandle);
433  return false;
434  }
435  }
436  }
437  BUCKET_CONSISTENCY(routingAdd: node is in bucket);
438  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
439  << " (" << thisNode.getKey().toString(16) << ")]\n"
440  << " ... node is already in bucket "
441  << routingBucketIndex(kadHandle.getKey()) << "." << endl;
442  return true;
443  }
444 
445  /* check if node can be added to the sibling list -----------------------*/
446  if (siblingTable->isAddable(kadHandle) ) {
447  if (secureMaintenance && !authenticated) {
448  if (!maintenanceLookup || (isAlive && (rtt == MAXTIME))) {
449  // received a FindNodeCall or PingCall from a potential sibling
450  // or new nodes from a FindNodeResponse app lookup
451  pingNode(kadHandle);
452  } else if (newMaintenance) {
453  // new node from sibling table refresh
454  //sendSiblingFindNodeCall(kadHandle);
455  pingNode(kadHandle);
456  }
457  return false;
458  }
459 
460  // ping new siblings
461  if (pingNewSiblings && !isAlive) {
462  pingNode(kadHandle);
463  }
464 
465  // R/Kademlia
466  else if (needsRtt) {
467  // old version: pingNode(), now:
468  Prox prox =
470  this, NULL);
471  if (prox != Prox::PROX_SELF &&
472  prox != Prox::PROX_UNKNOWN &&
473  prox != Prox::PROX_TIMEOUT) {
474  kadHandle.setProx(prox);
475  } else if (prox == Prox::PROX_TIMEOUT) {
476  // do not put handle into sibling table
477  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
478  << " (" << thisNode.getKey().toString(16) << ")]\n"
479  << " ... node not added (node timeout)." << endl;
480  return false;
481  }
482  }
483 
484  bool finished = false;
485  int siblingPos = -1;
486 
487  // check if sibling list is full so a handle is preemted from the list
488  if (siblingTable->isFull()) {
489  // get handle thats about to be preempted
490  KademliaBucketEntry oldHandle = siblingTable->back();
491  assert(oldHandle.getKey() != kadHandle.getKey());
492 
493  // add handle to the sibling list
494  siblingPos = siblingTable->add(kadHandle);
495  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
496  << " (" << thisNode.getKey().toString(16) << ")]\n"
497  << " ... node added to sibling table, replacing "
498  << (NodeHandle)oldHandle << endl;
499 
500  // change, so that the preempted handle is added to a bucket
501  kadHandle = oldHandle;
502 
503  // call update() for removed sibling
504  if (!secureMaintenance) {
505  deleteOverlayNeighborArrow(oldHandle);
506  callUpdate(oldHandle, false);
507  }
508 
509  // return always true, since the handle has been added
510  result = true;
511  } else {
512  // simply add the handle and stop
513  siblingPos = siblingTable->add(kadHandle);
514  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
515  << " (" << thisNode.getKey().toString(16) << ")]\n"
516  << " ... node added to sibling table." << endl;
517 
518  // don't need to add kadHandle also to regular buckets
519  finished = true;
520  }
521  assert(siblingPos > -1);
522 
523  updateTooltip();
524 
525  // call update() for new sibling
526  showOverlayNeighborArrow(siblingTable->at(siblingPos), false,
527  "m=m,50,100,50,100;ls=green,1");
528  callUpdate(siblingTable->at(siblingPos), true);
529 
530  if (finished) {
531  BUCKET_CONSISTENCY(routingAdd: node is now sibling);
532  return true;
533  }
534  }
535 
536  /* add node to the appropriate bucket, if not full ---------------------*/
537  bucket = routingBucket(kadHandle.getKey(), true);
538  if (!bucket->isFull()) {
539  if (secureMaintenance && !authenticated) {
540  if ((isAlive && (rtt == MAXTIME))) {
541  // received a FindNodeCall or PingCall from a potential new bucket entry
542  // or new nodes from a FindNodeReponse app lookup
543  // optimization: don't send a ping for nodes from FindNodeResponse for app lookups
544  pingNode(kadHandle);
545  }
546  return false;
547  }
548 
549  //EV << "[Kademlia::routingAdd()]\n"
550  // << " Adding new node " << kadHandle
551  // << " to bucket " << routingBucketIndex(kadHandle.getKey())
552  // << endl;
553 
554  // PNS
555  if (needsRtt || proximityNeighborSelection) {
556  //pingNode(handle, -1, 0, NULL, NULL, NULL, -1, UDP_TRANSPORT, false);
557  Prox prox =
559  this, NULL);
560  if (prox != Prox::PROX_SELF &&
561  prox != Prox::PROX_UNKNOWN &&
562  prox != Prox::PROX_TIMEOUT) {
563  //routingAdd(handle, true, prox.proximity);//ctrlInfo->getSrcRoute() //TODO
564  kadHandle.setProx(prox);
565  }
566  }
567 
568  bucket->push_back(kadHandle);
569  result = true;
570  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
571  << " (" << thisNode.getKey().toString(16) << ")]\n"
572  << " ... node added to bucket "
573  << routingBucketIndex(kadHandle.getKey())
574  << " which was not yet full." << endl;
575  } else if (isAlive) {
576  //PNS node replacement
578  kadHandle.getProx() != Prox::PROX_UNKNOWN) {
579  KademliaBucket::iterator kickHim, it;
580  kickHim = it = bucket->begin();
581  ++it;
582  while (it != bucket->end()) {
583  if (it->getRtt() > kickHim->getRtt()) {
584  kickHim = it;
585  }
586  ++it;
587  }
588  if (kickHim->getRtt() > kadHandle.getRtt()) {
589  KademliaBucketEntry temp = *kickHim;
590  bucket->erase(kickHim);
591  bucket->push_back(kadHandle);
592  kadHandle = temp;
593  EV << "[Kademlia::routingAdd()] @ " << thisNode.getIp()
594  << " (" << thisNode.getKey().toString(16) << ")]\n"
595  << " ... added to bucket "
596  << routingBucketIndex(kadHandle.getKey())
597  << " (PNS: replacing another node)." << endl;
598  }
599  }
600 
601  if (enableReplacementCache && (!secureMaintenance || authenticated)) {
602  bucket->replacementCache.push_front(kadHandle);
603  if (bucket->replacementCache.size() > replacementCandidates) {
604  bucket->replacementCache.pop_back();
605  }
606 
607  if (replacementCachePing) {
608  KademliaBucket::iterator it = bucket->begin();
609  while (it != bucket->end() && (it->getPingSent() == true)) {
610  it++;
611  }
612  if (it != bucket->end()) {
613  pingNode(*it);
614  it->setPingSent(true);
615  }
616  }
617  }
618  }
619 
620  // PNS
621  else if (proximityNeighborSelection) {
622  neighborCache->getProx(kadHandle, NEIGHBORCACHE_QUERY, -1, this, NULL);
623  //pingNode(kadHandle);
624  }
625 
627  return result;
628 }
629 
630 bool Kademlia::routingTimeout(const OverlayKey& key, bool immediately)
631 {
633  // key unspecified? yes -> ignore
634  if (key.isUnspecified())
635  return false;
636 
637  // bucket index
639 
640  /* check if the node is one of the siblings -----------------------------*/
641  if ((i = siblingTable->findIterator(key)) != siblingTable->end()) {
642 
643  i->incStaleCount();
644  i->setPingSent(false);
645 
646  if (i->getStaleCount() > maxStaleCount || immediately) {
647  // remove from sibling table
648  NodeHandle oldSibling = *i;
649  siblingTable->erase(i);
650 
651  // lost last sibling?
652  if (siblingTable->size() == 0) {
653  join();
654  return true;
655  }
656 
657  BUCKET_CONSISTENCY(routingTimeout: is sibling);
658 
659  // try to refill with new closest contact
661 
662  // call update() for removed sibling
663  deleteOverlayNeighborArrow(oldSibling);
664  callUpdate(oldSibling, false);
665 
666  updateTooltip();
667 
668  return true;
669  }
670  }
671 
672  /* check if node is already in a bucket ---------------------------------*/
673  KademliaBucket* bucket = routingBucket(key, false);
674  if (bucket != NULL && (i = bucket->findIterator(key) ) != bucket->end() ) {
675 
676  i->incStaleCount();
677  i->setPingSent(false);
678 
679  if (i->getStaleCount() > maxStaleCount || immediately) {
680  // remove from routing table
681  bucket->erase(i);
682 
684  if (bucket->replacementCache.size()) {
685  routingAdd(bucket->replacementCache.front(), true,
686  bucket->replacementCache.front().getRtt());
687  bucket->replacementCache.pop_front();
688  nodesReplaced++;
689  }
690  }
691  }
692 
693  BUCKET_CONSISTENCY(routingTimeout: is in bucket);
694  return true;
695  }
697  return false;
698 }
699 
701 {
702  if (siblingTable->size() == 0 ||
703  siblingTable->isFull())
704  return;
705 
706  int index = routingBucketIndex(siblingTable->back().getKey()) - 1;
707  assert(index > 0);
708 
709  while ((routingTable[index] == NULL ||
710  routingTable[index]->empty()) &&
711  index < (int)(OverlayKey::getLength() - 1)) {
712  index++;
713  }
714  if (index < (int)OverlayKey::getLength() &&
715  routingTable[index] != NULL && routingTable[index]->size()) {
716 
717  KademliaBucket sortedBucket(k, comparator);
718  for (uint32_t i = 0; i < routingTable[index]->size(); ++i) {
719  sortedBucket.add(routingTable[index]->at(i));
720  }
721 
722  siblingTable->add(sortedBucket.front());
723 
724  // call update() for new sibling
725  if (!secureMaintenance) {
726  showOverlayNeighborArrow(sortedBucket.front(), false,
727  "m=m,50,100,50,100;ls=green,1");
728  callUpdate(sortedBucket.front(), true);
729  }
730 
731  // remove node from bucket
732  routingTable[index]->erase(routingTable[index]->
733  findIterator(sortedBucket.front().getKey()));
734  assert(siblingTable->isFull());
736  }
737 }
738 
740 {
741  KademliaBucket* bucket = routingBucket(key, true);
742 
743  if (bucket) {
744  bucket->setLastUsage(simTime());
745  }
746 
747  if ((siblingTable->size() < (uint32_t)getMaxNumSiblings())
748  || ((siblingTable->at(getMaxNumSiblings() - 1).getKey() ^ thisNode.getKey())
749  >= (key ^ thisNode.getKey()))) {
750  siblingTable->setLastUsage(simTime());
751  }
752 
753 }
754 
755 bool Kademlia::isSiblingFor(const NodeHandle& node, const OverlayKey& key,
756  int numSiblings, bool* err)
757 {
758  if (key.isUnspecified())
759  error("Kademlia::isSiblingFor(): key is unspecified!");
760 
761  if (state != READY) {
762  EV << "[Kademlia::isSiblingFor()] @ "
763  << thisNode.getIp()
764  << " (" << thisNode.getKey().toString(16) << ")]\n"
765  << " state != READY"
766  << endl;
767  *err = true;
768  return false;
769  }
770 
771  if (numSiblings > getMaxNumSiblings()) {
772  opp_error("Kademlia::isSiblingFor(): numSiblings too big!");
773  }
774 
775  // set default number of siblings to consider
776  if (numSiblings == -1) {
777  numSiblings = getMaxNumSiblings();
778  }
779 
780  if (numSiblings == 0) {
781  *err = false;
782  return (node.getKey() == key);
783  }
784 
785  if (siblingTable->size() < (uint)numSiblings) {
786  *err = false;
787  return true;
788  }
789 
790  if (siblingTable->isFull() &&
791  ((thisNode.getKey() ^ key) >
792  (thisNode.getKey() ^ siblingTable->back().getKey()))) {
793  EV << "[Kademlia::isSiblingFor()] @ "
794  << thisNode.getIp()
795  << " (" << thisNode.getKey().toString(16) << ")]\n"
796  << " Not sure if I am sibling for " << key << " !\n"
797  << " (" << key << " is not closer to me than "
798  << siblingTable->back().getKey() << ")"
799  << endl;
800  *err = true;
801  return false;
802  }
803 
806 
807  // create result vector
808  NodeVector* result = new NodeVector(numSiblings, comp);
809 
810  for (KademliaBucket::iterator i=siblingTable->begin();
811  i != siblingTable->end(); i++) {
812  result->add( *i);
813  }
814 
815  // add local node
816  result->add(thisNode);
817 
818  *err = false;
819  delete comp;
820 
821  if (result->contains(node.getKey())) {
822  delete result;
823  return true;
824  } else {
825  delete result;
826  assert(!(numSiblings == 1 && key == node.getKey()));
827  return false;
828  }
829 }
830 
832 {
833  // send failed node call to all siblings
834  FailedNodeCall* call = new FailedNodeCall();
835  call->setFailedNode(getThisNode());
836  call->setBitLength(FAILEDNODECALL_L(call));
837  for (KademliaBucket::iterator i = siblingTable->begin();
838  i != siblingTable->end(); i++) {
839  countFailedNodeCall(call);
840  sendUdpRpcCall(*i, call->dup());
841  }
842 
843  delete call;
844 }
845 
847 {
848  assert(!failed.isUnspecified());
849 
851  // check sibling table
852  for (i = siblingTable->begin(); i != siblingTable->end(); ++i) {
853  if (failed == *i) break;
854  }
855 
856  if (i != siblingTable->end()) {
857  // remove from sibling table
858  NodeHandle oldSibling = *i;
859  siblingTable->erase(i);
860 
861  // call update() for removed sibling
862  deleteOverlayNeighborArrow(oldSibling);
863  callUpdate(oldSibling, false);
864 
865  updateTooltip();
866 
867  // try to refill with new closest contact
869  } else {
870  // check buckets
871  uint32_t m;
872  for (m = 0; m < routingTable.size(); ++m) {
873  if (routingTable[m] != NULL) {
874  for (i = routingTable[m]->begin(); i != routingTable[m]->end();
875  ++i) {
876  if (failed == *i) {
877  // remove from routing table
878  routingTable[m]->erase(i);
879  return (siblingTable->size() != 0);
880  }
881  }
882  }
883  }
884  }
885  return (siblingTable->size() != 0);
886 }
887 
888 // R/Kademlia
890  BaseRouteMessage* msg)
891 {
892  if (msg->getSrcNode() != thisNode) {
893  if (!msg->getDestKey().isUnspecified()) {
894  routingAdd(msg->getSrcNode(), true);
895 
896  if (altRecMode && dest != thisNode) return true;
897 
898  NodeVector* nextHops = findNode(msg->getDestKey(), /*recursiveLookupConfig.redundantNodes*/ k, s, msg);
899  KademliaRoutingInfoMessage* kadRoutingInfoMsg =
901 
902  kadRoutingInfoMsg->setSrcNode(thisNode);
903  kadRoutingInfoMsg->setDestKey(msg->getDestKey());
904  kadRoutingInfoMsg->setNextHopsArraySize(nextHops->size());
905  kadRoutingInfoMsg->setName("KadRoutingInfoMsg");
906  for (uint32_t i = 0; i < nextHops->size(); i++) {
907  kadRoutingInfoMsg->setNextHops(i, (*nextHops)[i]);
908  if (thisNode == kadRoutingInfoMsg->getNextHops(i)) {
909  kadRoutingInfoMsg->getNextHops(i).setIsAlive(true);
910  }
911  }
912  kadRoutingInfoMsg->setBitLength(KADEMLIAROUTINGINFO_L(kadRoutingInfoMsg));
913 
914  delete nextHops;
915 
916  if (!altRecMode) {
917  sendMessageToUDP(msg->getSrcNode(), kadRoutingInfoMsg, 0.000001);
918  } else {
919  // alternative maintenance mode
920  std::vector<TransportAddress> sourceRoute;
921  for (int i = msg->getVisitedHopsArraySize() - 1/*2*/; i >= 0; i--) {
922  //TODO remove loops
923  sourceRoute.push_back(msg->getVisitedHops(i));
924  }
925  //sourceRoute.push_back(msg->getSrcNode());
926  sendToKey(OverlayKey::UNSPECIFIED_KEY, kadRoutingInfoMsg, 0,
927  sourceRoute, NO_OVERLAY_ROUTING);
928  }
929  //TODO should be sent after baseroutemsg
930  } else if (altRecMode &&
931  dynamic_cast<KademliaRoutingInfoMessage*>(msg->
933  // alternative mode: infoMsg on its way back
934  KademliaRoutingInfoMessage* infoMsg =
935  static_cast<KademliaRoutingInfoMessage*>(msg->decapsulate());
936  NodeVector* nextHops = findNode(infoMsg->getDestKey(), k, s, msg);
937 
938  // merge vectors
940  MarkedNodeVector temp(UINT16_MAX, &comp);
941 
942  for (uint32_t i = 0; i < nextHops->size(); i++) {
943  temp.push_back((*nextHops)[i]);
944  }
945  delete nextHops;
946 
947  for (uint32_t i = 0; i < infoMsg->getNextHopsArraySize(); i++) {
948  routingAdd(infoMsg->getNextHops(i),
949  infoMsg->getNextHops(i).getIsAlive());
950  temp.add(infoMsg->getNextHops(i));
951  }
952 
953  infoMsg->setNextHopsArraySize(temp.size());
954  for (uint32_t i = 0; i < temp.size(); ++i) {
955  infoMsg->setNextHops(i, temp[i]);
956  if (thisNode == infoMsg->getNextHops(i)) {
957  infoMsg->getNextHops(i).setIsAlive(true);
958  }
959  }
960  infoMsg->setBitLength(KADEMLIAROUTINGINFO_L(infoMsg));
961 
962  msg->encapsulate(infoMsg);
963  }
964  }
965  return true;
966 }
967 
968 
969 NodeVector* Kademlia::findNode(const OverlayKey& key, int numRedundantNodes,
970  int numSiblings, BaseOverlayMessage* msg)
971 {
972  if (numSiblings > getMaxNumSiblings()) {
973  opp_error("(Kademlia::findNode()) numRedundantNodes or numSiblings "
974  "too big!");
975  }
976 
977 #if 0
978  if (numRedundantNodes < 2) {
979  throw cRuntimeError("Kademlia::findNode(): For Kademlia "
980  "redundantNodes must be at least 2 "
981  "and lookupMerge should be true!");
982  }
983 #endif
984 
985  // create temporary comparator
988 
989  // select result set size
990  bool err;
991  int resultSize;
992 
993  if (numSiblings < 0) {
994  // exhaustive iterative doesn't care about siblings
995  resultSize = numRedundantNodes;
996  } else {
997  resultSize = isSiblingFor(thisNode, key, numSiblings, &err) ?
998  (numSiblings ? numSiblings : 1) : numRedundantNodes;
999  }
1000  assert(numSiblings || numRedundantNodes);
1001 
1002  NodeVector* result = new NodeVector(resultSize, comp);
1003 
1004  if (siblingTable->isEmpty()) {
1005  result->add(thisNode);
1006  delete comp;
1007  return result;
1008  }
1009 
1010  // R/Kademlia: in recursive mode just speed up route messages //TODO iterative PR
1011  bool returnProxNodes = false;
1012  if (proximityRouting) {
1013  if (msg &&
1014  (!dynamic_cast<FindNodeCall*>(msg->getEncapsulatedPacket()) &&
1015  !dynamic_cast<FindNodeCall*>(msg))) {
1016  returnProxNodes = true;
1017  }
1018  }
1019 
1020  ProxNodeVector* resultProx = NULL;
1021  KademliaPRComparator* compProx = NULL;
1022  if (returnProxNodes) {
1023  compProx = new KademliaPRComparator(key);
1024  resultProx = new ProxNodeVector(resultSize, NULL, NULL, compProx, 0, resultSize);
1025  }
1026 
1027  // add items from buckets
1028  int index;
1029  int mainIndex = routingBucketIndex(key);
1030  int startIndex = routingBucketIndex(key, true);
1031  int endIndex = routingBucketIndex(siblingTable->back().getKey());
1032 
1033  // add nodes from best fitting bucket
1034  if (mainIndex != -1) {
1035  KademliaBucket* bucket = routingTable[mainIndex];
1036  if (bucket != NULL && bucket->size()) {
1037  for (KademliaBucket::iterator i=bucket->begin(); i!=bucket->end(); i++) {
1038  result->add(*i);
1039  if (returnProxNodes)
1040  resultProx->add(*i);
1041  //EV << "Kademlia::findNode(): Adding "
1042  // << *i << " from bucket " << mainIndex << endl;
1043  }
1044  }
1045  }
1046 
1047  // add most fitting buckets
1048  if (startIndex >= endIndex || !result->isFull()) {
1049  for (index = startIndex; index >= endIndex; --index) {
1050  // add bucket to result vector
1051  if (index == mainIndex) continue;
1052  KademliaBucket* bucket = routingTable[index];
1053  if (bucket != NULL && bucket->size()) {
1054  for (KademliaBucket::iterator i=bucket->begin(); i!=bucket->end(); i++) {
1055  result->add(*i);
1056  if (returnProxNodes)
1057  resultProx->add(*i);//std::make_pair(*i, i->getRtt()));
1058  //EV << "Kademlia::routingGetClosestNodes(): Adding "
1059  // << *i << " from bucket " << index << endl;
1060  }
1061  }
1062  }
1063 
1064  // add nodes from sibling table
1065  for (KademliaBucket::iterator i = siblingTable->begin();
1066  i != siblingTable->end(); i++) {
1067  result->add(*i);
1068  if (returnProxNodes)
1069  resultProx->add(*i);
1070  }
1071  // add local node
1072  result->add(thisNode);
1073  if (returnProxNodes) {
1075  if (!result->size() || (*result)[0] == thisNode) {
1076  temp.setProx(Prox::PROX_SELF);
1077  resultProx->add(temp);
1078  } else {
1080  resultProx->add(temp);
1081  }
1082  }
1083  }
1084 
1085  // add more distant buckets
1086  for (index = mainIndex + 1; !result->isFull() && index < numBuckets;
1087  ++index) {
1088  // add bucket to result vector
1089  KademliaBucket* bucket = routingTable[index];
1090  if (bucket != NULL && bucket->size()) {
1091  for (KademliaBucket::iterator i=bucket->begin(); i!=bucket->end(); i++) {
1092  result->add(*i);
1093  if (returnProxNodes)
1094  resultProx->add(*i);
1095  //EV << "[Kademlia::routingGetClosestNodes()]\n"
1096  // << " Adding " << *i << " from bucket " << index
1097  // << endl;
1098  }
1099  }
1100  }
1101 
1102  if (returnProxNodes && resultProx->size() && result->size() &&
1103  (KeyPrefixMetric().distance(key, (*resultProx)[0].getKey()) <
1104  KeyPrefixMetric().distance(key, (*result)[0].getKey()))) {
1105  result->clear();
1106  for (uint32_t i = 0; i < resultProx->size(); ++i) {
1107  result->push_back((*resultProx)[i]/*.first*/);
1108  }
1109  }
1110  delete compProx;
1111  delete resultProx;
1112 
1113  delete comp;
1114 
1115  return result;
1116 }
1117 
1118 //-----------------------------------------------------------------------------
1119 
1120 
1121 void Kademlia::handleTimerEvent(cMessage* msg)
1122 {
1123  if (msg == bucketRefreshTimer) {
1125  } else if (msg == siblingPingTimer) {
1126  if (siblingPingInterval == 0) {
1127  return;
1128  }
1129 
1130  for (KademliaBucket::iterator i = siblingTable->begin();
1131  i != siblingTable->end(); i++) {
1132  pingNode(*i);
1133  }
1134  scheduleAt(simTime() + siblingPingInterval, msg);
1135  }
1136 }
1137 
1138 // R/Kademlia
1140 {
1141  // only used for recursive Kademlia
1142  OverlayCtrlInfo* ctrlInfo =
1143  check_and_cast<OverlayCtrlInfo*>(msg->removeControlInfo());
1144  KademliaRoutingInfoMessage* kadRoutingInfoMsg =
1145  check_and_cast<KademliaRoutingInfoMessage*>(msg);
1146 
1147  routingAdd(kadRoutingInfoMsg->getSrcNode(), true);
1148 
1149  for (uint32_t i = 0; i < kadRoutingInfoMsg->getNextHopsArraySize(); i++) {
1150  routingAdd(kadRoutingInfoMsg->getNextHops(i),
1151  kadRoutingInfoMsg->getNextHops(i).getIsAlive());
1152  }
1153 
1154  delete ctrlInfo;
1155  delete msg;
1156 }
1157 
1158 //In Kademlia this method is used to maintain the routing table.
1160 {
1161  bool maintenanceLookup = (msg->getStatType() == MAINTENANCE_STAT);
1162  RPC_SWITCH_START(msg)
1163  RPC_ON_CALL(Ping) {
1164  // add active node
1165  OverlayCtrlInfo* ctrlInfo =
1166  check_and_cast<OverlayCtrlInfo*>(msg->getControlInfo());
1167  routingAdd(ctrlInfo->getSrcRoute(), true, MAXTIME, maintenanceLookup);
1168  break;
1169  }
1170  RPC_ON_CALL(FindNode)
1171  {
1172  // add active node
1173  OverlayCtrlInfo* ctrlInfo =
1174  check_and_cast<OverlayCtrlInfo*>(msg->getControlInfo());
1175  routingAdd(ctrlInfo->getSrcRoute(), true, MAXTIME, maintenanceLookup);
1176  break;
1177  }
1178  RPC_SWITCH_END()
1179  return false;
1180 }
1181 
1182 //In Kademlia this method is used to maintain the routing table.
1184  cPolymorphic* context, int rpcId,
1185  simtime_t rtt)
1186 {
1187  bool maintenanceLookup = (msg->getStatType() == MAINTENANCE_STAT);
1188  OverlayCtrlInfo* ctrlInfo =
1189  dynamic_cast<OverlayCtrlInfo*>(msg->getControlInfo());
1190  NodeHandle srcRoute = (ctrlInfo ? ctrlInfo->getSrcRoute()
1191  : msg->getSrcNode());
1192 
1193  RPC_SWITCH_START(msg)
1194  RPC_ON_RESPONSE(Ping) {
1195  if (state == INIT) {
1196  // schedule bucket refresh timer
1197  cancelEvent(bucketRefreshTimer);
1198  scheduleAt(simTime(), bucketRefreshTimer);
1199  cancelEvent(siblingPingTimer);
1200  scheduleAt(simTime() + siblingPingInterval, siblingPingTimer);
1201  state = JOIN;
1202  }
1203  }
1204  RPC_ON_RESPONSE(FindNode)
1205  {
1206  if (state == INIT) {
1207  state = JOIN;
1208 
1209  // bootstrap node is trustworthy: add all nodes immediately
1210  routingAdd(srcRoute, true, rtt, maintenanceLookup);
1211  for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
1212  routingAdd(_FindNodeResponse->getClosestNodes(i), true,
1213  MAXTIME-1, maintenanceLookup);
1214 
1215  if (newMaintenance) {
1216  createLookup()->lookup(getThisNode().getKey(), s, hopCountMax, 0,
1217  new KademliaLookupListener(this));
1218  } else {
1219  // schedule bucket refresh timer
1220  cancelEvent(bucketRefreshTimer);
1221  scheduleAt(simTime(), bucketRefreshTimer);
1222  cancelEvent(siblingPingTimer);
1223  scheduleAt(simTime() + siblingPingInterval, siblingPingTimer);
1224  }
1225 
1226  break;
1227  }
1228 
1229  // add active node
1233  rtt = MAXTIME;
1234  }
1235  setBucketUsage(srcRoute.getKey());
1236 
1237  // add inactive nodes
1238  for (uint32_t i=0; i<_FindNodeResponse->getClosestNodesArraySize(); i++)
1239  routingAdd(_FindNodeResponse->getClosestNodes(i), false,
1240  MAXTIME, maintenanceLookup);
1241  break;
1242  }
1243  RPC_SWITCH_END()
1244 
1245  // add node that responded
1246  routingAdd(srcRoute, true, rtt, maintenanceLookup);
1247 }
1248 
1249 // In Kademlia this method is used to maintain the routing table.
1251  const TransportAddress& dest,
1252  cPolymorphic* context, int rpcId,
1253  const OverlayKey& destKey)
1254 {
1255  if (dest.isUnspecified()) return;
1256  try {
1257  RPC_SWITCH_START(msg)
1258  RPC_ON_CALL(Ping) {
1259  if (state == INIT) {
1260  joinOverlay();
1261  return;
1262  }
1263 
1264  const NodeHandle& handle = dynamic_cast<const NodeHandle&>(dest);
1265  routingTimeout(handle.getKey());
1266  break;
1267  }
1268  RPC_ON_CALL(FindNode) {
1269  if (state == INIT) {
1270  joinOverlay();
1271  return;
1272  }
1273 
1274  const NodeHandle& handle = dynamic_cast<const NodeHandle&>(dest);
1275  routingTimeout(handle.getKey());
1276  setBucketUsage(handle.getKey());
1277  break;
1278  }
1279  RPC_SWITCH_END()
1280  } catch (...) {
1281  EV << "[Kademlia:handleRpcTimout() @ " << thisNode.getIp()
1282  << " (" << thisNode.getKey().toString(16) << ")]\n"
1283  << " ERROR: RPC timeout without key ("
1284  << msg << " -> " << dest << ")" << endl;
1285  return;
1286  }
1287 }
1288 
1289 // R/Kademlia
1290 void Kademlia::proxCallback(const TransportAddress& node, int rpcId,
1291  cPolymorphic *contextPointer, Prox prox)
1292 {
1293  Enter_Method_Silent();
1294 
1295  if (prox != Prox::PROX_TIMEOUT) {
1296  routingAdd((const NodeHandle&)node, true, prox.proximity);
1297  } else {
1298  routingTimeout(((const NodeHandle&)node).getKey());
1299  }
1300 }
1301 
1302 void Kademlia::lookupFinished(bool isValid)
1303 {
1304  if (state == JOIN) {
1305  cancelEvent(bucketRefreshTimer);
1306 
1307  if (siblingTable->size() == 0) {
1308  // initial lookup failed - get new bootstrap node
1309  joinOverlay();
1310  return;
1311  }
1312 
1313  scheduleAt(simTime(), bucketRefreshTimer);
1314 
1315  if (!newMaintenance) {
1316  state = READY;
1317  setOverlayReady(true);
1318  }
1319  }
1320 }
1321 
1322 // handle a expired bucket refresh timer
1324 {
1325  // refresh buckets
1326  if (state != READY || (((simTime() - siblingTable->getLastUsage()) >
1328  // R/Kademlia
1332  //TODO real exhaustive-recursive lookup
1334  0, hopCountMax, 0,
1335  new KademliaLookupListener(this));
1336  } else if (exhaustiveRefresh) {
1337  //TODO config shit
1338  int baseRedundantNodes = iterativeLookupConfig.redundantNodes;
1341  getThisNode().getKey(), siblingRefreshNodes,
1342  hopCountMax, 0, new KademliaLookupListener(this));
1343  iterativeLookupConfig.redundantNodes = baseRedundantNodes;
1344  } else if (newMaintenance) {
1345  //for (KademliaBucket::iterator i = siblingTable->begin();
1346  // i != siblingTable->end(); i++) {
1347 
1348  // sendSiblingFindNodeCall(*i);
1349  //}
1350  if (siblingTable->size()) {
1351  sendSiblingFindNodeCall(siblingTable->at(intuniform(0,siblingTable->size()-1)));
1352  }
1353  state = READY;
1354  setOverlayReady(true);
1355  } else {
1356  createLookup()->lookup(getThisNode().getKey(), s, hopCountMax, 0,
1357  new KademliaLookupListener(this));
1358  }
1359  siblingTable->setLastUsage(simTime());
1361  }
1362 
1363  if (state == READY) {
1364  if (siblingTable->size()) {
1365  // get bit index of most significant digit that differs
1366  // from our next sibling's key to prevent us from refreshing
1367  // buckets, which can't contain any nodes
1368  int32_t diff = OverlayKey::getLength() - b*(getThisNode().getKey().
1369  sharedPrefixLength(siblingTable->front().getKey(), b) + 1);
1370  int bucketsRefreshedPerTask = 0;
1371  for (int32_t i = OverlayKey::getLength() - b; i >= diff; i -=b ) {
1372  for (int32_t d=0; d < ((1 << b) - 1); d++) {
1373  int32_t index = (i / b) * ((1 << b) - 1) + d;
1374  if (index < 0) continue;
1375  if ((routingTable[index] == NULL) ||
1376  ((simTime() - routingTable[index]->getLastUsage()) >
1378 
1379  OverlayKey refreshKey =
1380  getThisNode().getKey() ^ (OverlayKey(d+1) << i);
1381 
1382  // R/Kademlia
1386  //TODO real exhaustive-recursive lookup
1387  createLookup()->lookup(refreshKey, 0,
1388  hopCountMax, 0,
1389  new KademliaLookupListener(this));
1390  } else if (exhaustiveRefresh) {
1391  //TODO config shit
1392  int baseRedundantNodes = iterativeLookupConfig.redundantNodes;
1395  refreshKey, bucketRefreshNodes, hopCountMax,
1396  0, new KademliaLookupListener(this));
1397  iterativeLookupConfig.redundantNodes = baseRedundantNodes;
1398  } else {
1399  createLookup()->lookup(refreshKey, s, hopCountMax, 0,
1400  new KademliaLookupListener(this));
1401  }
1402 
1403  ++bucketsRefreshedPerTask;
1405  setBucketUsage(refreshKey);
1406  }
1407  }
1408  }
1410  "Kademlia: Buckets Refreshed Per Task",
1411  bucketsRefreshedPerTask));
1412  }
1413  // schedule next bucket refresh process
1414  cancelEvent(bucketRefreshTimer);
1415  scheduleAt(simTime() + (std::min(minSiblingTableRefreshInterval,
1417  }
1418 }
1419 
1420 //virtual public: xor metric
1422  const OverlayKey& y,
1423  bool useAlternative) const
1424 {
1425  if (!useAlternative) return x^y; // KeyXorMetric().distance(x, y);
1426  return KeyPrefixMetric().distance(x, y);
1427 }
1428 
1430 {
1431  if (ev.isGUI()) {
1432  std::stringstream ttString;
1433 
1434  // show our nodeId in a tooltip
1435  ttString << "This: " << thisNode << endl << "Siblings: "
1436  << siblingTable->size();
1437 
1438  getParentModule()->getParentModule()->getDisplayString().
1439  setTagArg("tt", 0, ttString.str().c_str());
1440  getParentModule()->getDisplayString().
1441  setTagArg("tt", 0, ttString.str().c_str());
1442  getDisplayString().setTagArg("tt", 0, ttString.str().c_str());
1443  }
1444 }
1445