OverSim
IterativeLookup.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <iostream>
25 #include <assert.h>
26 
27 #include <UnderlayConfigurator.h>
28 #include <LookupListener.h>
29 #include <BaseOverlay.h>
30 #include <GlobalStatistics.h>
31 
32 #include "IterativeLookup.h"
33 
34 using namespace std;
35 
36 //----------------------------------------------------------------------------
37 
39 {}
40 
41 //----------------------------------------------------------------------------
42 
44 {}
45 
46 std::ostream& operator<<(std::ostream& os, const LookupEntry& n)
47 {
48  os << "handle: " << n.handle << " source: " << n.source
49  << " alreadyUsed: " << n.alreadyUsed;
50 
51  return os;
52 };
53 
54 std::ostream& operator<<(std::ostream& os, const LookupVector& n)
55 {
56  for (LookupVector::const_iterator i=n.begin(); i !=n.end(); i++) {
57  os << *i;
58  if (i+1 != n.end()) {
59  os << endl;
60  }
61  }
62 
63  return os;
64 };
65 
66 //----------------------------------------------------------------------------
67 //- Construction & Destruction -----------------------------------------------
68 //----------------------------------------------------------------------------
70  RoutingType routingType,
71  const IterativeLookupConfiguration& config,
72  const cPacket* findNodeExt,
73  bool appLookup) :
74 overlay(overlay),
75 routingType(routingType),
76 config(config),
77 firstCallExt(NULL),
78 finished(false),
79 success(false),
80 running(false),
81 appLookup(appLookup)
82 {
83  if (findNodeExt) firstCallExt = static_cast<cPacket*>(findNodeExt->dup());
84 
85  if ((config.parallelPaths > 1) && (!config.merge)) {
86  throw cRuntimeError("IterativeLookup::IterativeLookup(): "
87  "config.merge must be enabled for "
88  "using parallel paths!");
89  }
90 
91  if (config.verifySiblings && (!config.merge)) {
92  throw cRuntimeError("IterativeLookup::IterativeLookup(): "
93  "config.merge must be enabled for "
94  "using secure lookups!");
95  }
96 
97 
98  if (config.majoritySiblings && (!config.merge)) {
99  throw cRuntimeError("IterativeLookup::IterativeLookup(): "
100  "config.merge must be enabled for "
101  "using majority decision for sibling selection!");
102  }
103 /*
104  if (config.useAllParallelResponses && (!config.merge)) {
105  throw cRuntimeError("IterativeLookup::IterativeLookup(): "
106  "config.merge must be enabled if "
107  "config.useAllParallelResponses is true!");
108  }*/
109 }
110 
112 {
113  stop();
114  delete firstCallExt;
115  overlay->removeLookup(this);
116 }
117 
119 {
120  if (listener != NULL) {
121  delete listener;
122  listener = NULL;
123  }
124  for (RpcInfoMap::iterator i = rpcs.begin(); i != rpcs.end(); i++) {
125  overlay->cancelRpcMessage(i->second.nonce);
126  }
127  rpcs.clear();
128  delete this;
129 }
130 
132 {
133  // init params
134  successfulPaths = 0;
135  finishedPaths = 0;
136  accumulatedHops = 0;
137 
138  // init flags
139  finished = false;
140  success = false;
141  running = true;
142 
143  // init siblings vector
144  siblings = NodeVector(numSiblings == 0 ? 1 : numSiblings, this);
145  visited.clear();
146  dead.clear();
147  pinged.clear();
148 
149  startTime = simTime();
150 
151  // get local closest nodes
155 
156  bool err;
157 
159 
160  // if this node is new and no nodes are known -> stop lookup
161  if (nextHops->size() == 0) {
162  //std::cout << "IterativeLookup: No next hops known" << endl;
163  finished = true;
164  success = false;
165  } else if ((numSiblings == 0)
167  key, numSiblings,
168  &err)) {
169  if (overlay->getThisNode().getKey() == key) {
170  addSibling(overlay->getThisNode(), true);
171  success = true;
172  } else {
173  std::cout << "IterativeLookup: numSiblings==0 and no node with this id"
174  << endl;
175  success = false;
176  }
177  finished = true;
178  }
179  // finish lookup if the key is local and siblings are needed
180  else if (numSiblings != 0 && routingType != EXHAUSTIVE_ITERATIVE_ROUTING &&
182  numSiblings, &err)) {
183 
184  for (uint32_t i=0; i<nextHops->size(); i++) {
185  addSibling(nextHops->at(i), true);
186  }
187 
188  success = finished = true;
189  }
190 
191  // if the key was local or belongs to one of our siblings we are finished
192  if (finished) {
193  // calls stop and finishes the lookup
194  delete nextHops;
195  delete call;
196  delete this;
197  return;
198  }
199 
200  // extract find node extensions
201  cPacket* findNodeExt = NULL;
202  if (call->hasObject("findNodeExt"))
203  findNodeExt = (cPacket*)call->removeObject("findNodeExt");
204  delete call;
205 
206  // not enough nodes for all paths? -> reduce number of parallel paths
207  if ((uint32_t)config.parallelPaths > nextHops->size()) {
208  config.parallelPaths = nextHops->size();
209  }
210 
211  // create parallel paths and distribute nodes to paths
212  for (int i = 0; i < config.parallelPaths; i++) {
213 
214  // create state
215  IterativePathLookup* pathLookup = new IterativePathLookup(this);
216  paths.push_back(pathLookup);
217 
218  // populate next hops
219  for (uint32_t k=0; (k * config.parallelPaths + i) < nextHops->size(); k++) {
220  pathLookup->add(nextHops->at(k * config.parallelPaths + i));
221  }
222 
223  // send initial rpcs
224  pathLookup->sendRpc(config.parallelRpcs, findNodeExt);
225  }
226 
227 
228  //std::cout << "nextHops size: " << nextHops->size()
229  //<< " numSiblings: " << numSiblings
230  //<< " redundantNodes: " << config.redundantNodes
231  //<< " thisNode " << overlay->getThisNode().ip
232  //<< " nextHop " << nextHops->at(0).ip << endl;
233 
234  delete nextHops;
235  delete findNodeExt;
236 
237  checkStop();
238 }
239 
241 {
242  // only stop if running
243  if (!running)
244  return;
245 
246  for (uint32_t i=0; i<paths.size(); i++) {
247  success |= paths[i]->success;
248  }
249 
250  // cancel pending rpcs
251  for (RpcInfoMap::iterator i = rpcs.begin(); i != rpcs.end(); i++) {
252  overlay->cancelRpcMessage(i->second.nonce);
253  }
254  rpcs.clear();
255 
256  // cancel pending ping rpcs
257  for (PendingPings::iterator i = pendingPings.begin(); i != pendingPings.end(); i++) {
258  overlay->cancelRpcMessage(i->second);
259  }
260  pendingPings.clear();
261 
262  // delete path lookups
263  for (uint32_t i = 0; i < paths.size(); ++i) {
264  delete paths[i];
265  }
266  paths.clear();
267 
268  // reset running flag
269  running = false;
270  finished = true;
271 
272 #if 0
273  EV << "[IterativeLookup::stop() @ " << overlay->getThisNode().getIp()
274  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
275  << " Lookup " << (isValid() ? "succeeded\n" : "failed\n")
276  << " (" << successfulPaths << " successful paths)\n"
277  << " (" << finishedPaths << " finished paths)"
278  << endl;
279 #endif
280 
281  // inform listener
282  if (listener != NULL) {
283  listener->lookupFinished(this);
284  listener = NULL;
285  }
286 }
287 
288 //inline
290 {
291  bool finishLookup = false;
292 
293  if (config.majoritySiblings && (numSiblings > 0) &&
294  finishedPaths == (uint32_t)config.parallelPaths) {
295  if (majoritySiblings.size() <= (uint)numSiblings) {
296  // TODO: better check that all nodes have sent the same siblings
297  MajoritySiblings::iterator it;
298  for (it = majoritySiblings.begin(); it != majoritySiblings.end(); it++) {
299  siblings.add(*it);
300  }
301  success = true;
302  finishLookup = true;
303  }
304  }
305 
306  // check if there are rpcs pending or lookup finished
307  if (((successfulPaths >= 1) && (numSiblings == 0) && (siblings.size() >= 1)) ||
308  ((finishedPaths == (uint32_t)config.parallelPaths) &&
309  (numSiblings > 0) && (pendingPings.size() == 0))) {
310  /*
311  std::cout << "successfulPaths: " << successfulPaths
312  << ", numSiblings: " << numSiblings
313  << ", siblings.size(): " << siblings.size()
314  << ", finishedPaths: " << finishedPaths
315  << ", config.parallelPaths: " << config.parallelPaths
316  << ", pendingPings.size(): " << pendingPings.size()
317  << std::endl;
318  */
319  for (uint32_t i=0; i<paths.size(); i++) {
320  success |= paths[i]->success;
321  }
322  finishLookup = true;
323 
324  } else if ((rpcs.size() == 0) && (pendingPings.size() == 0)) {
325  finishLookup = true;
326  }
327 
328  if (finishLookup == true) {
329  // TODO: should not be needed, but sometimes finishedPaths seems
330  // to be smaller than config.parallelPaths
331  if (successfulPaths >= 1) {
332  success = true;
333  }
334 
335  if (success == false) {
336  //std::cout << overlay->getThisNode().getKey() << " lookup of " << key << " failed: hops :" << accumulatedHops << endl;
337  }
338 
339  if (success == false && retries > 0) {
340  //std::cout << "IterativeLookup::checkStop(): Retry..." << endl;
341  retries--;
342  LookupListener* oldListener = listener;
343  listener = NULL;
344  stop();
345  listener = oldListener;
346  start();
347  } else {
348  //std::cout << "IterativeLookup::checkStop(): No retry!" << endl;
349  for (RpcInfoMap::iterator i = rpcs.begin(); i != rpcs.end(); i++) {
350  overlay->cancelRpcMessage(i->second.nonce);
351  }
352  rpcs.clear();
353  delete this;
354  }
355  }
356 }
357 
358 //----------------------------------------------------------------------------
359 //- Enhanceable methods ------------------------------------------------------
360 //----------------------------------------------------------------------------
362 {
363  return new IterativePathLookup(this);
364 }
365 
367 {
368  FindNodeCall* call = new FindNodeCall("FindNodeCall");
369  if (appLookup) {
371  } else {
373  }
374 
376  call->setExhaustiveIterative(true);
377  } else {
378  call->setExhaustiveIterative(false);
379  }
380 
381  call->setLookupKey(key);
385  call->setExhaustiveIterative(true);
386  } else {
387  call->setExhaustiveIterative(false);
388  }
389  call->setBitLength(FINDNODECALL_L(call));
390 
391  // duplicate extension object
392  if (findNodeExt) {
393  call->addObject(static_cast<cObject*>(findNodeExt->dup()));
394  call->addBitLength(findNodeExt->getBitLength());
395  }
396 
397  return call;
398 }
399 
400 //----------------------------------------------------------------------------
401 //- Base configuration and state ---------------------------------------------
402 //----------------------------------------------------------------------------
403 //virtual public
405  const OverlayKey& rhs /*,
406  bool useAlternativeMetric*/) const
407 {
408  //std::cout << key << ": l=" << lhs << ", r=" << rhs << std::endl;
409  return overlay->distance(lhs, key/*, useAlternativeMetric*/).compareTo(overlay->distance(rhs, key/*, useAlternativeMetric*/));
410 }
411 
412 
413 //----------------------------------------------------------------------------
414 //- Siblings and visited nodes management -----------------------------------
415 //----------------------------------------------------------------------------
416 bool IterativeLookup::addSibling(const NodeHandle& handle, bool assured)
417 {
418  bool result = false;
419 
420  EV << "[IterativeLookup::addSibling() @ " << overlay->getThisNode().getIp()
421  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
422  << " Adding potential sibling " << handle
423  << endl;
424 
425  majoritySiblings.insert(handle);
426 
427  if (config.verifySiblings && !assured && !getVisited(handle)) {
428  // ping potential sibling for authentication
429  if (siblings.isAddable(handle) && !getPinged(handle)) {
430  int id = intuniform(1, 2147483647);
431  int nonce = overlay->pingNode(handle, -1, 0, NULL, NULL, this, id);
432  pendingPings.insert(make_pair(id, nonce));
433  setPinged(handle);
434  }
435 
436  return false;
437  }
438 
439  if (numSiblings == 0) {
440  if (handle.getKey() == key) {
441  siblings.clear();
442  siblings.push_back( handle );
443  result = true;
444  }
445  } else {
446  if (config.parallelPaths == 1 && !config.verifySiblings) {
447  result = true;
448  if (!siblings.isFull()) {
449  siblings.push_back(handle);
450  }
451  } else {
452  if (siblings.add(handle) >= 0) {
453  result = true;
454  }
455  }
456  }
457 
458  return result;
459 }
460 
461 void IterativeLookup::setVisited(const TransportAddress& addr, bool visitedFlag)
462 {
463  if (visitedFlag)
464  this->visited.insert(addr);
465  else
466  this->visited.erase(addr);
467 }
468 
470 {
471  return (visited.count(addr) != 0);
472 }
473 
475 {
476  dead.insert(addr);
477 }
478 
480 {
481  return (dead.count(addr) != 0);
482 }
483 
485 {
486  pinged.insert(addr);
487 }
488 
490 {
491  return (pinged.count(addr) != 0);
492 }
493 
494 
495 //----------------------------------------------------------------------------
496 //- Parallel RPC distribution ------------------------------------------------
497 //----------------------------------------------------------------------------
499  cPolymorphic* context,
500  int rpcId, simtime_t rtt)
501 {
502  // check flags
503  if (finished || !running)
504  return;
505 
506  // get source, cast messages and mark node as visited
507  const TransportAddress& src = msg->getSrcNode();
508  FindNodeResponse* findNodeResponse = dynamic_cast<FindNodeResponse*>(msg);
509  PingResponse* pingResponse = dynamic_cast<PingResponse*>(msg);
510  FailedNodeResponse* failedNodeResponse =
511  dynamic_cast<FailedNodeResponse*>(msg);
512 
513  if (pingResponse != NULL) {
514  pendingPings.erase(rpcId);
515  // add authenticated sibling
516  addSibling(pingResponse->getSrcNode(), true);
517  }
518 
519  // handle find node response
520  if (findNodeResponse != NULL) {
521  // std::cout << "time: " << simTime() << " node: " << overlay->thisNode << " this: " << this << " received rpc with nonce: " << findNodeResponse->getNonce() << " from: " << findNodeResponse->getSrcNode() << endl;
522 
523  // check if rpc info is available, no -> exit
524  if (rpcs.count(src) == 0)
525  return;
526 
527  // get info
528  RpcInfoVector infos = rpcs[src];
529  rpcs.erase(src);
530 
531  // iterate
532  bool rpcHandled = false;
533 
534  for (uint32_t i=0; i<infos.size(); i++) {
535  // get info
536  const RpcInfo& info = infos[i];
537 
538  // do not handle finished paths
539  if (info.path->finished)
540  continue;
541 
542  // check if path accepts the message
543  // make an exception for responses with siblings==true
544  if (!rpcHandled &&
545  (info.path->accepts(info.vrpcId) ||
547  || (findNodeResponse->getSiblings() &&
549  info.path->handleResponse(findNodeResponse);
550  rpcHandled = true;
551  } else {
552  EV << "[IterativeLookup::handleRpcResponse()]\n"
553  << " Path does not accept message with id " << info.vrpcId
554  << endl;
555 
556  info.path->handleTimeout(NULL, findNodeResponse->getSrcNode(),
557  info.vrpcId);
558  }
559 
560  // count finished and successful paths
561  if (info.path->finished) {
562  finishedPaths++;
563 
564  // count total number of hops
565  accumulatedHops += info.path->hops;
566 
567  if (info.path->success)
568  successfulPaths++;
569  }
570 
571  }
572  }
573 
574 
575  // handle failed node response
576  if (failedNodeResponse != NULL) {
577  cPacket* findNodeExt = NULL;
578  if (failedNodeResponse->hasObject("findNodeExt")) {
579  findNodeExt =
580  (cPacket*)failedNodeResponse->removeObject("findNodeExt");
581  }
582 
583  for (std::vector<IterativePathLookup*>::iterator i = paths.begin();
584  i != paths.end(); i++) {
585 
586  (*i)->handleFailedNodeResponse(failedNodeResponse->getSrcNode(),
587  findNodeExt,
588  failedNodeResponse->getTryAgain());
589  }
590  }
591 
592  checkStop();
593 }
594 
595 
597  const TransportAddress& dest,
598  cPolymorphic* context, int rpcId,
599  const OverlayKey& destKey)
600 {
601  // check flags
602  if (finished || !running)
603  return;
604 
605  if (dynamic_cast<PingCall*>(msg) != NULL) {
606  pendingPings.erase(rpcId);
607  checkStop();
608  return;
609  }
610 
611  // check if rpc info is available
612  if (rpcs.count(dest)==0) {
613  //std::cout << "IterativeLookup::handleRpcTimeout(): RPC Timeout, but node"
614  // << " is not in rpcs structure!" << endl;
615  return;
616  }
617 
618  // mark the node as dead
619  setDead(dest);
620 
621  RpcInfoVector infos = rpcs[dest];
622  rpcs.erase(dest);
623 
624  // iterate
625  for (uint32_t i=0; i < infos.size(); i++) {
626 
627  const RpcInfo& info = infos[i];
628 
629  // do not handle finished paths
630  if (info.path->finished)
631  continue;
632 
633  // delegate timeout
634  info.path->handleTimeout(msg, dest, info.vrpcId);
635 
636  // count finished and successful paths
637  if (info.path->finished) {
638  finishedPaths++;
639 
640  // count total number of hops
641  accumulatedHops += info.path->hops;
642 
643  if (info.path->success)
644  successfulPaths++;
645  }
646  }
647  checkStop();
648 }
649 
651  IterativePathLookup* listener, int rpcId)
652 {
653  // check flags
654  if (finished || !running) {
655  delete call;
656  return;
657  }
658 
659  // create rpc info
660  RpcInfo info;
661  info.path = listener;
662  info.vrpcId = rpcId;
663 
664  // send new message
665  if (rpcs.count(handle) == 0) {
666  RpcInfoVector newVector;
667 
668  overlay->countFindNodeCall(call);
669  newVector.nonce = overlay->sendUdpRpcCall(handle, call, NULL,
670  -1, 0, -1, this);
671 
672  // std::cout << "time: " << simTime() << " node: " << overlay->thisNode << " new rpc with nonce: " << newVector.nonce << " to: " << handle << endl;
673  rpcs[handle] = newVector;
674  } else {
675  EV << "[IterativeLookup::sendRpc()]\n"
676  << " RPC already sent...not sent again"
677  << endl;
678  delete call;
679  }
680 
681  // register info
682  rpcs[handle].push_back(info);
683 }
684 
685 //----------------------------------------------------------------------------
686 //- AbstractLookup implementation --------------------------------------------
687 //----------------------------------------------------------------------------
688 
689 void IterativeLookup::lookup(const OverlayKey& key, int numSiblings,
690  int hopCountMax, int retries, LookupListener* listener)
691 {
692  EV << "[IterativeLookup::lookup() @ " << overlay->overlay->getThisNode().getIp()
693  << " (" << overlay->overlay->getThisNode().getKey().toString(16) << ")]\n"
694  << " Lookup of key " << key
695  << endl;
696 
697  // check flags
698  if (finished || running)
699  return;
700 
701  // set params
702  this->key = key;
703  this->numSiblings = numSiblings;
704  this->hopCountMax = hopCountMax;
705  this->listener = listener;
706  this->retries = retries;
707 
709  && (numSiblings > config.redundantNodes)) {
710  throw cRuntimeError("IterativeLookup::lookup(): "
711  "With EXHAUSTIVE_ITERATIVE_ROUTING numRedundantNodes "
712  "must be >= numSiblings!");
713  }
714 
715  // start lookup
716  start();
717 }
718 
720 {
721  // return sibling vector
722  return siblings;
723 }
724 
726 {
727  return success && finished;
728 }
729 
731 {
732  return accumulatedHops;
733 }
734 
736 {
737  this->lookup = lookup;
738  this->hops = 0;
739  this->step = 0;
740  this->pendingRpcs = 0;
741  this->finished = false;
742  this->success = false;
743  this->overlay = lookup->overlay;
744 
745  if (lookup->routingType == EXHAUSTIVE_ITERATIVE_ROUTING) {
746  // need to add some extra space for backup nodes, if we have to
747  // remove failed nodes from the nextHops vector
748  this->nextHops = LookupVector(2*(lookup->config.redundantNodes),
749  lookup);
750  } else {
751  this->nextHops = LookupVector((lookup->config.redundantNodes),
752  lookup);
753  }
754 }
755 
757 {}
758 
760 {
761  if (finished) {
762  return false;
763  }
764 
765  // shall we use all responses, or only
766  // the first one (rpcId == step)?
768  && lookup->config.merge) {
769 
770  return true;
771  }
772 
773  return (rpcId == step);
774 }
775 
777 {
778  if (finished)
779  return;
780 
781  if (simTime() > (lookup->startTime + LOOKUP_TIMEOUT)) {
782  EV << "[IterativePathLookup::handleResponse()]\n"
783  << " Iterative lookup path timed out!"
784  << endl;
785  finished = true;
786  success = false;
787  return;
788  }
789 
790  const NodeHandle& source = msg->getSrcNode();
791  std::map<TransportAddress, NodeHandle>::iterator oldPos;
792  oldPos = oldNextHops.find(source);
793  if (oldPos != oldNextHops.end()) oldNextHops.erase(oldPos);
794 
795  // don't count local hops
796  if (lookup->overlay->getThisNode() != source) {
797  hops++;
798  }
799 
800  lookup->setVisited(source);
801 
802 // if (source.getKey() == lookup->key) {
803 // cout << "received response from destination for key " << lookup->key
804 // << " with isSibling = " << msg->getSiblings() << endl;
805 // }
806 
807  step++;
808 
809  // decrease pending rpcs
810  pendingRpcs--;
811 
812  if (msg->getClosestNodesArraySize() != 0) {
813  // mode: merge or replace
814  if (!lookup->config.merge) {
815  nextHops.clear();
816  }
817  } else {
818  //cout << "findNode() returned 0 nodes!" << endl;
819  }
820 
821  int numNewRpcs = 0;
822 
823  // add new next hops
824  for (uint32_t i=0; i < msg->getClosestNodesArraySize(); i++) {
825  const NodeHandle& handle = msg->getClosestNodes(i);
826 
827  // add NodeHandle to next hops and siblings
828  int pos = add(handle, source);
829 
830  // only send new rpcs if we've learned about new nodes
831  if ((pos >= 0) && (pos < lookup->config.redundantNodes)) {
832  numNewRpcs++;
833  }
834 
835  // check if node was found
836  if ((lookup->numSiblings == 0) && (handle.getKey() == lookup->key)) {
837 
838  lookup->addSibling(handle);
839 
840  // TODO: how do we resume, if the potential sibling doesn't authenticate?
841  finished = true;
842  success = true;
843  return;
844  } else {
845  if (lookup->numSiblings != 0 &&
847  msg->getSiblings()) {
848 
849  //cout << "adding sibling " << handle << endl;
850  lookup->addSibling(handle);
851  }
852  }
853  }
854 
855 #if 0
856  cout << "nextHops.size " << nextHops.size()
857  << " find node response " << msg->getClosestNodesArraySize()
858  << " config " << lookup->config.redundantNodes << endl;
859 
860  cout << "looking for " << lookup->key << endl;
861 
862  for (uint32_t i=0; i < msg->getClosestNodesArraySize(); i++) {
863  cout << "find node " << msg->getClosestNodes(i) << endl;
864  }
865 
866  cout << "next Hops " << nextHops << endl;
867 #endif
868 
869  // check if sibling lookup is finished
871  && msg->getSiblings()
872  && msg->getClosestNodesArraySize() != 0 &&
873  lookup->numSiblings != 0) {
874 
875  finished = true;
876  success = true;
877  return;
878  }
879 
880  // extract find node extension object
881  cPacket* findNodeExt = NULL;
882  if (msg->hasObject("findNodeExt")) {
883  findNodeExt = (cPacket*)msg->removeObject("findNodeExt");
884  }
885 
886  // If config.newRpcOnEveryResponse is true, send a new RPC
887  // even if there was no lookup progress
888  if ((numNewRpcs == 0) && lookup->config.newRpcOnEveryResponse) {
889  numNewRpcs = 1;
890  }
891 
892  // send next rpcs
893  sendRpc(min(numNewRpcs, lookup->config.parallelRpcs), findNodeExt);
894 
895  delete findNodeExt;
896 }
897 
899 {
900  // two alternatives to react on a timeout
902  // always send one new RPC for every timeout
903  sendRpc(1, findNodeExt);
904  } else if (pendingRpcs == 0) {
905  // wait until all RPCs have timed out and then send alpha new RPCs
906  sendRpc(lookup->config.parallelRpcs, findNodeExt);
907  }
908 }
909 
911  const TransportAddress& dest, int rpcId)
912 {
913  if (finished)
914  return;
915 
916  EV << "[IterativePathLookup::handleTimeout()]\n"
917  << " Timeout of RPC " << rpcId
918  << endl;
919 
920  //std::cout << lookup->overlay->getThisNode() << ": Path timeout for node"
921  // << dest << endl;
922 
923  // For exhaustive-iterative remove dead nodes from nextHops vector
924  // (which is also our results vector)
926  && lookup->getDead(dest)) {
928  (dynamic_cast<const NodeHandle&>(dest)).getKey());
929  if (it != nextHops.end()) {
930  nextHops.erase(it);
931  }
932  }
933 
934  std::map<TransportAddress, NodeHandle>::iterator oldPos;
935  oldPos = oldNextHops.find(dest);
936 
937  // decrease pending rpcs
938  pendingRpcs--;
939 
940  cPacket* findNodeExt = NULL;
941  if (msg && msg->hasObject("findNodeExt")) {
942  findNodeExt = static_cast<cPacket*>(
943  msg->removeObject("findNodeExt"));
944  }
945 
946  if (simTime() > (lookup->startTime + LOOKUP_TIMEOUT)) {
947  EV << "[IterativePathLookup::handleTimeout()]\n"
948  << " Iterative lookup path timed out!"
949  << endl;
950  delete findNodeExt;
951  finished = true;
952  success = false;
953  return;
954  }
955 
956  if (oldPos == oldNextHops.end() || (!lookup->config.failedNodeRpcs)) {
957  sendNewRpcAfterTimeout(findNodeExt);
958  delete findNodeExt;
959  } else {
960  if (oldPos->second.isUnspecified()) {
961  // TODO: handleFailedNode should always be called for local
962  // nodes, independent of config.failedNodeRpcs
963  // Attention: currently this method is also called,
964  // if a node responded and the path doesn't accept a message
965  FindNodeCall* findNodeCall = dynamic_cast<FindNodeCall*>(msg);
966  // answer was from local findNode()
967 
968  if (findNodeCall && lookup->overlay->handleFailedNode(dest)) {
969  NodeVector* retry = lookup->overlay->findNode(
970  findNodeCall->getLookupKey(), -1, lookup->numSiblings, msg);
971 
972  for (NodeVector::iterator i = retry->begin(); i != retry->end(); i++) {
974  }
975 
976  delete retry;
977  }
978 
979  sendNewRpcAfterTimeout(findNodeExt);
980  delete findNodeExt;
981 
982  } else {
983  FailedNodeCall* call = new FailedNodeCall("FailedNodeCall");
984  call->setFailedNode(dest);
985  call->setBitLength(FAILEDNODECALL_L(call));
986  if (findNodeExt) {
987  call->addObject(findNodeExt);
988  call->addBitLength(findNodeExt->getBitLength());
989  }
991  lookup->overlay->sendUdpRpcCall(oldPos->second, call, NULL,
992  -1, 0, -1, lookup);
993  }
994  }
995 }
996 
998  cPacket* findNodeExt, bool retry)
999 {
1000  if (finished) {
1001  return;
1002  }
1003 
1004  std::map<TransportAddress, NodeHandle>::iterator oldPos;
1005  for (oldPos = oldNextHops.begin(); oldPos != oldNextHops.end(); oldPos++) {
1006  if ((! oldPos->second.isUnspecified()) &&
1007  (oldPos->second == src)) break;
1008  }
1009 
1010  if (oldPos == oldNextHops.end()) {
1011  return;
1012  }
1013 
1014  std::map<TransportAddress, NodeHandle>::iterator oldSrcPos =
1015  oldNextHops.find(src);
1016  const NodeHandle* oldSrc = &NodeHandle::UNSPECIFIED_NODE;
1017 
1018  if (oldSrcPos != oldNextHops.end()) {
1019  oldSrc = &(oldSrcPos->second);
1020  }
1021 
1022  if (retry) {
1023  // FIXME: This is needed for a node to be asked again when detecting
1024  // a failed node. It could pose problems when parallel lookups and
1025  // failed node recovery are both needed at the same time!
1026  lookup->setVisited(src, false);
1027 
1028  nextHops.add(LookupEntry(src, *oldSrc, false));
1029  }
1030 
1031  oldNextHops.erase(oldPos);
1032 
1033  sendNewRpcAfterTimeout(findNodeExt);
1034 }
1035 
1036 void IterativePathLookup::sendRpc(int num, cPacket* findNodeExt)
1037 {
1038  // path finished? yes -> quit
1039  if (finished)
1040  return;
1041 
1042  // check for maximum hop count
1043  if (lookup->hopCountMax && (hops >= lookup->hopCountMax)) {
1044  EV << "[IterativePathLookup::sendRpc()]\n"
1045  << " Max hop count exceeded - lookup failed!"
1046  << endl;
1047  //cout << "[IterativePathLookup::sendRpc()]\n"
1048  // << " Max hop count exceeded - lookup failed!"
1049  // << endl;
1050 
1051  finished = true;
1052  success = false;
1053 
1054  return;
1055  }
1056 
1057  // if strictParallelRpcs is true, limit concurrent in-flight requests
1058  // to config.parallelRpcs
1060  num = min(num, lookup->config.parallelRpcs - pendingRpcs);
1061  }
1062 
1063  // try all remaining nodes
1064  if ((num == 0) && (pendingRpcs == 0)
1066  num = lookup->config.parallelRpcs;
1067  //cout << "trying all remaining nodes ("
1068  // << lookup->numSiblings << ")" << endl;
1069  }
1070 
1071  // send rpc messages
1072  int i = 0;
1073  for (LookupVector::iterator it = nextHops.begin();
1074  ((num > 0) && (i < lookup->config.redundantNodes)
1075  && (it != nextHops.end())); it++, i++) {
1076 
1077  // ignore nodes to which we've already sent an RPC
1078  if (it->alreadyUsed || lookup->getDead(it->handle)) continue;
1079 
1080  // check if node has already been visited? no ->
1081  // TODO: doesn't work with Broose
1082  if ((!lookup->config.visitOnlyOnce) || (!lookup->getVisited(it->handle))) {
1083  // send rpc to node increase pending rpcs
1084  pendingRpcs++;
1085  num--;
1086  FindNodeCall* call = lookup->createFindNodeCall(findNodeExt);
1087  lookup->sendRpc(it->handle, call, this, step);
1088  oldNextHops[it->handle] = it->source;
1089 
1090  //cout << "Sending RPC to " << it->handle
1091  // << " ( " << num << " more )"
1092  // << " thisNode = " << lookup->overlay->getThisNode().getKey() << endl;
1093 
1094  // mark node as already used
1095  it->alreadyUsed = true;
1096  } else {
1097  //EV << "[IterativePathLookup::sendRpc()]\n"
1098  // << " Last next hop ("
1099  // << it->handle
1100  // << ") already visited."
1101  // << endl;
1102 
1103 // std::cout << "visited:" << std::endl;
1104 // for (TransportAddress::Set::iterator it = lookup->visited.begin();
1105 // it != lookup->visited.end(); it++)
1106 // std::cout << *it << std::endl;
1107 //
1108 // std::cout << "nextHops:" << std::endl;
1109 // for (NodePairVector::iterator it = nextHops.begin();
1110 // it != nextHops.end(); it++)
1111 // std::cout << it->first << std::endl;
1112  }
1113  }
1114 
1115  // no rpc sent, no pending rpcs?
1116  // -> failed for normal lookups
1117  // -> exhaustive lookups are always successful
1118  if (pendingRpcs == 0) {
1120  int i = 0;
1121  for (LookupVector::iterator it = nextHops.begin();
1122  ((i < lookup->config.redundantNodes)
1123  && (it != nextHops.end())); it++, i++) {
1124  lookup->addSibling(it->handle);
1125  }
1126 
1127  success = true;
1128  } else {
1129  success = false;
1130  //cout << "failed nextHops for key " << lookup->key << endl;
1131  //cout << nextHops << endl;
1132  EV << "[IterativePathLookup::sendRpc() @ " << overlay->getThisNode().getIp()
1133  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
1134  << " Path failed (no further nodes to query) "
1135  << endl;
1136  }
1137 
1138  finished = true;
1139  }
1140 }
1141 
1142 int IterativePathLookup::add(const NodeHandle& handle, const NodeHandle& source)
1143 {
1144  if (lookup->config.merge) {
1145  return nextHops.add(LookupEntry(handle, source, false));
1146  } else {
1147  nextHops.push_back(LookupEntry(handle, source, false));
1148  return (nextHops.size() - 1);
1149  }
1150 }
1151