From e9dbb6c3b661b4dc6dc53dba50a5a2f26367c57c Mon Sep 17 00:00:00 2001 From: badaix Date: Fri, 15 Nov 2019 17:45:49 +0100 Subject: [PATCH] fix crash during heavy websocket load --- server/control_server.cpp | 2 +- server/control_session_http.cpp | 55 ++++++++++++++++------ server/control_session_http.hpp | 7 ++- server/control_session_tcp.cpp | 81 ++++++++++++++++++++------------- server/control_session_tcp.hpp | 7 ++- 5 files changed, 103 insertions(+), 49 deletions(-) diff --git a/server/control_server.cpp b/server/control_server.cpp index 55117bc3..a782a784 100644 --- a/server/control_server.cpp +++ b/server/control_server.cpp @@ -117,7 +117,7 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args) setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); // socket->set_option(boost::asio::ip::tcp::no_delay(false)); SLOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; - shared_ptr session = make_shared(this, std::move(socket), std::forward(args)...); + shared_ptr session = make_shared(this, io_context_, std::move(socket), std::forward(args)...); { std::lock_guard mlock(session_mutex_); session->start(); diff --git a/server/control_session_http.cpp b/server/control_session_http.cpp index 77e2cffd..d30bddff 100644 --- a/server/control_session_http.cpp +++ b/server/control_session_http.cpp @@ -98,8 +98,9 @@ std::string path_cat(boost::beast::string_view base, boost::beast::string_view p } } // namespace -ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, tcp::socket&& socket, const ServerSettings::HttpSettings& settings) - : ControlSession(receiver), socket_(std::move(socket)), settings_(settings) +ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, + const ServerSettings::HttpSettings& settings) + : ControlSession(receiver), socket_(std::move(socket)), settings_(settings), strand_(ioc) { LOG(DEBUG) << "ControlSessionHttp\n"; } @@ -115,7 +116,8 @@ ControlSessionHttp::~ControlSessionHttp() void ControlSessionHttp::start() { auto self = shared_from_this(); - http::async_read(socket_, buffer_, req_, [this, self](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); }); + http::async_read(socket_, buffer_, req_, + boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); })); } @@ -270,7 +272,9 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe // Write the response auto self = this->shared_from_this(); - http::async_write(this->socket_, *sp, [this, self, sp](beast::error_code ec, std::size_t bytes) { this->on_write(ec, bytes, sp->need_eof()); }); + http::async_write(this->socket_, *sp, boost::asio::bind_executor(strand_, [this, self, sp](beast::error_code ec, std::size_t bytes) { + this->on_write(ec, bytes, sp->need_eof()); + })); }); } @@ -297,7 +301,8 @@ void ControlSessionHttp::on_write(beast::error_code ec, std::size_t, bool close) req_ = {}; // Read another request - http::async_read(socket_, buffer_, req_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes) { on_read(ec, bytes); }); + http::async_read(socket_, buffer_, req_, + boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes) { on_read(ec, bytes); })); } @@ -305,25 +310,44 @@ void ControlSessionHttp::stop() { } - void ControlSessionHttp::sendAsync(const std::string& message) { if (!ws_) return; - auto self(shared_from_this()); - ws_->async_write(boost::asio::buffer(message), [self](std::error_code ec, std::size_t length) { - if (ec) + strand_.post([this, message]() { + messages_.emplace_back(message); + if (messages_.size() > 1) { - LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n"; - } - else - { - LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n"; + LOG(DEBUG) << "HTTP session outstanding async_writes: " << messages_.size() << "\n"; + return; } + send_next(); }); } +void ControlSessionHttp::send_next() +{ + if (!ws_) + return; + + auto self(shared_from_this()); + auto message = messages_.front(); + ws_->async_write(boost::asio::buffer(message), boost::asio::bind_executor(strand_, [this, self](std::error_code ec, std::size_t length) { + messages_.pop_front(); + if (ec) + { + LOG(ERROR) << "Error while writing to web socket: " << ec.message() << "\n"; + } + else + { + LOG(DEBUG) << "Wrote " << length << " bytes to web socket\n"; + } + if (!messages_.empty()) + send_next(); + })); +} + bool ControlSessionHttp::send(const std::string& message) { @@ -351,7 +375,8 @@ void ControlSessionHttp::do_read_ws() { // Read a message into our buffer auto self(shared_from_this()); - ws_->async_read(buffer_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); }); + ws_->async_read( + buffer_, boost::asio::bind_executor(strand_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); })); } diff --git a/server/control_session_http.hpp b/server/control_session_http.hpp index 5c4ec478..ae9ace3d 100644 --- a/server/control_session_http.hpp +++ b/server/control_session_http.hpp @@ -22,6 +22,7 @@ #include "control_session.hpp" #include #include +#include namespace beast = boost::beast; // from namespace http = beast::http; // from @@ -39,7 +40,7 @@ class ControlSessionHttp : public ControlSession, public std::enable_shared_from { public: /// ctor. Received message from the client are passed to MessageReceiver - ControlSessionHttp(ControlMessageReceiver* receiver, tcp::socket&& socket, const ServerSettings::HttpSettings& settings); + ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, const ServerSettings::HttpSettings& settings); ~ControlSessionHttp() override; void start() override; void stop() override; @@ -58,6 +59,8 @@ protected: template void handle_request(http::request>&& req, Send&& send); + void send_next(); + http::request req_; protected: @@ -72,6 +75,8 @@ protected: tcp::socket socket_; beast::flat_buffer buffer_; ServerSettings::HttpSettings settings_; + boost::asio::io_context::strand strand_; + std::deque messages_; }; diff --git a/server/control_session_tcp.cpp b/server/control_session_tcp.cpp index c5f8bff6..94518c65 100644 --- a/server/control_session_tcp.cpp +++ b/server/control_session_tcp.cpp @@ -24,7 +24,8 @@ using namespace std; -ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket) : ControlSession(receiver), socket_(std::move(socket)) +ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket) + : ControlSession(receiver), socket_(std::move(socket)), strand_(ioc) { } @@ -40,30 +41,31 @@ void ControlSessionTcp::do_read() { const std::string delimiter = "\n"; auto self(shared_from_this()); - boost::asio::async_read_until(socket_, streambuf_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { - if (ec) - { - LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n"; - return; - } - - // Extract up to the first delimiter. - std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()}; - if (!line.empty()) - { - if (line.back() == '\r') - line.resize(line.size() - 1); - // LOG(DEBUG) << "received: " << line << "\n"; - if ((message_receiver_ != nullptr) && !line.empty()) + boost::asio::async_read_until( + socket_, streambuf_, delimiter, boost::asio::bind_executor(strand_, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + if (ec) { - string response = message_receiver_->onMessageReceived(this, line); - if (!response.empty()) - sendAsync(response); + LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n"; + return; } - } - streambuf_.consume(bytes_transferred); - do_read(); - }); + + // Extract up to the first delimiter. + std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()}; + if (!line.empty()) + { + if (line.back() == '\r') + line.resize(line.size() - 1); + // LOG(DEBUG) << "received: " << line << "\n"; + if ((message_receiver_ != nullptr) && !line.empty()) + { + string response = message_receiver_->onMessageReceived(this, line); + if (!response.empty()) + sendAsync(response); + } + } + streambuf_.consume(bytes_transferred); + do_read(); + })); } @@ -89,19 +91,36 @@ void ControlSessionTcp::stop() void ControlSessionTcp::sendAsync(const std::string& message) { - auto self(shared_from_this()); - boost::asio::async_write(socket_, boost::asio::buffer(message + "\r\n"), [self](std::error_code ec, std::size_t length) { - if (ec) + strand_.post([this, message]() { + messages_.emplace_back(message); + if (messages_.size() > 1) { - LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n"; - } - else - { - LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n"; + LOG(DEBUG) << "TCP session outstanding async_writes: " << messages_.size() << "\n"; + return; } + send_next(); }); } +void ControlSessionTcp::send_next() +{ + auto self(shared_from_this()); + auto message = messages_.front(); + boost::asio::async_write(socket_, boost::asio::buffer(message + "\r\n"), + boost::asio::bind_executor(strand_, [this, self](std::error_code ec, std::size_t length) { + messages_.pop_front(); + if (ec) + { + LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n"; + } + else + { + LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n"; + } + if (!messages_.empty()) + send_next(); + })); +} bool ControlSessionTcp::send(const std::string& message) { diff --git a/server/control_session_tcp.hpp b/server/control_session_tcp.hpp index efdb1862..4985351a 100644 --- a/server/control_session_tcp.hpp +++ b/server/control_session_tcp.hpp @@ -20,6 +20,7 @@ #define CONTROL_SESSION_TCP_HPP #include "control_session.hpp" +#include /// Endpoint for a connected control client. /** @@ -31,7 +32,7 @@ class ControlSessionTcp : public ControlSession, public std::enable_shared_from_ { public: /// ctor. Received message from the client are passed to MessageReceiver - ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket); + ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket); ~ControlSessionTcp() override; void start() override; void stop() override; @@ -44,8 +45,12 @@ public: protected: void do_read(); + void send_next(); + tcp::socket socket_; boost::asio::streambuf streambuf_; + boost::asio::io_context::strand strand_; + std::deque messages_; };