proper shutdown of the ControlSession

This commit is contained in:
Johannes Pohl 2017-03-12 11:22:01 +01:00
parent 9e540c4855
commit df5f9bdeab
4 changed files with 23 additions and 19 deletions

View file

@ -41,9 +41,9 @@ ControlServer::~ControlServer()
} }
void ControlServer::send(const std::string& message, const ControlSession* excludeSession) void ControlServer::cleanup()
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ) for (auto it = sessions_.begin(); it != sessions_.end(); )
{ {
if (!(*it)->active()) if (!(*it)->active())
@ -53,12 +53,18 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu
auto func = [](shared_ptr<ControlSession> s)->void{s->stop();}; auto func = [](shared_ptr<ControlSession> s)->void{s->stop();};
std::thread t(func, *it); std::thread t(func, *it);
t.detach(); t.detach();
//(*it)->stop();
sessions_.erase(it++); sessions_.erase(it++);
} }
else else
++it; ++it;
} }
}
void ControlServer::send(const std::string& message, const ControlSession* excludeSession)
{
cleanup();
for (auto s : sessions_) for (auto s : sessions_)
{ {
if (s.get() != excludeSession) if (s.get() != excludeSession)
@ -69,6 +75,7 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu
void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message) void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message)
{ {
std::lock_guard<std::recursive_mutex> mlock(mutex_);
logD << "received: \"" << message << "\"\n"; logD << "received: \"" << message << "\"\n";
if ((message == "quit") || (message == "exit") || (message == "bye")) if ((message == "quit") || (message == "exit") || (message == "bye"))
{ {
@ -108,9 +115,10 @@ void ControlServer::handleAccept(socket_ptr socket)
logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<ControlSession> session = make_shared<ControlSession>(this, socket); shared_ptr<ControlSession> session = make_shared<ControlSession>(this, socket);
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(mutex_);
session->start(); session->start();
sessions_.insert(session); sessions_.insert(session);
cleanup();
} }
startAccept(); startAccept();
} }
@ -127,7 +135,7 @@ void ControlServer::stop()
{ {
if (acceptor_) if (acceptor_)
acceptor_->cancel(); acceptor_->cancel();
std::unique_lock<std::mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(mutex_);
for (auto s: sessions_) for (auto s: sessions_)
s->stop(); s->stop();
} }

View file

@ -61,8 +61,9 @@ public:
private: private:
void startAccept(); void startAccept();
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
void cleanup();
// void acceptor(); // void acceptor();
mutable std::mutex mutex_; mutable std::recursive_mutex mutex_;
std::set<std::shared_ptr<ControlSession>> sessions_; std::set<std::shared_ptr<ControlSession>> sessions_;
std::shared_ptr<tcp::acceptor> acceptor_; std::shared_ptr<tcp::acceptor> acceptor_;

View file

@ -42,7 +42,7 @@ ControlSession::~ControlSession()
void ControlSession::start() void ControlSession::start()
{ {
{ {
std::lock_guard<std::mutex> activeLock(activeMutex_); std::lock_guard<std::recursive_mutex> activeLock(activeMutex_);
active_ = true; active_ = true;
} }
readerThread_ = new thread(&ControlSession::reader, this); readerThread_ = new thread(&ControlSession::reader, this);
@ -52,20 +52,15 @@ void ControlSession::start()
void ControlSession::stop() void ControlSession::stop()
{ {
{ logD << "ControlSession::stop\n";
std::lock_guard<std::mutex> activeLock(activeMutex_); std::lock_guard<std::recursive_mutex> activeLock(activeMutex_);
if (!active_) active_ = false;
return;
active_ = false;
}
try try
{ {
std::error_code ec; std::error_code ec;
if (socket_) if (socket_)
{ {
std::lock_guard<std::mutex> socketLock(socketMutex_); std::lock_guard<std::recursive_mutex> socketLock(socketMutex_);
socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec); socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
if (ec) logE << "Error in socket shutdown: " << ec.message() << "\n"; if (ec) logE << "Error in socket shutdown: " << ec.message() << "\n";
socket_->close(ec); socket_->close(ec);
@ -105,9 +100,9 @@ void ControlSession::sendAsync(const std::string& message)
bool ControlSession::send(const std::string& message) const bool ControlSession::send(const std::string& message) const
{ {
//logO << "send: " << message << ", size: " << message.length() << "\n"; //logO << "send: " << message << ", size: " << message.length() << "\n";
std::lock_guard<std::mutex> socketLock(socketMutex_); std::lock_guard<std::recursive_mutex> socketLock(socketMutex_);
{ {
std::lock_guard<std::mutex> activeLock(activeMutex_); std::lock_guard<std::recursive_mutex> activeLock(activeMutex_);
if (!socket_ || !active_) if (!socket_ || !active_)
return false; return false;
} }

View file

@ -76,8 +76,8 @@ protected:
void writer(); void writer();
std::atomic<bool> active_; std::atomic<bool> active_;
mutable std::mutex activeMutex_; mutable std::recursive_mutex activeMutex_;
mutable std::mutex socketMutex_; mutable std::recursive_mutex socketMutex_;
std::thread* readerThread_; std::thread* readerThread_;
std::thread* writerThread_; std::thread* writerThread_;
std::shared_ptr<tcp::socket> socket_; std::shared_ptr<tcp::socket> socket_;