3namespace Cogs::Network {
10 MessageHub::HubList& allHubs() {
11 static MessageHub::HubList hubs;
18 Mutex& allHubsMutex() {
27 MessageHub* find(uint32_t hubid) {
28 LockGuard g(allHubsMutex());
29 MessageHub::HubList& hubs = allHubs();
31 for (
auto i = hubs.begin(), e = hubs.end(); i != e; ++i ) {
32 if ((*i)->getID() == hubid) {
49 LockGuard g(allHubsMutex());
51 allHubs().push_back(
this);
59 LockGuard g(allHubsMutex());
60 HubList& hubs = allHubs();
62 for (
auto i = hubs.begin(), e = hubs.end(); i != e; ++i ) {
68 disconnectFromAllSenders();
70 flushQueuedMessages();
82 for (
auto i = listeners.begin(), e = listeners.end(); i != e; ++i) {
89 listeners.push_back(hub);
111 return addListener(find(hubid), bidirectional);
118 return hub ? hub->
addListener(
this, bidirectional) :
false;
126 return listenTo(find(hubid), bidirectional);
134 bool removed =
false;
137 for (
auto i = listeners.begin(), e = listeners.end(); i != e; ++i) {
158 return removeListener(find(hubid));
168 std::swap(listeners, copy);
171 for (
auto i = copy.begin(), e = copy.end(); i != e; ++i) {
172 (*i)->removeSender(
this);
186 for (
auto i = copy.begin(), e = copy.end(); i != e; ++i) {
187 if ((*i)->getID() == hubid) {
188 (*i)->removeListener(
this);
201 std::swap(senders, copy);
204 for (
auto i = copy.begin(), e = copy.end(); i != e; ++i) {
205 (*i)->removeListener(
this);
215 LockGuard g(allHubsMutex());
216 HubList& hubs = allHubs();
218 for (
auto i = hubs.begin(), e = hubs.end(); i != e; ++i ) {
220 (*i)->queueMessage(message);
232 LockGuard g(hubsMutex);
234 for (
auto i = listeners.begin(), e = listeners.end(); i != e; ++i) {
235 (*i)->queueMessage(message);
249 size_t queuedMessages = getNoOfQueuedMessages();
250 bool ret = queuedMessages ? true :
false;
252 if (queuedMessages > (limit * 10)) {
256 for (Message::Ptr message = getNextMessage(); message; message = getNextMessage()) {
257 Message reader(message->getID(), *message);
259 processMessage(reader);
277 LockGuard g(messagesMutex);
279 messages.push_back(message);
287 LockGuard g(messagesMutex);
289 return messages.size();
296 LockGuard g(messagesMutex);
307 LockGuard g(messagesMutex);
309 if (!messages.empty()) {
310 Message::Ptr message = messages.front();
312 messages.pop_front();
325 LockGuard g(hubsMutex);
327 for (
auto i = senders.begin(), e = senders.end(); i != e; ++i) {
332 senders.push_back(sender);
339 LockGuard g(hubsMutex);
341 for (
auto i = senders.begin(), e = senders.end(); i != e; ++i) {
A MessageHub connects to other hubs to form a simple peer-to-peer application-internal networking sys...
void disconnectFromAllSenders()
Disconnects this hub from all hubs to which it is listening.
bool removeListener(MessageHub *hub)
Removes the specified hub as a listener to this hub.
bool addListener(MessageHub *hub, bool bidirectional=false)
Adds the specified hub as a listener to this hub.
MessageHub(uint32_t ident=0)
Constructs a new MessageHub instance and adds it to the global hub list.
virtual void sendMessage(const Message::Ptr &message)
Send message to all hubs listening to this one.
bool listenTo(MessageHub *hub, bool bidirectional=false)
Sets this MessageHub up to listen for messages from the specified hub.
void flushQueuedMessages()
Flushes all pending messages from this hub without processing them.
size_t getNoOfQueuedMessages()
Retrieves the number of messages currently awaiting processing by this hub.
virtual ~MessageHub()
Removes this MessageHub from the global list and cleans up all connections to and from this hub.
virtual bool processMessages(size_t limit=noLimit)
Process all queued messages for this hub.
void removeSender(const MessageHub *sender)
Removes the specified hub from this hub's list of senders.
void addSender(MessageHub *sender)
Adds the specified hub as a sender to which this hub is listening.
virtual void queueMessage(const Message::Ptr &message)
Queue a message for this hub to process.
void removeAllListeners()
Removes all listeners from this hub.
void disconnectFromSender(uint32_t hubid)
Disconnects this hub from the specified hub to which it is listening.
Message::Ptr getNextMessage()
Retrieves the next pending message.
virtual void broadcastMessage(const Message::Ptr &message)
Broadcast message to all hubs.