diff --git a/server/control_server.cpp b/server/control_server.cpp index fcc75a7c..13b5fa1a 100644 --- a/server/control_server.cpp +++ b/server/control_server.cpp @@ -134,7 +134,7 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args) setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); // socket->set_option(boost::asio::ip::tcp::no_delay(false)); LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; - shared_ptr session = make_shared(this, io_context_, std::move(socket), std::forward(args)...); + shared_ptr session = make_shared(this, std::move(socket), std::forward(args)...); onNewSession(std::move(session)); } catch (const std::exception& e) @@ -155,8 +155,8 @@ void ControlServer::start() try { LOG(INFO, LOG_TAG) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n"; - acceptor_tcp_.emplace_back( - make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port))); + acceptor_tcp_.emplace_back(make_unique(net::make_strand(io_context_.get_executor()), + tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port))); } catch (const boost::system::system_error& e) { @@ -171,8 +171,8 @@ void ControlServer::start() try { LOG(INFO, LOG_TAG) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n"; - acceptor_http_.emplace_back( - make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port))); + acceptor_http_.emplace_back(make_unique(net::make_strand(io_context_.get_executor()), + tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port))); } catch (const boost::system::system_error& e) { diff --git a/server/control_session.hpp b/server/control_session.hpp index 0cc8d390..eebf3a0d 100644 --- a/server/control_session.hpp +++ b/server/control_session.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -30,6 +30,7 @@ #include using boost::asio::ip::tcp; +namespace net = boost::asio; class ControlSession; diff --git a/server/control_session_http.cpp b/server/control_session_http.cpp index 574885ed..afe38d5c 100644 --- a/server/control_session_http.cpp +++ b/server/control_session_http.cpp @@ -136,9 +136,8 @@ std::string path_cat(boost::beast::string_view base, boost::beast::string_view p } } // namespace -ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, - const ServerSettings::Http& settings) - : ControlSession(receiver), socket_(std::move(socket)), settings_(settings), strand_(ioc) +ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, tcp::socket&& socket, const ServerSettings::Http& settings) + : ControlSession(receiver), socket_(std::move(socket)), settings_(settings) { LOG(DEBUG, LOG_TAG) << "ControlSessionHttp, Local IP: " << socket_.local_endpoint().address().to_string() << "\n"; } @@ -153,9 +152,7 @@ ControlSessionHttp::~ControlSessionHttp() void ControlSessionHttp::start() { - http::async_read( - socket_, buffer_, req_, - boost::asio::bind_executor(strand_, [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); })); + http::async_read(socket_, buffer_, req_, [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes) { on_read(ec, bytes); }); } @@ -349,7 +346,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe } else { - auto ws_session = make_shared(message_receiver_, strand_.context(), std::move(*ws)); + auto ws_session = make_shared(message_receiver_, std::move(*ws)); message_receiver_->onNewSession(std::move(ws_session)); } }); @@ -366,7 +363,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe } else { - auto ws_session = make_shared(strand_.context(), nullptr, std::move(*ws)); + auto ws_session = make_shared(nullptr, std::move(*ws)); message_receiver_->onNewSession(std::move(ws_session)); } }); @@ -384,9 +381,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe // Write the response http::async_write(this->socket_, *sp, - boost::asio::bind_executor(strand_, [this, self = this->shared_from_this(), sp](beast::error_code ec, std::size_t bytes) { - this->on_write(ec, bytes, sp->need_eof()); - })); + [this, self = this->shared_from_this(), sp](beast::error_code ec, std::size_t bytes) { this->on_write(ec, bytes, sp->need_eof()); }); }); } @@ -415,8 +410,7 @@ void ControlSessionHttp::on_write(beast::error_code ec, std::size_t bytes, bool req_ = {}; // Read another request - http::async_read(socket_, buffer_, req_, - boost::asio::bind_executor(strand_, [this, self = shared_from_this()](beast::error_code ec, std::size_t bytes) { on_read(ec, bytes); })); + http::async_read(socket_, buffer_, req_, [this, self = shared_from_this()](beast::error_code ec, std::size_t bytes) { on_read(ec, bytes); }); } diff --git a/server/control_session_http.hpp b/server/control_session_http.hpp index 6d0ef47a..11bcfbdc 100644 --- a/server/control_session_http.hpp +++ b/server/control_session_http.hpp @@ -47,7 +47,7 @@ class ControlSessionHttp : public ControlSession { public: /// ctor. Received message from the client are passed to ControlMessageReceiver - ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, const ServerSettings::Http& settings); + ControlSessionHttp(ControlMessageReceiver* receiver, tcp::socket&& socket, const ServerSettings::Http& settings); ~ControlSessionHttp() override; void start() override; void stop() override; @@ -69,7 +69,6 @@ protected: tcp::socket socket_; beast::flat_buffer buffer_; ServerSettings::Http settings_; - boost::asio::io_context::strand strand_; std::deque messages_; }; diff --git a/server/control_session_tcp.cpp b/server/control_session_tcp.cpp index 26cfa31f..a53d848e 100644 --- a/server/control_session_tcp.cpp +++ b/server/control_session_tcp.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -27,8 +27,8 @@ static constexpr auto LOG_TAG = "ControlSessionTCP"; // https://stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not-interleaving-async-write-calls/7756894 -ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket) - : ControlSession(receiver), socket_(std::move(socket)), strand_(ioc) +ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket) + : ControlSession(receiver), socket_(std::move(socket)), strand_(net::make_strand(socket_.get_executor())) { } @@ -44,8 +44,7 @@ void ControlSessionTcp::do_read() { const std::string delimiter = "\n"; boost::asio::async_read_until( - socket_, streambuf_, delimiter, - boost::asio::bind_executor(strand_, [this, self = shared_from_this(), delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + socket_, streambuf_, delimiter, [this, self = shared_from_this(), delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n"; @@ -69,7 +68,7 @@ void ControlSessionTcp::do_read() } streambuf_.consume(bytes_transferred); do_read(); - })); + }); } @@ -95,7 +94,7 @@ void ControlSessionTcp::stop() void ControlSessionTcp::sendAsync(const std::string& message) { - strand_.post([this, self = shared_from_this(), message]() { + net::post(strand_, [this, self = shared_from_this(), message]() { messages_.emplace_back(message + "\r\n"); if (messages_.size() > 1) { @@ -108,18 +107,17 @@ void ControlSessionTcp::sendAsync(const std::string& message) void ControlSessionTcp::send_next() { - boost::asio::async_write(socket_, boost::asio::buffer(messages_.front()), - boost::asio::bind_executor(strand_, [this, self = shared_from_this()](std::error_code ec, std::size_t length) { - messages_.pop_front(); - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error while writing to control socket: " << ec.message() << "\n"; - } - else - { - LOG(TRACE, LOG_TAG) << "Wrote " << length << " bytes to control socket\n"; - } - if (!messages_.empty()) - send_next(); - })); + boost::asio::async_write(socket_, boost::asio::buffer(messages_.front()), [this, self = shared_from_this()](std::error_code ec, std::size_t length) { + messages_.pop_front(); + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error while writing to control socket: " << ec.message() << "\n"; + } + else + { + LOG(TRACE, LOG_TAG) << "Wrote " << length << " bytes to control socket\n"; + } + if (!messages_.empty()) + send_next(); + }); } diff --git a/server/control_session_tcp.hpp b/server/control_session_tcp.hpp index e04e3254..738532ed 100644 --- a/server/control_session_tcp.hpp +++ b/server/control_session_tcp.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,7 +32,7 @@ class ControlSessionTcp : public ControlSession { public: /// ctor. Received message from the client are passed to ControlMessageReceiver - ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket); + ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket); ~ControlSessionTcp() override; void start() override; void stop() override; @@ -46,7 +46,7 @@ protected: tcp::socket socket_; boost::asio::streambuf streambuf_; - boost::asio::io_context::strand strand_; + net::strand strand_; std::deque messages_; }; diff --git a/server/control_session_ws.cpp b/server/control_session_ws.cpp index 3157294c..24ba2b56 100644 --- a/server/control_session_ws.cpp +++ b/server/control_session_ws.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,8 +26,8 @@ using namespace std; static constexpr auto LOG_TAG = "ControlSessionWS"; -ControlSessionWebsocket::ControlSessionWebsocket(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, websocket::stream&& socket) - : ControlSession(receiver), ws_(std::move(socket)), strand_(ioc) +ControlSessionWebsocket::ControlSessionWebsocket(ControlMessageReceiver* receiver, websocket::stream&& socket) + : ControlSession(receiver), ws_(std::move(socket)), strand_(net::make_strand(ws_.get_executor())) { LOG(DEBUG, LOG_TAG) << "ControlSessionWebsocket\n"; } @@ -61,7 +61,7 @@ void ControlSessionWebsocket::stop() void ControlSessionWebsocket::sendAsync(const std::string& message) { - strand_.post([this, self = shared_from_this(), msg = message]() { + net::post(strand_, [this, self = shared_from_this(), msg = message]() { messages_.push_back(std::move(msg)); if (messages_.size() > 1) { @@ -76,29 +76,26 @@ void ControlSessionWebsocket::sendAsync(const std::string& message) void ControlSessionWebsocket::send_next() { const std::string& message = messages_.front(); - ws_.async_write(boost::asio::buffer(message), - boost::asio::bind_executor(strand_, [this, self = shared_from_this()](std::error_code ec, std::size_t length) { - messages_.pop_front(); - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error while writing to web socket: " << ec.message() << "\n"; - } - else - { - LOG(TRACE, LOG_TAG) << "Wrote " << length << " bytes to web socket\n"; - } - if (!messages_.empty()) - send_next(); - })); + ws_.async_write(boost::asio::buffer(message), [this, self = shared_from_this()](std::error_code ec, std::size_t length) { + messages_.pop_front(); + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error while writing to web socket: " << ec.message() << "\n"; + } + else + { + LOG(TRACE, LOG_TAG) << "Wrote " << length << " bytes to web socket\n"; + } + if (!messages_.empty()) + send_next(); + }); } void ControlSessionWebsocket::do_read_ws() { // Read a message into our buffer - ws_.async_read(buffer_, boost::asio::bind_executor(strand_, [this, self = shared_from_this()](beast::error_code ec, std::size_t bytes_transferred) { - on_read_ws(ec, bytes_transferred); - })); + ws_.async_read(buffer_, [this, self = shared_from_this()](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); }); } diff --git a/server/control_session_ws.hpp b/server/control_session_ws.hpp index c5836b46..26381901 100644 --- a/server/control_session_ws.hpp +++ b/server/control_session_ws.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -40,7 +40,7 @@ class ControlSessionWebsocket : public ControlSession { public: /// ctor. Received message from the client are passed to ControlMessageReceiver - ControlSessionWebsocket(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, websocket::stream&& socket); + ControlSessionWebsocket(ControlMessageReceiver* receiver, websocket::stream&& socket); ~ControlSessionWebsocket() override; void start() override; void stop() override; @@ -58,7 +58,7 @@ protected: protected: beast::flat_buffer buffer_; - boost::asio::io_context::strand strand_; + net::strand strand_; std::deque messages_; }; diff --git a/server/stream_server.cpp b/server/stream_server.cpp index b6601571..d2152c90 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -33,7 +33,7 @@ using json = nlohmann::json; static constexpr auto LOG_TAG = "StreamServer"; -StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver) +StreamServer::StreamServer(net::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver) : io_context_(io_context), config_timer_(io_context), settings_(serverSettings), messageReceiver_(messageReceiver) { } @@ -195,7 +195,7 @@ void StreamServer::handleAccept(tcp::socket socket) socket.set_option(tcp::no_delay(true)); LOG(NOTICE, LOG_TAG) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; - shared_ptr session = make_shared(io_context_, this, std::move(socket)); + shared_ptr session = make_shared(this, std::move(socket)); addSession(session); } catch (const std::exception& e) @@ -213,8 +213,8 @@ void StreamServer::start() try { LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n"; - acceptor_.emplace_back( - make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port))); + acceptor_.emplace_back(make_unique(net::make_strand(io_context_.get_executor()), + tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port))); } catch (const boost::system::system_error& e) { diff --git a/server/stream_server.hpp b/server/stream_server.hpp index 88e58de4..623a195f 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -54,7 +54,7 @@ using session_ptr = std::shared_ptr; class StreamServer : public StreamMessageReceiver { public: - StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver = nullptr); + StreamServer(net::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver = nullptr); virtual ~StreamServer(); void start(); @@ -82,7 +82,7 @@ private: mutable std::recursive_mutex sessionsMutex_; mutable std::recursive_mutex clientMutex_; std::vector> sessions_; - boost::asio::io_context& io_context_; + net::io_context& io_context_; std::vector acceptor_; boost::asio::steady_timer config_timer_; diff --git a/server/stream_session.cpp b/server/stream_session.cpp index cb6ae59e..c77eeb8e 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,11 +25,11 @@ using namespace std; using namespace streamreader; - static constexpr auto LOG_TAG = "StreamSession"; -StreamSession::StreamSession(boost::asio::io_context& ioc, StreamMessageReceiver* receiver) : messageReceiver_(receiver), pcmStream_(nullptr), strand_(ioc) +StreamSession::StreamSession(net::any_io_executor executor, StreamMessageReceiver* receiver) + : messageReceiver_(receiver), pcmStream_(nullptr), strand_(net::make_strand(executor)) { base_msg_size_ = baseMessage_.getSize(); buffer_.resize(base_msg_size_); @@ -52,7 +52,7 @@ void StreamSession::send_next() { auto& buffer = messages_.front(); buffer.on_air = true; - strand_.post([this, self = shared_from_this(), buffer]() { + net::post(strand_, [this, self = shared_from_this(), buffer]() { sendAsync(buffer, [this](boost::system::error_code ec, std::size_t length) { messages_.pop_front(); if (ec) @@ -70,7 +70,7 @@ void StreamSession::send_next() void StreamSession::send(shared_const_buffer const_buf) { - strand_.post([this, self = shared_from_this(), const_buf]() { + net::post(strand_, [this, self = shared_from_this(), const_buf]() { // delete PCM chunks that are older than the overall buffer duration messages_.erase(std::remove_if(messages_.begin(), messages_.end(), [this](const shared_const_buffer& buffer) { diff --git a/server/stream_session.hpp b/server/stream_session.hpp index cb1a1aaa..568b063f 100644 --- a/server/stream_session.hpp +++ b/server/stream_session.hpp @@ -34,6 +34,7 @@ using boost::asio::ip::tcp; +namespace net = boost::asio; class StreamSession; @@ -116,7 +117,7 @@ class StreamSession : public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to StreamMessageReceiver - StreamSession(boost::asio::io_context& ioc, StreamMessageReceiver* receiver); + StreamSession(net::any_io_executor strand, StreamMessageReceiver* receiver); virtual ~StreamSession() = default; virtual std::string getIP() = 0; @@ -156,7 +157,7 @@ protected: StreamMessageReceiver* messageReceiver_; size_t bufferMs_; streamreader::PcmStreamPtr pcmStream_; - boost::asio::io_context::strand strand_; + net::strand strand_; std::deque messages_; }; diff --git a/server/stream_session_tcp.cpp b/server/stream_session_tcp.cpp index 69adbfc5..a7f9a2ab 100644 --- a/server/stream_session_tcp.cpp +++ b/server/stream_session_tcp.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -29,8 +29,8 @@ using namespace streamreader; static constexpr auto LOG_TAG = "StreamSessionTCP"; -StreamSessionTcp::StreamSessionTcp(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, tcp::socket&& socket) - : StreamSession(ioc, receiver), socket_(std::move(socket)) +StreamSessionTcp::StreamSessionTcp(StreamMessageReceiver* receiver, tcp::socket&& socket) + : StreamSession(socket.get_executor(), receiver), socket_(std::move(socket)) { } @@ -80,37 +80,36 @@ std::string StreamSessionTcp::getIP() void StreamSessionTcp::read_next() { - boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), - boost::asio::bind_executor(strand_, [this, self = shared_from_this()](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; - messageReceiver_->onDisconnect(this); - return; - } + boost::asio::async_read( + socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, self = shared_from_this()](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; + messageReceiver_->onDisconnect(this); + return; + } - baseMessage_.deserialize(buffer_.data()); - LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id - << ", refers: " << baseMessage_.refersTo << "\n"; - if (baseMessage_.type > message_type::kLast) - { - LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; - messageReceiver_->onDisconnect(this); - return; - } - else if (baseMessage_.size > msg::max_size) - { - LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; - messageReceiver_->onDisconnect(this); - return; - } + baseMessage_.deserialize(buffer_.data()); + LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id + << ", refers: " << baseMessage_.refersTo << "\n"; + if (baseMessage_.type > message_type::kLast) + { + LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + else if (baseMessage_.size > msg::max_size) + { + LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; + messageReceiver_->onDisconnect(this); + return; + } - if (baseMessage_.size > buffer_.size()) - buffer_.resize(baseMessage_.size); + if (baseMessage_.size > buffer_.size()) + buffer_.resize(baseMessage_.size); - boost::asio::async_read( - socket_, boost::asio::buffer(buffer_, baseMessage_.size), - boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable { + boost::asio::async_read(socket_, boost::asio::buffer(buffer_, baseMessage_.size), + [this, self](boost::system::error_code ec, std::size_t length) mutable { if (ec) { LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; @@ -123,14 +122,13 @@ void StreamSessionTcp::read_next() if (messageReceiver_ != nullptr) messageReceiver_->onMessageReceived(this, baseMessage_, buffer_.data()); read_next(); - })); - })); + }); + }); } void StreamSessionTcp::sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) { boost::asio::async_write(socket_, buffer, - boost::asio::bind_executor(strand_, [self = shared_from_this(), buffer, handler](boost::system::error_code ec, - std::size_t length) { handler(ec, length); })); + [self = shared_from_this(), buffer, handler](boost::system::error_code ec, std::size_t length) { handler(ec, length); }); } diff --git a/server/stream_session_tcp.hpp b/server/stream_session_tcp.hpp index 76d2cb99..7952b72c 100644 --- a/server/stream_session_tcp.hpp +++ b/server/stream_session_tcp.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -34,7 +34,7 @@ class StreamSessionTcp : public StreamSession { public: /// ctor. Received message from the client are passed to StreamMessageReceiver - StreamSessionTcp(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, tcp::socket&& socket); + StreamSessionTcp(StreamMessageReceiver* receiver, tcp::socket&& socket); ~StreamSessionTcp() override; void start() override; void stop() override; diff --git a/server/stream_session_ws.cpp b/server/stream_session_ws.cpp index 81a469be..903754e9 100644 --- a/server/stream_session_ws.cpp +++ b/server/stream_session_ws.cpp @@ -26,8 +26,8 @@ using namespace std; static constexpr auto LOG_TAG = "StreamSessionWS"; -StreamSessionWebsocket::StreamSessionWebsocket(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, websocket::stream&& socket) - : StreamSession(ioc, receiver), ws_(std::move(socket)) +StreamSessionWebsocket::StreamSessionWebsocket(StreamMessageReceiver* receiver, websocket::stream&& socket) + : StreamSession(socket.get_executor(), receiver), ws_(std::move(socket)) { LOG(DEBUG, LOG_TAG) << "StreamSessionWS\n"; } @@ -77,18 +77,14 @@ std::string StreamSessionWebsocket::getIP() void StreamSessionWebsocket::sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) { LOG(TRACE, LOG_TAG) << "sendAsync: " << buffer.message().type << "\n"; - ws_.async_write(buffer, boost::asio::bind_executor(strand_, [self = shared_from_this(), buffer, handler](boost::system::error_code ec, std::size_t length) { - handler(ec, length); - })); + ws_.async_write(buffer, [self = shared_from_this(), buffer, handler](boost::system::error_code ec, std::size_t length) { handler(ec, length); }); } void StreamSessionWebsocket::do_read_ws() { // Read a message into our buffer - ws_.async_read(buffer_, boost::asio::bind_executor(strand_, [this, self = shared_from_this()](beast::error_code ec, std::size_t bytes_transferred) { - on_read_ws(ec, bytes_transferred); - })); + ws_.async_read(buffer_, [this, self = shared_from_this()](beast::error_code ec, std::size_t bytes_transferred) { on_read_ws(ec, bytes_transferred); }); } diff --git a/server/stream_session_ws.hpp b/server/stream_session_ws.hpp index 201ae848..76a16732 100644 --- a/server/stream_session_ws.hpp +++ b/server/stream_session_ws.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + Copyright (C) 2014-2021 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -40,7 +40,7 @@ class StreamSessionWebsocket : public StreamSession { public: /// ctor. Received message from the client are passed to StreamMessageReceiver - StreamSessionWebsocket(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, websocket::stream&& socket); + StreamSessionWebsocket(StreamMessageReceiver* receiver, websocket::stream&& socket); ~StreamSessionWebsocket() override; void start() override; void stop() override; diff --git a/server/streamreader/stream_control.cpp b/server/streamreader/stream_control.cpp index 902d09ec..6033b9d4 100644 --- a/server/streamreader/stream_control.cpp +++ b/server/streamreader/stream_control.cpp @@ -36,7 +36,7 @@ namespace streamreader static constexpr auto LOG_TAG = "Script"; -StreamControl::StreamControl(boost::asio::io_context& ioc) : ioc_(ioc), strand_(ioc) +StreamControl::StreamControl(net::io_context& ioc) : ioc_(ioc), strand_(net::make_strand(ioc.get_executor())) { } @@ -61,7 +61,7 @@ void StreamControl::start(const std::string& stream_id, const ServerSettings& se void StreamControl::command(const jsonrpcpp::Request& request, const OnResponse& response_handler) { // use strand to serialize commands sent from different threads - boost::asio::post(strand_, [this, request, response_handler]() { + net::post(strand_, [this, request, response_handler]() { if (response_handler) request_callbacks_[request.id()] = response_handler; @@ -134,7 +134,7 @@ void StreamControl::onLog(std::string message) -ScriptStreamControl::ScriptStreamControl(boost::asio::io_context& ioc, const std::string& script) : StreamControl(ioc), script_(script) +ScriptStreamControl::ScriptStreamControl(net::io_context& ioc, const std::string& script) : StreamControl(ioc), script_(script) { // auto fileExists = [](const std::string& filename) { // struct stat buffer; @@ -191,7 +191,7 @@ void ScriptStreamControl::doCommand(const jsonrpcpp::Request& request) void ScriptStreamControl::stderrReadLine() { const std::string delimiter = "\n"; - boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + net::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n"; @@ -211,7 +211,7 @@ void ScriptStreamControl::stderrReadLine() void ScriptStreamControl::stdoutReadLine() { const std::string delimiter = "\n"; - boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + net::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n"; diff --git a/server/streamreader/stream_control.hpp b/server/streamreader/stream_control.hpp index b3b3658f..89d7eb5b 100644 --- a/server/streamreader/stream_control.hpp +++ b/server/streamreader/stream_control.hpp @@ -37,6 +37,8 @@ namespace bp = boost::process; +namespace net = boost::asio; + using json = nlohmann::json; @@ -52,7 +54,7 @@ public: using OnResponse = std::function; using OnLog = std::function; - StreamControl(boost::asio::io_context& ioc); + StreamControl(net::io_context& ioc); virtual ~StreamControl(); void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler, @@ -68,7 +70,7 @@ protected: void onReceive(const std::string& json); void onLog(std::string message); - boost::asio::io_context& ioc_; + net::io_context& ioc_; private: OnRequest request_handler_; @@ -76,14 +78,14 @@ private: OnLog log_handler_; std::map request_callbacks_; - boost::asio::io_context::strand strand_; + net::strand strand_; }; class ScriptStreamControl : public StreamControl { public: - ScriptStreamControl(boost::asio::io_context& ioc, const std::string& script); + ScriptStreamControl(net::io_context& ioc, const std::string& script); virtual ~ScriptStreamControl() = default; void stop() override;