diff --git a/client/client_connection.cpp b/client/client_connection.cpp index c3903dc2..429d9857 100644 --- a/client/client_connection.cpp +++ b/client/client_connection.cpp @@ -35,7 +35,7 @@ using namespace std; static constexpr auto LOG_TAG = "Connection"; ClientConnection::ClientConnection(boost::asio::io_context& io_context, const ClientSettings::Server& server) - : io_context_(io_context), strand_(net::make_strand(io_context_.get_executor())), resolver_(strand_), socket_(strand_), reqId_(1), server_(server) + : io_context_(io_context), strand_(boost::asio::make_strand(io_context_.get_executor())), resolver_(strand_), socket_(strand_), reqId_(1), server_(server) { base_msg_size_ = base_message_.getSize(); buffer_.resize(base_msg_size_); @@ -144,7 +144,7 @@ void ClientConnection::sendNext() message.msg->serialize(stream); auto handler = message.handler; - net::async_write(socket_, streambuf, [this, handler](boost::system::error_code ec, std::size_t length) { + boost::asio::async_write(socket_, streambuf, [this, handler](boost::system::error_code ec, std::size_t length) { if (ec) LOG(ERROR, LOG_TAG) << "Failed to send message, error: " << ec.message() << "\n"; else @@ -162,7 +162,7 @@ void ClientConnection::sendNext() void ClientConnection::send(const msg::message_ptr& message, const ResultHandler& handler) { - net::post(strand_, [this, message, handler]() { + boost::asio::post(strand_, [this, message, handler]() { messages_.emplace_back(message, handler); if (messages_.size() > 1) { @@ -176,7 +176,7 @@ void ClientConnection::send(const msg::message_ptr& message, const ResultHandler void ClientConnection::sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler& handler) { - net::post(strand_, [this, message, timeout, handler]() { + boost::asio::post(strand_, [this, message, timeout, handler]() { pendingRequests_.erase( std::remove_if(pendingRequests_.begin(), pendingRequests_.end(), [](std::weak_ptr request) { return request.expired(); }), pendingRequests_.end()); @@ -197,7 +197,7 @@ void ClientConnection::sendRequest(const msg::message_ptr& message, const chrono void ClientConnection::getNextMessage(const MessageHandler& handler) { - net::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, handler](boost::system::error_code ec, std::size_t length) mutable { + boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, handler](boost::system::error_code ec, std::size_t length) mutable { if (ec) { LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; @@ -229,35 +229,36 @@ void ClientConnection::getNextMessage(const MessageHandler& ha if (base_message_.size > buffer_.size()) buffer_.resize(base_message_.size); - net::async_read(socket_, boost::asio::buffer(buffer_, base_message_.size), [this, handler](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; - if (handler) - handler(ec, nullptr); - return; - } + boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_message_.size), + [this, handler](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; + if (handler) + handler(ec, nullptr); + return; + } - auto response = msg::factory::createMessage(base_message_, buffer_.data()); - if (!response) - LOG(WARNING, LOG_TAG) << "Failed to deserialize message of type: " << base_message_.type << "\n"; - for (auto iter = pendingRequests_.begin(); iter != pendingRequests_.end(); ++iter) - { - auto request = *iter; - if (auto req = request.lock()) - { - if (req->id() == base_message_.refersTo) - { - req->setValue(std::move(response)); - pendingRequests_.erase(iter); - getNextMessage(handler); - return; - } - } - } + auto response = msg::factory::createMessage(base_message_, buffer_.data()); + if (!response) + LOG(WARNING, LOG_TAG) << "Failed to deserialize message of type: " << base_message_.type << "\n"; + for (auto iter = pendingRequests_.begin(); iter != pendingRequests_.end(); ++iter) + { + auto request = *iter; + if (auto req = request.lock()) + { + if (req->id() == base_message_.refersTo) + { + req->setValue(std::move(response)); + pendingRequests_.erase(iter); + getNextMessage(handler); + return; + } + } + } - if (handler) - handler(ec, std::move(response)); - }); + if (handler) + handler(ec, std::move(response)); + }); }); } diff --git a/client/client_connection.hpp b/client/client_connection.hpp index c9a527bc..30c5f35f 100644 --- a/client/client_connection.hpp +++ b/client/client_connection.hpp @@ -40,7 +40,6 @@ using boost::asio::ip::tcp; -namespace net = boost::asio; class ClientConnection; @@ -52,7 +51,7 @@ using MessageHandler = std::function { public: - PendingRequest(const net::strand& strand, uint16_t reqId, const MessageHandler& handler) + PendingRequest(const boost::asio::strand& strand, uint16_t reqId, const MessageHandler& handler) : id_(reqId), timer_(strand), strand_(strand), handler_(handler){}; virtual ~PendingRequest() @@ -65,7 +64,7 @@ public: /// @param value the response message void setValue(std::unique_ptr value) { - net::post(strand_, [this, self = shared_from_this(), val = std::move(value)]() mutable { + boost::asio::post(strand_, [this, self = shared_from_this(), val = std::move(value)]() mutable { timer_.cancel(); if (handler_) handler_({}, std::move(val)); @@ -111,7 +110,7 @@ public: private: uint16_t id_; boost::asio::steady_timer timer_; - net::strand strand_; + boost::asio::strand strand_; MessageHandler handler_; }; @@ -178,7 +177,7 @@ protected: size_t base_msg_size_; boost::asio::io_context& io_context_; - net::strand strand_; + boost::asio::strand strand_; tcp::resolver resolver_; tcp::socket socket_; std::vector> pendingRequests_; diff --git a/server/control_server.cpp b/server/control_server.cpp index 888a91c3..7c55ffb8 100644 --- a/server/control_server.cpp +++ b/server/control_server.cpp @@ -161,7 +161,7 @@ void ControlServer::start() try { LOG(INFO, LOG_TAG) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n"; - acceptor_tcp_.emplace_back(make_unique(net::make_strand(io_context_.get_executor()), + acceptor_tcp_.emplace_back(make_unique(boost::asio::make_strand(io_context_.get_executor()), tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port))); } catch (const boost::system::system_error& e) @@ -177,7 +177,7 @@ void ControlServer::start() try { LOG(INFO, LOG_TAG) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n"; - acceptor_http_.emplace_back(make_unique(net::make_strand(io_context_.get_executor()), + acceptor_http_.emplace_back(make_unique(boost::asio::make_strand(io_context_.get_executor()), tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port))); } catch (const boost::system::system_error& e) diff --git a/server/control_session_tcp.cpp b/server/control_session_tcp.cpp index 52a7b67d..b395d973 100644 --- a/server/control_session_tcp.cpp +++ b/server/control_session_tcp.cpp @@ -32,7 +32,7 @@ static constexpr auto LOG_TAG = "ControlSessionTCP"; ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, tcp::socket&& socket) - : ControlSession(receiver), socket_(std::move(socket)), strand_(net::make_strand(socket_.get_executor())) + : ControlSession(receiver), socket_(std::move(socket)), strand_(boost::asio::make_strand(socket_.get_executor())) { } @@ -98,7 +98,7 @@ void ControlSessionTcp::stop() void ControlSessionTcp::sendAsync(const std::string& message) { - net::post(strand_, [this, self = shared_from_this(), message]() { + boost::asio::post(strand_, [this, self = shared_from_this(), message]() { messages_.emplace_back(message + "\r\n"); if (messages_.size() > 1) { diff --git a/server/control_session_tcp.hpp b/server/control_session_tcp.hpp index 29e113ec..07af3691 100644 --- a/server/control_session_tcp.hpp +++ b/server/control_session_tcp.hpp @@ -29,7 +29,6 @@ #include using boost::asio::ip::tcp; -namespace net = boost::asio; /// Endpoint for a connected control client. /** @@ -55,7 +54,7 @@ protected: tcp::socket socket_; boost::asio::streambuf streambuf_; - net::strand strand_; + boost::asio::strand strand_; std::deque messages_; }; diff --git a/server/control_session_ws.cpp b/server/control_session_ws.cpp index 8aeda904..905790f8 100644 --- a/server/control_session_ws.cpp +++ b/server/control_session_ws.cpp @@ -32,7 +32,7 @@ static constexpr auto LOG_TAG = "ControlSessionWS"; ControlSessionWebsocket::ControlSessionWebsocket(ControlMessageReceiver* receiver, websocket::stream&& socket) - : ControlSession(receiver), ws_(std::move(socket)), strand_(net::make_strand(ws_.get_executor())) + : ControlSession(receiver), ws_(std::move(socket)), strand_(boost::asio::make_strand(ws_.get_executor())) { LOG(DEBUG, LOG_TAG) << "ControlSessionWebsocket\n"; } @@ -66,7 +66,7 @@ void ControlSessionWebsocket::stop() void ControlSessionWebsocket::sendAsync(const std::string& message) { - net::post(strand_, [this, self = shared_from_this(), msg = message]() { + boost::asio::post(strand_, [this, self = shared_from_this(), msg = message]() { messages_.push_back(std::move(msg)); if (messages_.size() > 1) { diff --git a/server/control_session_ws.hpp b/server/control_session_ws.hpp index dcade5ee..86d155b8 100644 --- a/server/control_session_ws.hpp +++ b/server/control_session_ws.hpp @@ -29,9 +29,9 @@ // standard headers #include + namespace beast = boost::beast; // from namespace websocket = beast::websocket; // from -namespace net = boost::asio; // from /// Endpoint for a connected control client. @@ -62,7 +62,7 @@ protected: protected: beast::flat_buffer buffer_; - net::strand strand_; + boost::asio::strand strand_; std::deque messages_; }; diff --git a/server/stream_server.cpp b/server/stream_server.cpp index d2966d72..afbc1a32 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2021 Johannes Pohl + Copyright (C) 2014-2022 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -39,7 +39,7 @@ using json = nlohmann::json; static constexpr auto LOG_TAG = "StreamServer"; -StreamServer::StreamServer(net::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver) +StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver) : io_context_(io_context), config_timer_(io_context), settings_(serverSettings), messageReceiver_(messageReceiver) { } @@ -219,7 +219,7 @@ void StreamServer::start() try { LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n"; - acceptor_.emplace_back(make_unique(net::make_strand(io_context_.get_executor()), + acceptor_.emplace_back(make_unique(boost::asio::make_strand(io_context_.get_executor()), tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port))); } catch (const boost::system::system_error& e) diff --git a/server/stream_server.hpp b/server/stream_server.hpp index 1aa852c2..95a06449 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2021 Johannes Pohl + Copyright (C) 2014-2022 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -59,7 +59,7 @@ using session_ptr = std::shared_ptr; class StreamServer : public StreamMessageReceiver { public: - StreamServer(net::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver = nullptr); + StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver = nullptr); virtual ~StreamServer(); void start(); @@ -87,7 +87,7 @@ private: mutable std::recursive_mutex sessionsMutex_; // mutable std::recursive_mutex clientMutex_; std::vector> sessions_; - net::io_context& io_context_; + boost::asio::io_context& io_context_; std::vector acceptor_; boost::asio::steady_timer config_timer_; diff --git a/server/stream_session.cpp b/server/stream_session.cpp index adcfebde..e3d1d0d2 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -16,10 +16,16 @@ along with this program. If not, see . ***/ +// prototype/interface header file #include "stream_session.hpp" +// local headers #include "common/aixlog.hpp" #include "message/pcm_chunk.hpp" + +// 3rd party headers + +// standard headers #include using namespace std; @@ -28,8 +34,8 @@ using namespace streamreader; static constexpr auto LOG_TAG = "StreamSession"; -StreamSession::StreamSession(const net::any_io_executor& executor, StreamMessageReceiver* receiver) - : messageReceiver_(receiver), pcmStream_(nullptr), strand_(net::make_strand(executor)) +StreamSession::StreamSession(const boost::asio::any_io_executor& executor, StreamMessageReceiver* receiver) + : messageReceiver_(receiver), pcmStream_(nullptr), strand_(boost::asio::make_strand(executor)) { base_msg_size_ = baseMessage_.getSize(); buffer_.resize(base_msg_size_); @@ -54,7 +60,7 @@ void StreamSession::send_next() { auto& buffer = messages_.front(); buffer.on_air = true; - net::post(strand_, [this, self = shared_from_this(), buffer]() { + boost::asio::post(strand_, [this, self = shared_from_this(), buffer]() { sendAsync(buffer, [this](boost::system::error_code ec, std::size_t length) { messages_.pop_front(); if (ec) @@ -72,7 +78,7 @@ void StreamSession::send_next() void StreamSession::send(shared_const_buffer const_buf) { - net::post(strand_, [this, self = shared_from_this(), const_buf]() { + boost::asio::post(strand_, [this, self = shared_from_this(), const_buf]() { // delete PCM chunks that are older than the overall buffer duration messages_.erase(std::remove_if(messages_.begin(), messages_.end(), [this](const shared_const_buffer& buffer) { diff --git a/server/stream_session.hpp b/server/stream_session.hpp index a268854b..85f7a385 100644 --- a/server/stream_session.hpp +++ b/server/stream_session.hpp @@ -122,7 +122,7 @@ class StreamSession : public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to StreamMessageReceiver - StreamSession(const net::any_io_executor& executor, StreamMessageReceiver* receiver); + StreamSession(const boost::asio::any_io_executor& executor, StreamMessageReceiver* receiver); virtual ~StreamSession() = default; virtual std::string getIP() = 0; @@ -162,7 +162,7 @@ protected: StreamMessageReceiver* messageReceiver_; size_t bufferMs_; streamreader::PcmStreamPtr pcmStream_; - net::strand strand_; + boost::asio::strand strand_; std::deque messages_; mutable std::mutex mutex_; }; diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 0f9211a9..a412274d 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2021 Johannes Pohl + Copyright (C) 2014-2022 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -45,7 +45,7 @@ static constexpr auto LOG_TAG = "PcmStream"; PcmStream::PcmStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) - : active_(false), strand_(net::make_strand(ioc.get_executor())), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), + : active_(false), strand_(boost::asio::make_strand(ioc.get_executor())), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), req_id_(0), property_timer_(strand_) { encoder::EncoderFactory encoderFactory; diff --git a/server/streamreader/stream_control.cpp b/server/streamreader/stream_control.cpp index d68e2700..772c123d 100644 --- a/server/streamreader/stream_control.cpp +++ b/server/streamreader/stream_control.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2021 Johannes Pohl + Copyright (C) 2014-2022 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -39,7 +39,7 @@ namespace streamreader static constexpr auto LOG_TAG = "Script"; -StreamControl::StreamControl(const net::any_io_executor& executor) : executor_(executor) +StreamControl::StreamControl(const boost::asio::any_io_executor& executor) : executor_(executor) { } @@ -64,7 +64,7 @@ void StreamControl::start(const std::string& stream_id, const ServerSettings& se void StreamControl::command(const jsonrpcpp::Request& request, const OnResponse& response_handler) { // use strand to serialize commands sent from different threads - net::post(executor_, [this, request, response_handler]() { + boost::asio::post(executor_, [this, request, response_handler]() { if (response_handler) request_callbacks_[request.id()] = response_handler; @@ -137,7 +137,7 @@ void StreamControl::onLog(std::string message) -ScriptStreamControl::ScriptStreamControl(const net::any_io_executor& executor, const std::string& script) : StreamControl(executor), script_(script) +ScriptStreamControl::ScriptStreamControl(const boost::asio::any_io_executor& executor, const std::string& script) : StreamControl(executor), script_(script) { namespace fs = utils::file; if (!fs::exists(script_)) @@ -194,7 +194,7 @@ void ScriptStreamControl::doCommand(const jsonrpcpp::Request& request) void ScriptStreamControl::stderrReadLine() { const std::string delimiter = "\n"; - net::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n"; @@ -214,7 +214,7 @@ void ScriptStreamControl::stderrReadLine() void ScriptStreamControl::stdoutReadLine() { const std::string delimiter = "\n"; - net::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n"; diff --git a/server/streamreader/stream_control.hpp b/server/streamreader/stream_control.hpp index 09cf730c..6e4e340f 100644 --- a/server/streamreader/stream_control.hpp +++ b/server/streamreader/stream_control.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2021 Johannes Pohl + Copyright (C) 2014-2022 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -58,7 +58,7 @@ public: using OnResponse = std::function; using OnLog = std::function; - StreamControl(const net::any_io_executor& executor); + StreamControl(const boost::asio::any_io_executor& executor); virtual ~StreamControl(); void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler, @@ -74,7 +74,7 @@ protected: void onReceive(const std::string& json); void onLog(std::string message); - net::any_io_executor executor_; + boost::asio::any_io_executor executor_; private: OnRequest request_handler_; @@ -88,7 +88,7 @@ private: class ScriptStreamControl : public StreamControl { public: - ScriptStreamControl(const net::any_io_executor& executor, const std::string& script); + ScriptStreamControl(const boost::asio::any_io_executor& executor, const std::string& script); virtual ~ScriptStreamControl() = default; void stop() override; diff --git a/server/streamreader/watchdog.cpp b/server/streamreader/watchdog.cpp index 227d6f24..1351615e 100644 --- a/server/streamreader/watchdog.cpp +++ b/server/streamreader/watchdog.cpp @@ -36,7 +36,7 @@ using namespace std; namespace streamreader { -Watchdog::Watchdog(const net::any_io_executor& executor, WatchdogListener* listener) : timer_(executor), listener_(listener) +Watchdog::Watchdog(const boost::asio::any_io_executor& executor, WatchdogListener* listener) : timer_(executor), listener_(listener) { } diff --git a/server/streamreader/watchdog.hpp b/server/streamreader/watchdog.hpp index 7f180afd..6c990ff3 100644 --- a/server/streamreader/watchdog.hpp +++ b/server/streamreader/watchdog.hpp @@ -44,7 +44,7 @@ public: class Watchdog { public: - Watchdog(const net::any_io_executor& executor, WatchdogListener* listener = nullptr); + Watchdog(const boost::asio::any_io_executor& executor, WatchdogListener* listener = nullptr); virtual ~Watchdog(); void start(const std::chrono::milliseconds& timeout);