Fix crash during client disconnect

This commit is contained in:
badaix 2020-01-25 22:20:02 +01:00
parent 08de66fe70
commit 21d259d0af
6 changed files with 19 additions and 47 deletions

View file

@ -26,7 +26,6 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <condition_variable> #include <condition_variable>
#include <memory> #include <memory>
#include <mutex>
#include <set> #include <set>
#include <string> #include <string>

View file

@ -115,9 +115,8 @@ ControlSessionHttp::~ControlSessionHttp()
void ControlSessionHttp::start() void ControlSessionHttp::start()
{ {
auto self = shared_from_this(); http::async_read(socket_, buffer_, req_, boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](
http::async_read(socket_, buffer_, req_, boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); }));
boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); }));
} }
@ -256,8 +255,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
// Create a WebSocket session by transferring the socket // Create a WebSocket session by transferring the socket
// std::make_shared<websocket_session>(std::move(socket_), state_)->run(std::move(req_)); // std::make_shared<websocket_session>(std::move(socket_), state_)->run(std::move(req_));
ws_ = make_unique<websocket::stream<beast::tcp_stream>>(std::move(socket_)); ws_ = make_unique<websocket::stream<beast::tcp_stream>>(std::move(socket_));
auto self = shared_from_this(); ws_->async_accept(req_, [ this, self = shared_from_this() ](beast::error_code ec) { on_accept_ws(ec); });
ws_->async_accept(req_, [this, self](beast::error_code ec) { on_accept_ws(ec); });
LOG(DEBUG) << "websocket upgrade\n"; LOG(DEBUG) << "websocket upgrade\n";
return; return;
} }
@ -271,8 +269,8 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
auto sp = std::make_shared<response_type>(std::forward<decltype(response)>(response)); auto sp = std::make_shared<response_type>(std::forward<decltype(response)>(response));
// Write the response // Write the response
auto self = this->shared_from_this(); http::async_write(this->socket_, *sp,
http::async_write(this->socket_, *sp, boost::asio::bind_executor(strand_, [this, self, sp](beast::error_code ec, std::size_t bytes) { boost::asio::bind_executor(strand_, [ this, self = this->shared_from_this(), sp ](beast::error_code ec, std::size_t bytes) {
this->on_write(ec, bytes, sp->need_eof()); this->on_write(ec, bytes, sp->need_eof());
})); }));
}); });
@ -315,7 +313,7 @@ void ControlSessionHttp::sendAsync(const std::string& message)
if (!ws_) if (!ws_)
return; return;
strand_.post([this, message]() { strand_.post([ this, self = shared_from_this(), message ]() {
messages_.emplace_back(message); messages_.emplace_back(message);
if (messages_.size() > 1) if (messages_.size() > 1)
{ {
@ -331,9 +329,9 @@ void ControlSessionHttp::send_next()
if (!ws_) if (!ws_)
return; return;
auto self(shared_from_this());
auto message = messages_.front(); auto message = messages_.front();
ws_->async_write(boost::asio::buffer(message), boost::asio::bind_executor(strand_, [this, self](std::error_code ec, std::size_t length) { ws_->async_write(boost::asio::buffer(message),
boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](std::error_code ec, std::size_t length) {
messages_.pop_front(); messages_.pop_front();
if (ec) if (ec)
{ {
@ -374,9 +372,9 @@ void ControlSessionHttp::on_accept_ws(beast::error_code ec)
void ControlSessionHttp::do_read_ws() void ControlSessionHttp::do_read_ws()
{ {
// Read a message into our buffer // Read a message into our buffer
auto self(shared_from_this()); ws_->async_read(buffer_, boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes_transferred) {
ws_->async_read( on_read_ws(ec, bytes_transferred);
buffer_, boost::asio::bind_executor(strand_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); })); }));
} }

View file

@ -41,9 +41,9 @@ ControlSessionTcp::~ControlSessionTcp()
void ControlSessionTcp::do_read() void ControlSessionTcp::do_read()
{ {
const std::string delimiter = "\n"; const std::string delimiter = "\n";
auto self(shared_from_this());
boost::asio::async_read_until( boost::asio::async_read_until(
socket_, streambuf_, delimiter, boost::asio::bind_executor(strand_, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { socket_, streambuf_, delimiter,
boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), delimiter ](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec) if (ec)
{ {
LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n"; LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n";
@ -92,7 +92,7 @@ void ControlSessionTcp::stop()
void ControlSessionTcp::sendAsync(const std::string& message) void ControlSessionTcp::sendAsync(const std::string& message)
{ {
strand_.post([this, message]() { strand_.post([ this, self = shared_from_this(), message ]() {
messages_.emplace_back(message); messages_.emplace_back(message);
if (messages_.size() > 1) if (messages_.size() > 1)
{ {
@ -105,10 +105,9 @@ void ControlSessionTcp::sendAsync(const std::string& message)
void ControlSessionTcp::send_next() void ControlSessionTcp::send_next()
{ {
auto self(shared_from_this());
auto message = messages_.front(); auto message = messages_.front();
boost::asio::async_write(socket_, boost::asio::buffer(message + "\r\n"), boost::asio::async_write(socket_, boost::asio::buffer(message + "\r\n"),
boost::asio::bind_executor(strand_, [this, self](std::error_code ec, std::size_t length) { boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](std::error_code ec, std::size_t length) {
messages_.pop_front(); messages_.pop_front();
if (ec) if (ec)
{ {

View file

@ -45,19 +45,8 @@ StreamSession::~StreamSession()
void StreamSession::read_next() void StreamSession::read_next()
{ {
shared_ptr<StreamSession> self;
try
{
self = shared_from_this();
}
catch (const std::bad_weak_ptr& e)
{
LOG(ERROR, LOG_TAG) << "read_next: Error getting shared from this\n";
return;
}
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_),
boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable { boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](boost::system::error_code ec, std::size_t length) mutable {
if (ec) if (ec)
{ {
LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
@ -139,20 +128,9 @@ void StreamSession::stop()
void StreamSession::send_next() void StreamSession::send_next()
{ {
shared_ptr<StreamSession> self;
try
{
self = shared_from_this();
}
catch (const std::bad_weak_ptr& e)
{
LOG(ERROR, LOG_TAG) << "send_next: Error getting shared from this\n";
return;
}
auto buffer = messages_.front(); auto buffer = messages_.front();
boost::asio::async_write(socket_, buffer,
boost::asio::async_write(socket_, buffer, boost::asio::bind_executor(strand_, [this, self, buffer](boost::system::error_code ec, std::size_t length) { boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), buffer ](boost::system::error_code ec, std::size_t length) {
messages_.pop_front(); messages_.pop_front();
if (ec) if (ec)
{ {
@ -168,7 +146,7 @@ void StreamSession::send_next()
void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now)
{ {
strand_.post([this, const_buf, send_now]() { strand_.post([ this, self = shared_from_this(), const_buf, send_now ]() {
if (send_now) if (send_now)
messages_.push_front(const_buf); messages_.push_front(const_buf);
else else

View file

@ -27,7 +27,6 @@
#include <condition_variable> #include <condition_variable>
#include <deque> #include <deque>
#include <memory> #include <memory>
#include <mutex>
#include <set> #include <set>
#include <sstream> #include <sstream>
#include <string> #include <string>

View file

@ -29,7 +29,6 @@
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <condition_variable> #include <condition_variable>
#include <map> #include <map>
#include <mutex>
#include <string> #include <string>