Cogs.Core
MessageHub.cpp
1#include "MessageHub.h"
2
3namespace Cogs::Network {
4
10 MessageHub::HubList& allHubs() {
11 static MessageHub::HubList hubs;
12 return hubs;
13 }
14
18 Mutex& allHubsMutex() {
19 static Mutex mutex;
20 return mutex;
21 }
22
27 MessageHub* find(uint32_t hubid) {
28 LockGuard g(allHubsMutex());
29 MessageHub::HubList& hubs = allHubs();
30
31 for (auto i = hubs.begin(), e = hubs.end(); i != e; ++i ) {
32 if ((*i)->getID() == hubid) {
33 return *i;
34 }
35 }
36 return nullptr;
37 }
38}
39
47 id = ident;
48
49 LockGuard g(allHubsMutex());
50
51 allHubs().push_back(this);
52}
53
59 LockGuard g(allHubsMutex());
60 HubList& hubs = allHubs();
61
62 for (auto i = hubs.begin(), e = hubs.end(); i != e; ++i ) {
63 if (*i == this) {
64 hubs.erase(i);
65 break;
66 }
67 }
68 disconnectFromAllSenders();
69 removeAllListeners();
70 flushQueuedMessages();
71}
72
77bool Cogs::Network::MessageHub::addListener(MessageHub* hub, bool bidirectional) {
78 if (hub) {
79 bool exists = false;
80
81 hubsMutex.lock();
82 for (auto i = listeners.begin(), e = listeners.end(); i != e; ++i) {
83 if (*i == hub) {
84 exists = true;
85 break;
86 }
87 }
88 if (!exists) {
89 listeners.push_back(hub);
90 }
91 hubsMutex.unlock();
92
93 if (bidirectional) {
94 hub->addListener(this, false);
95 }
96
97 if (!exists) {
98 hub->addSender(this);
99 return true;
100 }
101 }
102 return false;
103}
104
110bool Cogs::Network::MessageHub::addListener(uint32_t hubid, bool bidirectional) {
111 return addListener(find(hubid), bidirectional);
112}
113
117bool Cogs::Network::MessageHub::listenTo(MessageHub* hub, bool bidirectional) {
118 return hub ? hub->addListener(this, bidirectional) : false;
119}
120
125bool Cogs::Network::MessageHub::listenTo(uint32_t hubid, bool bidirectional) {
126 return listenTo(find(hubid), bidirectional);
127}
128
133 if (hub) {
134 bool removed = false;
135
136 hubsMutex.lock();
137 for (auto i = listeners.begin(), e = listeners.end(); i != e; ++i) {
138 if (*i == hub) {
139 listeners.erase(i);
140 removed = true;
141 break;
142 }
143 }
144 hubsMutex.unlock();
145
146 if (removed) {
147 hub->removeSender(this);
148 return true;
149 }
150 }
151 return false;
152}
153
158 return removeListener(find(hubid));
159}
160
165 HubList copy;
166
167 hubsMutex.lock();
168 std::swap(listeners, copy);
169 hubsMutex.unlock();
170
171 for (auto i = copy.begin(), e = copy.end(); i != e; ++i) {
172 (*i)->removeSender(this);
173 }
174}
175
180 HubList copy;
181
182 hubsMutex.lock();
183 copy = senders;
184 hubsMutex.unlock();
185
186 for (auto i = copy.begin(), e = copy.end(); i != e; ++i) {
187 if ((*i)->getID() == hubid) {
188 (*i)->removeListener(this);
189 break;
190 }
191 }
192}
193
198 HubList copy;
199
200 hubsMutex.lock();
201 std::swap(senders, copy);
202 hubsMutex.unlock();
203
204 for (auto i = copy.begin(), e = copy.end(); i != e; ++i) {
205 (*i)->removeListener(this);
206 }
207}
208
214void Cogs::Network::MessageHub::broadcastMessage(const Message::Ptr& message) {
215 LockGuard g(allHubsMutex());
216 HubList& hubs = allHubs();
217
218 for (auto i = hubs.begin(), e = hubs.end(); i != e; ++i ) {
219 if (*i != this) {
220 (*i)->queueMessage(message);
221 }
222 }
223}
224
231void Cogs::Network::MessageHub::sendMessage(const Message::Ptr& message) {
232 LockGuard g(hubsMutex);
233
234 for (auto i = listeners.begin(), e = listeners.end(); i != e; ++i) {
235 (*i)->queueMessage(message);
236 }
237}
238
249 size_t queuedMessages = getNoOfQueuedMessages();
250 bool ret = queuedMessages ? true : false;
251
252 if (queuedMessages > (limit * 10)) {
253 limit = noLimit;
254 }
255
256 for (Message::Ptr message = getNextMessage(); message; message = getNextMessage()) {
257 Message reader(message->getID(), *message);
258
259 processMessage(reader);
260
261 if (!--limit) {
262 break;
263 }
264 }
265 return ret;
266}
267
275void Cogs::Network::MessageHub::queueMessage(const Message::Ptr& message) {
276 if (!isSendOnly()) {
277 LockGuard g(messagesMutex);
278
279 messages.push_back(message);
280 }
281}
282
287 LockGuard g(messagesMutex);
288
289 return messages.size();
290}
291
296 LockGuard g(messagesMutex);
297
298 messages.clear();
299}
300
306Cogs::Network::Message::Ptr Cogs::Network::MessageHub::getNextMessage() {
307 LockGuard g(messagesMutex);
308
309 if (!messages.empty()) {
310 Message::Ptr message = messages.front();
311
312 messages.pop_front();
313 return message;
314 }
315 return nullptr;
316}
317
325 LockGuard g(hubsMutex);
326
327 for (auto i = senders.begin(), e = senders.end(); i != e; ++i) {
328 if (*i == sender) {
329 return;
330 }
331 }
332 senders.push_back(sender);
333}
334
339 LockGuard g(hubsMutex);
340
341 for (auto i = senders.begin(), e = senders.end(); i != e; ++i) {
342 if (*i == sender) {
343 senders.erase(i);
344 break;
345 }
346 }
347}
A MessageHub connects to other hubs to form a simple peer-to-peer application-internal networking sys...
Definition: MessageHub.h:30
void disconnectFromAllSenders()
Disconnects this hub from all hubs to which it is listening.
Definition: MessageHub.cpp:197
bool removeListener(MessageHub *hub)
Removes the specified hub as a listener to this hub.
Definition: MessageHub.cpp:132
bool addListener(MessageHub *hub, bool bidirectional=false)
Adds the specified hub as a listener to this hub.
Definition: MessageHub.cpp:77
MessageHub(uint32_t ident=0)
Constructs a new MessageHub instance and adds it to the global hub list.
Definition: MessageHub.cpp:46
virtual void sendMessage(const Message::Ptr &message)
Send message to all hubs listening to this one.
Definition: MessageHub.cpp:231
bool listenTo(MessageHub *hub, bool bidirectional=false)
Sets this MessageHub up to listen for messages from the specified hub.
Definition: MessageHub.cpp:117
void flushQueuedMessages()
Flushes all pending messages from this hub without processing them.
Definition: MessageHub.cpp:295
size_t getNoOfQueuedMessages()
Retrieves the number of messages currently awaiting processing by this hub.
Definition: MessageHub.cpp:286
virtual ~MessageHub()
Removes this MessageHub from the global list and cleans up all connections to and from this hub.
Definition: MessageHub.cpp:58
virtual bool processMessages(size_t limit=noLimit)
Process all queued messages for this hub.
Definition: MessageHub.cpp:248
void removeSender(const MessageHub *sender)
Removes the specified hub from this hub's list of senders.
Definition: MessageHub.cpp:338
void addSender(MessageHub *sender)
Adds the specified hub as a sender to which this hub is listening.
Definition: MessageHub.cpp:324
virtual void queueMessage(const Message::Ptr &message)
Queue a message for this hub to process.
Definition: MessageHub.cpp:275
void removeAllListeners()
Removes all listeners from this hub.
Definition: MessageHub.cpp:164
void disconnectFromSender(uint32_t hubid)
Disconnects this hub from the specified hub to which it is listening.
Definition: MessageHub.cpp:179
Message::Ptr getNextMessage()
Retrieves the next pending message.
Definition: MessageHub.cpp:306
virtual void broadcastMessage(const Message::Ptr &message)
Broadcast message to all hubs.
Definition: MessageHub.cpp:214