diff --git a/server/server.cpp b/server/server.cpp index 08bc1925..eeaf7d2a 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -49,13 +49,12 @@ void Server::onNewSession(std::shared_ptr session) } -void Server::onMetadataChanged(const PcmStream* pcmStream) +void Server::onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) { // clang-format off // Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "metadata": {"album": "some album", "artist": "some artist", "track": "some track"...}}} // clang-format on - const auto metadata = pcmStream->getMetadata(); LOG(DEBUG, LOG_TAG) << "Metadata changed, stream: " << pcmStream->getName() << ", meta: " << metadata.toJson().dump(3) << "\n"; // streamServer_->onMetadataChanged(pcmStream, meta); @@ -67,13 +66,13 @@ void Server::onMetadataChanged(const PcmStream* pcmStream) } -void Server::onPropertiesChanged(const PcmStream* pcmStream) +void Server::onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) { - const auto props = pcmStream->getProperties(); - LOG(DEBUG, LOG_TAG) << "Properties changed, stream: " << pcmStream->getName() << ", properties: " << props.toJson().dump(3) << "\n"; + LOG(DEBUG, LOG_TAG) << "Properties changed, stream: " << pcmStream->getName() << ", properties: " << properties.toJson().dump(3) << "\n"; // Send propeties to all connected control clients - json notification = jsonrpcpp::Notification("Stream.OnProperties", jsonrpcpp::Parameter("id", pcmStream->getId(), "properties", props.toJson())).to_json(); + json notification = + jsonrpcpp::Notification("Stream.OnProperties", jsonrpcpp::Parameter("id", pcmStream->getId(), "properties", properties.toJson())).to_json(); controlServer_->send(notification.dump(), nullptr); } diff --git a/server/server.hpp b/server/server.hpp index 0db35625..226f6a1e 100644 --- a/server/server.hpp +++ b/server/server.hpp @@ -78,8 +78,8 @@ private: void onNewSession(std::shared_ptr session) override; /// Implementation of PcmListener - void onMetadataChanged(const PcmStream* pcmStream) override; - void onPropertiesChanged(const PcmStream* pcmStream) override; + void onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) override; + void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override; void onStateChanged(const PcmStream* pcmStream, ReaderState state) override; void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) override; void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) override; diff --git a/server/streamreader/librespot_stream.cpp b/server/streamreader/librespot_stream.cpp index a88f4b5e..d794b117 100644 --- a/server/streamreader/librespot_stream.cpp +++ b/server/streamreader/librespot_stream.cpp @@ -170,10 +170,10 @@ void LibrespotStream::onStderrMsg(const std::string& line) meta.title = string(m[1]); meta.duration = cpt::stod(m[2]) / 1000.; setMetadata(meta); - Properties props; - // props.can_seek = true; - // props.can_control = true; - setProperties(props); + Properties properties; + // properties.can_seek = true; + // properties.can_control = true; + setProperties(properties); } } diff --git a/server/streamreader/meta_stream.cpp b/server/streamreader/meta_stream.cpp index a8e3bb92..cd477b09 100644 --- a/server/streamreader/meta_stream.cpp +++ b/server/streamreader/meta_stream.cpp @@ -84,8 +84,9 @@ void MetaStream::stop() } -void MetaStream::onMetadataChanged(const PcmStream* pcmStream) +void MetaStream::onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) { + std::ignore = metadata; LOG(DEBUG, LOG_TAG) << "onMetadataChanged: " << pcmStream->getName() << "\n"; std::lock_guard lock(mutex_); if (pcmStream != active_stream_.get()) @@ -93,8 +94,9 @@ void MetaStream::onMetadataChanged(const PcmStream* pcmStream) } -void MetaStream::onPropertiesChanged(const PcmStream* pcmStream) +void MetaStream::onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) { + std::ignore = properties; LOG(DEBUG, LOG_TAG) << "onPropertiesChanged: " << pcmStream->getName() << "\n"; } diff --git a/server/streamreader/meta_stream.hpp b/server/streamreader/meta_stream.hpp index 422ff604..cebaca8c 100644 --- a/server/streamreader/meta_stream.hpp +++ b/server/streamreader/meta_stream.hpp @@ -46,8 +46,8 @@ public: protected: /// Implementation of PcmListener - void onMetadataChanged(const PcmStream* pcmStream) override; - void onPropertiesChanged(const PcmStream* pcmStream) override; + void onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) override; + void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override; void onStateChanged(const PcmStream* pcmStream, ReaderState state) override; void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) override; void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) override; diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index c9d1f772..0ffc4638 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -37,7 +37,8 @@ static constexpr auto LOG_TAG = "PcmStream"; PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) - : active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), req_id_(0) + : active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), + req_id_(0), property_timer_(ioc) { encoder::EncoderFactory encoderFactory; if (uri_.query.find(kUriCodec) == uri_.query.end()) @@ -66,6 +67,7 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con PcmStream::~PcmStream() { stop(); + property_timer_.cancel(); } @@ -111,6 +113,23 @@ void PcmStream::onControlRequest(const jsonrpcpp::Request& request) } +void PcmStream::pollProperties() +{ + property_timer_.expires_after(10s); + property_timer_.async_wait([this](const boost::system::error_code& ec) { + if (!ec) + { + stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) { + LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n"; + if (response.error().code() == 0) + setProperties(response.result()); + }); + pollProperties(); + } + }); +} + + void PcmStream::onControlNotification(const jsonrpcpp::Notification& notification) { try @@ -139,6 +158,7 @@ void PcmStream::onControlNotification(const jsonrpcpp::Notification& notificatio if (response.error().code() == 0) setMetadata(response.result()); }); + pollProperties(); } else if (notification.method() == "Plugin.Stream.Log") { @@ -291,12 +311,14 @@ void PcmStream::addListener(PcmListener* pcmListener) const Metatags& PcmStream::getMetadata() const { + std::lock_guard lock(mutex_); return metadata_; } const Properties& PcmStream::getProperties() const { + std::lock_guard lock(mutex_); return properties_; } @@ -426,6 +448,7 @@ void PcmStream::control(const jsonrpcpp::Request& request, const StreamControl:: void PcmStream::setMetadata(const Metatags& metadata) { + std::lock_guard lock(mutex_); if (metadata == metadata_) { LOG(DEBUG, LOG_TAG) << "setMetadata: Metadata did not change\n"; @@ -439,27 +462,28 @@ void PcmStream::setMetadata(const Metatags& metadata) for (auto* listener : pcmListeners_) { if (listener != nullptr) - listener->onMetadataChanged(this); + listener->onMetadataChanged(this, metadata_); } } -void PcmStream::setProperties(const Properties& props) +void PcmStream::setProperties(const Properties& properties) { - if (props == properties_) + std::lock_guard lock(mutex_); + if (properties == properties_) { LOG(DEBUG, LOG_TAG) << "setProperties: Properties did not change\n"; return; } - properties_ = props; + properties_ = properties; LOG(INFO, LOG_TAG) << "setProperties, stream: " << getId() << ", properties: " << properties_.toJson() << "\n"; // Trigger a stream update for (auto* listener : pcmListeners_) { if (listener != nullptr) - listener->onPropertiesChanged(this); + listener->onPropertiesChanged(this, properties); } } diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index ff629f64..aff5b97b 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -20,11 +20,13 @@ #define PCM_STREAM_HPP #include +#include #include #include #include #include +#include #include #include "common/json.hpp" @@ -96,8 +98,8 @@ static constexpr auto kControlScript = "controlscript"; class PcmListener { public: - virtual void onMetadataChanged(const PcmStream* pcmStream) = 0; - virtual void onPropertiesChanged(const PcmStream* pcmStream) = 0; + virtual void onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) = 0; + virtual void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) = 0; virtual void onStateChanged(const PcmStream* pcmStream, ReaderState state) = 0; virtual void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) = 0; virtual void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) = 0; @@ -154,7 +156,9 @@ protected: void chunkEncoded(const encoder::Encoder& encoder, std::shared_ptr chunk, double duration); void setMetadata(const Metatags& metadata); - void setProperties(const Properties& props); + void setProperties(const Properties& properties); + + void pollProperties(); std::chrono::time_point tvEncodedChunk_; std::vector pcmListeners_; @@ -169,7 +173,9 @@ protected: boost::asio::io_context& ioc_; ServerSettings server_settings_; std::unique_ptr stream_ctrl_; - int req_id_; + std::atomic req_id_; + boost::asio::steady_timer property_timer_; + mutable std::mutex mutex_; }; } // namespace streamreader diff --git a/server/streamreader/stream_control.cpp b/server/streamreader/stream_control.cpp index 53edee7b..c8175500 100644 --- a/server/streamreader/stream_control.cpp +++ b/server/streamreader/stream_control.cpp @@ -36,7 +36,7 @@ namespace streamreader static constexpr auto LOG_TAG = "Script"; -StreamControl::StreamControl(boost::asio::io_context& ioc) : ioc_(ioc) +StreamControl::StreamControl(boost::asio::io_context& ioc) : ioc_(ioc), strand_(ioc) { } @@ -60,10 +60,13 @@ void StreamControl::start(const std::string& stream_id, const ServerSettings& se void StreamControl::command(const jsonrpcpp::Request& request, const OnResponse& response_handler) { - if (response_handler) - request_callbacks_[request.id()] = response_handler; + // use strand to serialize commands sent from different threads + boost::asio::post(strand_, [this, request, response_handler]() { + if (response_handler) + request_callbacks_[request.id()] = response_handler; - doCommand(request); + doCommand(request); + }); } @@ -72,31 +75,54 @@ void StreamControl::stop() } -void StreamControl::onNotification(const jsonrpcpp::Notification& notification) +void StreamControl::onReceive(const std::string& json) { - notification_handler_(notification); -} - - -void StreamControl::onRequest(const jsonrpcpp::Request& request) -{ - request_handler_(request); -} - - -void StreamControl::onResponse(const jsonrpcpp::Response& response) -{ - LOG(INFO, LOG_TAG) << "Response: " << response.to_json() << ", id: " << response.id() << "\n"; - // TODO: call request_callbacks_ on timeout with error - auto iter = request_callbacks_.find(response.id()); - if (iter != request_callbacks_.end()) + jsonrpcpp::entity_ptr entity(nullptr); + try { - iter->second(response); - request_callbacks_.erase(iter); + entity = jsonrpcpp::Parser::do_parse(json); + if (!entity) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message\n"; + } + else if (entity->is_notification()) + { + jsonrpcpp::notification_ptr notification = dynamic_pointer_cast(entity); + notification_handler_(*notification); + } + else if (entity->is_request()) + { + jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); + request_handler_(*request); + } + else if (entity->is_response()) + { + jsonrpcpp::response_ptr response = dynamic_pointer_cast(entity); + LOG(INFO, LOG_TAG) << "Response: " << response->to_json() << ", id: " << response->id() << "\n"; + // TODO: call request_callbacks_ on timeout with error + auto iter = request_callbacks_.find(response->id()); + if (iter != request_callbacks_.end()) + { + iter->second(*response); + request_callbacks_.erase(iter); + } + else + { + LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response->id() << "\n"; + } + } + else + { + LOG(WARNING, LOG_TAG) << "Not handling message: " << json << "\n"; + } } - else + catch (const jsonrpcpp::ParseErrorException& e) { - LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response.id() << "\n"; + LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; + } + catch (const std::exception& e) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; } } @@ -160,6 +186,7 @@ void ScriptStreamControl::stderrReadLine() // Extract up to the first delimiter. std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()}; onLog(std::move(line)); + streambuf_stderr_.consume(bytes_transferred); stderrReadLine(); }); @@ -178,43 +205,7 @@ void ScriptStreamControl::stdoutReadLine() // Extract up to the first delimiter. std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()}; - - jsonrpcpp::entity_ptr entity(nullptr); - try - { - entity = jsonrpcpp::Parser::do_parse(line); - if (!entity) - { - LOG(ERROR, LOG_TAG) << "Failed to parse message\n"; - } - if (entity->is_notification()) - { - jsonrpcpp::notification_ptr notification = dynamic_pointer_cast(entity); - onNotification(*notification); - } - else if (entity->is_request()) - { - jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); - onRequest(*request); - } - else if (entity->is_response()) - { - jsonrpcpp::response_ptr response = dynamic_pointer_cast(entity); - onResponse(*response); - } - else - { - LOG(WARNING, LOG_TAG) << "Not handling message: " << line << "\n"; - } - } - catch (const jsonrpcpp::ParseErrorException& e) - { - LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; - } - catch (const std::exception& e) - { - LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; - } + onReceive(line); streambuf_stdout_.consume(bytes_transferred); stdoutReadLine(); diff --git a/server/streamreader/stream_control.hpp b/server/streamreader/stream_control.hpp index 04bfdd40..b3b3658f 100644 --- a/server/streamreader/stream_control.hpp +++ b/server/streamreader/stream_control.hpp @@ -30,6 +30,7 @@ #include #include +#include #include "jsonrpcpp.hpp" #include "server_settings.hpp" @@ -64,17 +65,18 @@ protected: virtual void doCommand(const jsonrpcpp::Request& request) = 0; virtual void doStart(const std::string& stream_id, const ServerSettings& server_setttings) = 0; - void onNotification(const jsonrpcpp::Notification& notification); - void onRequest(const jsonrpcpp::Request& request); - void onResponse(const jsonrpcpp::Response& response); + void onReceive(const std::string& json); void onLog(std::string message); + boost::asio::io_context& ioc_; + +private: OnRequest request_handler_; OnNotification notification_handler_; OnLog log_handler_; - boost::asio::io_context& ioc_; std::map request_callbacks_; + boost::asio::io_context::strand strand_; }; diff --git a/test/test_main.cpp b/test/test_main.cpp index 2a897db8..d2d547df 100644 --- a/test/test_main.cpp +++ b/test/test_main.cpp @@ -182,12 +182,12 @@ TEST_CASE("Properties") )"); // std::cout << in_json.dump(4) << "\n"; - Properties props(in_json); - std::cout << props.toJson().dump(4) << "\n"; + Properties properties(in_json); + std::cout << properties.toJson().dump(4) << "\n"; - REQUIRE(props.loop_status.has_value()); + REQUIRE(properties.loop_status.has_value()); - auto out_json = props.toJson(); + auto out_json = properties.toJson(); // std::cout << out_json.dump(4) << "\n"; REQUIRE(in_json == out_json); @@ -198,12 +198,12 @@ TEST_CASE("Properties") )"); // std::cout << in_json.dump(4) << "\n"; - props.fromJson(in_json); - // std::cout << props.toJson().dump(4) << "\n"; + properties.fromJson(in_json); + // std::cout << properties.toJson().dump(4) << "\n"; - REQUIRE(!props.loop_status.has_value()); + REQUIRE(!properties.loop_status.has_value()); - out_json = props.toJson(); + out_json = properties.toJson(); // std::cout << out_json.dump(4) << "\n"; REQUIRE(in_json == out_json); }