Cogs.Core
ConnectionHub.cpp
1#include "ConnectionHub.h"
2
3#if !defined( EMSCRIPTEN )
4
5#include "../Logging/Logger.h"
6
7#include <chrono>
8#include <thread>
9
10using namespace std::chrono_literals;
11
12namespace {
13 Cogs::Logging::Log logger = Cogs::Logging::getLogger("ConnectionHub");
14}
15
16bool Cogs::Network::ConnectionHub::processOutgoing() {
17 if (isConnected()) {
18 for (;;) {
19 if (!outgoingMessage) {
20 outgoingMessage = getNextMessage();
21
22 if (outgoingMessage) {
23 Header header = { outgoingMessage->getID(), outgoingMessage->size() };
24 char* ptr = reinterpret_cast<char*>(&header);
25 int retries = 1000;
26
27 for (uint64_t remaining = sizeof(header); remaining; ) {
28 uint64_t bytessent;
29
30 if (ConnectionTCP::send(ptr, remaining, bytessent)) {
31 if (bytessent) {
32 remaining -= bytessent;
33 ptr += bytessent;
34 }
35 else {
36 if (--retries <= 0) {
37 outgoingMessage = nullptr;
38 LOG_ERROR(logger, "Failed to send message header to remote host. Forcing disconnect.");
39 disconnect();
40 return false;
41 }
42 std::this_thread::sleep_for(1ms);
43 }
44 }
45 else {
46 outgoingMessage = nullptr;
47 return false;
48 }
49 }
50 }
51 else {
52 return true;
53 }
54 }
55 if (bytesSent < outgoingMessage->size()) {
56 uint64_t justSent;
57 const char* ptr = reinterpret_cast<const char*>(outgoingMessage->data()) + bytesSent;
58
59 if (ConnectionTCP::send(ptr, outgoingMessage->size() - bytesSent, justSent)) {
60 if (justSent) {
61 bytesSent += justSent;
62 }
63 else {
64 return true;
65 }
66 }
67 else {
68 outgoingMessage = nullptr;
69 bytesSent = 0;
70 return false;
71 }
72 }
73 if (bytesSent >= outgoingMessage->size()) {
74 outgoingMessage = nullptr;
75 bytesSent = 0;
76 }
77 }
78 }
79 outgoingMessage = nullptr;
80 bytesSent = 0;
81
82 while (getNextMessage()) {
83 }
84 return false;
85}
86
88 if (!incomingMessage) {
89 Header header;
90 char* dest = reinterpret_cast<char*>(&header);
91
92 for (uint64_t remaining = sizeof(header); remaining; ) {
93 uint64_t bytesReceived;
94
95 if (ConnectionTCP::receive(dest, remaining, bytesReceived)) {
96 remaining -= bytesReceived;
97 dest += bytesReceived;
98 }
99 else {
100 return false;
101 }
102 }
103 incomingMessage = Message::allocate(header.id);
104 incomingMessage->resize(header.size, false);
105 }
106
107 uint64_t remaining = incomingMessage->unreadSize();
108 char* dest = reinterpret_cast<char*>(incomingMessage->data()) + incomingMessage->pos();
109
110 while (remaining) {
111 uint64_t bytesReceived;
112
113 if (ConnectionTCP::receive(dest, remaining, bytesReceived)) {
114 if (bytesReceived) {
115 incomingMessage->seek(bytesReceived, Cogs::Memory::MemoryBuffer::Anchor::Current);
116 remaining -= bytesReceived;
117 dest += bytesReceived;
118 }
119 else {
120 break;
121 }
122 }
123 else {
124 return false;
125 }
126 }
127 if (!remaining) {
128 incomingMessage->seek(0, Cogs::Memory::MemoryBuffer::Anchor::Start);
129 handleReceivedMessage(incomingMessage);
130 incomingMessage = nullptr;
131 }
132 return true;
133}
134
135void Cogs::Network::ConnectionHub::handleReceivedMessage(const Message::Ptr& message) {
136 sendMessage(message);
137}
138
144 incomingMessage = nullptr;
145 outgoingMessage = nullptr;
146 bytesSent = 0;
147
149}
150
151#endif
Log implementation class.
Definition: LogManager.h:139
bool receive(void *buffer, uint64_t byteCount, uint64_t &bytesReceived)
Attempts to read the specified number of bytes from the network socket this connection represents.
Definition: Connection.cpp:141
bool send(const void *data, uint64_t byteCount, uint64_t &bytesSent)
Attempts to send as much of the specified data as possible.
Definition: Connection.cpp:105
virtual bool disconnect() override
Called in response to a problem with the network, or a request by the application to disconnect.
virtual bool processIncoming() override
Manages accepting incoming connections when this connection is a listener.
virtual bool disconnect() override
Disconnects this connection.
Definition: Connection.cpp:488
Message::Ptr getNextMessage()
Retrieves the next pending message.
Definition: MessageHub.cpp:306
static Ptr allocate(uint32_t id)
Retrieves an existing message from the pool, or creates a new one if the pool is empty.
Definition: Message.cpp:9
constexpr Log getLogger(const char(&name)[LEN]) noexcept
Definition: LogManager.h:180