#include <MessageObserver.h>
Classes | |
| struct | MulticastGroup |
Public Member Functions | |
| MessageObserver () | |
| ~MessageObserver () | |
| void | initialize () |
| void | finish () |
| void | handleMessage (cMessage *msg) |
| void | joinedGroup (int moduleId, OverlayKey groupId) |
| Adds one to node count for group. | |
| void | leftGroup (int moduleId, OverlayKey groupId) |
| Subtracts one from node count for group. | |
| void | sentMessage (ALMTestTracedMessage *msg) |
| Counts n - 1 messages pending reception, where n is the size of the group to which the message is sent. | |
| void | receivedMessage (ALMTestTracedMessage *msg) |
| Counts one received message for group. | |
| void | nodeDead (int moduleId) |
| Notifies the observer that the node doesn't exist anymore. | |
Private Types | |
| typedef std::pair< int, OverlayKey > | NodeGroupPair |
| typedef std::pair< int, long > | NodeMessagePair |
Private Attributes | |
| simtime_t | creationTime |
| std::map< OverlayKey, MulticastGroup > | groups |
| std::map< NodeGroupPair, simtime_t > | joinedAt |
| std::map< NodeMessagePair, simtime_t > | receivedAt |
| cMessage * | gcTimer |
| double | gcInterval |
| double | cacheMaxAge |
| int | numLooped |
| GlobalStatistics * | globalStatistics |
Friends | |
| std::ostream & | operator<< (std::ostream &os, MessageObserver::MulticastGroup const &mg) |
| std::ostream & | operator<< (std::ostream &os, MessageObserver::NodeGroupPair const &ngp) |
Definition at line 33 of file MessageObserver.h.
typedef std::pair<int, OverlayKey> MessageObserver::NodeGroupPair [private] |
Definition at line 93 of file MessageObserver.h.
typedef std::pair<int, long> MessageObserver::NodeMessagePair [private] |
Definition at line 95 of file MessageObserver.h.
| MessageObserver::MessageObserver | ( | ) |
Definition at line 31 of file MessageObserver.cc.
{
gcTimer = new cMessage("garbage_collection");
gcInterval = 1.0;
cacheMaxAge = 10.0;
numLooped = 0;
}
| MessageObserver::~MessageObserver | ( | ) |
Definition at line 38 of file MessageObserver.cc.
{
cancelAndDelete(gcTimer);
}
| void MessageObserver::finish | ( | ) |
Definition at line 59 of file MessageObserver.cc.
{
uint64_t totalSent = 0;
uint64_t totalReceived = 0;
for (std::map<OverlayKey, MulticastGroup>::iterator i = groups.begin(); i != groups.end(); ++i) {
std::stringstream message;
message << "MessageObserver: Group " << i->first;
std::string name;
name = message.str() + " Sent Messages";
recordScalar(name.c_str(), (double)i->second.sent);
name = message.str() + " Received Messages";
recordScalar(name.c_str(), (double)i->second.received);
name = message.str() + " Delivered Percentage";
recordScalar(name.c_str(), ((double)i->second.received * 100.0) / (double)i->second.sent);
totalSent += i->second.sent;
totalReceived += i->second.received;
}
recordScalar("MessageObserver: Total Sent Messages", (double)totalSent);
recordScalar("MessageObserver: Total Received Messages", (double)totalReceived);
recordScalar("MessageObserver: Total Delivered Percentage", ((double)totalReceived * 100.0) / (double)totalSent);
simtime_t time = globalStatistics->calcMeasuredLifetime(creationTime);
if ( time >= GlobalStatistics::MIN_MEASURED ) {
globalStatistics->addStdDev("MessageObserver: Looped messages/s", (double)numLooped / time);
}
}
| void MessageObserver::handleMessage | ( | cMessage * | msg | ) |
Definition at line 90 of file MessageObserver.cc.
{
if (msg == gcTimer) {
simtime_t now = OPP::simTime();
std::map<NodeMessagePair, simtime_t>::iterator i, iPrev;
i = receivedAt.begin();
while (i != receivedAt.end()) {
if (now - i->second >= cacheMaxAge) {
iPrev = i;
++i;
receivedAt.erase(iPrev);
}
else {
++i;
}
}
scheduleAt(OPP::simTime() + gcInterval, gcTimer);
}
}
| void MessageObserver::initialize | ( | ) |
Definition at line 42 of file MessageObserver.cc.
{
WATCH_MAP(groups);
WATCH_MAP(joinedAt);
WATCH_MAP(receivedAt);
WATCH(numLooped);
numLooped = 0;
gcInterval = par("gcInterval");
cacheMaxAge = par("cacheMaxAge");
if (gcInterval > 0.0)
scheduleAt(OPP::simTime() + gcInterval, gcTimer);
creationTime = OPP::simTime();
globalStatistics = GlobalStatisticsAccess().get();
}
| void MessageObserver::joinedGroup | ( | int | moduleId, | |
| OverlayKey | groupId | |||
| ) |
Adds one to node count for group.
Definition at line 112 of file MessageObserver.cc.
Referenced by ALMTest::joinGroup().
{
groups[groupId].size += 1;
joinedAt[NodeGroupPair(moduleId, groupId)] = OPP::simTime();
}
| void MessageObserver::leftGroup | ( | int | moduleId, | |
| OverlayKey | groupId | |||
| ) |
Subtracts one from node count for group.
Definition at line 120 of file MessageObserver.cc.
Referenced by ALMTest::leaveGroup().
{
std::map<OverlayKey, MulticastGroup>::iterator iter = groups.find(groupId);
if (iter == groups.end()) {
EV << "Warning: MessageObserver asked to remove node from nonexistent group " << groupId.toString();
}
else if (iter->second.size == 0) {
EV << "Warning: MessageObserver asked to remove node from empty group " << groupId.toString();
}
else {
iter->second.size -= 1;
}
joinedAt.erase(NodeGroupPair(moduleId, groupId));
}
| void MessageObserver::nodeDead | ( | int | moduleId | ) |
Notifies the observer that the node doesn't exist anymore.
Definition at line 196 of file MessageObserver.cc.
Referenced by ALMTest::finishApp().
{
// For each group, if node has joined group, decrease group member count by one
// and clear joined info.
for (std::map<OverlayKey, MulticastGroup>::iterator ig = groups.begin(); ig != groups.end(); ++ig) {
NodeGroupPair ngp = NodeGroupPair(moduleId, ig->first);
if (joinedAt.find(ngp) != joinedAt.end()) {
ig->second.size--;
joinedAt.erase(ngp);
}
}
}
| void MessageObserver::receivedMessage | ( | ALMTestTracedMessage * | msg | ) |
Counts one received message for group.
Definition at line 159 of file MessageObserver.cc.
Referenced by ALMTest::handleMCast().
{
if (msg == NULL) {
error("%s called with null message.", __PRETTY_FUNCTION__);
}
std::map<OverlayKey, MulticastGroup>::iterator iGroup;
iGroup = groups.find(msg->getGroupId());
if (iGroup == groups.end()) {
EV << "Warning: MessageObserver notified of received message for nonexistent group " << msg->getGroupId().toString();
}
else if (iGroup->second.size == 0) {
EV << "Warning: MessageObserver notified of received message for empty group " << msg->getGroupId().toString();
}
else if (msg->getSenderId() != msg->getReceiverId()) {
// Only count if message was received after joining
std::map<NodeGroupPair, simtime_t>::iterator iJoinInfo;
iJoinInfo = joinedAt.find(NodeGroupPair(msg->getReceiverId(), msg->getGroupId()));
if (iJoinInfo != joinedAt.end() && iJoinInfo->second < msg->getTimestamp()) {
// Check if this message has not already been received
NodeMessagePair nmp = NodeMessagePair(msg->getReceiverId(), msg->getMcastId());
if (receivedAt.find(nmp) == receivedAt.end()) {
iGroup->second.received += 1;
receivedAt[nmp] = msg->getTimestamp();
}
}
}
else {
RECORD_STATS(++numLooped);
}
}
| void MessageObserver::sentMessage | ( | ALMTestTracedMessage * | msg | ) |
Counts n - 1 messages pending reception, where n is the size of the group to which the message is sent.
Counts n - 1 messages pending reception, where n is the size of the group.
Definition at line 138 of file MessageObserver.cc.
Referenced by ALMTest::sendDataToGroup().
{
if (msg == NULL) {
error("%s called with null message.", __PRETTY_FUNCTION__);
}
std::map<OverlayKey, MulticastGroup>::iterator iter;
iter = groups.find(msg->getGroupId());
if (iter == groups.end()) {
EV << "Warning: MessageObserver notified of sent message for nonexistent group " << msg->getGroupId().toString();
}
else if (iter->second.size == 0) {
EV << "Warning: MessageObserver notified of sent message for empty group " << msg->getGroupId().toString();
}
else {
iter->second.sent += iter->second.size - 1;
}
}
| std::ostream& operator<< | ( | std::ostream & | os, | |
| MessageObserver::MulticastGroup const & | mg | |||
| ) | [friend] |
Definition at line 208 of file MessageObserver.cc.
{
return os << "Nodes: " << mg.size << "; Messages Sent: " << mg.sent
<< ", Received: " << mg.received << ", Dropped: " << (mg.sent - mg.received);
}
| std::ostream& operator<< | ( | std::ostream & | os, | |
| MessageObserver::NodeGroupPair const & | ngp | |||
| ) | [friend] |
Definition at line 213 of file MessageObserver.cc.
{
cModule* module = OPP::cSimulation::getActiveSimulation()->getModule(ngp.first);
return os << "(" << (module != NULL ? module->getFullPath() : "Deleted node")
<< ", " << ngp.second << ")";
}
double MessageObserver::cacheMaxAge [private] |
Definition at line 113 of file MessageObserver.h.
Referenced by handleMessage(), initialize(), and MessageObserver().
simtime_t MessageObserver::creationTime [private] |
Definition at line 91 of file MessageObserver.h.
Referenced by finish(), and initialize().
double MessageObserver::gcInterval [private] |
Definition at line 110 of file MessageObserver.h.
Referenced by handleMessage(), initialize(), and MessageObserver().
cMessage* MessageObserver::gcTimer [private] |
Definition at line 107 of file MessageObserver.h.
Referenced by handleMessage(), initialize(), MessageObserver(), and ~MessageObserver().
Definition at line 118 of file MessageObserver.h.
Referenced by finish(), and initialize().
std::map<OverlayKey, MulticastGroup> MessageObserver::groups [private] |
Definition at line 98 of file MessageObserver.h.
Referenced by finish(), initialize(), joinedGroup(), leftGroup(), nodeDead(), receivedMessage(), and sentMessage().
std::map<NodeGroupPair, simtime_t> MessageObserver::joinedAt [private] |
Definition at line 101 of file MessageObserver.h.
Referenced by initialize(), joinedGroup(), leftGroup(), nodeDead(), and receivedMessage().
int MessageObserver::numLooped [private] |
Definition at line 116 of file MessageObserver.h.
Referenced by finish(), initialize(), MessageObserver(), and receivedMessage().
std::map<NodeMessagePair, simtime_t> MessageObserver::receivedAt [private] |
Definition at line 104 of file MessageObserver.h.
Referenced by handleMessage(), initialize(), and receivedMessage().
1.7.1