OverSim
I3BaseApp.cc
Go to the documentation of this file.
1 // Copyright (C) 2006 Institut fuer Telematik, Universitaet Karlsruhe (TH)
2 //
3 // This program is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU General Public License
5 // as published by the Free Software Foundation; either version 2
6 // of the License, or (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 
23 #include <omnetpp.h>
24 #include <IPAddressResolver.h>
25 #include <GlobalNodeListAccess.h>
26 #include <InitStages.h>
27 #include <NotificationBoard.h>
28 #include <UnderlayConfigurator.h>
29 #include <UDPControlInfo_m.h>
30 #include <NodeHandle.h>
31 #include <BootstrapList.h>
32 
33 #include "I3Trigger.h"
34 #include "I3IdentifierStack.h"
35 #include "I3Message.h"
36 #include "I3BaseApp.h"
37 
38 #include <iostream>
39 #include <sstream>
40 #include <cfloat>
41 
42 using namespace std;
43 
44 
46  lastReply(0),
47  roundTripTime(MAXTIME)
48 {
49 }
50 
51 std::ostream &operator<<(std::ostream &os, const I3BaseApp::I3CachedServer &server) {
52  os << server.address << " rt=" << server.roundTripTime;
53  return os;
54 }
55 
56 
57 
59 {
60 }
61 
63 {
64 }
65 
67 {
68  return MAX_STAGE_APP + 1;
69 }
70 
71 void I3BaseApp::initialize(int stage)
72 {
73  if (stage != MIN_STAGE_APP) return;
74 
75  nodeIPAddress = IPAddressResolver().addressOf(getParentModule());
76 
77  bindToPort(par("clientPort"));
78  /* NotificationBoardAccess().get()->subscribe(this, NF_HOSTPOSITION_BEFOREUPDATE);
79  NotificationBoardAccess().get()->subscribe(this, NF_HOSTPOSITION_UPDATED);*/
80 
81  getDisplayString().setTagArg("i", 0, "i3c");
82  getParentModule()->getDisplayString().removeTag("i2");
83 
84  if (int(par("bootstrapTime")) >= int(par("initTime"))) {
85  opp_error("Parameter bootstrapTime must be smaller than initTime");
86  }
87 
88  bootstrapTimer = new cMessage();
89  scheduleAt(simTime() + int(par("bootstrapTime")), bootstrapTimer);
90 
91  initializeTimer = new cMessage();
92  scheduleAt(simTime() + int(par("initTime")), initializeTimer);
93 
94  numSent = 0;
95  sentBytes = 0;
96  numReceived = 0;
97  receivedBytes = 0;
98  numIsolations = 0;
99  mobilityInStages = false;
100 
101  WATCH(nodeIPAddress);
102  WATCH(numSent);
103  WATCH(sentBytes);
104  WATCH(numReceived);
105  WATCH(receivedBytes);
106  WATCH(numIsolations);
107 
108 
109  WATCH_SET(insertedTriggers);
110  WATCH(gateway);
111  WATCH_MAP(samplingCache);
112  WATCH_MAP(identifierCache);
113 
114  initializeApp(stage);
115 }
116 
118 {
119 }
120 
122 {
123  I3IPAddress myAddress(nodeIPAddress, par("clientPort"));
124 
125  // TODO: use BootstrapList instead of GlobalNodeList
126  const NodeHandle handle = GlobalNodeListAccess().get()->getBootstrapNode();
127  gateway.address = I3IPAddress(handle.getIp(), par("serverPort"));
128 
129  int cacheSize = par("cacheSize");
130  for (int i = 0; i < cacheSize; i++) {
131  I3Identifier id;
132 
133  id.createRandomKey();
134 
135  ostringstream os;
136  os << myAddress << " sample" << i;
137  id.setName(os.str());
138 
139  samplingCache[id] = I3CachedServer(); // placeholder
140 
141  insertTrigger(id, false);
142  }
143 
144  refreshTriggersTimer = new cMessage();
145  refreshTriggersTime = par("triggerRefreshTime");
146  scheduleAt(simTime() + truncnormal(refreshTriggersTime, refreshTriggersTime / 10),
148 
149  refreshSamplesTimer = new cMessage();
150  refreshSamplesTime = par("sampleRefreshTime");
151  scheduleAt(simTime() + truncnormal(refreshSamplesTime, refreshSamplesTime / 10),
153 }
154 
156 {
157 
158 }
159 
160 void I3BaseApp::handleMessage(cMessage *msg)
161 {
162  if (msg->isSelfMessage()) {
163  if (msg == bootstrapTimer) {
164  bootstrapI3();
165  delete msg;
166  bootstrapTimer = 0;
167  } else if (msg == initializeTimer) {
168  initializeI3();
169  delete msg;
170  initializeTimer = 0;
171  } else if (msg == refreshTriggersTimer) {
172  refreshTriggers();
173  scheduleAt(simTime() + truncnormal(refreshTriggersTime, refreshTriggersTime / 10),
175  } else if (msg == refreshSamplesTimer) {
176  refreshSamples();
177  scheduleAt(simTime() + truncnormal(refreshSamplesTime, refreshSamplesTime / 10),
179  } else {
180  handleTimerEvent(msg);
181  }
182  } else if (msg->arrivedOn("udpIn")) {
183  handleUDPMessage(msg);
184  } else {
185  delete msg;
186  }
187 }
188 
189 void I3BaseApp::deliver(I3Trigger &matchingTrigger, I3IdentifierStack &stack, cPacket *msg)
190 {
191  delete msg;
192 }
193 
194 void I3BaseApp::handleTimerEvent(cMessage *msg)
195 {
196  delete msg;
197 }
198 
199 void I3BaseApp::handleUDPMessage(cMessage *msg)
200 {
201  I3Message *i3msg;
202 
203  i3msg = dynamic_cast<I3Message*>(msg);
204  if (i3msg) {
205  switch (i3msg->getType()) {
206  case SEND_PACKET:
207  {
208  I3SendPacketMessage *smsg;
209 
210  smsg = check_and_cast<I3SendPacketMessage*>(msg);
211  numReceived++;
212  receivedBytes += smsg->getByteLength();
213 
214  /* deliver to app */
215  cPacket *newMessage = smsg->decapsulate();
216  deliver(smsg->getMatchedTrigger(), smsg->getIdentifierStack(), newMessage);
217 
218  break;
219  }
220  case QUERY_REPLY:
221  {
222  I3QueryReplyMessage *pmsg;
223  pmsg = check_and_cast<I3QueryReplyMessage*>(msg);
224  I3Identifier &id = pmsg->getIdentifier();
225 
226  identifierCache[id].address = pmsg->getSource();
227  identifierCache[id].lastReply = simTime();
228  identifierCache[id].roundTripTime = simTime() - pmsg->getSendingTime();
229 
230  if (samplingCache.count(id) != 0) {
231  samplingCache[id] = identifierCache[id];
232  }
233  break;
234  }
235  default:
236  /* shouldn't get here */
237  break;
238  }
239  }
240  delete msg;
241 }
242 
244 {
246 }
247 
248 void I3BaseApp::sendThroughUDP(cMessage *msg, const I3IPAddress &add)
249 {
250  msg->removeControlInfo();
251  msg->setKind(UDP_C_DATA);
252 
253  UDPControlInfo* udpControlInfo = new UDPControlInfo();
254  udpControlInfo->setSrcAddr(nodeIPAddress);
255  udpControlInfo->setSrcPort(par("clientPort"));
256 
257  udpControlInfo->setDestAddr(add.getIp());
258  udpControlInfo->setDestPort(add.getPort());
259 
260  msg->setControlInfo(udpControlInfo);
261  send(msg, "udpOut");
262 }
263 
265 {
266  I3IPAddress myAddress(nodeIPAddress, par("clientPort"));
267  map<I3Identifier, I3CachedServer>::iterator mit;
268 
269 
270  // pick fastest I3 server as gateway
271  int serverTimeout = par("serverTimeout");
272  gateway.roundTripTime = serverTimeout;
273  I3Identifier gatewayId;
274  for (mit = samplingCache.begin(); mit != samplingCache.end(); mit++) {
275  if (gateway.roundTripTime > mit->second.roundTripTime) {
276  gatewayId = mit->first;
277  gateway = mit->second;
278  }
279  }
280 
281  // check if gateway has timeout'ed
282  if (simTime() - gateway.lastReply >= serverTimeout) {
283  // We have a small problem here: if the fastest server has timeout,
284  // that means the previous gateway had stopped responding some time before and no triggers were refreshed.
285  // Since all servers have timeout'ed by now and we can't trust return times, pick a random server and hope that one is alive.
286  int random = intrand(samplingCache.size()), i;
287 
288  EV << "I3BaseApp::refreshTriggers()]\n"
289  << " Gateway timeout at " << nodeIPAddress
290  << ", time " << simTime()
291  << "; expired gateway is " << gateway << "(" << gatewayId << ") "
292  << " with last reply at " << gateway.lastReply
293  << endl;
294 
295  for (i = 0, mit = samplingCache.begin(); i < random; i++, mit++);
296  gateway = mit->second;
297  EV << "I3BaseApp::refreshTriggers()]\n"
298  << " New gateway for " << nodeIPAddress << " is " << gateway
299  << endl;
300 
301  if (gateway.roundTripTime > 2 * serverTimeout) {
302  EV << "I3BaseApp::refreshTriggers()]\n"
303  << " New gateway's (" << gateway << ") rtt for " << nodeIPAddress
304  << " too high... marking as isolated!"
305  << endl;
306  numIsolations++;
307  const NodeHandle handle = GlobalNodeListAccess().get()->getBootstrapNode();
308  gateway.address = I3IPAddress(handle.getIp(), par("serverPort"));
309  }
310  }
311 
312  /* ping gateway */
313  insertTrigger(gatewayId, false);
314 // cout << "Client " << nodeIPAddress << " pings " << gatewayId << endl;
315 
316  /* reinsert stored triggers */
317  set<I3Trigger>::iterator it;
318  for (it = insertedTriggers.begin(); it != insertedTriggers.end(); it++) {
319  insertTrigger(*it, false);
320  }
321 
322  /* now that we are refreshing stuff, might as well erase old identifier cache entries */
323  int idStoreTime = par("idStoreTime");
324  for (mit = identifierCache.begin(); mit != identifierCache.end(); mit++) {
325  if (mit->second.lastReply - simTime() > idStoreTime) {
326  identifierCache.erase(mit);
327  }
328  }
329 
330 }
331 
333  map<I3Identifier, I3CachedServer>::iterator mit;
334 
335  EV << "I3BaseApp::refreshSamples()]\n"
336  << " Refresh samples!"
337  << endl;
338  /* reinsert sample triggers */
339  for (mit = samplingCache.begin(); mit != samplingCache.end(); mit++) {
340  insertTrigger(mit->first, false);
341  }
342 }
343 
345 {
346  simtime_t time;
347  I3Identifier id;
348  map<I3Identifier, I3CachedServer>::iterator mit;
349  I3IPAddress myAddress(nodeIPAddress, par("clientPort"));
350 
351  time = MAXTIME;
352  for (mit = samplingCache.begin(); mit != samplingCache.end(); mit++) {
353  if (time > mit->second.roundTripTime) {
354  time = mit->second.roundTripTime;
355  id = mit->first;
356  }
357  }
358  samplingCache.erase(id);
359 
360  I3Identifier rid;
361  rid.createRandomKey();
362 
363  ostringstream os;
364  os << myAddress << " sample";
365  rid.setName(os.str());
366 
367  samplingCache[rid] = I3CachedServer(); // placeholder
368  insertTrigger(rid, false);
369 
370  return id;
371 }
372 
373 void I3BaseApp::sendPacket(const I3Identifier &id, cPacket *msg, bool useHint)
374 {
375  I3IdentifierStack stack;
376 
377  stack.push(id);
378  sendPacket(stack, msg, useHint);
379 }
380 
381 void I3BaseApp::sendPacket(const I3IdentifierStack &stack, cPacket *msg, bool useHint)
382 {
383  I3SendPacketMessage *smsg;
384 
385  smsg = new I3SendPacketMessage();
386  smsg->setBitLength(SEND_PACKET_L(smsg));
387  smsg->encapsulate(msg);
388  smsg->setIdentifierStack(stack);
389 
390  smsg->setSendReply(useHint);
391  if (useHint) {
392  I3IPAddress add(nodeIPAddress, par("clientPort"));
393  smsg->setSource(add);
394  }
395 
396  numSent++;
397  sentBytes += smsg->getByteLength();
398 
399  I3SubIdentifier subid = stack.peek(); // first check where the packet should go
400  if (subid.getType() == I3SubIdentifier::IPAddress) { // if it's an IP address
401  smsg->getIdentifierStack().pop(); // pop it
402  sendThroughUDP(smsg, subid.getIPAddress()); // and send directly to host
403  } else { // else if it's an identifier
404  // check if we have the I3 server cached
405  I3IPAddress address = (useHint && identifierCache.count(subid.getIdentifier()) != 0) ?
406  identifierCache[subid.getIdentifier()].address :
408  sendThroughUDP(smsg, address); // send it directly
409  }
410 }
411 
412 void I3BaseApp::insertTrigger(const I3Identifier &identifier, bool store)
413 {
414  I3Trigger trigger;
415  I3IPAddress add(nodeIPAddress, par("clientPort"));;
416 
417  trigger.getIdentifierStack().push(add);
418  trigger.setIdentifier(identifier);
419  insertTrigger(trigger, store);
420 }
421 
422 void I3BaseApp::insertTrigger(const I3Identifier &identifier, const I3IdentifierStack &stack, bool store)
423 {
424  I3Trigger trigger;
425 
426  trigger.setIdentifier(identifier);
427  trigger.getIdentifierStack() = stack;
428  insertTrigger(trigger, store);
429 }
430 
431 void I3BaseApp::insertTrigger(const I3Trigger &t, bool store) {
432 
433  if (store) {
434  if (insertedTriggers.count(t) != 0) return;
435  insertedTriggers.insert(t);
436  }
437 
439  I3IPAddress myAddress(nodeIPAddress, par("clientPort"));
440 
441  msg->setTrigger(t);
442  msg->setSendReply(true);
443  msg->setSource(myAddress);
444  msg->setBitLength(INSERT_TRIGGER_L(msg));
445 
447 }
448 
449 void I3BaseApp::removeTrigger(const I3Identifier &identifier)
450 {
451  I3Trigger dummy;
452  dummy.setIdentifier(identifier);
453 
454  set<I3Trigger>::iterator it = insertedTriggers.lower_bound(dummy);
455  if (it == insertedTriggers.end()) return; /* no matches */
456 
457  for (; it != insertedTriggers.end() && it->getIdentifier() == identifier; it++) {
458  removeTrigger(*it);
459  }
460 }
461 
463 {
465  msg->setTrigger(t);
466  msg->setBitLength(REMOVE_TRIGGER_L(msg));
468 
469  insertedTriggers.erase(t);
470 }
471 
473 {
474  return insertedTriggers;
475 }
476 
477 void I3BaseApp::receiveChangeNotification (int category, const cPolymorphic *details)
478 {
479  Enter_Method_Silent();
480 
481  /* Mobility is happening (the only event we are subscribed to). We have two things to do:
482  * 1) Insert triggers with new IP
483  * 2) Delete triggers with old IP
484  * If it's one staged mobility, we just get told the IP after it's changed, and we need to make sure
485  * step 1 and 2 are done. If it's two staged mobility, we need to make sure we do step 1 first and then
486  * step 2. */
487 
488 // if (!mobilityInStages) { /* if the flag isn't set, mobility isn't done in stages or this is stage 1 */
489 // if (category == NF_HOSTPOSITION_BEFOREUPDATE) {
490 // mobilityInStages = true; /* set the flag so we don't land here in stage 2 again */
491 // }
492 // /* do part 1! */
493 // cMessage *msg = check_and_cast<cMessage*>(details);
494 // IPvXAddress *ipAddr = (IPvXAddress*)msg->getContextPointer();
495 //
496 // ostringstream os;
497 // os << "Mobility first stage - actual IP is " << nodeIPAddress << ", future IP is " << *ipAddr << endl;
498 // getParentModule()->bubble(os.str().c_str());
499 //
500 // std::cout << "In advance from " << nodeIPAddress << " to " << *ipAddr << endl;
501 // I3IPAddress oldAddress(nodeIPAddress, par("clientPort"));
502 // I3IPAddress newAddress(*ipAddr, par("clientPort"));
503 //
504 // delete ipAddr;
505 // delete msg;
506 //
507 // for (set<I3Trigger>::iterator it = insertedTriggers.begin(); it != insertedTriggers.end(); it++) {
508 // I3Trigger trigger(*it); /* create copy */
509 // trigger.getIdentifierStack().replaceAddress(oldAddress, newAddress); /* replace old address with new */
510 // insertTrigger(trigger, false); /* insert trigger in I3, but don't store it in our list yet - that's done in part 2 */
511 // }
512 //
513 // doMobilityEvent(I3_MOBILITY_BEFORE_UPDATE);
514 // }
515 // if (category == NF_HOSTPOSITION_UPDATED) { /* part 2: both for 1-stage and stage 2 of 2-stage mobility */
516 // I3IPAddress oldAddress(nodeIPAddress, par("clientPort"));
517 // nodeIPAddress = IPAddressResolver().addressOf(getParentModule()).get4();
518 // I3IPAddress newAddress(nodeIPAddress, par("clientPort"));
519 //
520 // cout << "After from " << oldAddress << " to " << newAddress << endl;
521 //
522 // ostringstream os;
523 // os << "Mobility second stage - setting IP as " << newAddress << endl;
524 // getParentModule()->bubble(os.str().c_str());
525 //
526 // set<I3Trigger> newSet; /* list of new triggers (that we already inserted in I3 in stage 1) */
527 //
528 // for (set<I3Trigger>::iterator it = insertedTriggers.begin(); it != insertedTriggers.end(); it++) {
529 // I3Trigger trigger(*it); /* create copy */
530 //
531 // trigger.getIdentifierStack().replaceAddress(oldAddress, newAddress); /* replace old address with new */
532 // newSet.insert(trigger); /* insert in new list */
533 //
534 // removeTrigger(*it); /* remove trigger from I3 and out list */
535 //
536 // }
537 // insertedTriggers = newSet; /* replace old list with updated one */
538 //
539 // mobilityInStages = false; /* reset variable */
540 // refreshTriggers(); /* to get new trigger round-trip times, new cache list */
541 // refreshSamples();
542 //
543 // doMobilityEvent(I3_MOBILITY_UPDATED);
544 // }
545 }
546 
548 {
549 }