This class implements a path lookup. More...
#include <IterativeLookup.h>
Protected Member Functions | |
| bool | accepts (int rpcId) |
| void | handleResponse (FindNodeResponse *msg) |
| void | handleTimeout (BaseCallMessage *msg, const TransportAddress &dest, int rpcId) |
| void | handleFailedNodeResponse (const NodeHandle &src, cPacket *findNodeExt, bool retry) |
| IterativePathLookup (IterativeLookup *lookup) | |
| virtual | ~IterativePathLookup () |
| int | add (const NodeHandle &handle, const NodeHandle &source=NodeHandle::UNSPECIFIED_NODE) |
| Adds a NodeHandle to next hops. | |
Protected Attributes | |
| IterativeLookup * | lookup |
| BaseOverlay * | overlay |
| int | hops |
| int | step |
| int | pendingRpcs |
| bool | finished |
| bool | success |
| LookupVector | nextHops |
| std::map< TransportAddress, NodeHandle > | oldNextHops |
Private Member Functions | |
| void | sendRpc (int num, cPacket *FindNodeExt=NULL) |
| void | sendNewRpcAfterTimeout (cPacket *findNodeExt) |
Friends | |
| class | IterativeLookup |
This class implements a path lookup.
Definition at line 287 of file IterativeLookup.h.
| IterativePathLookup::IterativePathLookup | ( | IterativeLookup * | lookup | ) | [protected] |
Definition at line 719 of file IterativeLookup.cc.
{
this->lookup = lookup;
this->hops = 0;
this->step = 0;
this->pendingRpcs = 0;
this->finished = false;
this->success = false;
this->overlay = lookup->overlay;
if (lookup->routingType == EXHAUSTIVE_ITERATIVE_ROUTING) {
// need to add some extra space for backup nodes, if we have to
// remove failed nodes from the nextHops vector
this->nextHops = LookupVector(2*(lookup->config.redundantNodes),
lookup);
} else {
this->nextHops = LookupVector((lookup->config.redundantNodes),
lookup);
}
}
| IterativePathLookup::~IterativePathLookup | ( | ) | [protected, virtual] |
Definition at line 740 of file IterativeLookup.cc.
{}
| bool IterativePathLookup::accepts | ( | int | rpcId | ) | [protected] |
Definition at line 743 of file IterativeLookup.cc.
Referenced by IterativeLookup::handleRpcResponse().
| int IterativePathLookup::add | ( | const NodeHandle & | handle, | |
| const NodeHandle & | source = NodeHandle::UNSPECIFIED_NODE | |||
| ) | [protected] |
Adds a NodeHandle to next hops.
Definition at line 1128 of file IterativeLookup.cc.
Referenced by handleResponse(), and IterativeLookup::start().
{
if (lookup->config.merge) {
return nextHops.add(LookupEntry(handle, source, false));
} else {
nextHops.push_back(LookupEntry(handle, source, false));
return (nextHops.size() - 1);
}
}
| void IterativePathLookup::handleFailedNodeResponse | ( | const NodeHandle & | src, | |
| cPacket * | findNodeExt, | |||
| bool | retry | |||
| ) | [protected] |
Definition at line 981 of file IterativeLookup.cc.
{
if (finished) {
return;
}
std::map<TransportAddress, NodeHandle>::iterator oldPos;
for (oldPos = oldNextHops.begin(); oldPos != oldNextHops.end(); oldPos++) {
if ((! oldPos->second.isUnspecified()) &&
(oldPos->second == src)) break;
}
if (oldPos == oldNextHops.end()) {
return;
}
std::map<TransportAddress, NodeHandle>::iterator oldSrcPos =
oldNextHops.find(src);
const NodeHandle* oldSrc = &NodeHandle::UNSPECIFIED_NODE;
if (oldSrcPos != oldNextHops.end()) {
oldSrc = &(oldSrcPos->second);
}
if (retry) {
// FIXME: This is needed for a node to be asked again when detecting
// a failed node. It could pose problems when parallel lookups and
// failed node recovery are both needed at the same time!
lookup->setVisited(src, false);
nextHops.add(LookupEntry(src, *oldSrc, false));
}
oldNextHops.erase(oldPos);
sendNewRpcAfterTimeout(findNodeExt);
}
| void IterativePathLookup::handleResponse | ( | FindNodeResponse * | msg | ) | [protected] |
Definition at line 760 of file IterativeLookup.cc.
Referenced by IterativeLookup::handleRpcResponse().
{
if (finished)
return;
if (simTime() > (lookup->startTime + LOOKUP_TIMEOUT)) {
EV << "[IterativePathLookup::handleResponse()]\n"
<< " Iterative lookup path timed out!"
<< endl;
finished = true;
success = false;
return;
}
const NodeHandle& source = msg->getSrcNode();
std::map<TransportAddress, NodeHandle>::iterator oldPos;
oldPos = oldNextHops.find(source);
if (oldPos != oldNextHops.end()) oldNextHops.erase(oldPos);
// don't count local hops
if (lookup->overlay->getThisNode() != source) {
hops++;
}
lookup->setVisited(source);
// if (source.getKey() == lookup->key) {
// cout << "received response from destination for key " << lookup->key
// << " with isSibling = " << msg->getSiblings() << endl;
// }
step++;
// decrease pending rpcs
pendingRpcs--;
if (msg->getClosestNodesArraySize() != 0) {
// mode: merge or replace
if (!lookup->config.merge) {
nextHops.clear();
}
} else {
//cout << "findNode() returned 0 nodes!" << endl;
}
int numNewRpcs = 0;
// add new next hops
for (uint32_t i=0; i < msg->getClosestNodesArraySize(); i++) {
const NodeHandle& handle = msg->getClosestNodes(i);
// add NodeHandle to next hops and siblings
int pos = add(handle, source);
// only send new rpcs if we've learned about new nodes
if ((pos >= 0) && (pos < lookup->config.redundantNodes)) {
numNewRpcs++;
}
// check if node was found
if ((lookup->numSiblings == 0) && (handle.getKey() == lookup->key)) {
lookup->addSibling(handle);
// TODO: how do we resume, if the potential sibling doesn't authenticate?
finished = true;
success = true;
return;
} else {
if (lookup->numSiblings != 0 &&
(lookup->routingType != EXHAUSTIVE_ITERATIVE_ROUTING) &&
msg->getSiblings()) {
//cout << "adding sibling " << handle << endl;
lookup->addSibling(handle);
}
}
}
#if 0
cout << "nextHops.size " << nextHops.size()
<< " find node response " << msg->getClosestNodesArraySize()
<< " config " << lookup->config.redundantNodes << endl;
cout << "looking for " << lookup->key << endl;
for (uint32_t i=0; i < msg->getClosestNodesArraySize(); i++) {
cout << "find node " << msg->getClosestNodes(i) << endl;
}
cout << "next Hops " << nextHops << endl;
#endif
// check if sibling lookup is finished
if ((lookup->routingType != EXHAUSTIVE_ITERATIVE_ROUTING)
&& msg->getSiblings()
&& msg->getClosestNodesArraySize() != 0 &&
lookup->numSiblings != 0) {
finished = true;
success = true;
return;
}
// extract find node extension object
cPacket* findNodeExt = NULL;
if (msg->hasObject("findNodeExt")) {
findNodeExt = (cPacket*)msg->removeObject("findNodeExt");
}
// If config.newRpcOnEveryResponse is true, send a new RPC
// even if there was no lookup progress
if ((numNewRpcs == 0) && lookup->config.newRpcOnEveryResponse) {
numNewRpcs = 1;
}
// send next rpcs
sendRpc(min(numNewRpcs, lookup->config.parallelRpcs), findNodeExt);
delete findNodeExt;
}
| void IterativePathLookup::handleTimeout | ( | BaseCallMessage * | msg, | |
| const TransportAddress & | dest, | |||
| int | rpcId | |||
| ) | [protected] |
Definition at line 894 of file IterativeLookup.cc.
Referenced by IterativeLookup::handleRpcResponse(), and IterativeLookup::handleRpcTimeout().
{
if (finished)
return;
EV << "[IterativePathLookup::handleTimeout()]\n"
<< " Timeout of RPC " << rpcId
<< endl;
//std::cout << lookup->overlay->getThisNode() << ": Path timeout for node"
// << dest << endl;
// For exhaustive-iterative remove dead nodes from nextHops vector
// (which is also our results vector)
if ((lookup->routingType == EXHAUSTIVE_ITERATIVE_ROUTING)
&& lookup->getDead(dest)) {
LookupVector::iterator it = nextHops.findIterator(
(dynamic_cast<const NodeHandle&>(dest)).getKey());
if (it != nextHops.end()) {
nextHops.erase(it);
}
}
std::map<TransportAddress, NodeHandle>::iterator oldPos;
oldPos = oldNextHops.find(dest);
// decrease pending rpcs
pendingRpcs--;
cPacket* findNodeExt = NULL;
if (msg && msg->hasObject("findNodeExt")) {
findNodeExt = static_cast<cPacket*>(
msg->removeObject("findNodeExt"));
}
if (simTime() > (lookup->startTime + LOOKUP_TIMEOUT)) {
EV << "[IterativePathLookup::handleTimeout()]\n"
<< " Iterative lookup path timed out!"
<< endl;
delete findNodeExt;
finished = true;
success = false;
return;
}
if (oldPos == oldNextHops.end() || (!lookup->config.failedNodeRpcs)) {
sendNewRpcAfterTimeout(findNodeExt);
delete findNodeExt;
} else {
if (oldPos->second.isUnspecified()) {
// TODO: handleFailedNode should always be called for local
// nodes, independent of config.failedNodeRpcs
// Attention: currently this method is also called,
// if a node responded and the path doesn't accept a message
FindNodeCall* findNodeCall = dynamic_cast<FindNodeCall*>(msg);
// answer was from local findNode()
if (findNodeCall && lookup->overlay->handleFailedNode(dest)) {
NodeVector* retry = lookup->overlay->findNode(
findNodeCall->getLookupKey(), -1, lookup->numSiblings, msg);
for (NodeVector::iterator i = retry->begin(); i != retry->end(); i++) {
nextHops.add(LookupEntry(*i, NodeHandle::UNSPECIFIED_NODE, false));
}
delete retry;
}
sendNewRpcAfterTimeout(findNodeExt);
delete findNodeExt;
} else {
FailedNodeCall* call = new FailedNodeCall("FailedNodeCall");
call->setFailedNode(dest);
call->setBitLength(FAILEDNODECALL_L(call));
if (findNodeExt) {
call->addObject(findNodeExt);
call->addBitLength(findNodeExt->getBitLength());
}
lookup->overlay->countFailedNodeCall(call);
lookup->overlay->sendUdpRpcCall(oldPos->second, call, NULL,
-1, 0, -1, lookup);
}
}
}
| void IterativePathLookup::sendNewRpcAfterTimeout | ( | cPacket * | findNodeExt | ) | [private] |
Definition at line 882 of file IterativeLookup.cc.
Referenced by handleFailedNodeResponse(), and handleTimeout().
{
// two alternatives to react on a timeout
if (lookup->config.newRpcOnEveryTimeout) {
// always send one new RPC for every timeout
sendRpc(1, findNodeExt);
} else if (pendingRpcs == 0) {
// wait until all RPCs have timed out and then send alpha new RPCs
sendRpc(lookup->config.parallelRpcs, findNodeExt);
}
}
| void IterativePathLookup::sendRpc | ( | int | num, | |
| cPacket * | FindNodeExt = NULL | |||
| ) | [private] |
Definition at line 1020 of file IterativeLookup.cc.
Referenced by handleResponse(), sendNewRpcAfterTimeout(), and IterativeLookup::start().
{
// path finished? yes -> quit
if (finished)
return;
// check for maximum hop count
if (lookup->hopCountMax && (hops >= lookup->hopCountMax)) {
EV << "[IterativePathLookup::sendRpc()]\n"
<< " Max hop count exceeded - lookup failed!"
<< endl;
//cout << "[IterativePathLookup::sendRpc()]\n"
// << " Max hop count exceeded - lookup failed!"
// << endl;
finished = true;
success = false;
return;
}
// if strictParallelRpcs is true, limit concurrent in-flight requests
// to config.parallelRpcs
if (lookup->config.strictParallelRpcs) {
num = min(num, lookup->config.parallelRpcs - pendingRpcs);
}
// try all remaining nodes
if ((num == 0) && (pendingRpcs == 0)
&& !lookup->config.finishOnFirstUnchanged) {
num = lookup->config.parallelRpcs;
//cout << "trying all remaining nodes ("
// << lookup->numSiblings << ")" << endl;
}
// send rpc messages
LookupVector::iterator it = nextHops.begin();
int i = 0;
for (LookupVector::iterator it = nextHops.begin();
((num > 0) && (i < lookup->config.redundantNodes)
&& (it != nextHops.end())); it++, i++) {
// ignore nodes to which we've already sent an RPC
if (it->alreadyUsed || lookup->getDead(it->handle)) continue;
// check if node has already been visited? no ->
// TODO: doesn't work with Broose
if ((!lookup->config.visitOnlyOnce) || (!lookup->getVisited(it->handle))) {
// send rpc to node increase pending rpcs
pendingRpcs++;
num--;
FindNodeCall* call = lookup->createFindNodeCall(findNodeExt);
lookup->sendRpc(it->handle, call, this, step);
oldNextHops[it->handle] = it->source;
//cout << "Sending RPC to " << it->handle
// << " ( " << num << " more )"
// << " thisNode = " << lookup->overlay->getThisNode().getKey() << endl;
// mark node as already used
it->alreadyUsed = true;
} else {
//EV << "[IterativePathLookup::sendRpc()]\n"
// << " Last next hop ("
// << it->handle
// << ") already visited."
// << endl;
// std::cout << "visited:" << std::endl;
// for (TransportAddress::Set::iterator it = lookup->visited.begin();
// it != lookup->visited.end(); it++)
// std::cout << *it << std::endl;
//
// std::cout << "nextHops:" << std::endl;
// for (NodePairVector::iterator it = nextHops.begin();
// it != nextHops.end(); it++)
// std::cout << it->first << std::endl;
}
}
// no rpc sent, no pending rpcs?
// -> failed for normal lookups
// -> exhaustive lookups are always successful
if (pendingRpcs == 0) {
if (lookup->routingType == EXHAUSTIVE_ITERATIVE_ROUTING) {
int i = 0;
for (LookupVector::iterator it = nextHops.begin();
((i < lookup->config.redundantNodes)
&& (it != nextHops.end())); it++, i++) {
lookup->addSibling(it->handle);
}
success = true;
} else {
success = false;
//cout << "failed nextHops for key " << lookup->key << endl;
//cout << nextHops << endl;
EV << "[IterativePathLookup::sendRpc() @ " << overlay->getThisNode().getIp()
<< " (" << overlay->getThisNode().getKey().toString(16) << ")]\n"
<< " Path failed (no further nodes to query) "
<< endl;
}
finished = true;
}
//cout << endl;
}
friend class IterativeLookup [friend] |
Definition at line 289 of file IterativeLookup.h.
bool IterativePathLookup::finished [protected] |
Definition at line 299 of file IterativeLookup.h.
Referenced by accepts(), handleFailedNodeResponse(), handleResponse(), IterativeLookup::handleRpcResponse(), IterativeLookup::handleRpcTimeout(), handleTimeout(), IterativePathLookup(), and sendRpc().
int IterativePathLookup::hops [protected] |
Definition at line 296 of file IterativeLookup.h.
Referenced by handleResponse(), IterativeLookup::handleRpcResponse(), IterativeLookup::handleRpcTimeout(), IterativePathLookup(), and sendRpc().
IterativeLookup* IterativePathLookup::lookup [protected] |
Definition at line 292 of file IterativeLookup.h.
Referenced by accepts(), add(), handleFailedNodeResponse(), handleResponse(), handleTimeout(), sendNewRpcAfterTimeout(), and sendRpc().
LookupVector IterativePathLookup::nextHops [protected] |
Definition at line 301 of file IterativeLookup.h.
Referenced by add(), handleFailedNodeResponse(), handleResponse(), handleTimeout(), IterativePathLookup(), and sendRpc().
std::map<TransportAddress, NodeHandle> IterativePathLookup::oldNextHops [protected] |
Definition at line 302 of file IterativeLookup.h.
Referenced by handleFailedNodeResponse(), handleResponse(), handleTimeout(), and sendRpc().
BaseOverlay* IterativePathLookup::overlay [protected] |
Definition at line 293 of file IterativeLookup.h.
Referenced by IterativePathLookup(), and sendRpc().
int IterativePathLookup::pendingRpcs [protected] |
Definition at line 298 of file IterativeLookup.h.
Referenced by handleResponse(), handleTimeout(), IterativePathLookup(), sendNewRpcAfterTimeout(), and sendRpc().
int IterativePathLookup::step [protected] |
Definition at line 297 of file IterativeLookup.h.
Referenced by accepts(), handleResponse(), IterativePathLookup(), and sendRpc().
bool IterativePathLookup::success [protected] |
Definition at line 300 of file IterativeLookup.h.
Referenced by handleResponse(), IterativeLookup::handleRpcResponse(), IterativeLookup::handleRpcTimeout(), handleTimeout(), IterativePathLookup(), and sendRpc().
1.7.1