fix crash during heavy websocket load

This commit is contained in:
badaix 2019-11-15 17:45:49 +01:00
parent 5d7bac11ba
commit e9dbb6c3b6
5 changed files with 103 additions and 49 deletions

View file

@ -117,7 +117,7 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args)
setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
// socket->set_option(boost::asio::ip::tcp::no_delay(false));
SLOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<SessionType> session = make_shared<SessionType>(this, std::move(socket), std::forward<Args>(args)...);
shared_ptr<SessionType> session = make_shared<SessionType>(this, io_context_, std::move(socket), std::forward<Args>(args)...);
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();

View file

@ -98,8 +98,9 @@ std::string path_cat(boost::beast::string_view base, boost::beast::string_view p
}
} // namespace
ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, tcp::socket&& socket, const ServerSettings::HttpSettings& settings)
: ControlSession(receiver), socket_(std::move(socket)), settings_(settings)
ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket,
const ServerSettings::HttpSettings& settings)
: ControlSession(receiver), socket_(std::move(socket)), settings_(settings), strand_(ioc)
{
LOG(DEBUG) << "ControlSessionHttp\n";
}
@ -115,7 +116,8 @@ ControlSessionHttp::~ControlSessionHttp()
void ControlSessionHttp::start()
{
auto self = shared_from_this();
http::async_read(socket_, buffer_, req_, [this, self](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); });
http::async_read(socket_, buffer_, req_,
boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); }));
}
@ -270,7 +272,9 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
// Write the response
auto self = this->shared_from_this();
http::async_write(this->socket_, *sp, [this, self, sp](beast::error_code ec, std::size_t bytes) { this->on_write(ec, bytes, sp->need_eof()); });
http::async_write(this->socket_, *sp, boost::asio::bind_executor(strand_, [this, self, sp](beast::error_code ec, std::size_t bytes) {
this->on_write(ec, bytes, sp->need_eof());
}));
});
}
@ -297,7 +301,8 @@ void ControlSessionHttp::on_write(beast::error_code ec, std::size_t, bool close)
req_ = {};
// Read another request
http::async_read(socket_, buffer_, req_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes) { on_read(ec, bytes); });
http::async_read(socket_, buffer_, req_,
boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes) { on_read(ec, bytes); }));
}
@ -305,25 +310,44 @@ void ControlSessionHttp::stop()
{
}
void ControlSessionHttp::sendAsync(const std::string& message)
{
if (!ws_)
return;
auto self(shared_from_this());
ws_->async_write(boost::asio::buffer(message), [self](std::error_code ec, std::size_t length) {
if (ec)
strand_.post([this, message]() {
messages_.emplace_back(message);
if (messages_.size() > 1)
{
LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n";
}
else
{
LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n";
LOG(DEBUG) << "HTTP session outstanding async_writes: " << messages_.size() << "\n";
return;
}
send_next();
});
}
void ControlSessionHttp::send_next()
{
if (!ws_)
return;
auto self(shared_from_this());
auto message = messages_.front();
ws_->async_write(boost::asio::buffer(message), boost::asio::bind_executor(strand_, [this, self](std::error_code ec, std::size_t length) {
messages_.pop_front();
if (ec)
{
LOG(ERROR) << "Error while writing to web socket: " << ec.message() << "\n";
}
else
{
LOG(DEBUG) << "Wrote " << length << " bytes to web socket\n";
}
if (!messages_.empty())
send_next();
}));
}
bool ControlSessionHttp::send(const std::string& message)
{
@ -351,7 +375,8 @@ void ControlSessionHttp::do_read_ws()
{
// Read a message into our buffer
auto self(shared_from_this());
ws_->async_read(buffer_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); });
ws_->async_read(
buffer_, boost::asio::bind_executor(strand_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); }));
}

View file

@ -22,6 +22,7 @@
#include "control_session.hpp"
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <deque>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
@ -39,7 +40,7 @@ class ControlSessionHttp : public ControlSession, public std::enable_shared_from
{
public:
/// ctor. Received message from the client are passed to MessageReceiver
ControlSessionHttp(ControlMessageReceiver* receiver, tcp::socket&& socket, const ServerSettings::HttpSettings& settings);
ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, const ServerSettings::HttpSettings& settings);
~ControlSessionHttp() override;
void start() override;
void stop() override;
@ -58,6 +59,8 @@ protected:
template <class Body, class Allocator, class Send>
void handle_request(http::request<Body, http::basic_fields<Allocator>>&& req, Send&& send);
void send_next();
http::request<http::string_body> req_;
protected:
@ -72,6 +75,8 @@ protected:
tcp::socket socket_;
beast::flat_buffer buffer_;
ServerSettings::HttpSettings settings_;
boost::asio::io_context::strand strand_;
std::deque<std::string> messages_;
};

View file

@ -24,7 +24,8 @@ using namespace std;
ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket) : ControlSession(receiver), socket_(std::move(socket))
ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket)
: ControlSession(receiver), socket_(std::move(socket)), strand_(ioc)
{
}
@ -40,30 +41,31 @@ void ControlSessionTcp::do_read()
{
const std::string delimiter = "\n";
auto self(shared_from_this());
boost::asio::async_read_until(socket_, streambuf_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
// LOG(DEBUG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
boost::asio::async_read_until(
socket_, streambuf_, delimiter, boost::asio::bind_executor(strand_, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
string response = message_receiver_->onMessageReceived(this, line);
if (!response.empty())
sendAsync(response);
LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
}
streambuf_.consume(bytes_transferred);
do_read();
});
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
// LOG(DEBUG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
{
string response = message_receiver_->onMessageReceived(this, line);
if (!response.empty())
sendAsync(response);
}
}
streambuf_.consume(bytes_transferred);
do_read();
}));
}
@ -89,19 +91,36 @@ void ControlSessionTcp::stop()
void ControlSessionTcp::sendAsync(const std::string& message)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(message + "\r\n"), [self](std::error_code ec, std::size_t length) {
if (ec)
strand_.post([this, message]() {
messages_.emplace_back(message);
if (messages_.size() > 1)
{
LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n";
}
else
{
LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n";
LOG(DEBUG) << "TCP session outstanding async_writes: " << messages_.size() << "\n";
return;
}
send_next();
});
}
void ControlSessionTcp::send_next()
{
auto self(shared_from_this());
auto message = messages_.front();
boost::asio::async_write(socket_, boost::asio::buffer(message + "\r\n"),
boost::asio::bind_executor(strand_, [this, self](std::error_code ec, std::size_t length) {
messages_.pop_front();
if (ec)
{
LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n";
}
else
{
LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n";
}
if (!messages_.empty())
send_next();
}));
}
bool ControlSessionTcp::send(const std::string& message)
{

View file

@ -20,6 +20,7 @@
#define CONTROL_SESSION_TCP_HPP
#include "control_session.hpp"
#include <deque>
/// Endpoint for a connected control client.
/**
@ -31,7 +32,7 @@ class ControlSessionTcp : public ControlSession, public std::enable_shared_from_
{
public:
/// ctor. Received message from the client are passed to MessageReceiver
ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket);
ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket);
~ControlSessionTcp() override;
void start() override;
void stop() override;
@ -44,8 +45,12 @@ public:
protected:
void do_read();
void send_next();
tcp::socket socket_;
boost::asio::streambuf streambuf_;
boost::asio::io_context::strand strand_;
std::deque<std::string> messages_;
};