Implementation of "P2PNS: A distributed name service for P2PSIP". More...
#include <P2pns.h>
Classes | |
| class | OverlayKeyObject |
Public Member Functions | |
| P2pns () | |
| virtual | ~P2pns () |
| void | tunnel (const OverlayKey &destKey, const BinaryValue &payload) |
| void | registerId (const std::string &addr) |
| void | handleReadyMessage (CompReadyMessage *msg) |
| method to handle ready messages from the overlay | |
Private Types | |
| enum | LookupRpcId { RESOLVE_LOOKUP = 0, TUNNEL_LOOKUP = 1, REFRESH_LOOKUP = 2 } |
Private Member Functions | |
| void | initializeApp (int stage) |
| initializes derived class-attributes | |
| void | finishApp () |
| collects statistical data of derived app | |
| void | handleTimerEvent (cMessage *msg) |
| void | deliver (OverlayKey &key, cMessage *msg) |
| Common API function: handles delivered messages from overlay. | |
| void | sendTunnelMessage (const TransportAddress &addr, const BinaryValue &payload) |
| void | updateIdCacheWithNewTransport (cMessage *msg) |
| void | handleTunnelLookupResponse (LookupResponse *lookupResponse) |
| bool | handleRpcCall (BaseCallMessage *msg) |
| void | handleRpcResponse (BaseResponseMessage *msg, cPolymorphic *context, int rpcId, simtime_t rtt) |
| void | pingRpcResponse (PingResponse *response, cPolymorphic *context, int rpcId, simtime_t rtt) |
| void | pingTimeout (PingCall *call, const TransportAddress &dest, cPolymorphic *context, int rpcId) |
| void | p2pnsRegisterRpc (P2pnsRegisterCall *registerCall) |
| void | p2pnsResolveRpc (P2pnsResolveCall *registerCall) |
| void | handleDHTputCAPIResponse (DHTputCAPIResponse *putResponse, P2pnsRegisterCall *registerCall) |
| void | handleDHTgetCAPIResponse (DHTgetCAPIResponse *gettResponse, P2pnsResolveCall *resolveCall) |
| void | handleLookupResponse (LookupResponse *lookupResponse, cObject *context, int rpcId) |
Private Attributes | |
| P2pnsCache * | p2pnsCache |
| pointer to the name cache module | |
| XmlRpcInterface * | xmlRpcInterface |
| pointer to the XmlRpcInterface module | |
| bool | twoStageResolution |
| Use the two stage name resolution (KBR/DHt). | |
| simtime_t | keepaliveInterval |
| interval between two keeaplive pings for active connections | |
| simtime_t | idCacheLifetime |
| idle connections in the idCache get deleted after this time | |
| OverlayKey | thisId |
| the 100 most significant bit of this node's nodeId | |
Implementation of "P2PNS: A distributed name service for P2PSIP".
Implementation of "P2PNS: A distributed name service for P2PSIP"
Definition at line 45 of file P2pns.h.
enum P2pns::LookupRpcId [private] |
Definition at line 57 of file P2pns.h.
{
RESOLVE_LOOKUP = 0,
TUNNEL_LOOKUP = 1,
REFRESH_LOOKUP = 2
};
| P2pns::P2pns | ( | ) |
Definition at line 34 of file P2pns.cc.
{
p2pnsCache = NULL;
}
| void P2pns::deliver | ( | OverlayKey & | key, | |
| cMessage * | msg | |||
| ) | [private, virtual] |
Common API function: handles delivered messages from overlay.
method to handle decapsulated KBRdeliver messages from overlay module, should be overwritten in derived application
| key | destination key | |
| msg | delivered message |
Reimplemented from BaseApp.
Definition at line 186 of file P2pns.cc.
{
P2pnsTunnelMessage* tunnelMsg = check_and_cast<P2pnsTunnelMessage*>(msg);
if (xmlRpcInterface) {
xmlRpcInterface->deliverTunneledMessage(tunnelMsg->getPayload());
}
updateIdCacheWithNewTransport(msg);
delete msg;
}
| void P2pns::finishApp | ( | ) | [private, virtual] |
| void P2pns::handleDHTgetCAPIResponse | ( | DHTgetCAPIResponse * | gettResponse, | |
| P2pnsResolveCall * | resolveCall | |||
| ) | [private] |
Definition at line 436 of file P2pns.cc.
Referenced by handleRpcResponse().
{
if ((!getResponse->getIsSuccess())
|| (getResponse->getResultArraySize() == 0)) { // || (valueStream.str().size() == 0)) {
P2pnsResolveResponse* resolveResponse = new P2pnsResolveResponse();
resolveResponse->setAddressArraySize(1);
resolveResponse->setKindArraySize(1);
resolveResponse->setIdArraySize(1);
resolveResponse->setP2pName(resolveCall->getP2pName());
resolveResponse->setAddress(0, BinaryValue(""));
resolveResponse->setKind(0, 0);
resolveResponse->setId(0, 0);
resolveResponse->setIsSuccess(false);
sendRpcResponse(resolveCall, resolveResponse);
return;
}
// TODO: fix cache to support kind and id of data records
// p2pnsCache->addData(resolveCall->getP2pName(),
// getResponse->getValue());
if (twoStageResolution) {
if (getResponse->getResultArraySize() != 1) {
throw cRuntimeError("P2pns::handleDHTgetCAPIResponse: "
"Two-stage name resolution currently only "
"works with unique keys!");
}
std::stringstream valueStream;
valueStream << getResponse->getResult(0).getValue();
OverlayKey key(valueStream.str(), 16);
LookupCall* lookupCall = new LookupCall();
lookupCall->setKey(key);
lookupCall->setNumSiblings(1);
sendInternalRpcCall(OVERLAY_COMP, lookupCall, resolveCall, -1, 0,
RESOLVE_LOOKUP);
return;
}
EV << "[P2pns::handleDHTgetCAPIResponse()]\n"
<< " ResolveRpcResponse: name: " << resolveCall->getP2pName();
P2pnsResolveResponse* resolveResponse = new P2pnsResolveResponse();
resolveResponse->setP2pName(resolveCall->getP2pName());
resolveResponse->setIsSuccess(getResponse->getIsSuccess());
resolveResponse->setAddressArraySize(getResponse->getResultArraySize());
resolveResponse->setKindArraySize(getResponse->getResultArraySize());
resolveResponse->setIdArraySize(getResponse->getResultArraySize());
for (uint i = 0; i < getResponse->getResultArraySize(); i++) {
EV << " addr: " << getResponse->getResult(i).getValue();
resolveResponse->setAddress(i, getResponse->getResult(i).getValue());
resolveResponse->setKind(i, getResponse->getResult(i).getKind());
resolveResponse->setId(i, getResponse->getResult(i).getId());
}
EV << endl;
sendRpcResponse(resolveCall, resolveResponse);
}
| void P2pns::handleDHTputCAPIResponse | ( | DHTputCAPIResponse * | putResponse, | |
| P2pnsRegisterCall * | registerCall | |||
| ) | [private] |
Definition at line 426 of file P2pns.cc.
Referenced by handleRpcResponse().
{
P2pnsRegisterResponse* registerResponse = new P2pnsRegisterResponse();
registerResponse->setP2pName(registerCall->getP2pName());
registerResponse->setAddress(registerCall->getAddress());
registerResponse->setIsSuccess(putResponse->getIsSuccess());
sendRpcResponse(registerCall, registerResponse);
}
| void P2pns::handleLookupResponse | ( | LookupResponse * | lookupResponse, | |
| cObject * | context, | |||
| int | rpcId | |||
| ) | [private] |
Definition at line 500 of file P2pns.cc.
Referenced by handleRpcResponse().
{
switch (rpcId) {
case RESOLVE_LOOKUP: {
P2pnsResolveCall* resolveCall =
check_and_cast<P2pnsResolveCall*>(context);
stringstream sstream;
sstream << lookupResponse->getSiblings(0);
P2pnsResolveResponse* resolveResponse = new P2pnsResolveResponse();
resolveResponse->setP2pName(resolveCall->getP2pName());
resolveResponse->setAddressArraySize(1);
resolveResponse->setKindArraySize(1);
resolveResponse->setIdArraySize(1);
resolveResponse->setAddress(0, sstream.str());
resolveResponse->setKind(0, 0);
resolveResponse->setId(0, 0);
resolveResponse->setIsSuccess(lookupResponse->getIsValid());
sendRpcResponse(resolveCall, resolveResponse);
break;
}
case TUNNEL_LOOKUP:
handleTunnelLookupResponse(lookupResponse);
break;
case REFRESH_LOOKUP:
break;
default:
throw cRuntimeError("P2pns::handleLookupResponse(): invalid rpcId!");
}
}
| void P2pns::handleReadyMessage | ( | CompReadyMessage * | msg | ) | [virtual] |
method to handle ready messages from the overlay
| msg | message to handle |
Reimplemented from BaseApp.
Definition at line 59 of file P2pns.cc.
{
if ((msg->getReady() == false) || (msg->getComp() != OVERLAY_COMP)) {
delete msg;
return;
}
thisId = (overlay->getThisNode().getKey() >> (OverlayKey::getLength() - 100))
<< (OverlayKey::getLength() - 100);
delete msg;
}
| bool P2pns::handleRpcCall | ( | BaseCallMessage * | msg | ) | [private] |
Definition at line 333 of file P2pns.cc.
{
// delegate messages
RPC_SWITCH_START(msg)
RPC_ON_CALL(Ping) {
updateIdCacheWithNewTransport(msg);
return false;
}
RPC_DELEGATE( P2pnsRegister, p2pnsRegisterRpc );
RPC_DELEGATE( P2pnsResolve, p2pnsResolveRpc );
RPC_SWITCH_END()
return RPC_HANDLED;
}
| void P2pns::handleRpcResponse | ( | BaseResponseMessage * | msg, | |
| cPolymorphic * | context, | |||
| int | rpcId, | |||
| simtime_t | rtt | |||
| ) | [private] |
Definition at line 349 of file P2pns.cc.
{
RPC_SWITCH_START(msg)
RPC_ON_RESPONSE( DHTputCAPI ) {
EV << "[P2pns::handleRpcResponse()]\n"
<< " DHTputCAPI RPC Response received: id=" << rpcId
<< " msg=" << *_DHTputCAPIResponse << " rtt=" << rtt
<< endl;
if (dynamic_cast<P2pnsRegisterCall*>(context)) {
handleDHTputCAPIResponse(_DHTputCAPIResponse,
check_and_cast<P2pnsRegisterCall*>(context));
}
break;
}
RPC_ON_RESPONSE( DHTgetCAPI ) {
EV << "[P2pns::handleRpcResponse()]\n"
<< " DHTgetCAPI RPC Response received: id=" << rpcId
<< " msg=" << *_DHTgetCAPIResponse << " rtt=" << rtt
<< endl;
handleDHTgetCAPIResponse(_DHTgetCAPIResponse,
check_and_cast<P2pnsResolveCall*>(context));
break;
}
RPC_ON_RESPONSE( Lookup ) {
EV << "[P2pns::handleRpcResponse()]\n"
<< " Lookup RPC Response received: id=" << rpcId
<< " msg=" << *_LookupResponse << " rtt=" << rtt
<< endl;
handleLookupResponse(_LookupResponse, context, rpcId);
break;
}
RPC_SWITCH_END()
}
| void P2pns::handleTimerEvent | ( | cMessage * | msg | ) | [private] |
Definition at line 227 of file P2pns.cc.
{
P2pnsKeepaliveTimer* timer = dynamic_cast<P2pnsKeepaliveTimer*>(msg);
if (timer) {
P2pnsIdCacheEntry* entry = p2pnsCache->getIdCacheEntry(timer->getKey());
if (entry == NULL) {
// no valid cache entry found
delete msg;
return;
}
if ((entry->lastUsage + idCacheLifetime) < simTime()) {
// remove idle connections
EV << "[P2pns::handleTimerEvent()]\n"
<< " Removing id " << timer->getKey()
<< " from idCache (connection idle)"
<< endl;
p2pnsCache->removeIdCacheEntry(timer->getKey());
delete msg;
return;
}
if (!entry->addr.isUnspecified()) {
// ping 3 times with default timeout
pingNode(entry->addr, -1, 2, new OverlayKeyObject(timer->getKey()));
}
// reschedule periodic keepalive timer
scheduleAt(simTime() + keepaliveInterval, msg);
// exhaustive-iterative lookup to refresh siblings, if our ip has changed
LookupCall* lookupCall = new LookupCall();
lookupCall->setKey(overlay->getThisNode().getKey());
lookupCall->setNumSiblings(1);
lookupCall->setRoutingType(EXHAUSTIVE_ITERATIVE_ROUTING);
sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
TUNNEL_LOOKUP);
}
}
| void P2pns::handleTunnelLookupResponse | ( | LookupResponse * | lookupResponse | ) | [private] |
Definition at line 104 of file P2pns.cc.
Referenced by handleLookupResponse().
{
P2pnsIdCacheEntry* entry =
p2pnsCache->getIdCacheEntry(lookupResponse->getKey());
if ((entry == NULL) || (entry->state == CONNECTION_ACTIVE)) {
// no matching entry in idCache or connection is already active
// => lookup result not needed anymore
return;
}
if (lookupResponse->getIsValid()) {
// verify if nodeId of lookup's closest node matches requested id
if (lookupResponse->getKey().sharedPrefixLength(lookupResponse->
getSiblings(0).getKey()) < (OverlayKey::getLength() - 100)) {
EV << "[P2pns::handleTunnelLookupResponse()]\n"
<< " Lookup response " << lookupResponse->getSiblings(0)
<< " doesn't match requested id " << lookupResponse->getKey()
<< endl;
// lookup failed => drop cache entry and all queued packets
p2pnsCache->removeIdCacheEntry(lookupResponse->getKey());
return;
}
// add transport address to cache entry
entry->addr = lookupResponse->getSiblings(0);
entry->state = CONNECTION_ACTIVE;
// start periodic ping timer
P2pnsKeepaliveTimer* msg =
new P2pnsKeepaliveTimer("P2pnsKeepaliveTimer");
msg->setKey(lookupResponse->getKey());
scheduleAt(simTime() + keepaliveInterval, msg);
// send all pending tunnel messages
while (!entry->payloadQueue.empty()) {
sendTunnelMessage(entry->addr, entry->payloadQueue.front());
entry->payloadQueue.pop_front();
}
} else {
// lookup failed => drop cache entry and all queued packets
p2pnsCache->removeIdCacheEntry(lookupResponse->getKey());
}
}
| void P2pns::initializeApp | ( | int | stage | ) | [private, virtual] |
initializes derived class-attributes
| stage | the init stage |
Reimplemented from BaseApp.
Definition at line 43 of file P2pns.cc.
{
if (stage != MIN_STAGE_APP)
return;
twoStageResolution = par("twoStageResolution");
keepaliveInterval = par("keepaliveInterval");
idCacheLifetime = par("idCacheLifetime");
p2pnsCache = check_and_cast<P2pnsCache*> (getParentModule()->
getSubmodule("p2pnsCache"));
xmlRpcInterface = dynamic_cast<XmlRpcInterface*>(overlay->
getCompModule(TIER3_COMP));
}
| void P2pns::p2pnsRegisterRpc | ( | P2pnsRegisterCall * | registerCall | ) | [private] |
Definition at line 384 of file P2pns.cc.
Referenced by handleRpcCall().
{
p2pnsCache->addData(registerCall->getP2pName(),
registerCall->getAddress());
DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
EV << "[P2pns::p2pnsRegisterRpc()]\n"
<< " RegisterRpc: name: " << registerCall->getP2pName()
<< " addr: " << registerCall->getAddress()
<< endl;
dhtPutMsg->setKey(OverlayKey::sha1(registerCall->getP2pName()));
if (twoStageResolution) {
dhtPutMsg->setValue(overlay->getThisNode().getKey().toString());
} else {
dhtPutMsg->setValue(registerCall->getAddress());
}
dhtPutMsg->setKind(registerCall->getKind());
dhtPutMsg->setId(registerCall->getId());
dhtPutMsg->setTtl(registerCall->getTtl());
dhtPutMsg->setIsModifiable(true);
sendInternalRpcCall(TIER1_COMP, dhtPutMsg, registerCall);
}
| void P2pns::p2pnsResolveRpc | ( | P2pnsResolveCall * | registerCall | ) | [private] |
Definition at line 411 of file P2pns.cc.
Referenced by handleRpcCall().
{
DHTgetCAPICall* dhtGetMsg = new DHTgetCAPICall();
EV << "[P2pns::p2pnsResolveRpc()]\n"
<< " ResolveRpc: name: " << resolveCall->getP2pName()
<< endl;
dhtGetMsg->setKey(OverlayKey::sha1(resolveCall->getP2pName()));
dhtGetMsg->setKind(resolveCall->getKind());
dhtGetMsg->setId(resolveCall->getId());
sendInternalRpcCall(TIER1_COMP, dhtGetMsg, resolveCall);
}
| void P2pns::pingRpcResponse | ( | PingResponse * | response, | |
| cPolymorphic * | context, | |||
| int | rpcId, | |||
| simtime_t | rtt | |||
| ) | [private] |
| void P2pns::pingTimeout | ( | PingCall * | call, | |
| const TransportAddress & | dest, | |||
| cPolymorphic * | context, | |||
| int | rpcId | |||
| ) | [private] |
Definition at line 205 of file P2pns.cc.
{
OverlayKeyObject* key = dynamic_cast<OverlayKeyObject*>(context);
P2pnsIdCacheEntry* entry = NULL;
// lookup entry in id cache
if ((key != NULL) &&
(entry = p2pnsCache->getIdCacheEntry(*key))) {
// remove entry if TransportAddress hasn't been updated in the meantime
if (!entry->addr.isUnspecified() && (entry->addr != dest)) {
EV << "[P2pns::pingTimeout()]\n"
<< " Removing id " << key << " from idCache (ping timeout)"
<< endl;
p2pnsCache->removeIdCacheEntry(*key);
}
}
delete context;
}
| void P2pns::registerId | ( | const std::string & | addr | ) |
Definition at line 164 of file P2pns.cc.
Referenced by XmlRpcInterface::handleReadyMessage().
{
Enter_Method_Silent();
std::string name = par("registerName").stdstringValue();
DHTputCAPICall* dhtPutMsg = new DHTputCAPICall();
EV << "[P2pns::p2pnsRegisterRpc()]\n"
<< " registerId(): name: " << name << " addr: " << addr
<< endl;
dhtPutMsg->setKey(OverlayKey::sha1(BinaryValue(name)));
dhtPutMsg->setValue(BinaryValue(addr));
dhtPutMsg->setKind(28);
dhtPutMsg->setId(1);
dhtPutMsg->setTtl(60*60*24*7);
dhtPutMsg->setIsModifiable(true);
sendInternalRpcCall(TIER1_COMP, dhtPutMsg);
}
| void P2pns::sendTunnelMessage | ( | const TransportAddress & | addr, | |
| const BinaryValue & | payload | |||
| ) | [private] |
Definition at line 150 of file P2pns.cc.
Referenced by handleTunnelLookupResponse(), and tunnel().
{
EV << "[P2pns::sendTunnelMessage()]\n"
<< " Sending TUNNEL message to " << addr << endl;
P2pnsTunnelMessage* msg = new P2pnsTunnelMessage("P2pnsTunnelMsg");
msg->setPayload(payload);
msg->setSrcId(thisId);
msg->setBitLength(P2PNSTUNNELMESSAGE_L(msg));
callRoute(OverlayKey::UNSPECIFIED_KEY, msg, addr);
}
| void P2pns::tunnel | ( | const OverlayKey & | destKey, | |
| const BinaryValue & | payload | |||
| ) |
Definition at line 71 of file P2pns.cc.
Referenced by XmlRpcInterface::handleAppTunPacket().
{
Enter_Method_Silent();
P2pnsIdCacheEntry* entry = p2pnsCache->getIdCacheEntry(destKey);
if (entry == NULL) {
// lookup destKey and create new entry
EV << "[P2pns::tunnel()]\n"
<< " Establishing new cache entry for key: " << destKey
<< endl;
LookupCall* lookupCall = new LookupCall();
lookupCall->setKey(destKey);
lookupCall->setNumSiblings(1);
sendInternalRpcCall(OVERLAY_COMP, lookupCall, NULL, -1, 0,
TUNNEL_LOOKUP);
p2pnsCache->addIdCacheEntry(destKey, &payload);
} else if (entry->state == CONNECTION_PENDING) {
// lookup not finished yet => append packet to queue
EV << "[P2pns::tunnel()]\n"
<< " Queuing packet since lookup is still pending for key: "
<< destKey << endl;
entry->lastUsage = simTime();
entry->payloadQueue.push_back(payload);
} else {
entry->lastUsage = simTime();
sendTunnelMessage(entry->addr, payload);
}
}
| void P2pns::updateIdCacheWithNewTransport | ( | cMessage * | msg | ) | [private] |
Definition at line 269 of file P2pns.cc.
Referenced by deliver(), and handleRpcCall().
{
// update idCache with new TransportAddress of the ping originator
OverlayCtrlInfo* ctrlInfo =
dynamic_cast<OverlayCtrlInfo*>(msg->getControlInfo());
OverlayKey srcId;
if (!ctrlInfo) {
// can't update cache without knowing the originator id
EV << "[P2pns::updateCacheWithNewTransport()]\n"
<< " Can't update cache without knowing the originator id"
<< endl;
return;
}
if (ctrlInfo->getSrcRoute().isUnspecified()) {
P2pnsTunnelMessage* tunnelMsg = dynamic_cast<P2pnsTunnelMessage*>(msg);
if (tunnelMsg) {
srcId = tunnelMsg->getSrcId();
} else {
// can't update cache without knowing the originator id
EV << "[P2pns::updateCacheWithNewTransport()]\n"
<< " Can't update cache without knowing the originator id"
<< endl;
return;
}
} else {
srcId = (ctrlInfo->getSrcRoute().getKey() >> (OverlayKey::getLength() - 100))
<< (OverlayKey::getLength() - 100);
}
P2pnsIdCacheEntry* entry = p2pnsCache->getIdCacheEntry(srcId);
if (entry == NULL) {
EV << "[P2pns::updateCacheWithNewTransport()]\n"
<< " Adding new cache entry for id " << srcId
<< " with addr " << (const TransportAddress&)ctrlInfo->getSrcRoute()
<< endl;
entry = p2pnsCache->addIdCacheEntry(srcId);
entry->addr = ctrlInfo->getSrcRoute();
// start periodic ping timer
P2pnsKeepaliveTimer* msg =
new P2pnsKeepaliveTimer("P2pnsKeepaliveTimer");
msg->setKey(srcId);
scheduleAt(simTime() + keepaliveInterval, msg);
}
// update transport address in idCache (node may have a new
// TransportAddress due to mobility)
if (entry->addr.isUnspecified() ||
(entry->addr != ctrlInfo->getSrcRoute())) {
EV << "[P2pns::handleRpcCall()]\n"
<< " Ping with new transport address received: "
<< " Changing from " << entry->addr << " to "
<< static_cast<TransportAddress>(ctrlInfo->getSrcRoute())
<< " for id " << srcId << endl;
entry->addr = ctrlInfo->getSrcRoute();
}
entry->state = CONNECTION_ACTIVE;
}
simtime_t P2pns::idCacheLifetime [private] |
idle connections in the idCache get deleted after this time
Definition at line 103 of file P2pns.h.
Referenced by handleTimerEvent(), and initializeApp().
simtime_t P2pns::keepaliveInterval [private] |
interval between two keeaplive pings for active connections
Definition at line 102 of file P2pns.h.
Referenced by handleTimerEvent(), handleTunnelLookupResponse(), initializeApp(), and updateIdCacheWithNewTransport().
P2pnsCache* P2pns::p2pnsCache [private] |
pointer to the name cache module
Definition at line 99 of file P2pns.h.
Referenced by handleTimerEvent(), handleTunnelLookupResponse(), initializeApp(), P2pns(), p2pnsRegisterRpc(), pingTimeout(), tunnel(), and updateIdCacheWithNewTransport().
OverlayKey P2pns::thisId [private] |
the 100 most significant bit of this node's nodeId
Definition at line 104 of file P2pns.h.
Referenced by handleReadyMessage(), and sendTunnelMessage().
bool P2pns::twoStageResolution [private] |
Use the two stage name resolution (KBR/DHt).
Definition at line 101 of file P2pns.h.
Referenced by handleDHTgetCAPIResponse(), initializeApp(), and p2pnsRegisterRpc().
XmlRpcInterface* P2pns::xmlRpcInterface [private] |
pointer to the XmlRpcInterface module
Definition at line 100 of file P2pns.h.
Referenced by deliver(), and initializeApp().
1.7.1