OverSim
P2pns.cc
Go to the documentation of this file.
1 //
2 // Copyright (C) 2007 Institut fuer Telematik, Universitaet Karlsruhe (TH)
3 //
4 // This program is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU General Public License
6 // as published by the Free Software Foundation; either version 2
7 // of the License, or (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 //
18 
24 #include <XmlRpcInterface.h>
25 #include <IPAddressResolver.h>
26 #include <P2pnsMessage_m.h>
27 
28 #include "P2pns.h"
29 
31 
32 using namespace std;
33 
35 {
36  p2pnsCache = NULL;
37 }
38 
40 {
41 }
42 
43 void P2pns::initializeApp(int stage)
44 {
45  if (stage != MIN_STAGE_APP)
46  return;
47 
48  twoStageResolution = par("twoStageResolution");
49  keepaliveInterval = par("keepaliveInterval");
50  idCacheLifetime = par("idCacheLifetime");
51 
52  p2pnsCache = check_and_cast<P2pnsCache*> (getParentModule()->
53  getSubmodule("p2pnsCache"));
54 
55  xmlRpcInterface = dynamic_cast<XmlRpcInterface*>(overlay->
56  getCompModule(TIER3_COMP));
57 }
58 
60 {
61  if ((msg->getReady() == false) || (msg->getComp() != OVERLAY_COMP)) {
62  delete msg;
63  return;
64  }
65 
66  thisId = (overlay->getThisNode().getKey() >> (OverlayKey::getLength() - 100))
67  << (OverlayKey::getLength() - 100);
68  delete msg;
69 }
70 
71 void P2pns::tunnel(const OverlayKey& destKey, const BinaryValue& payload)
72 {
73  Enter_Method_Silent();
74 
75  P2pnsIdCacheEntry* entry = p2pnsCache->getIdCacheEntry(destKey);
76 
77  if (entry == NULL) {
78  // lookup destKey and create new entry
79 
80  EV << "[P2pns::tunnel()]\n"
81  << " Establishing new cache entry for key: " << destKey
82  << endl;
83 
84  LookupCall* lookupCall = new LookupCall();
85  lookupCall->setKey(destKey);
86  lookupCall->setNumSiblings(1);
87  sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
88  TUNNEL_LOOKUP);
89  p2pnsCache->addIdCacheEntry(destKey, &payload);
90  } else if (entry->state == CONNECTION_PENDING) {
91  // lookup not finished yet => append packet to queue
92  EV << "[P2pns::tunnel()]\n"
93  << " Queuing packet since lookup is still pending for key: "
94  << destKey << endl;
95 
96  entry->lastUsage = simTime();
97  entry->payloadQueue.push_back(payload);
98  } else {
99  entry->lastUsage = simTime();
100  sendTunnelMessage(entry->addr, payload);
101  }
102 }
103 
105 {
106  P2pnsIdCacheEntry* entry =
107  p2pnsCache->getIdCacheEntry(lookupResponse->getKey());
108 
109  if ((entry == NULL) || (entry->state == CONNECTION_ACTIVE)) {
110  // no matching entry in idCache or connection is already active
111  // => lookup result not needed anymore
112  return;
113  }
114 
115  if (lookupResponse->getIsValid()) {
116  // verify if nodeId of lookup's closest node matches requested id
117  if (lookupResponse->getKey().sharedPrefixLength(lookupResponse->
118  getSiblings(0).getKey()) < (OverlayKey::getLength() - 100)) {
119  EV << "[P2pns::handleTunnelLookupResponse()]\n"
120  << " Lookup response " << lookupResponse->getSiblings(0)
121  << " doesn't match requested id " << lookupResponse->getKey()
122  << endl;
123  // lookup failed => drop cache entry and all queued packets
124  p2pnsCache->removeIdCacheEntry(lookupResponse->getKey());
125 
126  return;
127  }
128 
129  // add transport address to cache entry
130  entry->addr = lookupResponse->getSiblings(0);
131  entry->state = CONNECTION_ACTIVE;
132 
133  // start periodic ping timer
134  P2pnsKeepaliveTimer* msg =
135  new P2pnsKeepaliveTimer("P2pnsKeepaliveTimer");
136  msg->setKey(lookupResponse->getKey());
137  scheduleAt(simTime() + keepaliveInterval, msg);
138 
139  // send all pending tunnel messages
140  while (!entry->payloadQueue.empty()) {
141  sendTunnelMessage(entry->addr, entry->payloadQueue.front());
142  entry->payloadQueue.pop_front();
143  }
144  } else {
145  // lookup failed => drop cache entry and all queued packets
146  p2pnsCache->removeIdCacheEntry(lookupResponse->getKey());
147  }
148 }
149 
151  const BinaryValue& payload)
152 {
153  EV << "[P2pns::sendTunnelMessage()]\n"
154  << " Sending TUNNEL message to " << addr << endl;
155 
156  P2pnsTunnelMessage* msg = new P2pnsTunnelMessage("P2pnsTunnelMsg");
157  msg->setPayload(payload);
158  msg->setSrcId(thisId);
159  msg->setBitLength(P2PNSTUNNELMESSAGE_L(msg));
160 
161  callRoute(OverlayKey::UNSPECIFIED_KEY, msg, addr);
162 }
163 
164 void P2pns::registerId(const std::string& addr)
165 {
166  Enter_Method_Silent();
167 
168  std::string name = par("registerName").stdstringValue();
169  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
170 
171  EV << "[P2pns::p2pnsRegisterRpc()]\n"
172  << " registerId(): name: " << name << " addr: " << addr
173  << endl;
174 
175  dhtPutMsg->setKey(OverlayKey::sha1(BinaryValue(name)));
176 
177  dhtPutMsg->setValue(BinaryValue(addr));
178  dhtPutMsg->setKind(28);
179  dhtPutMsg->setId(1);
180  dhtPutMsg->setTtl(60*60*24*7);
181  dhtPutMsg->setIsModifiable(true);
182 
183  sendInternalRpcCall(TIER1_COMP, dhtPutMsg);
184 }
185 
186 void P2pns::deliver(OverlayKey& key, cMessage* msg)
187 {
188  P2pnsTunnelMessage* tunnelMsg = check_and_cast<P2pnsTunnelMessage*>(msg);
189 
190  if (xmlRpcInterface) {
191  xmlRpcInterface->deliverTunneledMessage(tunnelMsg->getPayload());
192  }
193 
194  updateIdCacheWithNewTransport(msg);
195 
196  delete msg;
197 }
198 
199 void P2pns::pingRpcResponse(PingResponse* response, cPolymorphic* context,
200  int rpcId, simtime_t rtt)
201 {
202  delete context;
203 }
204 
206  cPolymorphic* context, int rpcId)
207 {
208  OverlayKeyObject* key = dynamic_cast<OverlayKeyObject*>(context);
209  P2pnsIdCacheEntry* entry = NULL;
210 
211  // lookup entry in id cache
212  if ((key != NULL) &&
213  (entry = p2pnsCache->getIdCacheEntry(*key))) {
214 
215  // remove entry if TransportAddress hasn't been updated in the meantime
216  if (!entry->addr.isUnspecified() && (entry->addr != dest)) {
217  EV << "[P2pns::pingTimeout()]\n"
218  << " Removing id " << key << " from idCache (ping timeout)"
219  << endl;
220  p2pnsCache->removeIdCacheEntry(*key);
221  }
222  }
223 
224  delete context;
225 }
226 
227 void P2pns::handleTimerEvent(cMessage* msg)
228 {
229  P2pnsKeepaliveTimer* timer = dynamic_cast<P2pnsKeepaliveTimer*>(msg);
230 
231  if (timer) {
232  P2pnsIdCacheEntry* entry = p2pnsCache->getIdCacheEntry(timer->getKey());
233 
234  if (entry == NULL) {
235  // no valid cache entry found
236  delete msg;
237  return;
238  }
239 
240  if ((entry->lastUsage + idCacheLifetime) < simTime()) {
241  // remove idle connections
242  EV << "[P2pns::handleTimerEvent()]\n"
243  << " Removing id " << timer->getKey()
244  << " from idCache (connection idle)"
245  << endl;
246  p2pnsCache->removeIdCacheEntry(timer->getKey());
247  delete msg;
248  return;
249  }
250 
251  if (!entry->addr.isUnspecified()) {
252  // ping 3 times with default timeout
253  pingNode(entry->addr, -1, 2, new OverlayKeyObject(timer->getKey()));
254  }
255 
256  // reschedule periodic keepalive timer
257  scheduleAt(simTime() + keepaliveInterval, msg);
258 
259  // exhaustive-iterative lookup to refresh siblings, if our ip has changed
260  LookupCall* lookupCall = new LookupCall();
261  lookupCall->setKey(overlay->getThisNode().getKey());
262  lookupCall->setNumSiblings(1);
264  sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
265  TUNNEL_LOOKUP);
266  }
267 }
268 
270 {
271  // update idCache with new TransportAddress of the ping originator
272  OverlayCtrlInfo* ctrlInfo =
273  dynamic_cast<OverlayCtrlInfo*>(msg->getControlInfo());
274 
275  OverlayKey srcId;
276 
277  if (!ctrlInfo) {
278  // can't update cache without knowing the originator id
279  EV << "[P2pns::updateCacheWithNewTransport()]\n"
280  << " Can't update cache without knowing the originator id"
281  << endl;
282  return;
283  }
284 
285  if (ctrlInfo->getSrcRoute().isUnspecified()) {
286  P2pnsTunnelMessage* tunnelMsg = dynamic_cast<P2pnsTunnelMessage*>(msg);
287  if (tunnelMsg) {
288  srcId = tunnelMsg->getSrcId();
289  } else {
290  // can't update cache without knowing the originator id
291  EV << "[P2pns::updateCacheWithNewTransport()]\n"
292  << " Can't update cache without knowing the originator id"
293  << endl;
294  return;
295  }
296  } else {
297  srcId = (ctrlInfo->getSrcRoute().getKey() >> (OverlayKey::getLength() - 100))
298  << (OverlayKey::getLength() - 100);
299  }
300 
301  P2pnsIdCacheEntry* entry = p2pnsCache->getIdCacheEntry(srcId);
302 
303  if (entry == NULL) {
304  EV << "[P2pns::updateCacheWithNewTransport()]\n"
305  << " Adding new cache entry for id " << srcId
306  << " with addr " << (const TransportAddress&)ctrlInfo->getSrcRoute()
307  << endl;
308  entry = p2pnsCache->addIdCacheEntry(srcId);
309  entry->addr = ctrlInfo->getSrcRoute();
310 
311  // start periodic ping timer
312  P2pnsKeepaliveTimer* msg =
313  new P2pnsKeepaliveTimer("P2pnsKeepaliveTimer");
314  msg->setKey(srcId);
315  scheduleAt(simTime() + keepaliveInterval, msg);
316  }
317 
318  // update transport address in idCache (node may have a new
319  // TransportAddress due to mobility)
320  if (entry->addr.isUnspecified() ||
321  (entry->addr != ctrlInfo->getSrcRoute())) {
322  EV << "[P2pns::handleRpcCall()]\n"
323  << " Ping with new transport address received: "
324  << " Changing from " << entry->addr << " to "
325  << static_cast<TransportAddress>(ctrlInfo->getSrcRoute())
326  << " for id " << srcId << endl;
327  entry->addr = ctrlInfo->getSrcRoute();
328  }
329 
330  entry->state = CONNECTION_ACTIVE;
331 }
332 
334 {
335  // delegate messages
336  RPC_SWITCH_START(msg)
337  RPC_ON_CALL(Ping) {
338  updateIdCacheWithNewTransport(msg);
339  return false;
340  }
341  RPC_DELEGATE( P2pnsRegister, p2pnsRegisterRpc );
342  RPC_DELEGATE( P2pnsResolve, p2pnsResolveRpc );
344 
345  return RPC_HANDLED;
346 }
347 
348 
350  cPolymorphic* context, int rpcId, simtime_t rtt)
351 {
352  RPC_SWITCH_START(msg)
353  RPC_ON_RESPONSE( DHTputCAPI ) {
354  EV << "[P2pns::handleRpcResponse()]\n"
355  << " DHTputCAPI RPC Response received: id=" << rpcId
356  << " msg=" << *_DHTputCAPIResponse << " rtt=" << rtt
357  << endl;
358  if (dynamic_cast<P2pnsRegisterCall*>(context)) {
359  handleDHTputCAPIResponse(_DHTputCAPIResponse,
360  check_and_cast<P2pnsRegisterCall*>(context));
361  }
362  break;
363  }
364  RPC_ON_RESPONSE( DHTgetCAPI ) {
365  EV << "[P2pns::handleRpcResponse()]\n"
366  << " DHTgetCAPI RPC Response received: id=" << rpcId
367  << " msg=" << *_DHTgetCAPIResponse << " rtt=" << rtt
368  << endl;
369  handleDHTgetCAPIResponse(_DHTgetCAPIResponse,
370  check_and_cast<P2pnsResolveCall*>(context));
371  break;
372  }
374  EV << "[P2pns::handleRpcResponse()]\n"
375  << " Lookup RPC Response received: id=" << rpcId
376  << " msg=" << *_LookupResponse << " rtt=" << rtt
377  << endl;
378  handleLookupResponse(_LookupResponse, context, rpcId);
379  break;
380  }
382 }
383 
385 {
386  p2pnsCache->addData(registerCall->getP2pName(),
387  registerCall->getAddress());
388 
389  DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
390 
391  EV << "[P2pns::p2pnsRegisterRpc()]\n"
392  << " RegisterRpc: name: " << registerCall->getP2pName()
393  << " addr: " << registerCall->getAddress()
394  << endl;
395 
396  dhtPutMsg->setKey(OverlayKey::sha1(registerCall->getP2pName()));
397  if (twoStageResolution) {
398  dhtPutMsg->setValue(overlay->getThisNode().getKey().toString());
399  } else {
400  dhtPutMsg->setValue(registerCall->getAddress());
401  }
402 
403  dhtPutMsg->setKind(registerCall->getKind());
404  dhtPutMsg->setId(registerCall->getId());
405  dhtPutMsg->setTtl(registerCall->getTtl());
406  dhtPutMsg->setIsModifiable(true);
407 
408  sendInternalRpcCall(TIER1_COMP, dhtPutMsg, registerCall);
409 }
410 
412 {
413  DHTgetCAPICall* dhtGetMsg = new DHTgetCAPICall();
414 
415  EV << "[P2pns::p2pnsResolveRpc()]\n"
416  << " ResolveRpc: name: " << resolveCall->getP2pName()
417  << endl;
418 
419  dhtGetMsg->setKey(OverlayKey::sha1(resolveCall->getP2pName()));
420  dhtGetMsg->setKind(resolveCall->getKind());
421  dhtGetMsg->setId(resolveCall->getId());
422 
423  sendInternalRpcCall(TIER1_COMP, dhtGetMsg, resolveCall);
424 }
425 
427  P2pnsRegisterCall* registerCall)
428 {
429  P2pnsRegisterResponse* registerResponse = new P2pnsRegisterResponse();
430  registerResponse->setP2pName(registerCall->getP2pName());
431  registerResponse->setAddress(registerCall->getAddress());
432  registerResponse->setIsSuccess(putResponse->getIsSuccess());
433  sendRpcResponse(registerCall, registerResponse);
434 }
435 
437  P2pnsResolveCall* resolveCall)
438 {
439  if ((!getResponse->getIsSuccess())
440  || (getResponse->getResultArraySize() == 0)) { // || (valueStream.str().size() == 0)) {
441  P2pnsResolveResponse* resolveResponse = new P2pnsResolveResponse();
442  resolveResponse->setAddressArraySize(1);
443  resolveResponse->setKindArraySize(1);
444  resolveResponse->setIdArraySize(1);
445  resolveResponse->setP2pName(resolveCall->getP2pName());
446  resolveResponse->setAddress(0, BinaryValue(""));
447  resolveResponse->setKind(0, 0);
448  resolveResponse->setId(0, 0);
449  resolveResponse->setIsSuccess(false);
450  sendRpcResponse(resolveCall, resolveResponse);
451  return;
452  }
453 
454 // TODO: fix cache to support kind and id of data records
455 // p2pnsCache->addData(resolveCall->getP2pName(),
456 // getResponse->getValue());
457 
458  if (twoStageResolution) {
459  if (getResponse->getResultArraySize() != 1) {
460  throw cRuntimeError("P2pns::handleDHTgetCAPIResponse: "
461  "Two-stage name resolution currently only "
462  "works with unique keys!");
463  }
464  std::stringstream valueStream;
465  valueStream << getResponse->getResult(0).getValue();
466  OverlayKey key(valueStream.str(), 16);
467 
468  LookupCall* lookupCall = new LookupCall();
469 
470  lookupCall->setKey(key);
471  lookupCall->setNumSiblings(1);
472 
473  sendInternalRpcCall(OVERLAY_COMP, lookupCall, resolveCall, -1, 0,
474  RESOLVE_LOOKUP);
475 
476  return;
477  }
478 
479  EV << "[P2pns::handleDHTgetCAPIResponse()]\n"
480  << " ResolveRpcResponse: name: " << resolveCall->getP2pName();
481 
482  P2pnsResolveResponse* resolveResponse = new P2pnsResolveResponse();
483  resolveResponse->setP2pName(resolveCall->getP2pName());
484  resolveResponse->setIsSuccess(getResponse->getIsSuccess());
485  resolveResponse->setAddressArraySize(getResponse->getResultArraySize());
486  resolveResponse->setKindArraySize(getResponse->getResultArraySize());
487  resolveResponse->setIdArraySize(getResponse->getResultArraySize());
488 
489  for (uint i = 0; i < getResponse->getResultArraySize(); i++) {
490  EV << " addr: " << getResponse->getResult(i).getValue();
491  resolveResponse->setAddress(i, getResponse->getResult(i).getValue());
492  resolveResponse->setKind(i, getResponse->getResult(i).getKind());
493  resolveResponse->setId(i, getResponse->getResult(i).getId());
494  }
495 
496  EV << endl;
497  sendRpcResponse(resolveCall, resolveResponse);
498 }
499 
501  cObject* context,
502  int rpcId)
503 {
504  switch (rpcId) {
505  case RESOLVE_LOOKUP: {
506  P2pnsResolveCall* resolveCall =
507  check_and_cast<P2pnsResolveCall*>(context);
508 
509  stringstream sstream;
510  sstream << lookupResponse->getSiblings(0);
511 
512  P2pnsResolveResponse* resolveResponse = new P2pnsResolveResponse();
513  resolveResponse->setP2pName(resolveCall->getP2pName());
514  resolveResponse->setAddressArraySize(1);
515  resolveResponse->setKindArraySize(1);
516  resolveResponse->setIdArraySize(1);
517 
518  resolveResponse->setAddress(0, sstream.str());
519  resolveResponse->setKind(0, 0);
520  resolveResponse->setId(0, 0);
521  resolveResponse->setIsSuccess(lookupResponse->getIsValid());
522  sendRpcResponse(resolveCall, resolveResponse);
523  break;
524  }
525  case TUNNEL_LOOKUP:
526  handleTunnelLookupResponse(lookupResponse);
527  break;
528  case REFRESH_LOOKUP:
529  break;
530  default:
531  throw cRuntimeError("P2pns::handleLookupResponse(): invalid rpcId!");
532  }
533 }
534 
535 
537 {
538 }
539