OverSim
Bamboo.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2012 Institute of Telematics, Karlsruhe Institute of Technology (KIT)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <cassert>
25 
26 #include <IPAddressResolver.h>
27 #include <IPvXAddress.h>
28 #include <IInterfaceTable.h>
29 #include <IPv4InterfaceData.h>
30 #include <RpcMacros.h>
31 #include <InitStages.h>
32 #include <GlobalStatistics.h>
33 #include <LookupListener.h>
34 #include <AbstractLookup.h>
35 
36 #include "Bamboo.h"
37 
38 
40 
42 {
43  // destroy self timer messages
44  cancelAndDelete(localTuningTimer);
45  cancelAndDelete(leafsetMaintenanceTimer);
46  cancelAndDelete(globalTuningTimer);
47 }
48 
49 
51 {
52  if ( stage != MIN_STAGE_OVERLAY )
53  return;
54 
55  // Bamboo provides KBR services
56  kbr = true;
57 
58  rowToAsk = 0;
59 
60  baseInit();
61 
62  localTuningInterval = par("localTuningInterval");
63  leafsetMaintenanceInterval = par("leafsetMaintenanceInterval");
64  globalTuningInterval = par("globalTuningInterval");
65 
66  localTuningTimer = new cMessage("repairTaskTimer");
67  leafsetMaintenanceTimer = new cMessage("leafsetMaintenanceTimer");
68  globalTuningTimer = new cMessage("globalTuningTimer");
69 }
70 
71 
73 {
75 
77  // no existing pastry network -> first node of a new one
79  } else {
80  // join existing pastry network
82  }
83 }
84 
85 
86 void Bamboo::changeState(int toState)
87 {
88  baseChangeState(toState);
89 
90  switch (toState) {
91  case INIT:
92  break;
93 
94  case DISCOVERY:
95  break;
96 
97  case JOIN: {
98  EV << "[Bamboo::changeState()]\n"
99  << " sending join message via " << bootstrapNode
100  << " to " << thisNode.getKey()
101  << endl;
102 
103  RequestLeafSetCall* call = new RequestLeafSetCall("JOIN Call");
104  call->setTimestamp(simTime());
106 
107  call->setBitLength(PASTRYREQUESTLEAFSETCALL_L(call));
108  call->encapsulate(createStateMessage(PASTRY_STATE_LEAFSET));
110  leafsetReqBytesSent += call->getByteLength());
111 
113  }
114 
115  break;
116 
117  case READY:
118  cancelEvent(leafsetMaintenanceTimer);
119  scheduleAt(simTime(), leafsetMaintenanceTimer);
120 
121  // schedule routing table maintenance task
122  cancelEvent(localTuningTimer);
123  scheduleAt(simTime() + localTuningInterval, localTuningTimer);
124 
125  cancelEvent(globalTuningTimer);
126  scheduleAt(simTime() + globalTuningInterval, globalTuningTimer);
127 
128  break;
129  }
130 }
131 
132 
133 void Bamboo::handleTimerEvent(cMessage* msg)
134 {
135  if (msg == localTuningTimer) {
136  EV << "[Bamboo::handleTimerEvent() @ " << thisNode.getIp()
137  << " (" << thisNode.getKey().toString(16) << ")]\n"
138  << " starting local tuning "
139  << "(aka neighbor's neighbors / routing table maintenance)"
140  << endl;
141  doLocalTuning();
142  scheduleAt(simTime() + localTuningInterval, localTuningTimer);
143  } else if (msg == leafsetMaintenanceTimer) {
144  EV << "[Bamboo::handleTimerEvent() @ " << thisNode.getIp()
145  << " (" << thisNode.getKey().toString(16) << ")]\n"
146  << " starting leafset maintenance"
147  << endl;
149  scheduleAt(simTime() + leafsetMaintenanceInterval,
151  } else if (msg == globalTuningTimer) {
152  EV << "[Bamboo::handleTimerEvent() @ " << thisNode.getIp()
153  << " (" << thisNode.getKey().toString(16) << ")]\n"
154  << " starting global tuning"
155  << endl;
156  doGlobalTuning();
157  scheduleAt(simTime() + globalTuningInterval, globalTuningTimer);
158  }
159 }
160 
161 
163  cPolymorphic* context, int rpcId,
164  simtime_t rtt)
165 {
166  BasePastry::handleRpcResponse(msg, context, rpcId, rtt);
167 
168  RPC_SWITCH_START( msg )
169  RPC_ON_RESPONSE( RequestLeafSet ) {
170  EV << "[Bamboo::handleRpcResponse() @ " << thisNode.getIp()
171  << " (" << thisNode.getKey().toString(16) << ")]\n"
172  << " Received a RequestLeafSet RPC Response (PUSH): id="
173  << rpcId << "\n"
174  << " msg=" << *_RequestLeafSetResponse << " rtt=" << rtt
175  << endl;
176  handleRequestLeafSetResponse(_RequestLeafSetResponse);
177  break;
178  }
179  RPC_SWITCH_END( )
180 }
181 
182 
184 {
185  EV << "[Bamboo::handleRequestLeafSetResponse() @ " << thisNode.getIp()
186  << " (" << thisNode.getKey().toString(16) << ")]"
187  << endl;
188 
189  if (state == JOIN) {
190  PastryStateMessage* stateMsg =
191  check_and_cast<PastryStateMessage*>(response->decapsulate());
192 
193  stateMsg->setLeafSetArraySize(stateMsg->getLeafSetArraySize() + 1);
194  stateMsg->setLeafSet(stateMsg->getLeafSetArraySize() - 1,
195  stateMsg->getSender());
196 
197  handleStateMessage(stateMsg);
198  }
199 }
200 
201 
203 {
204  const TransportAddress& ask = leafSet->getRandomNode();
205  if (!ask.isUnspecified()) {
206  EV << "[Bamboo::doLeafsetMaintenance()]\n"
207  << " leafset maintenance: pulling leafset from "
208  << ask << endl;
209 
210  RequestLeafSetCall* call = new RequestLeafSetCall("LeafSet PULL");
211  call->setTimestamp(simTime());
213 
214  call->setBitLength(PASTRYREQUESTLEAFSETCALL_L(call));
215  call->encapsulate(createStateMessage(PASTRY_STATE_LEAFSET));
217  leafsetReqBytesSent += call->getByteLength());
218 
219  sendUdpRpcCall(ask, call);
220  }
221 }
222 
223 
225 {
226  int digit = 0;
227  int lastRow = routingTable->getLastRow();
228 
229  int* choices = new int[lastRow + 1];
230  int sum = 0;
231 
232  for (int i = 0; i < lastRow; ++i) {
233  sum += (choices[i] = lastRow - i);
234  }
235 
236  int rval = intuniform(0, sum);
237 
238  while (true) {
239  rval -= choices [digit];
240  if (rval <= 0)
241  break;
242  ++digit;
243  }
244  delete[] choices;
245 
246  return digit;
247 }
248 
249 
251 {
253 
255 
256  if ((!ask4row.isUnspecified()) && (ask4row != thisNode)) {
257  EV << "[Bamboo::doLocalTuning() @ " << thisNode.getIp()
258  << " (" << thisNode.getKey().toString(16) << ")]\n"
259  << " Sending Message to Node in Row" << rowToAsk
260  << endl;
261 
262  RequestRoutingRowCall* call =
263  new RequestRoutingRowCall("REQUEST ROUTINGROW Call (Local Tuning)");
265  call->setRow(rowToAsk + 1);
266  call->setBitLength(PASTRYREQUESTROUTINGROWCALL_L(call));
268  routingTableRowReqBytesSent += call->getByteLength());
269 
270  sendUdpRpcCall(ask4row, call);
271  }
272 }
273 
274 
276 {
277  int digit = getNextRowToMaintain();
278 
279  // would be a better alternative
280  //OverlayKey OverlayKey::randomSuffix(uint pos) const;
281 
282  uint32_t maxDigitIndex = OverlayKey::getLength() - bitsPerDigit;
283  uint32_t maxKeyIndex = OverlayKey::getLength() - 1;
284  OverlayKey newKey = OverlayKey::random();
285  while (newKey.getBitRange(maxDigitIndex - digit * bitsPerDigit, bitsPerDigit) ==
286  thisNode.getKey().getBitRange(maxDigitIndex - digit * bitsPerDigit, bitsPerDigit)) {
287  newKey = OverlayKey::random();
288  }
289 
290  assert(digit * bitsPerDigit < OverlayKey::getLength());
291  for (uint16_t i = 0; i < digit * bitsPerDigit; ++i) {
292  newKey[maxKeyIndex - i] = thisNode.getKey().getBit(maxKeyIndex - i);
293  }
294 
295  createLookup()->lookup(newKey, 1, 0, 0, new BambooLookupListener(this));
296 }
297 
298 
300 {
301  if (state != READY) return false;
302 
303  if (failed.isUnspecified()) {
304  throw cRuntimeError("Bamboo::handleFailedNode(): failed is unspecified!");
305  }
306 
307  const TransportAddress& lsAsk = leafSet->failedNode(failed);
308  routingTable->failedNode(failed);
309  neighborhoodSet->failedNode(failed);
310 
311  if (lsAsk.isUnspecified() && (! leafSet->isValid())) {
312  EV << "[Bamboo::handleFailedNode()]\n"
313  << " lost connection to the network, trying to re-join."
314  << endl;
315  join();
316  return false;
317  }
318 
319  return true;
320 }
321 
322 
324 {
325  // no cached STATE message?
326  if (stateCache.msg) {
327  simtime_t now = simTime();
328  if (stateCache.prox) {
329  // some entries not yet determined?
330  if ((find(stateCache.prox->pr_rt.begin(), stateCache.prox->pr_rt.end(),
332  (find(stateCache.prox->pr_ls.begin(), stateCache.prox->pr_ls.end(),
334  (find(stateCache.prox->pr_ns.begin(), stateCache.prox->pr_ns.end(),
336  return;
337  }
338  }
339  // merge info in own state tables
340  // except leafset (was already handled in handleStateMessage)
342  lastStateChange = now;
343  }
344 
346  lastStateChange = now;
347  EV << "[Bamboo::checkProxCache()]\n"
348  << " Merged nodes into routing table."
349  << endl;
350  }
351  }
352 
353  delete stateCache.msg;
354  stateCache.msg = NULL;
355  delete stateCache.prox;
356  stateCache.prox = NULL;
357 
358  if (state == JOIN) {
360  }
361 
362  updateTooltip();
363 
364  // process next queued message:
365  if (! stateCacheQueue.empty()) {
366  stateCache = stateCacheQueue.front();
367  stateCacheQueue.pop();
369  pingNodes();
370  } else {
371  checkProxCache();
372  }
373  }
374 }
375 
376 
378 {
379  if (debugOutput) {
380  EV << "[Bamboo::handleStateMessage() @ " << thisNode.getIp()
381  << " (" << thisNode.getKey().toString(16) << ")]\n"
382  << " new STATE message to process "
383  << static_cast<void*>(msg) << " in state "
384  << ((state == READY)?"READY":((state == JOIN)?"JOIN":"INIT"))
385  << endl;
386  }
387 
388  if (state == INIT) {
389  EV << "[Bamboo::handleStateMessage() @ " << thisNode.getIp()
390  << " (" << thisNode.getKey().toString(16) << ")]\n"
391  << " can't handle state messages until at least reaching JOIN state."
392  << endl;
393  delete msg;
394  return;
395  }
396 
397  PastryStateMsgHandle handle(msg);
398 
399  if (state == JOIN) {
400  determineAliveTable(msg);
401  leafSet->mergeState(msg, &aliveTable);
402  // merged state into leafset right now
403  lastStateChange = simTime();
404  newLeafs();
405  updateTooltip();
406 
407  // no state message is processed right now, start immediately:
408  stateCache = handle;
410  pingNodes();
411  } else {
412  checkProxCache();
413  }
414 
415  return;
416  }
417 
418  // determine aliveTable to prevent leafSet from merging nodes that are
419  // known to be dead:
420  determineAliveTable(msg);
421  if (leafSet->mergeState(msg, &aliveTable)) {
422  // merged state into leafset right now
423  lastStateChange = simTime();
424  newLeafs();
425  updateTooltip();
426  }
427  // in READY state, only ping nodes to get proximity metric:
428  if (!stateCache.msg) {
429  // no state message is processed right now, start immediately:
430  stateCache = handle;
432  pingNodes();
433  } else {
434  checkProxCache();
435  }
436  } else {
437  // enqueue message for later processing:
438  stateCacheQueue.push(handle);
439  if (stateCacheQueue.size() > 15) {
440  delete stateCacheQueue.front().msg;
441  stateCacheQueue.pop();
442  EV << "[Bamboo::handleStateMessage() @ " << thisNode.getIp()
443  << " (" << thisNode.getKey().toString(16) << ")]\n"
444  << " stateCacheQueue full -> pop()" << endl;
445  }
447  prePing(msg);
448  } else {
449  checkProxCache();
450  }
451  }
452 }
453 
454 
456 {
457  EV << "[Bamboo::lookupFinished() @ " << thisNode.getIp()
458  << " (" << thisNode.getKey().toString(16) << ")]\n"
459  << endl;
460 
461  if (lookup->isValid()) {
462  EV << " Lookup successful" << endl;
463  const NodeVector& result = lookup->getResult();
464  if (result[0] != thisNode) {
466  // Global Tuning PING
467  Prox prox = neighborCache->getProx(result[0],
470  this, NULL);
471  if (prox != Prox::PROX_UNKNOWN && prox != Prox::PROX_WAITING) {
472  routingTable->mergeNode(result[0], prox.proximity);
473  }
474  } else {
475  routingTable->mergeNode(result[0], -1.0);
476  checkProxCache();
477  }
478  }
479  } else {
480  EV << " Lookup failed" << endl;
481  }
482 }