1#if !defined( EMSCRIPTEN )
6#include "../Platform/Threads.h"
7#include "../Platform/Timer.h"
10#include <condition_variable>
15 #include <sys/select.h>
18using namespace std::chrono;
20namespace Cogs::Network {
21 Thread* gThread =
nullptr;
24 std::vector< ConnectionBase* > gNewConnections;
25 std::vector< ConnectionBase* > gConnections;
27 std::condition_variable gAddCV;
28 std::condition_variable gRemoveCV;
33void Cogs::Network::addConnection(ConnectionBase* connection) {
34 std::unique_lock lock(gMutex, std::defer_lock);
35 bool do_lock = !gThread || (std::this_thread::get_id() != gThread->get_id());
36 if(do_lock) lock.lock();
38 if ((std::find(gNewConnections.begin(), gNewConnections.end(), connection) == gNewConnections.end()) &&
39 (std::find(gConnections.begin(), gConnections.end(), connection) == gConnections.end())) {
40 gNewConnections.push_back(connection);
43 gThread =
new Thread(&ThreadFn);
44 Threads::setName(*gThread,
"Network");
46 else if(gThread && (std::this_thread::get_id() != gThread->get_id())){
47 if(do_lock) lock.unlock();
53void Cogs::Network::removeConnection(ConnectionBase* connection) {
54 std::unique_lock lock(gMutex, std::defer_lock);
55 bool do_lock = !gThread || (std::this_thread::get_id() != gThread->get_id());
56 if(do_lock) lock.lock();
57 auto ne = gNewConnections.end();
58 auto ni = std::find(gNewConnections.begin(), ne, connection);
61 gNewConnections.erase(ni);
64 auto e = gConnections.end();
65 auto i = std::find(gConnections.begin(), e, connection);
68 if(do_lock) lock.unlock();
70 if(do_lock) lock.lock();
72 if (gThread && (std::this_thread::get_id() != gThread->get_id())) {
74 e = gConnections.end();
75 if (std::find(gConnections.begin(), e, connection) == e) {
85void Cogs::Network::closedown() {
101void Cogs::Network::ThreadFn() {
102 uint32_t current = 0;
103 std::unique_lock lock(gMutex);
106 gConnections.insert(gConnections.end(), gNewConnections.begin(), gNewConnections.end());
107 gNewConnections.clear();
108 size_t connectionCount = gConnections.size();
110 gRemoveCV.notify_all();
113 if (connectionCount) {
114 fd_set socketHandles;
117 int connectedCount = 0;
119 for (
auto connection : gConnections) {
120 connection->processOutgoing();
123 FD_ZERO(&socketHandles);
125 if (connectionCount < FD_SETSIZE) {
126 for (
auto connection : gConnections) {
127 if (connection->isConnected()) {
128 FD_SET(connection->getSocket(), &socketHandles);
129 highest = std::max(highest, connection->getSocket());
134 if (current >= connectionCount) {
137 for (uint32_t i = 0; i < FD_SETSIZE; ++i) {
138 ConnectionBase* connection = gConnections[current];
140 if (connection->isConnected()) {
141 FD_SET(connection->getSocket(), &socketHandles);
142 highest = std::max(highest, connection->getSocket());
144 current = ++current % connectionCount;
149 timeout.tv_usec = 50000;
152 if (select(
static_cast<int>(highest + 1), &socketHandles,
nullptr,
nullptr, &timeout) > 0) {
153 std::lock_guard select_lock(gMutex);
155 for (
auto connection : gConnections) {
156 if (FD_ISSET(connection->getSocket(), &socketHandles)) {
157 connection->processIncoming();
163 for (
auto i = gConnections.begin(); i != gConnections.end(); ) {
164 ConnectionBase* connection = *i;
166 if (!connection->isConnected()) {
167 if (connection->shouldDelete()) {
169 i = gConnections.erase(i);
171 else if (connection->isAutoConnectEnabled()) {
172 if (connection->getReconnectTime() < Timer::currentTimeMilliseconds()) {
173 connection->reconnect();
178 i = gConnections.erase(i);
181 else if (connection->shouldRemove()) {
182 connection->disconnect();
183 if (connection->shouldDelete()) {
186 i = gConnections.erase(i);
193 if (!connectedCount) {
194 gAddCV.wait_for(lock, 1ms);
203 gNewConnections.clear();
204 gConnections.clear();