OverSim
NeighborCache.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
25 #include <cassert>
26 
27 #include <TransportAddress.h>
28 #include <NodeHandle.h>
29 #include <PeerInfo.h>
30 #include <GlobalStatisticsAccess.h>
32 #include <CoordMessages_m.h>
33 #include <GlobalNodeListAccess.h>
34 #include <hashWatch.h>
35 #include <BootstrapList.h>
36 #include <DiscoveryMode.h>
37 
38 #include "NeighborCache.h"
39 #include <GlobalViewBuilder.h>
40 #include <UnderlayConfigurator.h>
41 
42 
43 const std::vector<double> NeighborCache::coordsDummy;
44 
45 std::ostream& operator<<(std::ostream& os,
47 {
49  os << entry.rtt;
50  } else {
51  if (entry.rttState == NeighborCache::RTTSTATE_TIMEOUT) os << "TIMEOUT";
52  else if (entry.rttState == NeighborCache::RTTSTATE_UNKNOWN) os << "UNKNOWN";
53  else if (entry.rttState == NeighborCache::RTTSTATE_WAITING) os << "WAITING";
54  }
55  os << " (inserted: " << entry.insertTime;
56 
57  os << ", #contexts: "
58  << entry.waitingContexts.size();
59 
60  if (!entry.nodeRef.isUnspecified()) os << ", <KEY>";
61 
62 
63  //TODO entry.coordsInfo
64 
65  os << ")";
66 
67  return os;
68 }
69 
70 
72 
74 {
75  if (stage == MAX_STAGE_COMPONENTS) {
76  neighborCache.clear();
78 
79  enableNeighborCache = par("enableNeighborCache");
80  rttExpirationTime = par("rttExpirationTime");
81  maxSize = par("maxSize");
82  //doDiscovery = par("doDiscovery");
83  collectClosestNodes = par("collectClosestNodes");
84  ncsPiggybackOwnCoords = par("ncsPiggybackOwnCoords");
85  useNcsForTimeout = par("useNcsForTimeout");
86 
87  // set default query types
88  std::string temp = par("defaultQueryType").stdstringValue();
89  if (temp == "exact")
91  else if (temp == "exact_timeout")
93  else if (temp == "available")
95  else if (temp == "estimated")
97  else throw cRuntimeError((std::string("Wrong query type: ")
98  + temp).c_str());
99 
100  temp = par("defaultQueryTypeI").stdstringValue();
101  if (temp == "available")
103  else if (temp == "estimated")
105  else throw cRuntimeError((std::string("Wrong query type (I): ")
106  + temp).c_str());
107 
108  temp = par("defaultQueryTypeQ").stdstringValue();
109  if (temp == "exact")
111  else if (temp == "exact_timeout")
113  else if (temp == "query")
115  else throw cRuntimeError((std::string("Wrong query type (Q): ")
116  + temp).c_str());
117 
118  temp = par("ncsType").stdstringValue();
119  if (temp == "none") ncs = NULL;
120  else if (temp == "vivaldi") ncs = new Vivaldi();
121  else if (temp == "svivaldi") ncs = new SVivaldi();
122  else if (temp == "gnp") ncs = new Nps(); //TODO
123  else if (temp == "nps") ncs = new Nps();
124  else if (temp == "simple") ncs = new SimpleNcs();
125  else if (temp == "simpleunderlayncs") ncs = new SimpleUnderlayNCS(); //TODO
126  else throw cRuntimeError((std::string("Wrong NCS type: ")
127  + temp).c_str());
128 
129  if (par("doDiscovery")) {
130  if (collectClosestNodes == 0) {
131  throw cRuntimeError("Discovery Mode with collectClosestNodes = 0");
132  }
134  discoveryMode->init(this);
135  discoveryFinished = false;
136  } else {
137  discoveryMode = NULL;
138  }
139  if (collectClosestNodes > 0) {
142  proxComparator, NULL,
144  WATCH_VECTOR(*closestNodes);
145  } else {
146  proxComparator = NULL;
147  closestNodes = NULL;
148  }
149 
150 
151  globalViewBuilder = NULL;
152  treeManager = NULL;
153 
154  if(par("treeMgmtEnableTreeManagement")) {
155  treeManager = new TreeManagement();
156  treeManager->init(this);
157 
158  if(par("gvbEnableGlobalViewBuilder")) {
161  capReqFinished = false;
162  }
163 
164  treeManager->addMsgClient("ViewBuilder", globalViewBuilder);
165  }
166 
167 
170 
171  cbrTimer = new cMessage("cbrTimer");
172 
173  misses = 0;
174  hits = 0;
175 
176  rttHistory = par("rttHistory");
177  timeoutAccuracyLimit = par("timeoutAccuracyLimit");
178 
179  numMsg = 0;
180  absoluteError = 0.0;
181  relativeError = 0.0;
182  numRttErrorToHigh = 0;
183  numRttErrorToLow = 0;
184  lastAbsoluteErrorPerNode.clear();
185  WATCH(absoluteError);
186  WATCH(relativeError);
187  WATCH(numMsg);
188  } else if (stage == MIN_STAGE_TIER_1) {
189  if (ncs) ncs->init(this);
190  }
191 }
192 
193 
195 {
196  if ((misses + hits) != 0) {
198  ->addStdDev("NeighborCache: Ping hit rate",
199  ((double)hits / (double)(misses + hits)));
200  }
201 
202 
203  if (ncs && numMsg > 0) {
204  globalStatistics->addStdDev("NeighborCache: NCS absolute RTT error",
205  absoluteError / (double)numMsg);
206  globalStatistics->addStdDev("NeighborCache: NCS relative RTT error",
207  relativeError / (double)numMsg);
208  globalStatistics->addStdDev("NeighborCache: number of messages/s",
209  numMsg / SIMTIME_DBL(simTime() - creationTime));
210  globalStatistics->addStdDev("NeighborCache: NCS percentage of RTT errors to high",
211  (double)numRttErrorToHigh / (double)numMsg);
212  globalStatistics->addStdDev("NeighborCache: NCS percentage of RTT errors to low",
213  (double)numRttErrorToLow / (double)numMsg);
214  }
215 
216  if(treeManager) {
218  }
219 }
220 
221 
223 {
224  delete ncs;
225  delete treeManager;
226  delete globalViewBuilder;
227  delete discoveryMode;
228  delete closestNodes;
229  delete proxComparator;
230  cancelAndDelete(cbrTimer);
231 }
232 
234  cPolymorphic* context,
235  ProxListener* rpcListener,
236  int rpcId)
237 {
238  if (!enableNeighborCache) return false;
239  if (neighborCache.count(handle) == 0) {
240  NeighborCacheEntry& entry = neighborCache[handle];
241 
242  entry.insertTime = simTime();
243  entry.rttState = RTTSTATE_WAITING;
244 
245  neighborCacheExpireMap.insert(std::make_pair(entry.insertTime,
246  handle));
247 
248  cleanupCache();
249 
250  assert(neighborCache.size() == neighborCacheExpireMap.size());
251  return false;
252  } else {
253  NeighborCacheEntry& entry = neighborCache[handle];
254 
255  // waiting?
256  if (entry.rttState == RTTSTATE_WAITING) {
257  WaitingContext temp(rpcListener, context, rpcId);
258  entry.waitingContexts.push_back(temp);
259 
260  return true;
261  } else {
262  if (entry.waitingContexts.size() > 0) {
263  throw cRuntimeError("not waiting for response,"
264  " but additional contexts found!");
265  }
266 
267  updateEntry(handle, entry.insertTime);
268 
269  entry.rttState = RTTSTATE_WAITING;
270  entry.insertTime = simTime();
271 
272  return false;
273  }
274  }
275 }
276 
277 
279 {
280  if (neighborCache.count(handle) == 0)
281  throw cRuntimeError("NeighborCache error!");
282  WaitingContexts temp = neighborCache[handle].waitingContexts;
283  neighborCache[handle].waitingContexts.clear();
284 
285  return temp;
286 }
287 
288 
290 {
291  //if (!enableNeighborCache) return;
292 
293  if (neighborCache.count(handle) == 0) {
294  NeighborCacheEntry& entry = neighborCache[handle];
295 
296  entry.insertTime = simTime();
297  entry.rttState = RTTSTATE_TIMEOUT;
298 
299  neighborCacheExpireMap.insert(std::make_pair(entry.insertTime,
300  handle));
301  cleanupCache();
302  } else {
303  NeighborCacheEntry& entry = neighborCache[handle];
304 
305  updateEntry(handle, entry.insertTime);
306 
307  entry.insertTime = simTime();
308  entry.rttState = RTTSTATE_TIMEOUT;
309 
310  WaitingContexts waitingContexts = getNodeContexts(handle);
311 
312  for (uint32_t i = 0; i < waitingContexts.size(); ++i) {
313  assert(waitingContexts[i].proxListener || !waitingContexts[i].proxContext);
314  if (waitingContexts[i].proxListener) {
315  waitingContexts[i].proxListener->proxCallback(handle,
316  waitingContexts[i].id,
317  waitingContexts[i].proxContext,
319  }
320  }
321  }
322  assert(neighborCache.size() == neighborCacheExpireMap.size());
323 }
324 
325 
326 void NeighborCache::updateNode(const NodeHandle& add, simtime_t rtt,
327  const NodeHandle& srcRoute,
328  AbstractNcsNodeInfo* ncsInfo)
329 {
330  Enter_Method_Silent();
331 
332  thisNode = overlay->getThisNode(); // Daniel TODO
333 
334  EV << "[NeighborCache::updateNode() @ " << thisNode.getIp()
335  << " (" << thisNode.getKey().toString(16) << ")]\n"
336  << " inserting rtt(" << SIMTIME_DBL(rtt) << ") of node " << add.getIp()
337  << endl;
338 
339  if (rtt <= 0) {
340  delete ncsInfo;
341  return; //TODO broose
342  }
343 
344  bool deleteInfo = false;
345 
346  //if (enableNeighborCache) {
347  if (neighborCache.count(add) == 0) {
348 
349  NeighborCacheEntry& entry = neighborCache[add];
350 
351  if (closestNodes) {
352  //std::cout << "closestNodes->add(ProxTransportAddress(" << add << ", rtt));" << std::endl;
354  }
355 
356  entry.insertTime = simTime();
357  entry.rtt = rtt;
358  entry.rttState = RTTSTATE_VALID;
359  entry.nodeRef = add;
360  entry.coordsInfo = ncsInfo;
361  entry.lastRtts.push_back(rtt);
362 
363  neighborCacheExpireMap.insert(std::make_pair(entry.insertTime, add));
364 
365  cleanupCache();
366  } else {
367 
368  updateEntry(add, neighborCache[add].insertTime);
369 
370  NeighborCacheEntry& entry = neighborCache[add];
371 
372  if (closestNodes) {
373  //std::cout << "closestNodes->add(ProxTransportAddress(" << add << ", rtt));" << std::endl;
375  }
376 
377  entry.insertTime = simTime();
378  if (entry.rttState != RTTSTATE_VALID || entry.rtt > rtt)
379  entry.rtt = rtt;
380  entry.rttState = RTTSTATE_VALID;
381  entry.nodeRef = add;
382 
383  entry.lastRtts.push_back(rtt);
384  if (entry.lastRtts.size() > rttHistory) {
385  entry.lastRtts.pop_front();
386  }
387 
388  if (ncsInfo) {
389  if (entry.coordsInfo) {
390  entry.coordsInfo->update(*ncsInfo);
391  deleteInfo = true;
392  } else {
393  entry.coordsInfo = ncsInfo;
394  }
395  }
396 
397  WaitingContexts waitingContexts = getNodeContexts(add);
398 
399  for (uint32_t i = 0; i < waitingContexts.size(); ++i) {
400  if (waitingContexts[i].proxListener) {
401  waitingContexts[i].proxListener
402  ->proxCallback(add,
403  waitingContexts[i].id,
404  waitingContexts[i].proxContext,
405  Prox(rtt, 1));
406  }
407  }
408  }
409  assert(neighborCache.size() == neighborCacheExpireMap.size());
410 
411  recordNcsEstimationError(add, rtt);
412 
413 
414  if (ncs) ncs->processCoordinates(rtt, *ncsInfo);
415 
416  // delete ncsInfo if old info is used
417  if (deleteInfo) delete ncsInfo;
418 }
419 
420 
422  AbstractNcsNodeInfo* ncsInfo)
423 {
424  Enter_Method_Silent();
425 
426  EV << "[NeighborCache::updateNcsInfo() @ " << thisNode.getIp()
427  << " (" << thisNode.getKey().toString(16) << ")]\n"
428  << " inserting new NcsInfo of node " << node.getIp()
429  << endl;
430 
431  if (neighborCache.count(node) == 0) {
432  NeighborCacheEntry& entry = neighborCache[node];
433 
434  entry.insertTime = simTime();
435  entry.coordsInfo = ncsInfo;
436 
437  neighborCacheExpireMap.insert(std::make_pair(entry.insertTime, node));
438 
439  cleanupCache();
440  } else {
441  updateEntry(node, neighborCache[node].insertTime);
442 
443  NeighborCacheEntry& entry = neighborCache[node];
444 
445  if (ncsInfo) {
446  if (entry.coordsInfo) {
447  entry.coordsInfo->update(*ncsInfo);
448  delete ncsInfo;
449  } else {
450  entry.coordsInfo = ncsInfo;
451  }
452  }
453  }
454 }
455 
456 
458 {
459  // cache disabled or entry not there
460  if (!enableNeighborCache ||
461  add.isUnspecified() ||
462  (neighborCache.count(add) == 0)) {
463  misses++;
464  return std::make_pair(0.0, RTTSTATE_UNKNOWN);
465  }
466 
467  NeighborCacheEntry &entry = neighborCache[add];
468 
469  if (entry.rttState == RTTSTATE_WAITING ||
470  entry.rttState == RTTSTATE_UNKNOWN)
471  return std::make_pair(entry.rtt, entry.rttState);
472  // entry expired
473  if ((simTime() - entry.insertTime) >= rttExpirationTime) {
474  entry.rttState = RTTSTATE_UNKNOWN;
475  return std::make_pair(entry.rtt, RTTSTATE_UNKNOWN);
476  }
477  hits++;
478  assert(!(entry.rtt == 0.0 && entry.rttState == RTTSTATE_VALID));
479  return std::make_pair(entry.rtt, entry.rttState);
480 }
481 
482 
484 {
485  if (neighborCache.count(add) == 0) {
486  throw cRuntimeError("NeighborCache.cc: getNodeHandle was asked for "
487  "a non-existent node reference.");
488  }
489  return neighborCache[add].nodeRef;
490 }
491 
493 {
494  if (neighborCache.count(address) == 0) {
495  throw cRuntimeError("NeighborCache.cc: getNodeAge was asked for "
496  "a non-existent address.");
497  }
498  return neighborCache[address].insertTime;
499 }
500 
502 {
503  bool result = false;
504  uint32_t size = neighborCache.size();
505 
506  if (size > maxSize) {
508  for (uint32_t i = 0; i < (size - (maxSize / 2)); ++i) {
509  it = neighborCacheExpireMap.begin();
510  if ((neighborCache[it->second].rttState == RTTSTATE_WAITING) ||
511  (neighborCache[it->second].insertTime == simTime())) {
512  break;
513  }
514  neighborCache.erase(it->second);
515  neighborCacheExpireMap.erase(it);
516  result = true;
517  }
518  }
519  assert(neighborCache.size() == neighborCacheExpireMap.size());
520  return result;
521 }
522 
523 
525  simtime_t insertTime)
526 {
528  neighborCacheExpireMap.lower_bound(insertTime);
529  while (it->second != address) ++it;
530  neighborCacheExpireMap.erase(it);
531  neighborCacheExpireMap.insert(std::make_pair(simTime(),
532  address));
533  assert(neighborCache.size() == neighborCacheExpireMap.size());
534 }
535 
536 //TODO
538 {
540  simtime_t nearestNodeRtt = MAXTIME;
542 
543  for(it = neighborCache.begin(); it != neighborCache.end(); it++ ) {
544  if (it->second.rtt < nearestNodeRtt &&
545  it->second.rtt > 0 /*&&
546  it->second.coordsInfo.npsLayer < maxLayer+1 &&
547  it->second.coordsInfo.npsLayer > 0*/) {
548  nearestNode.setIp(it->first.getIp());
549  nearestNodeRtt = it->second.rtt;
550  nearestNode.setPort(it->second.nodeRef.getPort());
551  }
552  }
553 
554  return nearestNode;
555 }
556 
557 
558 std::vector<TransportAddress>* NeighborCache::getClosestNodes(uint8_t number)
559 {
560  std::vector<TransportAddress>* nodes =
561  new std::vector<TransportAddress>();
562 
563  for (uint8_t i = 0; (i < number && i < closestNodes->size()); ++i) {
564  nodes->push_back((*closestNodes)[i]);
565  }
566 
567  return nodes;
568 }
569 
570 std::vector<TransportAddress>* NeighborCache::getSpreadedNodes(uint8_t number)
571 {
572  std::vector<TransportAddress>* nodes =
573  new std::vector<TransportAddress>;
574 
576  for (uint8_t i = 0; (i < number && i < neighborCache.size()); ++i) {
577  nodes->push_back((it++)->first);
578  }
579 
580  /*if (dynamic_cast<BasePastry*>(overlay)) {
581  BasePastry* pastry = static_cast<BasePastry*>(overlay);
582  std::vector<TransportAddress>* spreadedNodes = pastry->get
583  }*/
584 
585  return nodes;
586 }
587 
588 
590 {
591  if (neighborCache.count(node) > 0) return true;
592  return false;
593 }
594 
595 
596 // Vivaldi stuff
598 {
599  /*
600  //TODO retain and consider the last measured RTTs not the last error(s)
601  double absoluteDiff = 0.0;
602  uint32_t numNeighbors = 0;
603 
604  for (std::map<TransportAddress, std::vector<double> >::iterator it =
605  lastAbsoluteErrorPerNode.begin(); it != lastAbsoluteErrorPerNode.end();
606  it++) {
607  double tempAbsoluteDiff = 0.0;
608  for (uint32_t i = 0; i < it->second.size(); i++) {
609  tempAbsoluteDiff += it->second.at(i);
610  }
611  absoluteDiff += (tempAbsoluteDiff / it->second.size());
612  numNeighbors++;
613  }
614 
615  absoluteDiff /= numNeighbors;
616  return (absoluteDiff > 1.0) ? 1.0 : absoluteDiff;
617  */
618 
619  // old version
620  //if (neighborCache.size() < 2 || sampleSize == 0) return 1.0;
621 
622  double absoluteDiff = 0;
623  uint32_t numNeighbors = 0;
624  uint32_t sampleSize = 10; //test
625 
626  for (std::map<simtime_t, TransportAddress>::reverse_iterator it =
627  neighborCacheExpireMap.rbegin();
628  it != neighborCacheExpireMap.rend() &&
629  numNeighbors < sampleSize; ++it) {
630  NeighborCacheEntry& cacheEntry = neighborCache[it->second];
631 
632 
633  double dist = ncs->getOwnNcsInfo().getDistance(*cacheEntry.coordsInfo);
634 
635  if (dist != 0 && cacheEntry.rttState == RTTSTATE_VALID) {
636  double predictionError = fabs(dist - SIMTIME_DBL(cacheEntry.rtt));
637 
638  //test: error weighted
639  //if (it->second.coordErr < 1) {
640  // predictionError /= it->second.coordErr;
641  //}
642  //test: age weighted
643  //if ((simTime() - it->second.insertTime) > 1) {
644  // predictionError /= (simTime() - it->second.insertTime);
645  //}
646 
647  numNeighbors++;
648  absoluteDiff += predictionError;
649  }
650  }
651  assert(numNeighbors != 0);
652  absoluteDiff /= numNeighbors;
653 
654  return (absoluteDiff > 1.0) ? 1.0 : absoluteDiff;
655 }
656 
657 
659 {
660  // bootstrap list is ready
661  if (readyMsg->getReady() && readyMsg->getComp() == BOOTSTRAPLIST_COMP) {
662  if (discoveryMode) {
664  } else {
665  // inform overlay
666  prepareOverlay();
667  }
668  } else if (readyMsg->getReady() && readyMsg->getComp() == OVERLAY_COMP) {
669  // overlay is ready, build up tree
671  if(treeManager) {
672  //std::cout << thisNode << "treeManager->startTreeBuilding()" << std::endl;
674  if (globalViewBuilder) {
677  }
678  }
679  } //else {
680  //TODO
681  //}
682  delete readyMsg;
683 }
684 /*
685 void NeighborCache::callbackDiscoveryFinished(const TransportAddress& nearNode)
686 {
687  // handle bootstrapNode
688  getParentModule()->bubble("Discovery Mode finished!");
689  discoveryFinished = true;
690  //sendReadyMessage();
691 
692  RECORD_STATS(
693  globalStatistics->addStdDev("NeighborCache: Discovery Mode Improvement",
694  discoveryMode->getImprovement());
695  if (dynamic_cast<Vivaldi*>(ncs)) {
696  globalStatistics->addStdDev("NeighborCache: Vivaldi-Error after Discovery Mode",
697  static_cast<Vivaldi*>(ncs)->getOwnError());
698  }
699  );
700 
701  prepareOverlay();
702 }
703 */
704 
706 {
707  if ((!discoveryMode || discoveryMode->isFinished()) &&
709  (!ncs || ncs->isReady())) {
710  if (ncs && coordBasedRouting) {
713  ((dynamic_cast<Nps*>(ncs) || dynamic_cast<SimpleNcs*>(ncs) ||
714  (dynamic_cast<SVivaldi*>(ncs) &&
715  static_cast<SVivaldi*>(ncs)->getLoss() > 0.95)))) { //TODO
716  setCbrNodeId();
718  return;
719  } else {
722  scheduleAt(uniform(coordBasedRouting->getChangeIdStart(),
724  cbrTimer); //TODO
725  }
726  }
727  }
729  return;
730  }
731  //std::cout << "nc" << std::endl;
732 
733  if ((!discoveryMode || discoveryMode->isFinished()) &&
735  (ncs && ncs->isReady())) {
736  const TransportAddress& bootstrapNode =
738  if (globalViewBuilder && !bootstrapNode.isUnspecified() && true) { //TODO
739  globalViewBuilder->sendCapRequest(bootstrapNode);
740  return;
741  }
742  // first node
744  scheduleAt(uniform(coordBasedRouting->getChangeIdStart(),
746  cbrTimer); //TODO
747  }
749  }
750 }
751 
752 
754 {
755  const std::vector<double>& coords = ncs->getOwnNcsInfo().getCoords();
756  const AP* cap = (globalViewBuilder ? globalViewBuilder->getCAP() : NULL);
760  cap));
761 
762  EV << "[NeighborCache::setCbrNodeId() @ "
763  << thisNode.getIp()
764  << " (" << thisNode.getKey().toString(16) << ")]"
765  << "\n -> nodeID ( 2): "
766  << thisNode.getKey().toString(2)
767  << "\n -> nodeID (16): "
768  << thisNode.getKey().toString(16) << endl;
769  /*
770  std::cout << "[NeighborCache::setCbrNodeId() @ "
771  << thisNode.getIp()
772  << " (" << thisNode.getKey().toString(16) << ")]"
773  << "\n -> nodeID ( 2): "
774  << thisNode.getKey().toString(2)
775  << "\n -> nodeID (16): "
776  << thisNode.getKey().toString(16) << std::endl;*/
777 }
778 
779 
781 {
782  if (msg == cbrTimer) {
783  // TODO duplicate code
784  if (globalViewBuilder->isCapValid() &&
785  (dynamic_cast<SimpleNcs*>(ncs) || (dynamic_cast<SVivaldi*>(ncs) &&
786  static_cast<SVivaldi*>(ncs)->getLoss() > 0.95 &&
787  static_cast<SVivaldi*>(ncs)->getOwnError() < 0.2))) { //TODO
788  setCbrNodeId();
789  //std::cout << thisNode.getIp() << " NeighborCache::handleTimerEvent(): setCbrNodeId(); overlay->join(thisNode.getKey())" << std::endl;
791  return;
792  } else {
793  //assert(false);
794  overlay->join();
795  return;
796  //scheduleAt(simTime() + uniform(0, 5000), cbrTimer); //TODO
797  }
798  return;
799  }
800  if (ncs) {
801  ncs->handleTimerEvent(msg);
802  }
803 
804  if (treeManager) {
806  }
807 
808  if (globalViewBuilder) {
810  }
811 }
812 
813 
815 {
816  bool messageHandled = false;//TODO
817 
818  if (ncs && !messageHandled) {
819  messageHandled = ncs->handleRpcCall(msg); //TODO
820  }
821  if (discoveryMode && !messageHandled) {
822  messageHandled = discoveryMode->handleRpcCall(msg);
823  }
824 
825  if (treeManager && !messageHandled) {
826  messageHandled = treeManager->handleRpcCall(msg);
827 
828  if(globalViewBuilder && !messageHandled) {
829  messageHandled = globalViewBuilder->handleRpcCall(msg);
830  }
831  }
832 
833  return messageHandled;
834 }
835 
836 void NeighborCache::pingResponse(PingResponse* response, cPolymorphic* context,
837  int rpcId, simtime_t rtt) {
838  if(treeManager) {
839  //treeManager->pingResponse(response, context, rpcId, rtt);
840  }
841 
842 }
843 
845  cPolymorphic* context, int rpcId) {
846  if(treeManager) {
847  //treeManager->pingTimeout(call, dest, context, rpcId);
848  }
849 }
850 
851 
852 // Prox stuff
855  int rpcId,
856  ProxListener *listener,
857  cPolymorphic *contextPointer)
858 {
859  Enter_Method("getProx()");
860 
861  if (!enableNeighborCache) {
862  queryProx(node, rpcId, listener, contextPointer);
863  return Prox::PROX_WAITING;
864  }
865 
866  if (node == overlay->getThisNode()) {
867  delete contextPointer;
868  return Prox::PROX_SELF;
869  }
870 
871  if (node.isUnspecified()) {
872  throw cRuntimeError("Prox queried for undefined TransportAddress!");
873  delete contextPointer;
874  return Prox::PROX_TIMEOUT;
875  }
876 
877  bool sendQuery = false;
878  Prox result = Prox::PROX_UNKNOWN;
879  Rtt rtt = getNodeRtt(node);
880 
881  //countGetProxTotal++;
882  if (type == NEIGHBORCACHE_DEFAULT) type = defaultQueryType;
883  else if (type == NEIGHBORCACHE_DEFAULT_IMMEDIATELY) type = defaultQueryTypeI;
884  else if (type == NEIGHBORCACHE_DEFAULT_QUERY) type = defaultQueryTypeQ;
885 
886  switch(type) {
887  case NEIGHBORCACHE_EXACT:
888  if (rtt.second == RTTSTATE_TIMEOUT) {
889  if (getNodeAge(node) == simTime()) {
890  // right now, we got a time-out, so no new ping is sent
891  result = Prox::PROX_TIMEOUT;
892  } else{
893  // if timeout, return unknown???, and send a query!
894  result = Prox::PROX_WAITING;
895  sendQuery = true;
896  }
897  } else if (rtt.second == RTTSTATE_WAITING) {
898  // if a query was sent, return WAITING
899  result = Prox::PROX_WAITING;
900  sendQuery = true; //just inserting a context, no real ping is sent
901  } else if (rtt.second == RTTSTATE_UNKNOWN) {
902  // if no entry known, send a query and return UNKNOWN
903  result = Prox::PROX_WAITING; //???
904  sendQuery = true;
905  } else {
906  // else, return whatever we have
907  result = rtt.first;
908  }
909  break;
911  if (rtt.second == RTTSTATE_TIMEOUT) {
912  // if timeout, return that
913  result = Prox::PROX_TIMEOUT;
914  } else if (rtt.second == RTTSTATE_WAITING) {
915  // if a query was sent, return WAITING;
916  result = Prox::PROX_WAITING;
917  sendQuery = true; //just inserting a context, no real ping is sent
918  } else if (rtt.second == RTTSTATE_UNKNOWN) {
919  // if no entry known, send a query and return UNKNOWN
920  result = Prox::PROX_WAITING; //???
921  sendQuery = true;
922  } else {
923  // else, return whatever we have
924  result = rtt.first;
925  }
926  break;
928  if (rtt.second == RTTSTATE_TIMEOUT) {
929  // if timeout, return that
930  result = Prox::PROX_TIMEOUT;
931  } else if (rtt.second == RTTSTATE_WAITING) {
932  // if a query was sent, return an estimate
933  result = estimateProx(node);
934  } else if (rtt.second == RTTSTATE_UNKNOWN) {
935  // if no entry known, return an estimate
936  result = estimateProx(node);
937  } else {
938  // else return whatever we have
939  result = rtt.first;
940  }
941  break;
943  if (rtt.second == RTTSTATE_TIMEOUT) {
944  // if timeout, return that.
945  result = Prox::PROX_TIMEOUT;
946  } else if (rtt.second == RTTSTATE_WAITING) {
947  // if a query was sent return WAITING
948  result = Prox::PROX_WAITING;
949  } else if (rtt.second == RTTSTATE_UNKNOWN) {
950  // if a query was sent return WAITING
951  result = Prox::PROX_UNKNOWN;
952  } else {
953  // else return what we have
954  result = rtt.first;
955  }
956  break;
957  case NEIGHBORCACHE_QUERY:
958  // simply send a query and return WAITING
959  result = Prox::PROX_WAITING;
960  sendQuery = true;
961  break;
962  default:
963  throw cRuntimeError("Unknown query type!");
964  break;
965 
966  }
967  if (sendQuery) {
968  if (!insertNodeContext(node, contextPointer, listener, rpcId)) {
969  queryProx(node, rpcId, listener, contextPointer);
970  }
971  } else delete contextPointer;
972 
973  return result;
974 }
975 
977 {
978  Enter_Method("estimateProx()");
979 
980  Rtt rtt = getNodeRtt(node);
981 
982  if (rtt.second != RTTSTATE_UNKNOWN) return rtt.first;
983 
984  if (ncs && neighborCache.count(node)) {
985  return getCoordinateBasedProx(node);
986  }
987 
988  return Prox::PROX_UNKNOWN;
989 }
990 
992  int rpcId,
993  ProxListener *listener,
994  cPolymorphic *contextPointer)
995 {
996  Enter_Method("queryProx()");
997 
998  WaitingContext temp(listener, contextPointer, rpcId);
999 
1000  if (neighborCache.count(node) == 0) {
1001  NeighborCacheEntry& entry = neighborCache[node];
1002 
1003  entry.waitingContexts.push_back(temp);
1004  neighborCacheExpireMap.insert(std::make_pair(entry.insertTime, node));
1005  cleanupCache();
1006  } else {
1007  NeighborCacheEntry& entry = neighborCache[node];
1008  entry.waitingContexts.push_back(temp);
1009  }
1010  assert(neighborCache.size() == neighborCacheExpireMap.size());
1011 
1012  // TODO: this ping traffic is accounted application data traffic!
1013  pingNode(node, -1, 0, NULL, "PING");
1014 }
1015 
1017 {
1018  if (ncs && isEntry(node)) {
1019  const AbstractNcsNodeInfo* temp = getNodeCoordsInfo(node);
1020  if (temp) return ncs->getCoordinateBasedProx(*temp);
1021  }
1022  return Prox::PROX_UNKNOWN;
1023 }
1024 
1025 
1027 {
1028  if (neighborCache.count(node) == 0) {
1029  throw cRuntimeError("NeighborCache.cc: getNodeCoords was asked for "
1030  "a non-existent node reference.");
1031  }
1032  return neighborCache[node].coordsInfo;
1033 }
1034 
1035 
1037  simtime_t rtt)
1038 {
1039  if (!ncs) return;
1040 
1041  // Check if data collection can start
1042  if (!globalStatistics->isMeasuring()) return;
1043 
1044  Prox prox = getCoordinateBasedProx(handle);
1045  if (prox == Prox::PROX_UNKNOWN) return;
1046 
1047  //calculate absolute rtt error of the last message
1048  double tempRttError = prox.proximity - SIMTIME_DBL(rtt);
1049 
1050  /*
1051  std::cout << "prox.proximity = " << prox.proximity
1052  << ", SIMTIME_DBL(rtt) = " << SIMTIME_DBL(rtt)
1053  << ", error = " << tempRttError
1054  << ", relativeError = " << (tempRttError / SIMTIME_DBL(rtt))
1055  << std::endl;
1056  */
1057 
1058  if (tempRttError < 0){
1059  tempRttError *= -1;
1060  ++numRttErrorToLow;
1061  } else ++numRttErrorToHigh;
1062 
1063  numMsg++;
1064  absoluteError += tempRttError;
1065  relativeError += (tempRttError / SIMTIME_DBL(rtt));
1066 
1067  globalStatistics->recordOutVector("NCS: measured RTTs",
1068  SIMTIME_DBL(rtt));
1069  globalStatistics->recordOutVector("NCS: absolute Rtt Error",
1070  tempRttError);
1071  globalStatistics->recordOutVector("NCS: relative Rtt Error",
1072  (tempRttError / SIMTIME_DBL(rtt)));
1073 }
1074 
1075 
1076 std::pair<simtime_t, simtime_t> NeighborCache::getMeanVarRtt(const TransportAddress &node,
1077  bool returnVar)
1078 {
1079  if (neighborCache.count(node) == 0) {
1080  throw cRuntimeError("NeighborCache.cc: getMeanVarRtt was asked for"
1081  "a non-existent node reference.");
1082  }
1083 
1084  uint16_t size = neighborCache[node].lastRtts.size();
1085  if (size == 0) return std::make_pair(-1.0,-1.0);
1086 
1087  simtime_t rttSum = 0;
1088  for (int i = 0; i < size; i++){
1089  rttSum += neighborCache[node].lastRtts[i];
1090  }
1091  simtime_t meanRtt = rttSum / size;
1092  if (!returnVar) {
1093  return std::make_pair(meanRtt, -1.0);
1094  }
1095  if (size == 1) {
1096  return std::make_pair(meanRtt, 0.0);
1097  }
1098 
1099  double sum = 0.0;
1100  for (int i = 0; i < size; i++){
1101  simtime_t tempRtt = neighborCache[node].lastRtts.at(i) - meanRtt;
1102  sum += (SIMTIME_DBL(tempRtt) * SIMTIME_DBL(tempRtt));
1103  }
1104 
1105  return std::make_pair(meanRtt, (sum / size));
1106 }
1107 
1108 
1110 {
1111  simtime_t timeout = getRttBasedTimeout(node);
1112  if (timeout == -1 && useNcsForTimeout && ncs) return getNcsBasedTimeout(node);
1113  return timeout;
1114 }
1115 
1116 
1117 //Calculate timeout with RTT
1119 {
1120  simtime_t timeout = -1;
1121 
1122  // check if an entry is available in NeighborCache
1123  if (isEntry(node)) {
1124  std::pair<simtime_t, simtime_t> temp = getMeanVarRtt(node, true);
1125  simtime_t meanRtt = temp.first;
1126  simtime_t varRtt = temp.second;
1127 
1128  // TODO return value even if node has timed out
1129  if (meanRtt == -1) return -1;
1130  if (varRtt > 0) {
1131  // like TCP
1132  timeout = meanRtt + 4 * varRtt;
1133  } else {
1134  // only one RTT is available
1135  timeout = meanRtt * 1.2;
1136  }
1137  // adjustment
1138  timeout *= RTT_TIMEOUT_ADJUSTMENT;
1139  //if (timeout > SIMTIME_DBL(defaultTimeout)) return -1;
1140  }
1141  return timeout;
1142 }
1143 
1144 //Calculate timeout with NCS
1146 {
1147  double timeout = -1;
1148  Prox prox = Prox::PROX_UNKNOWN;
1149 
1150  // check if an entry is available in NeighborCache
1151  if (isEntry(node)) {
1152  prox = getProx(node, NEIGHBORCACHE_ESTIMATED);
1153 
1154  if (prox != Prox::PROX_UNKNOWN && prox != Prox::PROX_TIMEOUT &&
1155  prox.proximity > 0 && prox.accuracy > timeoutAccuracyLimit) {
1156  timeout = prox.proximity + (6 * (1 - prox.accuracy));
1157  timeout += NCS_TIMEOUT_CONSTANT;
1158  } else return -1;
1159 
1160  if (/*timeout > SIMTIME_DBL(defaultTimeout) ||*/ timeout < 0)
1161  return -1;
1162  }
1163  return timeout;
1164 }
1165 
1166 
1168 {
1169  return treeManager;
1170 }
1171 
1173 {
1175  //TODO failed bootstrap nodes are only detected and removed from nc
1176  // if rpcs are used for joining
1177 
1178  // return close node (discovery mode)
1179  if (closestNodes) {
1180  for (uint16_t i = 0; i < closestNodes->size(); ++i) {
1181  if (neighborCache.count((*closestNodes)[i]) > 0 &&
1182  neighborCache[(*closestNodes)[i]].rttState == RTTSTATE_VALID) {
1183  //std::cout << (*closestNodes)[i].getProx().proximity << "\n"<< std::endl;
1184  return (*closestNodes)[i];
1185  }
1186  }
1187  }
1188 
1189  // return known alive node
1190  if (neighborCache.size() > 0) {
1192  while (it != neighborCache.end() &&
1193  it->second.rttState != RTTSTATE_VALID) ++it;
1194  if (it != neighborCache.end()) {
1195  return neighborCache.begin()->first;
1196  }
1197  }
1199 }
1200