OverSim
KBRTestApp.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <IPAddressResolver.h>
25 #include <CommonMessages_m.h>
26 #include <GlobalStatistics.h>
27 #include <UnderlayConfigurator.h>
28 #include <GlobalNodeList.h>
29 
30 #include "KBRTestApp.h"
31 #include "KBRTestMessage_m.h"
32 
34 
36 {
37  onewayTimer = NULL;
38  rpcTimer = NULL;
39  lookupTimer = NULL;
40  underlayTimer = NULL;
41 }
42 
44 {
45  cancelAndDelete(onewayTimer);
46  cancelAndDelete(rpcTimer);
47  cancelAndDelete(lookupTimer);
48  cancelAndDelete(underlayTimer);
49 }
50 
52 {
53  if (stage != MIN_STAGE_APP) {
54  return;
55  }
56 
57  kbrOneWayTest = par("kbrOneWayTest");
58  kbrRpcTest = par("kbrRpcTest");
59  kbrLookupTest = par("kbrLookupTest");
60  underlayTest = par("underlayTest");
61 
63  throw cRuntimeError("KBRTestApp::initializeApp(): "
64  "no tests are configured!");
65  }
66 
67  failureLatency = par("failureLatency");
68 
69  testMsgSize = par("testMsgSize");
70  lookupNodeIds = par("lookupNodeIds");
71  mean = par("testMsgInterval");
72  deviation = mean / 10;
73  activeNetwInitPhase = par("activeNetwInitPhase");
74  msgHandleBufSize = par("msgHandleBufSize");
75  onlyLookupInoffensiveNodes = par("onlyLookupInoffensiveNodes");
76 
77  //rpcTimeout = par("rpcTimeout");
78  rpcRetries = par("rpcRetries");
79 
80  numSent = 0;
81  bytesSent = 0;
82  numDelivered = 0;
83  bytesDelivered = 0;
84  numDropped = 0;
85  bytesDropped = 0;
86  WATCH(numSent);
87  WATCH(bytesSent);
88  WATCH(numDelivered);
89  WATCH(bytesDelivered);
90  WATCH(numDropped);
91  WATCH(bytesDropped);
92 
93  numRpcSent = 0;
94  bytesRpcSent = 0;
95  numRpcDelivered = 0;
97  numRpcDropped = 0;
98  bytesRpcDropped = 0;
100  rpcSuccLatencySum = 0;
102  rpcTotalLatencySum = 0;
103  WATCH(numRpcSent);
104  WATCH(bytesRpcSent);
105  WATCH(numRpcDelivered);
106  WATCH(bytesRpcDelivered);
107  WATCH(numRpcDropped);
108  WATCH(bytesRpcDropped);
109 
110  numLookupSent = 0;
111  numLookupSuccess = 0;
112  numLookupFailed = 0;
113  WATCH(numLookupSent);
114  WATCH(numLookupSuccess);
115  WATCH(numLookupFailed);
116 
117  numUnderlaySent = 0;
118  bytesUnderlaySent = 0;
121  WATCH(numUnderlaySent);
122  WATCH(bytesUnderlaySent);
123  WATCH(numUnderlayDelivered);
124  WATCH(bytesUnderlayDelivered);
125 
126  sequenceNumber = 0;
127 
128  nodeIsLeavingSoon = false;
129 
130  // initialize circular buffer
131  if (msgHandleBufSize > 0) {
132  mhBuf.resize(msgHandleBufSize);
133  mhBufBegin = mhBuf.begin();
134  mhBufEnd = mhBuf.end();
136  }
137 
138  bindToPort(1025);
139  thisNode.setPort(1025);
140 
141  // start periodic timer
142  if (kbrOneWayTest) {
143  onewayTimer = new cMessage("onewayTimer");
144  scheduleAt(simTime() + truncnormal(mean, deviation), onewayTimer);
145  }
146  if (kbrRpcTest) {
147  rpcTimer = new cMessage("rpcTimer");
148  scheduleAt(simTime() + truncnormal(mean, deviation), rpcTimer);
149  }
150  if (kbrLookupTest) {
151  lookupTimer = new cMessage("lookupTimer");
152  scheduleAt(simTime() + truncnormal(mean, deviation), lookupTimer);
153  }
154  if (underlayTest) {
155  underlayTimer = new cMessage("underlayTimer");
156  scheduleAt(simTime() + truncnormal(mean, deviation), underlayTimer);
157  }
158 }
159 
160 void KBRTestApp::handleTimerEvent(cMessage* msg)
161 {
162  // schedule next timer event
163  scheduleAt(simTime() + truncnormal(mean, deviation), msg);
164 
165  // do nothing if the network is still in the initialization phase
168  || nodeIsLeavingSoon) {
169  return;
170  }
171 
172  if (msg == onewayTimer) {
173  // TEST 1: route a test message to a key (one-way)
174  // do nothing if there are currently no nodes in the network
175  std::pair<OverlayKey,TransportAddress> dest = createDestKey();
176  if (!dest.first.isUnspecified()) {
177  // create a 100 byte test message
178  KBRTestMessage* testMsg = new KBRTestMessage("KBRTestMessage");
179  testMsg->setId(getId());
180  testMsg->setSeqNum(sequenceNumber++);
181  testMsg->setByteLength(testMsgSize);
183 
185  numSent++; bytesSent += testMsg->getByteLength());
186 
187  callRoute(dest.first, testMsg);
188  }
189  } else if (msg == rpcTimer) {
190  // TEST 2: send a remote procedure call to a specific key and wait for a response
191  // do nothing if there are currently no nodes in the network
192  std::pair<OverlayKey,TransportAddress> dest = createDestKey();
193  if (!dest.first.isUnspecified()) {
194  KbrTestCall* call = new KbrTestCall;
195  call->setByteLength(testMsgSize);
196  KbrRpcContext* context = new KbrRpcContext;
197  context->setDestKey(dest.first);
198  if (lookupNodeIds) {
199  context->setDestAddr(dest.second);
200  }
202 
204  bytesRpcSent += call->getByteLength());
205 
206  sendRouteRpcCall(TIER1_COMP, dest.first, call, context,
208  }
209  } else if (msg == lookupTimer) {
210  // TEST 3: perform a lookup of a specific key
211  // do nothing if there are currently no nodes in the network
212  std::pair<OverlayKey,TransportAddress> dest = createDestKey();
213  if (!dest.first.isUnspecified()) {
214  LookupCall* call = new LookupCall();
215  call->setKey(dest.first);
217  KbrRpcContext* context = new KbrRpcContext;
218  context->setDestKey(dest.first);
219  if (lookupNodeIds) {
220  context->setDestAddr(dest.second);
221  }
223  sendInternalRpcCall(OVERLAY_COMP, call, context);
224 
226  }
227  } else /*if (msg == underlayRpcTimer)*/ {
228  // TEST 4: send a remote procedure call to a specific transportAddress
229  // and wait for a response, do nothing if there are currently no nodes
230  // in the network
231  TransportAddress* address =
233 
234  if (address == NULL) {
235  EV << "no node!!!" << std::endl;
236  return;
237  }
238 
239  address->setPort(thisNode.getPort());
240  if (!address->isUnspecified()) {
242  call->setByteLength(testMsgSize);
243  call->setSendTime(simTime());
244 
245  KbrRpcContext* context = new KbrRpcContext;
247 
249  bytesUnderlaySent += call->getByteLength());
250 
251  //dest.second.setPort(thisNode.getPort());
252  sendUdpRpcCall(*address, call, context,
254  }
255  }
256 #if 0
257  thisNode.setPort(1025);
259  handle.setPort(1025);
260  pingNode(handle, -1, -1, NULL, "TestPING", NULL, -1, UDP_TRANSPORT);
261 #endif
262 
263 }
264 
265 
266 void KBRTestApp::pingResponse(PingResponse* response, cPolymorphic* context,
267  int rpcId, simtime_t rtt)
268 {
269  //std::cout << rtt << std::endl;
270 }
271 
272 
274 {
275  RPC_SWITCH_START( msg );
276  RPC_DELEGATE( KbrTest, kbrTestCall );
277  RPC_DELEGATE( UnderlayTest, underlayTestCall );
278  RPC_SWITCH_END( );
279 
280  return RPC_HANDLED;
281 }
282 
283 
285 {
286  KbrTestResponse* response = new KbrTestResponse;
287  response->setByteLength(call->getByteLength());
288  sendRpcResponse(call, response);
289 }
290 
291 
293 {
295  response->setByteLength(call->getByteLength());
296  response->setOneWayLatency(simTime() - call->getSendTime());
297  sendRpcResponse(call, response);
298 }
299 
300 
302  cPolymorphic* context, int rpcId,
303  simtime_t rtt)
304 {
305  RPC_SWITCH_START(msg)
307  EV << "[KBRTestApp::handleRpcResponse() @ " << overlay->getThisNode().getIp()
308  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
309  << " Lookup RPC Response received: id=" << rpcId << "\n"
310  << " msg=" << *_LookupResponse << " rtt=" << rtt
311  << endl;
312  handleLookupResponse(_LookupResponse, context, rtt);
313  break;
314  }
315  RPC_ON_RESPONSE(KbrTest) {
316  KbrRpcContext* kbrRpcContext = check_and_cast<KbrRpcContext*>(context);
317  if (kbrRpcContext->getMeasurementPhase() == true) {
318  if (!lookupNodeIds ||
319  (kbrRpcContext->getDestKey() == msg->getSrcNode().getKey() &&
320  kbrRpcContext->getDestAddr() == msg->getSrcNode())) {
321 
323  bytesRpcDelivered += msg->getByteLength());
325  "KBRTestApp: RPC Success Latency", SIMTIME_DBL(rtt)));
327  "KBRTestApp: RPC Total Latency", SIMTIME_DBL(rtt)));
329  rpcSuccLatencySum += SIMTIME_DBL(rtt));
331  rpcTotalLatencySum += SIMTIME_DBL(rtt));
332  OverlayCtrlInfo* overlayCtrlInfo =
333  dynamic_cast<OverlayCtrlInfo*>(msg->getControlInfo());
334 
335  uint16_t hopSum = msg->getCallHopCount();
336  hopSum += (overlayCtrlInfo ? overlayCtrlInfo->getHopCount() : 1);
338  "KBRTestApp: RPC Hop Count", hopSum));
339 // RECORD_STATS(globalStatistics->recordHistogram(
340 // "KBRTestApp: RPC Hop Count Histogram", hopSum));
341  } else {
342  //std::cout << simTime() << " " << thisNode.getAddress()
343  // << " " << msg->getSrcNode().getAddress()
344  // << " " << kbrRpcContext->getDestKey() << std::endl;
346  bytesRpcDropped += msg->getByteLength());
347  // for failed RPCs add failureLatency to latency statistics vector
349  "KBRTestApp: RPC Total Latency",
350  SIMTIME_DBL(failureLatency)));
352  rpcTotalLatencySum += SIMTIME_DBL(failureLatency));
353  }
354  }
355  delete kbrRpcContext;
356  break;
357  }
358  RPC_ON_RESPONSE(UnderlayTest) {
359  KbrRpcContext* kbrRpcContext = check_and_cast<KbrRpcContext*>(context);
360  if (kbrRpcContext->getMeasurementPhase() == true) {
362  bytesUnderlayDelivered += msg->getByteLength());
364  "KBRTestApp: Underlay RTT", SIMTIME_DBL(rtt)));
366  "KBRTestApp: Underlay One-way Latency",
367  SIMTIME_DBL(_UnderlayTestResponse->getOneWayLatency())));
368  }
369  delete kbrRpcContext;
370  break;
371  }
372  RPC_SWITCH_END( )
373 }
374 
376  const TransportAddress& dest,
377  cPolymorphic* context, int rpcId,
378  const OverlayKey& destKey)
379 {
380  RPC_SWITCH_START(msg)
381  RPC_ON_CALL(KbrTest) {
382  EV << "[KBRTestApp::handleRpcTimeout() @ "
383  << overlay->getThisNode().getIp()
384  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
385  << " KBR-Test RPC Response timeout: id=" << rpcId
386  << endl;
387 
388  /*
389  std::cout << simTime() << ": [KBRTestApp::handleRpcTimeout() @ "
390  << overlay->getThisNode().getIp()
391  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
392  << " KBR-Test RPC Response (" << *msg << ") timeout: id=" << rpcId
393  << std::endl;
394  */
395 
396  KbrRpcContext* kbrRpcContext = check_and_cast<KbrRpcContext*>(context);
397  if (kbrRpcContext->getMeasurementPhase() == true) {
399  bytesRpcDropped += msg->getByteLength());
400  // for failed RPCs add failureLatency to latency statistics vector
402  "KBRTestApp: RPC Total Latency",
403  SIMTIME_DBL(failureLatency)));
405  rpcTotalLatencySum += SIMTIME_DBL(failureLatency));
406 
407  }
408  break;
409  }
411  KbrRpcContext* kbrRpcContext = check_and_cast<KbrRpcContext*>(context);
412  if (kbrRpcContext->getMeasurementPhase() == true) {
414  // for failed lookups add failureLatency to latency statistics vector
416  "KBRTestApp: Lookup Total Latency",
417  SIMTIME_DBL(failureLatency)));
418  }
419  break;
420  }
422 
423  delete context;
424 }
425 
427  cObject* context, simtime_t latency)
428 {
429  EV << "[KBRTestApp::handleLookupResponse() @ " << overlay->getThisNode().getIp()
430  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
431  << " Lookup response for key " << msg->getKey()<< " : ";
432 
433  KbrRpcContext* kbrRpcContext = check_and_cast<KbrRpcContext*>(context);
434 
435  if (kbrRpcContext->getMeasurementPhase() == true) {
436  if (msg->getIsValid() && (!lookupNodeIds ||
437  ((msg->getSiblingsArraySize() > 0) &&
438  (msg->getSiblings(0).getKey() == msg->getKey()) &&
439  (kbrRpcContext->getDestAddr() == msg->getSiblings(0))))) {
442  "KBRTestApp: Lookup Success Latency", SIMTIME_DBL(latency)));
444  "KBRTestApp: Lookup Total Latency", SIMTIME_DBL(latency)));
446  "KBRTestApp: Lookup Hop Count", msg->getHopCount()));
447  } else {
448 #if 0
449  if (!msg->getIsValid()) {
450  std::cout << "invalid" << std::endl;
451  } else if (msg->getSiblingsArraySize() == 0) {
452  std::cout << "empty" << std::endl;
453  } else {
454  std::cout << "wrong key" << std::endl;
455  }
456 #endif
458  // for failed lookups add failureLatency to latency statistics vector
460  "KBRTestApp: Lookup Total Latency",
461  SIMTIME_DBL(failureLatency)));
463  "KBRTestApp: Failed Lookup Hop Count", msg->getHopCount()));
464  }
465  }
466 
467  delete kbrRpcContext;
468 }
469 
471 {
472  nodeIsLeavingSoon = true;
473 }
474 
475 void KBRTestApp::deliver(OverlayKey& key, cMessage* msg)
476 {
477  KBRTestMessage* testMsg = check_and_cast<KBRTestMessage*>(msg);
478  OverlayCtrlInfo* overlayCtrlInfo =
479  check_and_cast<OverlayCtrlInfo*>(msg->removeControlInfo());
480 
482  error("key");
483 
484  // check for duplicate
485  if ((msgHandleBufSize > 0 )
486  && checkSeen(overlayCtrlInfo->getSrcNode().getKey(), testMsg->getSeqNum())) {
487  EV << "[KBRTestApp::deliver() @ " << overlay->getThisNode().getIp()
488  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
489  << " Duplicate dropped."
490  << endl;
491  delete overlayCtrlInfo;
492  delete testMsg;
493  return;
494  }
495 
496  // Return statistical data to the sender.
497  if (cModule* mod = simulation.getModule(testMsg->getId())) {
498  if (KBRTestApp* sender = dynamic_cast<KBRTestApp*>(mod)) {
499  if ((!lookupNodeIds) || (overlay->getThisNode().getKey() == key)) {
500  if (testMsg->getMeasurementPhase() == true) {
501  sender->evaluateData((simTime() - testMsg->getCreationTime()),
502  overlayCtrlInfo->getHopCount(),
503  testMsg->getByteLength());
504  }
505  } else if(lookupNodeIds) {
506  if (testMsg->getMeasurementPhase() == true) {
508  bytesDropped += testMsg->getByteLength());
509  }
510  EV << "[KBRTestApp::deliver() @ " << overlay->getThisNode().getIp()
511  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
512  << " Error: Lookup of NodeIDs and KBRTestMessage"
513  << " received with different destKey!"
514  << endl;
515  }
516  }
517  }
518 
519  EV << "[KBRTestApp::deliver() @ " << overlay->getThisNode().getIp()
520  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
521  << " Received \"" << testMsg->getName() << "(seqNr: "
522  << testMsg->getSeqNum() << ")\n"
523  << " with destination key: " << key.toString(16)
524  << endl;
525 
526  delete overlayCtrlInfo;
527  delete testMsg;
528 }
529 
530 void KBRTestApp::forward(OverlayKey* key, cPacket** msg,
531  NodeHandle* nextHopNode)
532 {
533  KBRTestMessage* tempMsg = dynamic_cast<KBRTestMessage*>(*msg);
534 
535  if (tempMsg == NULL) return;
536 
537  tempMsg->setVisitedNodesArraySize(tempMsg->getVisitedNodesArraySize() + 1);
538  tempMsg->setVisitedNodes(tempMsg->getVisitedNodesArraySize() - 1,
539  overlay->getThisNode().getIp());
540 }
541 
542 std::pair<OverlayKey, TransportAddress> KBRTestApp::createDestKey()
543 {
544  if (lookupNodeIds) {
545  // TODO parameter to choose only nodes with own nodeType
546  // getPeerInfo(getThisNode())->getTypeID()
547  const NodeHandle& handle = globalNodeList->getRandomNode(-1, -1, true,
549  return std::make_pair(handle.getKey(), handle);
550  }
551  // generate random destination key
552  return std::make_pair(OverlayKey::random(), TransportAddress::UNSPECIFIED_NODE);
553 }
554 
555 bool KBRTestApp::checkSeen(const OverlayKey& key, int seqNum)
556 {
557  MsgHandle hdl(key, seqNum);
558 
559  for (MsgHandleBuf::iterator it = mhBufBegin; it != mhBufEnd; ++it) {
560  if (it->key.isUnspecified()) {
561  continue;
562  }
563  if (*it == hdl) {
564  return true;
565  }
566  }
567 
568  *(mhBufNext++) = hdl;
569  if (mhBufNext == mhBufEnd) {
571  }
572 
573  return false;
574 }
575 
576 void KBRTestApp::evaluateData(simtime_t latency, int hopCount, long int bytes)
577 {
578  // count the number and size of successfully delivered messages
581 
582  if (numSent < numDelivered) {
583  std::ostringstream tempString;
584  tempString << "KBRTestApp::evaluateData(): numSent ("
585  << numSent << ") < numDelivered (" << numDelivered << ")!";
586  throw cRuntimeError(tempString.str().c_str());
587  }
588 
589  RECORD_STATS(globalStatistics->recordOutVector("KBRTestApp: One-way Hop "
590  "Count", hopCount));
591  RECORD_STATS(globalStatistics->recordOutVector("KBRTestApp: One-way Latency",
592  SIMTIME_DBL(latency)));
593 }
594 
596 {
598 
599  if (time >= GlobalStatistics::MIN_MEASURED) {
600  if (kbrOneWayTest) {
601  globalStatistics->addStdDev("KBRTestApp: One-way Delivered Messages/s",
602  numDelivered / time);
603  globalStatistics->addStdDev("KBRTestApp: One-way Delivered Bytes/s",
604  bytesDelivered / time);
605  globalStatistics->addStdDev("KBRTestApp: One-way Dropped Messages/s",
606  numDropped / time);
607  globalStatistics->addStdDev("KBRTestApp: One-way Dropped Bytes/s",
608  bytesDropped / time);
609  if (numSent > 0) {
610  globalStatistics->addStdDev("KBRTestApp: One-way Delivery Ratio",
611  (float)numDelivered /
612  (float)numSent);
613 /* globalStatistics->addStdDev("KBRTestApp: One-way Total Latency",
614  globalStatistics->getv * (1 - ((float)numDelivered /
615  (float)numSent)));*/
616  }
617  }
618 
619  if (kbrRpcTest) {
620  globalStatistics->addStdDev("KBRTestApp: RPC Delivered Messages/s",
621  numRpcDelivered / time);
622  globalStatistics->addStdDev("KBRTestApp: RPC Delivered Bytes/s",
623  bytesRpcDelivered / time);
624  globalStatistics->addStdDev("KBRTestApp: RPC Dropped Messages/s",
625  numRpcDropped / time);
626  globalStatistics->addStdDev("KBRTestApp: RPC Dropped Bytes/s",
627  bytesRpcDropped / time);
628 #if 0
629  if (rpcSuccLatencyCount > 0) {
630  globalStatistics->addStdDev("KBRTestApp: RPC Success Session Latency",
631  SIMTIME_DBL(rpcSuccLatencySum) / rpcSuccLatencyCount);
632  }
633 
634  if (rpcTotalLatencyCount > 0) {
635  globalStatistics->addStdDev("KBRTestApp: RPC Total Session Latency",
637  }
638 #endif
639 
640  if (numRpcSent > 0) {
641  globalStatistics->addStdDev("KBRTestApp: RPC Delivery Ratio",
642  (float)numRpcDelivered /
643  (float)numRpcSent);
644  }
645  }
646 
647  if (kbrLookupTest) {
648  globalStatistics->addStdDev("KBRTestApp: Successful Lookups/s",
649  numLookupSuccess / time);
650  globalStatistics->addStdDev("KBRTestApp: Failed Lookups/s",
651  numLookupFailed / time);
652  if (numLookupSent > 0) {
653  globalStatistics->addStdDev("KBRTestApp: Lookup Success Ratio",
654  (float)numLookupSuccess /
655  (float)numLookupSent);
656  }
657  }
658  if (underlayTest) {
659  if (numUnderlaySent > 0) {
660  globalStatistics->addStdDev("KBRTestApp: Underlay Delivery Ratio",
661  (float)numUnderlayDelivered /
662  (float)numUnderlaySent);
663  }
664  }
665  }
666 }