diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 9c505eba..ff67dbf3 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -41,9 +41,9 @@ ControlServer::~ControlServer() } -void ControlServer::send(const std::string& message, const ControlSession* excludeSession) +void ControlServer::cleanup() { - std::unique_lock mlock(mutex_); + std::lock_guard mlock(mutex_); for (auto it = sessions_.begin(); it != sessions_.end(); ) { if (!(*it)->active()) @@ -53,12 +53,18 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu auto func = [](shared_ptr s)->void{s->stop();}; std::thread t(func, *it); t.detach(); + //(*it)->stop(); sessions_.erase(it++); } else ++it; } +} + +void ControlServer::send(const std::string& message, const ControlSession* excludeSession) +{ + cleanup(); for (auto s : sessions_) { if (s.get() != excludeSession) @@ -69,6 +75,7 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message) { + std::lock_guard mlock(mutex_); logD << "received: \"" << message << "\"\n"; if ((message == "quit") || (message == "exit") || (message == "bye")) { @@ -108,9 +115,10 @@ void ControlServer::handleAccept(socket_ptr socket) logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; shared_ptr session = make_shared(this, socket); { - std::unique_lock mlock(mutex_); + std::lock_guard mlock(mutex_); session->start(); sessions_.insert(session); + cleanup(); } startAccept(); } @@ -127,7 +135,7 @@ void ControlServer::stop() { if (acceptor_) acceptor_->cancel(); - std::unique_lock mlock(mutex_); + std::lock_guard mlock(mutex_); for (auto s: sessions_) s->stop(); } diff --git a/server/controlServer.h b/server/controlServer.h index a7019947..5120a678 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -61,8 +61,9 @@ public: private: void startAccept(); void handleAccept(socket_ptr socket); + void cleanup(); // void acceptor(); - mutable std::mutex mutex_; + mutable std::recursive_mutex mutex_; std::set> sessions_; std::shared_ptr acceptor_; diff --git a/server/controlSession.cpp b/server/controlSession.cpp index 62d96329..730a7d20 100644 --- a/server/controlSession.cpp +++ b/server/controlSession.cpp @@ -42,7 +42,7 @@ ControlSession::~ControlSession() void ControlSession::start() { { - std::lock_guard activeLock(activeMutex_); + std::lock_guard activeLock(activeMutex_); active_ = true; } readerThread_ = new thread(&ControlSession::reader, this); @@ -52,20 +52,15 @@ void ControlSession::start() void ControlSession::stop() { - { - std::lock_guard activeLock(activeMutex_); - if (!active_) - return; - - active_ = false; - } - + logD << "ControlSession::stop\n"; + std::lock_guard activeLock(activeMutex_); + active_ = false; try { std::error_code ec; if (socket_) { - std::lock_guard socketLock(socketMutex_); + std::lock_guard socketLock(socketMutex_); socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec); if (ec) logE << "Error in socket shutdown: " << ec.message() << "\n"; socket_->close(ec); @@ -105,9 +100,9 @@ void ControlSession::sendAsync(const std::string& message) bool ControlSession::send(const std::string& message) const { //logO << "send: " << message << ", size: " << message.length() << "\n"; - std::lock_guard socketLock(socketMutex_); + std::lock_guard socketLock(socketMutex_); { - std::lock_guard activeLock(activeMutex_); + std::lock_guard activeLock(activeMutex_); if (!socket_ || !active_) return false; } diff --git a/server/controlSession.h b/server/controlSession.h index a6d8eb9f..bc3204d8 100644 --- a/server/controlSession.h +++ b/server/controlSession.h @@ -76,8 +76,8 @@ protected: void writer(); std::atomic active_; - mutable std::mutex activeMutex_; - mutable std::mutex socketMutex_; + mutable std::recursive_mutex activeMutex_; + mutable std::recursive_mutex socketMutex_; std::thread* readerThread_; std::thread* writerThread_; std::shared_ptr socket_;