OverSim
BaseApp.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <IPAddressResolver.h>
25 #include <NotificationBoard.h>
26 #include <UDPAppBase.h>
27 #include <UDPSocket.h>
28 #include <cassert>
29 
30 #include <CommonMessages_m.h>
31 #include <BaseRpc.h>
32 #include <OverlayAccess.h>
33 #include <GlobalNodeListAccess.h>
34 #include <GlobalStatisticsAccess.h>
36 
37 #include "BaseApp.h"
38 
39 using namespace std;
40 
42 {
43  notificationBoard = NULL;
44 
45  overlay = NULL;
46 }
47 
49 {
50  finishRpcs();
51 }
52 
54 {
55  return MAX_STAGE_APP + 1;
56 }
57 
58 void BaseApp::initialize(int stage)
59 {
60  CompType compType = getThisCompType();
61  bool tier = (compType == TIER1_COMP ||
62  compType == TIER2_COMP ||
63  compType == TIER3_COMP);
64 
65  if (stage == REGISTER_STAGE) {
66  OverlayAccess().get(this)->registerComp(getThisCompType(), this);
67  return;
68  }
69 
70  if ((tier && stage == MIN_STAGE_APP) ||
71  (!tier && stage == MIN_STAGE_COMPONENTS)) {
72  // fetch parameters
73  debugOutput = par("debugOutput");
74 
75  globalNodeList = GlobalNodeListAccess().get();
76  underlayConfigurator = UnderlayConfiguratorAccess().get();
77  globalStatistics = GlobalStatisticsAccess().get();
78  notificationBoard = NotificationBoardAccess().get();
79 
80  // subscribe to the notification board
81  notificationBoard->subscribe(this, NF_OVERLAY_TRANSPORTADDRESS_CHANGED);
82  notificationBoard->subscribe(this, NF_OVERLAY_NODE_LEAVE);
83  notificationBoard->subscribe(this, NF_OVERLAY_NODE_GRACEFUL_LEAVE);
84 
85  // determine the terminal's transport address
86  if (getParentModule()->getSubmodule("interfaceTable", 0) != NULL) {
87  thisNode.setIp(IPAddressResolver()
88  .addressOf(getParentModule()));
89  } else {
90  thisNode.setIp(IPAddressResolver()
91  .addressOf(getParentModule()->getParentModule()));
92  }
93 
94  thisNode.setPort(-1);
95 
96  WATCH(thisNode);
97 
98  // statistics
99  numOverlaySent = 0;
100  numOverlayReceived = 0;
101  bytesOverlaySent = 0;
102  bytesOverlayReceived = 0;
103  numUdpSent = 0;
104  numUdpReceived = 0;
105  bytesUdpSent = 0;
106  bytesUdpReceived = 0;
107 
108  creationTime = simTime();
109 
110  WATCH(numOverlaySent);
111  WATCH(numOverlayReceived);
112  WATCH(bytesOverlaySent);
113  WATCH(bytesOverlayReceived);
114  WATCH(numUdpSent);
115  WATCH(numUdpReceived);
116  WATCH(bytesUdpSent);
117  WATCH(bytesUdpReceived);
118 
119  // init rpcs
120  initRpcs();
121 
122  // set TCP output gate
123  setTcpOut(gate("tcpOut"));
124  }
125 
126  if ((stage >= MIN_STAGE_APP && stage <= MAX_STAGE_APP) ||
127  (stage >= MIN_STAGE_COMPONENTS && stage <= MAX_STAGE_COMPONENTS)) //TODO
128  initializeApp(stage);
129 }
130 
131 void BaseApp::initializeApp(int stage)
132 {
133  // ...
134 }
135 
136 // Process messages passed up from the overlay.
137 void BaseApp::handleMessage(cMessage* msg)
138 {
139  if (internalHandleMessage(msg)) {
140  return;
141  }
142 
143  if (msg->arrivedOn("from_lowerTier") ||
144  msg->arrivedOn("direct_in")) {
145  CompReadyMessage* readyMsg = dynamic_cast<CompReadyMessage*>(msg);
146  if (readyMsg != NULL) {
147  handleReadyMessage(readyMsg);
148  return;
149  }
150  // common API
151  CommonAPIMessage* commonAPIMsg = dynamic_cast<CommonAPIMessage*>(msg);
152  if (commonAPIMsg != NULL) {
153  handleCommonAPIMessage(commonAPIMsg);
154  } else if (msg->arrivedOn("from_lowerTier")) {
155  // TODO: What kind of messages to we want to measure here?
156  cPacket* packet = check_and_cast<cPacket*>(msg);
157  RECORD_STATS(numOverlayReceived++;
158  bytesOverlayReceived += packet->getByteLength());
159  handleLowerMessage(msg);
160  }
161  else delete msg;
162  } else if (msg->arrivedOn("from_upperTier")) {
163  handleUpperMessage(msg);
164  } else if (msg->arrivedOn("udpIn")) {
165  cPacket* packet = check_and_cast<cPacket*>(msg);
166  RECORD_STATS(numUdpReceived++; bytesUdpReceived += packet->getByteLength());
167  // debug message
168  if (debugOutput && !ev.isDisabled()) {
169  UDPControlInfo* udpControlInfo =
170  check_and_cast<UDPControlInfo*>(msg->getControlInfo());
171  EV << "[BaseApp:handleMessage() @ " << thisNode.getIp()
172  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
173  << " Received " << *msg << " from "
174  << udpControlInfo->getSrcAddr() << endl;
175  }
176  handleUDPMessage(msg);
177  } else if(msg->arrivedOn("tcpIn")) {
178  handleTCPMessage(msg);
179  } else if (msg->arrivedOn("trace_in")) {
180  handleTraceMessage(msg);
181  }else {
182  delete msg;
183  }
184 }
185 
187 {
188  std::string name(this->getName());
189 
190  if (name == std::string("tier1")) {
191  return TIER1_COMP;
192  } else if (name == std::string("tier2")) {
193  return TIER2_COMP;
194  } else if (name == std::string("tier3")) {
195  return TIER3_COMP;
196  }
197 
198  std::string parentName(this->getParentModule()->getName());
199 
200  if (parentName == std::string("tier1")) {
201  return TIER1_COMP;
202  } else if (parentName == std::string("tier2")) {
203  return TIER2_COMP;
204  } else if (parentName == std::string("tier3")) {
205  return TIER3_COMP;
206  } else {
207  throw cRuntimeError("BaseApp::getThisCompType(): "
208  "Unknown module type!");
209  }
210 
211  return INVALID_COMP;
212 }
213 
214 void BaseApp::receiveChangeNotification(int category, const cPolymorphic * details)
215 {
216  Enter_Method_Silent();
217  if (category == NF_OVERLAY_TRANSPORTADDRESS_CHANGED) {
218  handleTransportAddressChangedNotification();
219  } else if (category == NF_OVERLAY_NODE_LEAVE) {
220  handleNodeLeaveNotification();
221  } else if (category == NF_OVERLAY_NODE_GRACEFUL_LEAVE) {
222  handleNodeGracefulLeaveNotification();
223  }
224 }
225 
227 {
228  // ...
229 }
230 
232 {
233  // ...
234 }
235 
237 {
238  // ...
239 }
240 
241 void BaseApp::callRoute(const OverlayKey& key, cPacket* msg,
242  const std::vector<TransportAddress>& sourceRoute,
243  RoutingType routingType)
244 {
245  // create route-message (common API)
246  KBRroute* routeMsg = new KBRroute();
247  routeMsg->setDestKey(key);
248 
249  if (!(sourceRoute.size() == 1 && sourceRoute[0].isUnspecified())) {
250  routeMsg->setSourceRouteArraySize(sourceRoute.size());
251  for (uint32_t i = 0; i < sourceRoute.size(); ++i) {
252  routeMsg->setSourceRoute(i, sourceRoute[i]);
253  }
254  }
255  routeMsg->encapsulate(msg);
256  routeMsg->setSrcComp(thisCompType);
257  routeMsg->setDestComp(thisCompType);
258  routeMsg->setRoutingType(routingType);
259 
260  routeMsg->setType(KBR_ROUTE);
261 
262  sendDirect(routeMsg, overlay->getCompRpcGate(OVERLAY_COMP));
263 
264  // debug message
265  if (debugOutput && !ev.isDisabled()) {
266  EV << "[BaseApp::callRoute() @ " << thisNode.getIp()
267  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
268  << " Sending " << *msg
269  << " to destination key " << key
270  << " with source route ";
271 
272  for (uint32_t i = 0; i < sourceRoute.size(); ++i) {
273  EV << sourceRoute[i] << " ";
274  }
275 
276  EV << endl;
277  }
278 
279  // count
280  RECORD_STATS(numOverlaySent++; bytesOverlaySent += msg->getByteLength());
281 }
282 
283 void BaseApp::deliver(OverlayKey& key, cMessage* msg)
284 {
285  // deliver...
286 
287  delete msg;
288 }
289 
290 void BaseApp::forward(OverlayKey* key, cPacket** msg, NodeHandle* nextHopNode)
291 {
292  // usually do nothing
293 }
294 
295 void BaseApp::forwardResponse(const OverlayKey& key, cPacket* msg,
296  const NodeHandle& nextHopNode)
297 {
298  OverlayCtrlInfo* ctrlInfo =
299  check_and_cast<OverlayCtrlInfo*>(msg->removeControlInfo());
300 
301  //create forwardResponse message (common API)
302  KBRforward* forwardMsg = new KBRforward();
303  forwardMsg->setDestKey(key);
304  forwardMsg->setNextHopNode(nextHopNode);
305  forwardMsg->setControlInfo(ctrlInfo);
306  forwardMsg->encapsulate(msg);
307 
308  forwardMsg->setType(KBR_FORWARD_RESPONSE);
309 
310  if (getThisCompType() == TIER1_COMP) {
311  send(forwardMsg, "to_lowerTier");
312  } else {
313  sendDirect(forwardMsg, overlay->getCompRpcGate(OVERLAY_COMP));
314  }
315 }
316 
317 void BaseApp::update(const NodeHandle& node, bool joined)
318 {
319  EV << "[BaseApp::update() @ " << thisNode.getIp()
320  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
321  << " " << node << (joined ? " joined " : " left ") << "siblings"
322  << endl;
323 
324  /*
325  std::cout << simTime() << " [BaseApp::update() @ " << thisNode.getIp()
326  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
327  << " " << node << (joined ? " joined " : " left ") << "siblings"
328  << std::endl;
329  */
330 }
331 
333 {
334  cPacket* tempMsg = commonAPIMsg->decapsulate();
335 
336  // process interface control information
337  OverlayCtrlInfo* overlayCtrlInfo =
338  dynamic_cast<OverlayCtrlInfo*>(commonAPIMsg->removeControlInfo());
339 
340  if (overlayCtrlInfo != NULL) {
341  tempMsg->setControlInfo(overlayCtrlInfo);
342  }
343 
344  switch (commonAPIMsg->getType()) {
345  case KBR_DELIVER:
346  {
347  KBRdeliver* apiMsg = dynamic_cast<KBRdeliver*>(commonAPIMsg);
348  OverlayKey key = apiMsg->getDestKey(); // TODO key in overlayCtrlInfo
349  NodeHandle nextHopNode = overlay->getThisNode();
350 
351  //first call forward, then deliver
352  forward(&key, &tempMsg, &nextHopNode);
353 
354  if(tempMsg != NULL) {
355  //if key or nextHopNode is changed send msg back to overlay
356  if ((!key.isUnspecified() && key != apiMsg->getDestKey()) ||
357  (!nextHopNode.isUnspecified()
358  && nextHopNode != overlay->getThisNode())) {
359  forwardResponse(key, tempMsg, nextHopNode);
360  }
361  else {
362  RECORD_STATS(numOverlayReceived++;
363  bytesOverlayReceived += tempMsg->getByteLength());
364 
365  assert(overlayCtrlInfo->getTransportType()
366  == ROUTE_TRANSPORT);
367 
368  // debug message
369  if (debugOutput) {
370  EV << "[BaseApp:handleCommonAPIMessage() @ "
371  << thisNode.getIp() << " ("
372  << overlay->getThisNode().getKey().toString(16) << ")]\n"
373  << " Received " << *tempMsg << " from "
374  << overlayCtrlInfo->getSrcRoute() << endl;
375  }
376 
377  //handle RPC first
378  BaseRpcMessage* rpcMessage
379  = dynamic_cast<BaseRpcMessage*>(tempMsg);
380  if (rpcMessage!=NULL) {
381  internalHandleRpcMessage(rpcMessage);
382  } else {
383  deliver(apiMsg->getDestKey(), tempMsg);
384  }
385  }
386  }
387  break;
388  }
389 
390  case KBR_FORWARD:
391  {
392  KBRforward* apiMsg = dynamic_cast<KBRforward*>(commonAPIMsg);
393  OverlayKey key = apiMsg->getDestKey();
394  NodeHandle nextHopNode = apiMsg->getNextHopNode();
395 
396  forward(&key, &tempMsg, &nextHopNode);
397 
398  //if message ist not deleted send it back
399  if(tempMsg != NULL) {
400  if(nextHopNode == apiMsg->getNextHopNode())
401  //do we need this?
402  nextHopNode = NodeHandle::UNSPECIFIED_NODE;
403  forwardResponse(key, tempMsg, nextHopNode);
404  }
405  break;
406  }
407 
408  case KBR_UPDATE:
409  {
410  KBRupdate* apiMsg = dynamic_cast<KBRupdate*>(commonAPIMsg);
411  update(apiMsg->getNode(), apiMsg->getJoined());
412 
413  break;
414  }
415 
416  default:
417  {
418  delete tempMsg;
419  break;
420  }
421  }
422  delete commonAPIMsg;
423 }
424 
425 void BaseApp::handleUpperMessage(cMessage* msg)
426 {
427  delete msg;
428 }
429 
430 void BaseApp::handleLowerMessage(cMessage* msg)
431 {
432  delete msg;
433 }
434 
435 void BaseApp::handleUDPMessage(cMessage *msg)
436 {
437  delete msg;
438 }
439 
441 {
442  delete msg;
443 }
444 
445 void BaseApp::handleTraceMessage(cMessage* msg)
446 {
447  throw cRuntimeError("This application cannot handle trace data. "
448  "You have to overwrite handleTraceMessage() in your "
449  "application to make trace files work");
450 }
451 
453 {
454  RECORD_STATS(numOverlaySent++; bytesOverlaySent += msg->getByteLength());
455 
456  send(msg, "to_lowerTier");
457 }
458 void BaseApp::sendReadyMessage(bool ready, const OverlayKey& nodeId)
459 {
460  CompReadyMessage* msg = new CompReadyMessage();
461  msg->setReady(ready);
462  msg->setComp(getThisCompType());
463  msg->setNodeId(nodeId);
464 
465  overlay->sendMessageToAllComp(msg, getThisCompType());
466 }
467 
469 {
470  // record scalar data
471  simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
472 
473  string baseAppName = string("BaseApp (") += string(this->getName())
474  += string("): ");
475 
476  if (time >= GlobalStatistics::MIN_MEASURED) {
477  globalStatistics->addStdDev(baseAppName + string("Sent Messages/s to "
478  "Overlay"),
479  numOverlaySent / time);
480  globalStatistics->addStdDev(baseAppName +
481  string("Received Messages/s from Overlay"),
482  numOverlayReceived / time);
483  globalStatistics->addStdDev(baseAppName + string("Sent Bytes/s to "
484  "Overlay"),
485  bytesOverlaySent / time);
486  globalStatistics->addStdDev(baseAppName + string("Received Bytes/s "
487  "from Overlay"),
488  bytesOverlayReceived / time);
489  globalStatistics->addStdDev(baseAppName + string("Sent Messages/s to "
490  "UDP"),
491  numUdpSent / time);
492  globalStatistics->addStdDev(baseAppName +
493  string("Received Messages/s from UDP"),
494  numUdpReceived / time);
495  globalStatistics->addStdDev(baseAppName + string("Sent Bytes/s to UDP"),
496  bytesUdpSent / time);
497  globalStatistics->addStdDev(baseAppName + string("Received Bytes/s "
498  "from UDP"),
499  bytesUdpReceived / time);
500 
501  }
502 
503  finishApp();
504 }
505 
507 {
508  // ...
509 }
510 
511 void BaseApp::bindToPort(int port)
512 {
513  EV << "[BaseApp::bindToPort() @ " << thisNode.getIp()
514  << ": Binding to UDP port " << port << endl;
515 
516  thisNode.setPort(port);
517 
518  cMessage *msg = new cMessage("UDP_C_BIND", UDP_C_BIND);
519  UDPControlInfo *ctrl = new UDPControlInfo();
520  ctrl->setSrcPort(port);
521  ctrl->setSockId(UDPSocket::generateSocketId());
522  msg->setControlInfo(ctrl);
523  send(msg, "udpOut");
524 }
525 
526 void BaseApp::sendMessageToUDP(const TransportAddress& destAddr, cPacket *msg,
527  simtime_t delay)
528 {
529  // send message to UDP, with the appropriate control info attached
530  msg->removeControlInfo();
531  msg->setKind(UDP_C_DATA);
532 
533  UDPControlInfo *ctrl = new UDPControlInfo();
534  ctrl->setSrcPort(thisNode.getPort());
535  ctrl->setSrcAddr(thisNode.getIp());
536  ctrl->setDestAddr(destAddr.getIp());
537  ctrl->setDestPort(destAddr.getPort());
538  msg->setControlInfo(ctrl);
539 
540  if (ev.isGUI()) {
541  BaseRpcMessage* rpc = dynamic_cast<BaseRpcMessage*>(msg);
542  if (rpc) rpc->setStatType(APP_DATA_STAT);
543  }
544 
545  // debug message
546  if (debugOutput) {
547  EV << "[BaseApp::sendMessageToUDP() @ " << thisNode.getIp()
548  << " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
549  << " Sending " << *msg << " to " << destAddr.getIp()
550  << endl;
551  }
552 
553 
554  RECORD_STATS(numUdpSent++; bytesUdpSent += msg->getByteLength());
555  sendDelayed(msg, delay, "udpOut");
556 }
557 
558 //private
560 {
561  // if RPC was handled return true, else tell the parent class to handle it
562  return BaseRpc::internalHandleRpcCall(msg);
563 }
564 
566  cPolymorphic* context,
567  int rpcId, simtime_t rtt)
568 {
569  // if RPC was handled return true, else tell the parent class to handle it
570  BaseRpc::internalHandleRpcResponse(msg, context, rpcId, rtt);
571 }
572 
574  const OverlayKey& destKey,
575  const std::vector<TransportAddress>&
576  sourceRoute,
577  RoutingType routingType) {
578  callRoute(destKey, message, sourceRoute, routingType);
579 }
580 
582  BaseResponseMessage* response)
583 {
584  // default values for UDP transport
585  TransportType transportType = UDP_TRANSPORT;
586  CompType compType = INVALID_COMP;
587  const TransportAddress* destNode = &TransportAddress::UNSPECIFIED_NODE;//&(call->getSrcNode());
588  const OverlayKey* destKey = &OverlayKey::UNSPECIFIED_KEY;
589 
590  TransportAddress tempNode;
591 
592  OverlayCtrlInfo* overlayCtrlInfo =
593  dynamic_cast<OverlayCtrlInfo*>(call->getControlInfo());
594 
595  if (overlayCtrlInfo &&
596  overlayCtrlInfo->getTransportType() == ROUTE_TRANSPORT) {
597  //destNode = &(overlayCtrlInfo->getSrcNode());
598  if (overlayCtrlInfo->getSrcNode().isUnspecified())
599  destNode = &(overlayCtrlInfo->getLastHop());
600  else
601  destNode = &(overlayCtrlInfo->getSrcNode());
602  transportType = ROUTE_TRANSPORT;
603  compType = static_cast<CompType>(overlayCtrlInfo->getSrcComp());
604  if (static_cast<RoutingType>(overlayCtrlInfo->getRoutingType())
606  destKey = &(overlayCtrlInfo->getSrcNode().getKey());//&(call->getSrcNode().getKey());
607  destNode = &NodeHandle::UNSPECIFIED_NODE;
608  }
609  } else {
610  UDPControlInfo* udpCtrlInfo =
611  check_and_cast<UDPControlInfo*>(call->getControlInfo());
612 
613  tempNode = TransportAddress(udpCtrlInfo->getSrcAddr(), udpCtrlInfo->getSrcPort());
614  destNode = &tempNode;
615 
616  }
617 
618  sendRpcResponse(transportType, compType,
619  *destNode, *destKey, call, response);
620 }