diff --git a/server/server.cpp b/server/server.cpp index 4d5447d4..72f1d212 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -457,8 +457,10 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent if (stream == nullptr) throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); - // Set metadata from request - stream->control(request->params().get("command"), request->params().has("params") ? request->params().get("params") : json{}); + stream->control(*request, [](const jsonrpcpp::Response& response) { + LOG(INFO, LOG_TAG) << "Received response for Stream.Control, id: " << response.id() << ", result: " << response.result() + << ", error: " << response.error().code() << "\n"; + }); // Setup response result["id"] = streamId; @@ -477,7 +479,10 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent if (stream == nullptr) throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); - stream->setProperty(request->params().get("property"), request->params().get("value")); + stream->setProperty(*request, [](const jsonrpcpp::Response& response) { + LOG(INFO, LOG_TAG) << "Received response for Stream.SetProperty, id: " << response.id() << ", result: " << response.result() + << ", error: " << response.error().code() << "\n"; + }); // Setup response // TODO: error handling diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 379c6033..c8284634 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -25,7 +25,6 @@ #include "common/str_compat.hpp" #include "common/utils/string_utils.hpp" #include "encoder/encoder_factory.hpp" -#include "jsonrpcpp.hpp" #include "pcm_stream.hpp" @@ -49,9 +48,12 @@ CtrlScript::~CtrlScript() } -void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler) +void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler, + const OnRequest& request_handler, const OnLog& log_handler) { - receive_handler_ = receive_handler; + notification_handler_ = notification_handler; + request_handler_ = request_handler; + log_handler_ = log_handler; pipe_stderr_ = bp::pipe(); pipe_stdout_ = bp::pipe(); stringstream params; @@ -74,35 +76,17 @@ void CtrlScript::start(const std::string& stream_id, const ServerSettings& serve stderrReadLine(); } -void CtrlScript::send(const std::string& msg) + +void CtrlScript::send(const jsonrpcpp::Request& request, const OnResponse& response_handler) { + if (response_handler) + request_callbacks_[request.id()] = response_handler; + + std::string msg = request.to_json().dump() + "\n"; in_.write(msg.data(), msg.size()); in_.flush(); } -void CtrlScript::logScript(std::string line) -{ - if (line.back() == '\r') - line.resize(line.size() - 1); - if (line.empty()) - return; - auto tmp = utils::string::tolower_copy(line); - AixLog::Severity severity = AixLog::Severity::info; - if (tmp.find(" trace") != string::npos) - severity = AixLog::Severity::trace; - else if (tmp.find(" debug") != string::npos) - severity = AixLog::Severity::debug; - else if (tmp.find(" info") != string::npos) - severity = AixLog::Severity::info; - else if (tmp.find(" warning") != string::npos) - severity = AixLog::Severity::warning; - else if (tmp.find(" error") != string::npos) - severity = AixLog::Severity::error; - else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos)) - severity = AixLog::Severity::fatal; - LOG(severity, SCRIPT_LOG_TAG) << line << "\n"; -} - void CtrlScript::stderrReadLine() { @@ -116,7 +100,7 @@ void CtrlScript::stderrReadLine() // Extract up to the first delimiter. std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()}; - logScript(std::move(line)); + log_handler_(std::move(line)); streambuf_stderr_.consume(bytes_transferred); stderrReadLine(); }); @@ -135,7 +119,54 @@ void CtrlScript::stdoutReadLine() // Extract up to the first delimiter. std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()}; - receive_handler_(std::move(line)); + + jsonrpcpp::entity_ptr entity(nullptr); + try + { + entity = jsonrpcpp::Parser::do_parse(line); + if (!entity) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message\n"; + } + if (entity->is_notification()) + { + jsonrpcpp::notification_ptr notification = dynamic_pointer_cast(entity); + notification_handler_(*notification); + } + else if (entity->is_request()) + { + jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); + request_handler_(*request); + } + else if (entity->is_response()) + { + jsonrpcpp::response_ptr response = dynamic_pointer_cast(entity); + LOG(INFO, LOG_TAG) << "Response: " << response->to_json() << ", id: " << response->id() << "\n"; + auto iter = request_callbacks_.find(response->id()); + if (iter != request_callbacks_.end()) + { + iter->second(*response); + request_callbacks_.erase(iter); + } + else + { + LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response->id() << "\n"; + } + } + else + { + LOG(WARNING, LOG_TAG) << "Not handling message: " << line << "\n"; + } + } + catch (const jsonrpcpp::ParseErrorException& e) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; + } + catch (const std::exception& e) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; + } + streambuf_stdout_.consume(bytes_transferred); stdoutReadLine(); }); @@ -223,60 +254,51 @@ std::string PcmStream::getCodec() const } -void PcmStream::onControlMsg(const std::string& msg) +void PcmStream::onControlRequest(const jsonrpcpp::Request& request) { - LOG(DEBUG, LOG_TAG) << "Received: " << msg << "\n"; + LOG(INFO, LOG_TAG) << "Request: " << request.method() << ", id: " << request.id() << ", params: " << request.params().to_json() << "\n"; +} - jsonrpcpp::entity_ptr entity(nullptr); - try - { - entity = jsonrpcpp::Parser::do_parse(msg); - if (!entity) - { - LOG(ERROR, LOG_TAG) << "Failed to parse message\n"; - return; - } - } - catch (const jsonrpcpp::ParseErrorException& e) - { - LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; - return; - // return e.to_json().dump(); - } - catch (const std::exception& e) - { - LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; - return; - // return jsonrpcpp::ParseErrorException(e.what()).to_json().dump(); - } - if (entity->is_notification()) +void PcmStream::onControlNotification(const jsonrpcpp::Notification& notification) +{ + LOG(INFO, LOG_TAG) << "Notification method: " << notification.method() << ", params: " << notification.params().to_json() << "\n"; + if (notification.method() == "Player.Metadata") { - jsonrpcpp::notification_ptr notification = dynamic_pointer_cast(entity); - LOG(INFO, LOG_TAG) << "Notification method: " << notification->method() << ", params: " << notification->params().to_json() << "\n"; - if (notification->method() == "Player.Metadata") - { - LOG(DEBUG, LOG_TAG) << "Received metadata notification\n"; - setMeta(notification->params().to_json()); - } - else if (notification->method() == "Player.Properties") - { - LOG(DEBUG, LOG_TAG) << "Received properties notification\n"; - setProperties(notification->params().to_json()); - } - else - LOG(WARNING, LOG_TAG) << "Received unknown notification method: '" << notification->method() << "'\n"; + LOG(DEBUG, LOG_TAG) << "Received metadata notification\n"; + setMeta(notification.params().to_json()); } - else if (entity->is_request()) + else if (notification.method() == "Player.Properties") { - jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); - LOG(INFO, LOG_TAG) << "Request: " << request->method() << ", id: " << request->id() << ", params: " << request->params().to_json() << "\n"; - } - else if (entity->is_response()) - { - jsonrpcpp::response_ptr response = dynamic_pointer_cast(entity); - LOG(INFO, LOG_TAG) << "Response: " << response->result().dump() << ", id: " << response->id() << "\n"; + LOG(DEBUG, LOG_TAG) << "Received properties notification\n"; + setProperties(notification.params().to_json()); } + else + LOG(WARNING, LOG_TAG) << "Received unknown notification method: '" << notification.method() << "'\n"; +} + + +void PcmStream::onControlLog(std::string line) +{ + if (line.back() == '\r') + line.resize(line.size() - 1); + if (line.empty()) + return; + auto tmp = utils::string::tolower_copy(line); + AixLog::Severity severity = AixLog::Severity::info; + if (tmp.find(" trace") != string::npos) + severity = AixLog::Severity::trace; + else if (tmp.find(" debug") != string::npos) + severity = AixLog::Severity::debug; + else if (tmp.find(" info") != string::npos) + severity = AixLog::Severity::info; + else if (tmp.find(" warning") != string::npos) + severity = AixLog::Severity::warning; + else if (tmp.find(" error") != string::npos) + severity = AixLog::Severity::error; + else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos)) + severity = AixLog::Severity::fatal; + LOG(severity, SCRIPT_LOG_TAG) << "Stream: " << getId() << ", message: " << line << "\n"; } @@ -289,7 +311,9 @@ void PcmStream::start() active_ = true; if (ctrl_script_) - ctrl_script_->start(getId(), server_settings_, [this](const std::string& msg) { onControlMsg(msg); }); + ctrl_script_->start( + getId(), server_settings_, [this](const jsonrpcpp::Notification& notification) { onControlNotification(notification); }, + [this](const jsonrpcpp::Request& request) { onControlRequest(request); }, [this](std::string message) { onControlLog(std::move(message)); }); } @@ -399,54 +423,70 @@ std::shared_ptr PcmStream::getProperties() const } -void PcmStream::setProperty(const std::string& name, const json& value) +void PcmStream::setProperty(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler) { + auto name = request.params().get("property"); + auto value = request.params().get("value"); LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' set property: " << name << " = " << value << "\n"; // TODO: check validity + bool valid = true; if (name == "loopStatus") - ; + { + auto val = value.get(); + valid = ((val == "none") || (val == "track") || (val == "playlist")); + } else if (name == "shuffle") - ; + { + valid = value.is_boolean(); + } else if (name == "volume") - ; + { + valid = value.is_number_integer(); + } else if (name == "rate") - ; + { + valid = value.is_number_float(); + } else { - LOG(ERROR, LOG_TAG) << "Property not supported: " << name << "\n"; + valid = false; + } + + if (!valid) + { + auto error = jsonrpcpp::InvalidParamsException(request); + response_handler(error.to_json()); + return; } - // TODO: queue commands, send next on timeout or after reception of the last command's response if (ctrl_script_) { - jsonrpcpp::Request request(++req_id_, "Player.SetProperty", {name, value}); - ctrl_script_->send(request.to_json().dump() + "\n"); //, params); - } - else // TODO: Will the ctr_script always loop back the new properties? - { - // properties_ = std::make_shared(props); - // // Trigger a stream update - // for (auto* listener : pcmListeners_) - // { - // if (listener != nullptr) - // listener->onPropertiesChanged(this); - // } + jsonrpcpp::Request req(++req_id_, "Player.SetProperty", {name, value}); + ctrl_script_->send(req, response_handler); } } -void PcmStream::control(const std::string& command, const json& params) +void PcmStream::control(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler) { - LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << params << "'\n"; - for (const auto& it : params.items()) + + std::string command = request.params().get("command"); + static std::set supported_commands{"Next", "Previous", "Pause", "PlayPause", "Stop", "Play", "Seek", "SetPosition"}; + if ((supported_commands.find(command) == supported_commands.end()) || + ((command == "SetPosition") && + (!request.params().has("params") || !request.params().get("params").contains("Position") || !request.params().get("params").contains("TrackId"))) || + ((command == "Seek") && (!request.params().has("params") || !request.params().get("params").contains("Offset")))) { - LOG(INFO, LOG_TAG) << "Stream " << getId() << " key: '" << it.key() << "', param: '" << it.value() << "'\n"; + auto error = jsonrpcpp::InvalidParamsException(request); + response_handler(error.to_json()); + return; } - // TODO: queue commands, send next on timeout or after reception of the last command's response + + LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << request.params().to_json() << "'\n"; if (ctrl_script_) { - jsonrpcpp::Request request(++req_id_, "Player." + command, params); - ctrl_script_->send(request.to_json().dump() + "\n"); //, params); + jsonrpcpp::Request req(++req_id_, "Player." + command, request.params().has("params") ? request.params().get("params") : json{}); + ctrl_script_->send(req, response_handler); } } diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index 388be32e..b7e863b8 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -42,6 +42,7 @@ #include "encoder/encoder.hpp" #include "message/codec_header.hpp" // #include "message/stream_tags.hpp" +#include "jsonrpcpp.hpp" #include "server_settings.hpp" #include "stream_uri.hpp" @@ -115,20 +116,23 @@ public: class CtrlScript { public: - using OnReceive = std::function; + using OnRequest = std::function; + using OnNotification = std::function; + using OnResponse = std::function; + using OnLog = std::function; CtrlScript(boost::asio::io_context& ioc, const std::string& script); virtual ~CtrlScript(); - void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler); + void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler, + const OnRequest& request_handler, const OnLog& log_handler); void stop(); /// Send a message to stdin of the process - void send(const std::string& msg); + void send(const jsonrpcpp::Request& request, const OnResponse& response_handler); private: void stderrReadLine(); void stdoutReadLine(); - void logScript(std::string line); bp::child process_; bp::pipe pipe_stdout_; @@ -137,11 +141,15 @@ private: std::unique_ptr stream_stderr_; boost::asio::streambuf streambuf_stdout_; boost::asio::streambuf streambuf_stderr_; - OnReceive receive_handler_; + OnRequest request_handler_; + OnNotification notification_handler_; + OnLog log_handler_; boost::asio::io_context& ioc_; std::string script_; bp::opstream in_; + + std::map request_callbacks_; }; @@ -170,11 +178,10 @@ public: virtual std::string getCodec() const; std::shared_ptr getMeta() const; - std::shared_ptr getProperties() const; - void setProperty(const std::string& name, const json& value); - virtual void control(const std::string& command, const json& params); + virtual void setProperty(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler); + virtual void control(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler); virtual ReaderState getState() const; virtual json toJson() const; @@ -184,7 +191,10 @@ public: protected: std::atomic active_; - void onControlMsg(const std::string& msg); + void onControlRequest(const jsonrpcpp::Request& request); + void onControlNotification(const jsonrpcpp::Notification& notification); + void onControlLog(std::string line); + void setState(ReaderState newState); void chunkRead(const msg::PcmChunk& chunk); void resync(const std::chrono::nanoseconds& duration);