diff --git a/server/stream_session.cpp b/server/stream_session.cpp index eb8b4338..adcfebde 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -28,7 +28,7 @@ using namespace streamreader; static constexpr auto LOG_TAG = "StreamSession"; -StreamSession::StreamSession(net::any_io_executor executor, StreamMessageReceiver* receiver) +StreamSession::StreamSession(const net::any_io_executor& executor, StreamMessageReceiver* receiver) : messageReceiver_(receiver), pcmStream_(nullptr), strand_(net::make_strand(executor)) { base_msg_size_ = baseMessage_.getSize(); diff --git a/server/stream_session.hpp b/server/stream_session.hpp index b4965b88..b5693ae5 100644 --- a/server/stream_session.hpp +++ b/server/stream_session.hpp @@ -120,7 +120,7 @@ class StreamSession : public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to StreamMessageReceiver - StreamSession(net::any_io_executor strand, StreamMessageReceiver* receiver); + StreamSession(const net::any_io_executor& executor, StreamMessageReceiver* receiver); virtual ~StreamSession() = default; virtual std::string getIP() = 0; diff --git a/server/streamreader/airplay_stream.cpp b/server/streamreader/airplay_stream.cpp index 7aa9e1b3..edc07d7a 100644 --- a/server/streamreader/airplay_stream.cpp +++ b/server/streamreader/airplay_stream.cpp @@ -229,7 +229,7 @@ void AirplayStream::pipeReadLine() try { int fd = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK); - pipe_fd_ = std::make_unique(ioc_, fd); + pipe_fd_ = std::make_unique(strand_, fd); LOG(INFO, LOG_TAG) << "Metadata pipe opened: " << pipePath_ << "\n"; } catch (const std::exception& e) diff --git a/server/streamreader/alsa_stream.cpp b/server/streamreader/alsa_stream.cpp index 47a0e542..0aa7bff5 100644 --- a/server/streamreader/alsa_stream.cpp +++ b/server/streamreader/alsa_stream.cpp @@ -66,7 +66,7 @@ void wait(boost::asio::steady_timer& timer, const std::chrono::duration::wait(Timer& timer, const std::chrono::duration AsioStream::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) - : PcmStream(pcmListener, ioc, server_settings, uri), read_timer_(ioc), state_timer_(ioc) + : PcmStream(pcmListener, ioc, server_settings, uri), read_timer_(strand_), state_timer_(strand_) { chunk_ = std::make_unique(sampleFormat_, chunk_ms_); LOG(DEBUG, "AsioStream") << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize diff --git a/server/streamreader/file_stream.cpp b/server/streamreader/file_stream.cpp index a2f55c0e..627ca44d 100644 --- a/server/streamreader/file_stream.cpp +++ b/server/streamreader/file_stream.cpp @@ -53,7 +53,7 @@ void FileStream::do_connect() { LOG(DEBUG, LOG_TAG) << "connect\n"; int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); - stream_ = std::make_unique(ioc_, fd); + stream_ = std::make_unique(strand_, fd); on_connect(); } diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 0672d326..aa2bdbc2 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -42,8 +42,8 @@ static constexpr auto LOG_TAG = "PcmStream"; PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) - : active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), - req_id_(0), property_timer_(ioc) + : active_(false), strand_(net::make_strand(ioc.get_executor())), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), + ioc_(ioc), server_settings_(server_settings), req_id_(0), property_timer_(strand_) { encoder::EncoderFactory encoderFactory; if (uri_.query.find(kUriCodec) == uri_.query.end()) @@ -61,7 +61,7 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con if (uri_.query.find(kControlScript) != uri_.query.end()) { - stream_ctrl_ = std::make_unique(ioc, uri_.query[kControlScript]); + stream_ctrl_ = std::make_unique(strand_, uri_.query[kControlScript]); } if (uri_.query.find(kUriChunkMs) != uri_.query.end()) diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index 21b18f68..da0be45d 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -41,6 +41,8 @@ namespace bp = boost::process; +namespace net = boost::asio; + using json = nlohmann::json; @@ -178,6 +180,7 @@ protected: /// Send request to stream control script void sendRequest(const std::string& method, const jsonrpcpp::Parameter& params, ResultHandler handler); + net::strand strand_; std::chrono::time_point tvEncodedChunk_; std::vector pcmListeners_; StreamUri uri_; @@ -185,7 +188,7 @@ protected: size_t chunk_ms_; std::unique_ptr encoder_; std::string name_; - ReaderState state_; + std::atomic state_; Metatags metadata_; Properties properties_; boost::asio::io_context& ioc_; diff --git a/server/streamreader/pipe_stream.cpp b/server/streamreader/pipe_stream.cpp index d8995ef3..89c0c8a3 100644 --- a/server/streamreader/pipe_stream.cpp +++ b/server/streamreader/pipe_stream.cpp @@ -62,7 +62,7 @@ void PipeStream::do_connect() pipe_size = fcntl(fd, F_GETPIPE_SZ); #endif LOG(TRACE, LOG_TAG) << "Stream: " << name_ << ", connect to pipe: " << uri_.path << ", fd: " << fd << ", pipe size: " << pipe_size << "\n"; - stream_ = std::make_unique(ioc_, fd); + stream_ = std::make_unique(strand_, fd); on_connect(); } diff --git a/server/streamreader/process_stream.cpp b/server/streamreader/process_stream.cpp index 0b051a51..00e76b9f 100644 --- a/server/streamreader/process_stream.cpp +++ b/server/streamreader/process_stream.cpp @@ -113,12 +113,12 @@ void ProcessStream::do_connect() fcntl(pipe_stdout_.native_source(), F_SETFL, flags | O_NONBLOCK); process_ = bp::child(path_ + exe_ + " " + params_, bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, bp::start_dir = path_); - stream_ = make_unique(ioc_, pipe_stdout_.native_source()); - stream_stderr_ = make_unique(ioc_, pipe_stderr_.native_source()); + stream_ = make_unique(strand_, pipe_stdout_.native_source()); + stream_stderr_ = make_unique(strand_, pipe_stderr_.native_source()); on_connect(); if (wd_timeout_sec_ > 0) { - watchdog_ = make_unique(ioc_, this); + watchdog_ = make_unique(strand_, this); watchdog_->start(std::chrono::seconds(wd_timeout_sec_)); } else diff --git a/server/streamreader/stream_control.cpp b/server/streamreader/stream_control.cpp index 6033b9d4..4fe62597 100644 --- a/server/streamreader/stream_control.cpp +++ b/server/streamreader/stream_control.cpp @@ -36,7 +36,7 @@ namespace streamreader static constexpr auto LOG_TAG = "Script"; -StreamControl::StreamControl(net::io_context& ioc) : ioc_(ioc), strand_(net::make_strand(ioc.get_executor())) +StreamControl::StreamControl(const net::any_io_executor& executor) : executor_(executor) { } @@ -61,7 +61,7 @@ void StreamControl::start(const std::string& stream_id, const ServerSettings& se void StreamControl::command(const jsonrpcpp::Request& request, const OnResponse& response_handler) { // use strand to serialize commands sent from different threads - net::post(strand_, [this, request, response_handler]() { + net::post(executor_, [this, request, response_handler]() { if (response_handler) request_callbacks_[request.id()] = response_handler; @@ -134,7 +134,7 @@ void StreamControl::onLog(std::string message) -ScriptStreamControl::ScriptStreamControl(net::io_context& ioc, const std::string& script) : StreamControl(ioc), script_(script) +ScriptStreamControl::ScriptStreamControl(const net::any_io_executor& executor, const std::string& script) : StreamControl(executor), script_(script) { // auto fileExists = [](const std::string& filename) { // struct stat buffer; @@ -158,22 +158,20 @@ void ScriptStreamControl::doStart(const std::string& stream_id, const ServerSett { process_ = bp::child( script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, bp::std_in < in_, - bp::on_exit = - [](int exit, const std::error_code& ec_in) { - auto severity = AixLog::Severity::debug; - if (exit != 0) - severity = AixLog::Severity::error; - LOG(severity, LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; - }, - ioc_); + bp::on_exit = [](int exit, const std::error_code& ec_in) { + auto severity = AixLog::Severity::debug; + if (exit != 0) + severity = AixLog::Severity::error; + LOG(severity, LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; + }); } catch (const std::exception& e) { throw SnapException("Failed to start control script: '" + script_ + "', exception: " + e.what()); } - stream_stdout_ = make_unique(ioc_, pipe_stdout_.native_source()); - stream_stderr_ = make_unique(ioc_, pipe_stderr_.native_source()); + stream_stdout_ = make_unique(executor_, pipe_stdout_.native_source()); + stream_stderr_ = make_unique(executor_, pipe_stderr_.native_source()); stdoutReadLine(); stderrReadLine(); } diff --git a/server/streamreader/stream_control.hpp b/server/streamreader/stream_control.hpp index 89d7eb5b..42ea6c57 100644 --- a/server/streamreader/stream_control.hpp +++ b/server/streamreader/stream_control.hpp @@ -28,9 +28,7 @@ #include #include -#include -#include -#include +#include #include "jsonrpcpp.hpp" #include "server_settings.hpp" @@ -54,7 +52,7 @@ public: using OnResponse = std::function; using OnLog = std::function; - StreamControl(net::io_context& ioc); + StreamControl(const net::any_io_executor& executor); virtual ~StreamControl(); void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler, @@ -70,7 +68,7 @@ protected: void onReceive(const std::string& json); void onLog(std::string message); - net::io_context& ioc_; + net::any_io_executor executor_; private: OnRequest request_handler_; @@ -78,14 +76,13 @@ private: OnLog log_handler_; std::map request_callbacks_; - net::strand strand_; }; class ScriptStreamControl : public StreamControl { public: - ScriptStreamControl(net::io_context& ioc, const std::string& script); + ScriptStreamControl(const net::any_io_executor& executor, const std::string& script); virtual ~ScriptStreamControl() = default; void stop() override; diff --git a/server/streamreader/tcp_stream.cpp b/server/streamreader/tcp_stream.cpp index 844282c5..da5687df 100644 --- a/server/streamreader/tcp_stream.cpp +++ b/server/streamreader/tcp_stream.cpp @@ -61,7 +61,7 @@ TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con LOG(INFO, LOG_TAG) << "TcpStream host: " << host_ << ", port: " << port_ << ", is server: " << is_server_ << "\n"; if (is_server_) - acceptor_ = make_unique(ioc_, tcp::endpoint(boost::asio::ip::address::from_string(host_), port_)); + acceptor_ = make_unique(strand_, tcp::endpoint(boost::asio::ip::address::from_string(host_), port_)); } @@ -87,7 +87,7 @@ void TcpStream::do_connect() } else { - stream_ = make_unique(ioc_); + stream_ = make_unique(strand_); boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host_), port_); stream_->async_connect(endpoint, [this](const boost::system::error_code& ec) { if (!ec) diff --git a/server/streamreader/watchdog.cpp b/server/streamreader/watchdog.cpp index 48717676..e9ec333e 100644 --- a/server/streamreader/watchdog.cpp +++ b/server/streamreader/watchdog.cpp @@ -29,7 +29,7 @@ using namespace std; namespace streamreader { -Watchdog::Watchdog(boost::asio::io_context& ioc, WatchdogListener* listener) : timer_(ioc), listener_(listener) +Watchdog::Watchdog(const net::any_io_executor& executor, WatchdogListener* listener) : timer_(executor), listener_(listener) { } diff --git a/server/streamreader/watchdog.hpp b/server/streamreader/watchdog.hpp index 09b15aa5..c4e9df68 100644 --- a/server/streamreader/watchdog.hpp +++ b/server/streamreader/watchdog.hpp @@ -22,6 +22,8 @@ #include #include +namespace net = boost::asio; + namespace streamreader { @@ -39,7 +41,7 @@ public: class Watchdog { public: - Watchdog(boost::asio::io_context& ioc, WatchdogListener* listener = nullptr); + Watchdog(const net::any_io_executor& executor, WatchdogListener* listener = nullptr); virtual ~Watchdog(); void start(const std::chrono::milliseconds& timeout);