diff --git a/server/streamServer.cpp b/server/streamServer.cpp index b398fd6f..f80da338 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -45,12 +45,31 @@ StreamServer::~StreamServer() void StreamServer::send(const msg::BaseMessage* message) { std::unique_lock mlock(mutex_); + + for (auto it = sessions_.begin(); it != sessions_.end(); ) + { + if (!(*it)->active()) + { + logS(kLogErr) << "Session inactive. Removing\n"; + // don't block: remove ServerSession in a thread + onDisconnect(it->get()); + auto func = [](shared_ptr s)->void{s->stop();}; + std::thread t(func, *it); + t.detach(); + sessions_.erase(it++); + } + else + ++it; + } + + +/* for (auto it = sessions_.begin(); it != sessions_.end(); ) { if (!(*it)->active()) onDisconnect(it->get()); } - +*/ std::shared_ptr shared_message(message); for (auto s : sessions_) s->add(shared_message); @@ -73,12 +92,12 @@ void StreamServer::onResync(const PipeReader* pipeReader, double ms) void StreamServer::onDisconnect(StreamSession* streamSession) { logO << "onDisconnect: " << streamSession->macAddress << "\n"; - auto func = [](StreamSession* s)->void{s->stop();}; +/* auto func = [](StreamSession* s)->void{s->stop();}; std::thread t(func, streamSession); t.detach(); - +*/ ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress); - // don't block: remove StreamSession in a thread +/* // don't block: remove StreamSession in a thread for (auto it = sessions_.begin(); it != sessions_.end(); ) { if (it->get() == streamSession) @@ -88,7 +107,7 @@ void StreamServer::onDisconnect(StreamSession* streamSession) break; } } - +*/ // notify controllers if not yet done if (!clientInfo->connected) return; diff --git a/server/streamSession.cpp b/server/streamSession.cpp index beace3a6..c8605c2a 100644 --- a/server/streamSession.cpp +++ b/server/streamSession.cpp @@ -28,7 +28,7 @@ using namespace std; -StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr socket) : messageReceiver_(receiver) +StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr socket) : active_(true), messageReceiver_(receiver) { socket_ = socket; } @@ -42,7 +42,7 @@ StreamSession::~StreamSession() void StreamSession::start() { - active_ = true; + setActive(true); streamActive_ = false; readerThread_ = new thread(&StreamSession::reader, this); writerThread_ = new thread(&StreamSession::writer, this);