From c4a61dff47cbb6ac6793436de13fd75d6fcc92ef Mon Sep 17 00:00:00 2001 From: badaix Date: Sat, 28 Sep 2019 10:43:11 +0200 Subject: [PATCH] async control server --- server/controlServer.cpp | 66 ++++++++-------- server/controlServer.h | 7 +- server/controlSession.cpp | 161 ++++++++++++-------------------------- server/controlSession.h | 17 +--- 4 files changed, 92 insertions(+), 159 deletions(-) diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 24acb5cb..d8d44c9f 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -44,17 +44,12 @@ ControlServer::~ControlServer() void ControlServer::cleanup() { - std::lock_guard mlock(mutex_); + std::lock_guard mlock(session_mutex_); for (auto it = sessions_.begin(); it != sessions_.end();) { - if (!(*it)->active()) + if (it->expired()) { SLOG(ERROR) << "Session inactive. Removing\n"; - // don't block: remove ClientSession in a thread - auto func = [](shared_ptr s) -> void { s->stop(); }; - std::thread t(func, *it); - t.detach(); - //(*it)->stop(); sessions_.erase(it++); } else @@ -68,35 +63,41 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu cleanup(); for (auto s : sessions_) { - if (s.get() != excludeSession) - s->sendAsync(message); + if (auto session = s.lock()) + { + if (session.get() != excludeSession) + session->sendAsync(message); + } } } void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message) { - std::lock_guard mlock(mutex_); + std::lock_guard mlock(session_mutex_); LOG(DEBUG) << "received: \"" << message << "\"\n"; - if ((message == "quit") || (message == "exit") || (message == "bye")) - { - for (auto it = sessions_.begin(); it != sessions_.end(); ++it) - { - if (it->get() == connection) - { - /// delete in a thread to avoid deadlock - auto func = [&](std::shared_ptr s) -> void { sessions_.erase(s); }; - std::thread t(func, *it); - t.detach(); - break; - } - } - } - else - { + // if ((message == "quit") || (message == "exit") || (message == "bye")) + // { + // for (auto it = sessions_.begin(); it != sessions_.end(); ++it) + // { + // auto session = it->lock(); + // if (!session) + // continue; + // if (session.get() == connection) + // { + // /// delete in a thread to avoid deadlock + // auto func = [&](std::shared_ptr s) -> void { sessions_.erase(s); }; + // std::thread t(func, *it); + // t.detach(); + // break; + // } + // } + // } + // else + // { if (controlMessageReceiver_ != nullptr) controlMessageReceiver_->onMessageReceived(connection, message); - } + // } } @@ -129,9 +130,9 @@ void ControlServer::handleAccept(socket_ptr socket) SLOG(NOTICE) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; shared_ptr session = make_shared(this, socket); { - std::lock_guard mlock(mutex_); + std::lock_guard mlock(session_mutex_); session->start(); - sessions_.insert(session); + sessions_.emplace_back(session); cleanup(); } } @@ -191,7 +192,10 @@ void ControlServer::stop() acceptor_v6_->cancel(); acceptor_v6_ = nullptr; } - std::lock_guard mlock(mutex_); + std::lock_guard mlock(session_mutex_); for (auto s : sessions_) - s->stop(); + { + if (auto session = s.lock()) + session->stop(); + } } diff --git a/server/controlServer.h b/server/controlServer.h index 6459b4b2..83679d19 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -62,13 +62,12 @@ private: void startAccept(); void handleAccept(socket_ptr socket); void cleanup(); - // void acceptor(); - mutable std::recursive_mutex mutex_; - std::set> sessions_; + + mutable std::recursive_mutex session_mutex_; + std::vector> sessions_; std::shared_ptr acceptor_v4_; std::shared_ptr acceptor_v6_; - Queue> messages_; asio::io_service* io_service_; size_t port_; ControlMessageReceiver* controlMessageReceiver_; diff --git a/server/controlSession.cpp b/server/controlSession.cpp index 21e549f2..5eb8c623 100644 --- a/server/controlSession.cpp +++ b/server/controlSession.cpp @@ -26,7 +26,7 @@ using namespace std; -ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr socket) : active_(false), messageReceiver_(receiver) +ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr socket) : messageReceiver_(receiver) { socket_ = socket; } @@ -34,53 +34,55 @@ ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr ControlSession::~ControlSession() { + LOG(DEBUG) << "ControlSession::~ControlSession()\n"; stop(); } +void ControlSession::do_read() +{ + const std::string delimiter = "\n"; + auto self(shared_from_this()); + asio::async_read_until(*socket_, streambuf_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + if (ec) + { + LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n"; + return; + } + + // Extract up to the first delimiter. + std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()}; + if (!line.empty()) + { + if (line.back() == '\r') + line.resize(line.size() - 1); + LOG(INFO) << "received: " << line << "\n"; + if ((messageReceiver_ != nullptr) && !line.empty()) + messageReceiver_->onMessageReceived(this, line); + } + streambuf_.consume(bytes_transferred); + do_read(); + }); +} + void ControlSession::start() { - { - std::lock_guard activeLock(activeMutex_); - active_ = true; - } - readerThread_ = thread(&ControlSession::reader, this); - writerThread_ = thread(&ControlSession::writer, this); + do_read(); } void ControlSession::stop() { LOG(DEBUG) << "ControlSession::stop\n"; - std::lock_guard activeLock(activeMutex_); - active_ = false; - try - { - std::error_code ec; - if (socket_) - { - std::lock_guard socketLock(socketMutex_); - socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec); - if (ec) - LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n"; - socket_->close(ec); - if (ec) - LOG(ERROR) << "Error in socket close: " << ec.message() << "\n"; - } - if (readerThread_.joinable()) - { - LOG(DEBUG) << "ControlSession joining readerThread\n"; - readerThread_.join(); - } - if (writerThread_.joinable()) - { - LOG(DEBUG) << "ControlSession joining writerThread\n"; - messages_.abort_wait(); - writerThread_.join(); - } - } - catch (...) + std::error_code ec; + if (socket_) { + socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec); + if (ec) + LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n"; + socket_->close(ec); + if (ec) + LOG(ERROR) << "Error in socket close: " << ec.message() << "\n"; } socket_ = nullptr; LOG(DEBUG) << "ControlSession ControlSession stopped\n"; @@ -90,84 +92,23 @@ void ControlSession::stop() void ControlSession::sendAsync(const std::string& message) { - messages_.push(message); + auto self(shared_from_this()); + asio::async_write(*socket_, asio::buffer(message + "\r\n"), [this, self](std::error_code ec, std::size_t length) { + if (ec) + { + LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n"; + } + else + { + LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n"; + } + }); } bool ControlSession::send(const std::string& message) const { - // LOG(INFO) << "send: " << message << ", size: " << message.length() << "\n"; - std::lock_guard socketLock(socketMutex_); - { - std::lock_guard activeLock(activeMutex_); - if (!socket_ || !active_) - return false; - } - asio::streambuf streambuf; - std::ostream request_stream(&streambuf); - request_stream << message << "\r\n"; - asio::write(*socket_.get(), streambuf); - // LOG(INFO) << "done\n"; - return true; -} - - -void ControlSession::reader() -{ - try - { - std::stringstream message; - while (active_) - { - asio::streambuf response; - asio::read_until(*socket_, response, "\n"); - - std::string s((istreambuf_iterator(&response)), istreambuf_iterator()); - message << s; - if (s.empty() || (s.back() != '\n')) - continue; - - string line; - while (std::getline(message, line, '\n')) - { - if (line.empty()) - continue; - - size_t len = line.length() - 1; - if ((len >= 2) && line[len - 2] == '\r') - --len; - line.resize(len); - if ((messageReceiver_ != nullptr) && !line.empty()) - messageReceiver_->onMessageReceived(this, line); - } - message.str(""); - message.clear(); - } - } - catch (const std::exception& e) - { - SLOG(ERROR) << "Exception in ControlSession::reader(): " << e.what() << endl; - } - active_ = false; -} - - -void ControlSession::writer() -{ - try - { - asio::streambuf streambuf; - std::ostream stream(&streambuf); - string message; - while (active_) - { - if (messages_.try_pop(message, std::chrono::milliseconds(500))) - send(message); - } - } - catch (const std::exception& e) - { - SLOG(ERROR) << "Exception in ControlSession::writer(): " << e.what() << endl; - } - active_ = false; + error_code ec; + asio::write(*socket_, asio::buffer(message + "\r\n"), ec); + return !ec; } diff --git a/server/controlSession.h b/server/controlSession.h index 499a9e76..c96c47c0 100644 --- a/server/controlSession.h +++ b/server/controlSession.h @@ -51,7 +51,7 @@ public: * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSession +class ControlSession : public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to MessageReceiver @@ -66,23 +66,12 @@ public: /// Sends a message to the client (asynchronous) void sendAsync(const std::string& message); - bool active() const - { - return active_; - } - protected: - void reader(); - void writer(); + void do_read(); - std::atomic active_; - mutable std::recursive_mutex activeMutex_; - mutable std::recursive_mutex socketMutex_; - std::thread readerThread_; - std::thread writerThread_; std::shared_ptr socket_; ControlMessageReceiver* messageReceiver_; - Queue messages_; + asio::streambuf streambuf_; };