Parse a trace file and schedule node joins/leaves according to trace data. More...
#include <GlobalTraceManager.h>
Public Member Functions | |
| GlobalTraceManager () | |
| ~GlobalTraceManager () | |
| void | handleMessage (cMessage *msg) |
| void | initialize (int stage) |
Protected Member Functions | |
| void | readNextBlock () |
| void | scheduleNextEvent (double time, int nodeId, char *buf, int line) |
| void | createNode (int nodeId) |
| void | deleteNode (int nodeId) |
| cGate * | getAppGateById (int nodeId) |
Protected Attributes | |
| UnderlayConfigurator * | underlayConfigurator |
| GlobalNodeList * | globalNodeList |
| pointer to GlobalNodeList | |
Private Attributes | |
| int | fd |
| int | filesize |
| int | chunksize |
| int | remain |
| int | marginsize |
| int | offset |
| char * | buf |
| char * | start |
| cMessage * | nextRead |
Static Private Attributes | |
| static const int | readPages = 32 |
Parse a trace file and schedule node joins/leaves according to trace data.
If trace includes user action, send actions to application
Definition at line 37 of file GlobalTraceManager.h.
| GlobalTraceManager::GlobalTraceManager | ( | ) |
Definition at line 80 of file GlobalTraceManager.cc.
{
nextRead = NULL;
}
| GlobalTraceManager::~GlobalTraceManager | ( | ) |
Definition at line 85 of file GlobalTraceManager.cc.
{
cancelAndDelete(nextRead);
}
| void GlobalTraceManager::createNode | ( | int | nodeId | ) | [protected] |
Definition at line 169 of file GlobalTraceManager.cc.
Referenced by handleMessage().
{
check_and_cast<TraceChurn*>(underlayConfigurator->getChurnGenerator(0))
->createNode(nodeId);
}
| void GlobalTraceManager::deleteNode | ( | int | nodeId | ) | [protected] |
Definition at line 175 of file GlobalTraceManager.cc.
Referenced by handleMessage().
{
// TODO: calculate proper deletion time with *.underlayConfigurator.gracefulLeaveDelay
check_and_cast<TraceChurn*>(underlayConfigurator->getChurnGenerator(0))
->deleteNode(nodeId);
}
| cGate * GlobalTraceManager::getAppGateById | ( | int | nodeId | ) | [protected] |
Definition at line 243 of file GlobalTraceManager.cc.
Referenced by handleMessage().
{
return check_and_cast<TraceChurn*>(underlayConfigurator->getChurnGenerator(0))
->getAppGateById(nodeId);
}
| void GlobalTraceManager::handleMessage | ( | cMessage * | msg | ) |
Definition at line 182 of file GlobalTraceManager.cc.
{
if (!msg->isSelfMessage()) {
delete msg;
return;
} else if ( msg == nextRead ) {
readNextBlock();
return;
}
GlobalTraceManagerMessage* traceMsg = check_and_cast<GlobalTraceManagerMessage*>(msg);
if (strstr(traceMsg->getName(), "JOIN") == traceMsg->getName()) {
createNode(traceMsg->getInternalNodeId());
} else if (strstr(traceMsg->getName(), "LEAVE") == traceMsg->getName()) {
deleteNode(traceMsg->getInternalNodeId());
} else if (strstr(traceMsg->getName(), "CONNECT_NODETYPES") == traceMsg->getName()) {
std::vector<std::string> strVec = cStringTokenizer(msg->getName()).asVector();
if (strVec.size() != 3) {
throw cRuntimeError("GlobalTraceManager::"
"handleMessage(): Invalid command");
}
int firstNodeType = atoi(strVec[1].c_str());
int secondNodeType = atoi(strVec[2].c_str());
globalNodeList->connectNodeTypes(firstNodeType, secondNodeType);
} else if (strstr(traceMsg->getName(), "DISCONNECT_NODETYPES") == traceMsg->getName()) {
std::vector<std::string> strVec = cStringTokenizer(msg->getName()).asVector();
if (strVec.size() != 3) {
throw cRuntimeError("GlobalTraceManager::"
"handleMessage(): Invalid command");
}
int firstNodeType = atoi(strVec[1].c_str());
int secondNodeType = atoi(strVec[2].c_str());
globalNodeList->disconnectNodeTypes(firstNodeType, secondNodeType);
} else if (strstr(traceMsg->getName(), "MERGE_BOOTSTRAPNODES") == traceMsg->getName()) {
std::vector<std::string> strVec = cStringTokenizer(msg->getName()).asVector();
if (strVec.size() != 4) {
throw cRuntimeError("GlobalTraceManager::"
"handleMessage(): Invalid command");
}
int toPartition = atoi(strVec[1].c_str());
int fromPartition = atoi(strVec[2].c_str());
int numNodes = atoi(strVec[3].c_str());
globalNodeList->mergeBootstrapNodes(toPartition, fromPartition, numNodes);
} else {
sendDirect(msg, getAppGateById(traceMsg->getInternalNodeId()));
return; // don't delete Message
}
delete msg;
}
| void GlobalTraceManager::initialize | ( | int | stage | ) |
Definition at line 41 of file GlobalTraceManager.cc.
{
Enter_Method_Silent();
// Nothing to do for us if there is no traceFile
if (strlen(par("traceFile")) == 0)
return;
underlayConfigurator = UnderlayConfiguratorAccess().get();
globalNodeList = GlobalNodeListAccess().get();
offset = 0;
// Open file and get size
fd = open(par("traceFile"), O_RDONLY);
if (!fd) {
throw cRuntimeError(("Can't open file " + par("traceFile").stdstringValue() +
std::string(strerror(errno))).c_str());
}
struct stat filestat;
if (fstat(fd, &filestat)) {
throw cRuntimeError(("Error calling stat: " + std::string(strerror(errno))).c_str());
}
filesize = filestat.st_size;
remain = filesize;
EV << "[GlobalTraceManager::initialize()]\n"
<< " Successfully opened trace file " << par("traceFile").stdstringValue()
<< ". Size: " << filesize
<< endl;
nextRead = new cMessage("NextRead");
scheduleAt(0, nextRead);
}
| void GlobalTraceManager::readNextBlock | ( | ) | [protected] |
Definition at line 90 of file GlobalTraceManager.cc.
Referenced by handleMessage().
{
#ifdef _WIN32
// TODO: port for MINGW
throw cRuntimeError("GlobalTraceManager::readNextBlock():"
"Not available on WIN32 yet!");
#else
double time = -1;
int nodeID;
char* bufend;
int line = 1;
if (remain > 0) {
// If rest of the file is bigger than maximal chunk size, set chunksize
// to max; the last mapped page is used as margin.
// Else map the whole remainder of the file at a time, no margin is needed
// in this case
chunksize = remain < getpagesize()*readPages ? remain : getpagesize()*readPages;
marginsize = remain == chunksize ? 0 : getpagesize();
start = (char*) mmap(0, chunksize, PROT_READ|PROT_WRITE, MAP_PRIVATE, fd, filesize - remain);
if (start == MAP_FAILED) {
throw cRuntimeError(("Error mapping file to memory:" +
std::string(strerror(errno))).c_str());
}
buf = start + offset;
// While the read pointer has not reached the margin, continue parsing
while( buf < start + chunksize - marginsize) { // FIXME: Assuming max line length of getpagesize()
time = strtod(buf, &bufend);
if (bufend==buf) {
throw cRuntimeError("Error parsing file: Expected time as double");
}
buf = bufend;
nodeID = strtol(buf, &bufend, 0);
if (bufend==buf) {
throw cRuntimeError("Error parsing file: Expected ID as long int");
}
buf = bufend;
while( isspace(buf[0]) ) buf++;
bufend = strchr(buf, '\n');
if (!bufend) {
throw cRuntimeError("Error parsing file: Missing command or no newline at end of line!");
}
bufend[0]='\0';
scheduleNextEvent(time, nodeID, buf, line++);
buf += strlen(buf)+1;
while( isspace(buf[0]) ) buf++;
}
// Compute offset for the next read
offset = (buf - start) - (chunksize - marginsize);
remain -= chunksize - marginsize;
munmap( start, chunksize );
}
if (time > 0) {
scheduleAt(time, nextRead);
} else {
// TODO: Schedule simulation end?
}
#endif
}
| void GlobalTraceManager::scheduleNextEvent | ( | double | time, | |
| int | nodeId, | |||
| char * | buf, | |||
| int | line | |||
| ) | [protected] |
Definition at line 160 of file GlobalTraceManager.cc.
Referenced by readNextBlock().
{
GlobalTraceManagerMessage* msg = new GlobalTraceManagerMessage(buf);
msg->setInternalNodeId(nodeId);
msg->setLineNumber(line);
scheduleAt(time, msg);
}
char* GlobalTraceManager::buf [private] |
Definition at line 56 of file GlobalTraceManager.h.
Referenced by readNextBlock().
int GlobalTraceManager::chunksize [private] |
Definition at line 55 of file GlobalTraceManager.h.
Referenced by readNextBlock().
int GlobalTraceManager::fd [private] |
Definition at line 55 of file GlobalTraceManager.h.
Referenced by initialize(), and readNextBlock().
int GlobalTraceManager::filesize [private] |
Definition at line 55 of file GlobalTraceManager.h.
Referenced by initialize(), and readNextBlock().
GlobalNodeList* GlobalTraceManager::globalNodeList [protected] |
pointer to GlobalNodeList
Definition at line 52 of file GlobalTraceManager.h.
Referenced by handleMessage(), and initialize().
int GlobalTraceManager::marginsize [private] |
Definition at line 55 of file GlobalTraceManager.h.
Referenced by readNextBlock().
cMessage* GlobalTraceManager::nextRead [private] |
Definition at line 60 of file GlobalTraceManager.h.
Referenced by GlobalTraceManager(), handleMessage(), initialize(), readNextBlock(), and ~GlobalTraceManager().
int GlobalTraceManager::offset [private] |
Definition at line 55 of file GlobalTraceManager.h.
Referenced by initialize(), and readNextBlock().
const int GlobalTraceManager::readPages = 32 [static, private] |
Definition at line 57 of file GlobalTraceManager.h.
Referenced by readNextBlock().
int GlobalTraceManager::remain [private] |
Definition at line 55 of file GlobalTraceManager.h.
Referenced by initialize(), and readNextBlock().
char * GlobalTraceManager::start [private] |
Definition at line 56 of file GlobalTraceManager.h.
Referenced by readNextBlock().
Definition at line 51 of file GlobalTraceManager.h.
Referenced by createNode(), deleteNode(), getAppGateById(), and initialize().
1.7.1