Cogs.Core
Network.cpp
1#if !defined( EMSCRIPTEN )
2
3#include "Network.h"
4#include "Connection.h"
5
6#include "../Platform/Threads.h"
7#include "../Platform/Timer.h"
8
9#include <algorithm>
10#include <condition_variable>
11#include <vector>
12#include <chrono>
13
14#if !defined( _WIN32 )
15 #include <sys/select.h>
16#endif
17
18using namespace std::chrono;
19
20namespace Cogs::Network {
21 Thread* gThread = nullptr;
22 bool gExit = false;
23 std::mutex gMutex;
24 std::vector< ConnectionBase* > gNewConnections;
25 std::vector< ConnectionBase* > gConnections;
26
27 std::condition_variable gAddCV;
28 std::condition_variable gRemoveCV;
29
30 void ThreadFn();
31}
32
33void Cogs::Network::addConnection(ConnectionBase* connection) {
34 std::unique_lock lock(gMutex, std::defer_lock);
35 bool do_lock = !gThread || (std::this_thread::get_id() != gThread->get_id());
36 if(do_lock) lock.lock();
37
38 if ((std::find(gNewConnections.begin(), gNewConnections.end(), connection) == gNewConnections.end()) &&
39 (std::find(gConnections.begin(), gConnections.end(), connection) == gConnections.end())) {
40 gNewConnections.push_back(connection);
41
42 if (!gThread) {
43 gThread = new Thread(&ThreadFn);
44 Threads::setName(*gThread, "Network");
45 }
46 else if(gThread && (std::this_thread::get_id() != gThread->get_id())){
47 if(do_lock) lock.unlock();
48 gAddCV.notify_all();
49 }
50 }
51}
52
53void Cogs::Network::removeConnection(ConnectionBase* connection) {
54 std::unique_lock lock(gMutex, std::defer_lock);
55 bool do_lock = !gThread || (std::this_thread::get_id() != gThread->get_id());
56 if(do_lock) lock.lock();
57 auto ne = gNewConnections.end();
58 auto ni = std::find(gNewConnections.begin(), ne, connection);
59
60 if (ni != ne) {
61 gNewConnections.erase(ni);
62 }
63 else {
64 auto e = gConnections.end();
65 auto i = std::find(gConnections.begin(), e, connection);
66
67 if (i != e) {
68 if(do_lock) lock.unlock();
69 connection->remove();
70 if(do_lock) lock.lock();
71
72 if (gThread && (std::this_thread::get_id() != gThread->get_id())) {
73 for (;;) {
74 e = gConnections.end();
75 if (std::find(gConnections.begin(), e, connection) == e) {
76 break;
77 }
78 gRemoveCV.wait(lock);
79 }
80 }
81 }
82 }
83}
84
85void Cogs::Network::closedown() {
86 if (gThread) {
87 gExit = true;
88 gAddCV.notify_all();
89 gThread->join();
90
91 delete gThread;
92 gThread = nullptr;
93 gExit = false;
94 }
95}
96
101void Cogs::Network::ThreadFn() {
102 uint32_t current = 0;
103 std::unique_lock lock(gMutex);
104
105 while (!gExit) {
106 gConnections.insert(gConnections.end(), gNewConnections.begin(), gNewConnections.end());
107 gNewConnections.clear();
108 size_t connectionCount = gConnections.size();
109 lock.unlock();
110 gRemoveCV.notify_all();
111 lock.lock();
112
113 if (connectionCount) {
114 fd_set socketHandles;
115 Socket highest = 0;
116 timeval timeout;
117 int connectedCount = 0;
118
119 for (auto connection : gConnections) {
120 connection->processOutgoing();
121 }
122
123 FD_ZERO(&socketHandles);
124
125 if (connectionCount < FD_SETSIZE) {
126 for (auto connection : gConnections) {
127 if (connection->isConnected()) {
128 FD_SET(connection->getSocket(), &socketHandles);
129 highest = std::max(highest, connection->getSocket());
130 }
131 }
132 }
133 else {
134 if (current >= connectionCount) {
135 current = 0;
136 }
137 for (uint32_t i = 0; i < FD_SETSIZE; ++i) {
138 ConnectionBase* connection = gConnections[current];
139
140 if (connection->isConnected()) {
141 FD_SET(connection->getSocket(), &socketHandles);
142 highest = std::max(highest, connection->getSocket());
143 }
144 current = ++current % connectionCount;
145 }
146 }
147
148 timeout.tv_sec = 0;
149 timeout.tv_usec = 50000;
150
151 lock.unlock();
152 if (select(static_cast<int>(highest + 1), &socketHandles, nullptr, nullptr, &timeout) > 0) {
153 std::lock_guard select_lock(gMutex);
154
155 for (auto connection : gConnections) {
156 if (FD_ISSET(connection->getSocket(), &socketHandles)) {
157 connection->processIncoming();
158 }
159 }
160 }
161 lock.lock();
162
163 for (auto i = gConnections.begin(); i != gConnections.end(); ) {
164 ConnectionBase* connection = *i;
165
166 if (!connection->isConnected()) {
167 if (connection->shouldDelete()) {
168 delete connection;
169 i = gConnections.erase(i);
170 }
171 else if (connection->isAutoConnectEnabled()) {
172 if (connection->getReconnectTime() < Timer::currentTimeMilliseconds()) {
173 connection->reconnect();
174 }
175 ++i;
176 }
177 else {
178 i = gConnections.erase(i);
179 }
180 }
181 else if (connection->shouldRemove()) {
182 connection->disconnect();
183 if (connection->shouldDelete()) {
184 delete connection;
185 }
186 i = gConnections.erase(i);
187 }
188 else {
189 ++connectedCount;
190 ++i;
191 }
192 }
193 if (!connectedCount) {
194 gAddCV.wait_for(lock, 1ms);
195 }
196 }
197 else {
198 if(!gExit) {
199 gAddCV.wait(lock);
200 }
201 }
202 }
203 gNewConnections.clear();
204 gConnections.clear();
205}
206
207#endif