diff --git a/server/snapServer.cpp b/server/snapServer.cpp index a8d76da0..a0c67be6 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "common/chunk.h" #include "common/timeUtils.h" #include "common/queue.h" @@ -50,7 +51,7 @@ std::string return_current_time_and_date() class Session { public: - Session(socket_ptr sock) : socket_(sock) + Session(socket_ptr sock) : active_(false), socket_(sock) { } @@ -72,21 +73,31 @@ public: catch (std::exception& e) { std::cerr << "Exception in thread: " << e.what() << "\n"; + active_ = false; } } void start() { + active_ = true; senderThread = new thread(&Session::sender, this); // readerThread.join(); } void send(shared_ptr chunk) { + while (chunks.size() * WIRE_CHUNK_MS > 10000) + chunks.pop(); chunks.push(chunk); } + bool isActive() const + { + return active_; + } + private: + bool active_; socket_ptr socket_; thread* senderThread; Queue> chunks; @@ -110,17 +121,22 @@ public: cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; Session* session = new Session(sock); session->start(); - sessions.push_back(session); + sessions.insert(shared_ptr(session)); } } void send(shared_ptr chunk) { - for (size_t n=0; n>::iterator it = sessions.begin(); it != sessions.end(); ) { - if (sessions[n] != 0) - sessions[n]->send(chunk); - } + if (!(*it)->isActive()) + sessions.erase(it++); + else + ++it; + } + + for (auto s : sessions) + s->send(chunk); } void start() @@ -134,7 +150,7 @@ public: } private: - vector sessions; + set> sessions; boost::asio::io_service io_service_; unsigned short port_; thread* acceptThread;