mirror of
https://github.com/badaix/snapcast.git
synced 2025-04-29 10:17:16 +02:00
Add reponse handlers for requests
This commit is contained in:
parent
d32378bd2e
commit
5c28809b43
3 changed files with 170 additions and 115 deletions
|
@ -457,8 +457,10 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
|
|||
if (stream == nullptr)
|
||||
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
|
||||
|
||||
// Set metadata from request
|
||||
stream->control(request->params().get("command"), request->params().has("params") ? request->params().get("params") : json{});
|
||||
stream->control(*request, [](const jsonrpcpp::Response& response) {
|
||||
LOG(INFO, LOG_TAG) << "Received response for Stream.Control, id: " << response.id() << ", result: " << response.result()
|
||||
<< ", error: " << response.error().code() << "\n";
|
||||
});
|
||||
|
||||
// Setup response
|
||||
result["id"] = streamId;
|
||||
|
@ -477,7 +479,10 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
|
|||
if (stream == nullptr)
|
||||
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
|
||||
|
||||
stream->setProperty(request->params().get("property"), request->params().get("value"));
|
||||
stream->setProperty(*request, [](const jsonrpcpp::Response& response) {
|
||||
LOG(INFO, LOG_TAG) << "Received response for Stream.SetProperty, id: " << response.id() << ", result: " << response.result()
|
||||
<< ", error: " << response.error().code() << "\n";
|
||||
});
|
||||
|
||||
// Setup response
|
||||
// TODO: error handling
|
||||
|
|
|
@ -25,7 +25,6 @@
|
|||
#include "common/str_compat.hpp"
|
||||
#include "common/utils/string_utils.hpp"
|
||||
#include "encoder/encoder_factory.hpp"
|
||||
#include "jsonrpcpp.hpp"
|
||||
#include "pcm_stream.hpp"
|
||||
|
||||
|
||||
|
@ -49,9 +48,12 @@ CtrlScript::~CtrlScript()
|
|||
}
|
||||
|
||||
|
||||
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler)
|
||||
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler,
|
||||
const OnRequest& request_handler, const OnLog& log_handler)
|
||||
{
|
||||
receive_handler_ = receive_handler;
|
||||
notification_handler_ = notification_handler;
|
||||
request_handler_ = request_handler;
|
||||
log_handler_ = log_handler;
|
||||
pipe_stderr_ = bp::pipe();
|
||||
pipe_stdout_ = bp::pipe();
|
||||
stringstream params;
|
||||
|
@ -74,35 +76,17 @@ void CtrlScript::start(const std::string& stream_id, const ServerSettings& serve
|
|||
stderrReadLine();
|
||||
}
|
||||
|
||||
void CtrlScript::send(const std::string& msg)
|
||||
|
||||
void CtrlScript::send(const jsonrpcpp::Request& request, const OnResponse& response_handler)
|
||||
{
|
||||
if (response_handler)
|
||||
request_callbacks_[request.id()] = response_handler;
|
||||
|
||||
std::string msg = request.to_json().dump() + "\n";
|
||||
in_.write(msg.data(), msg.size());
|
||||
in_.flush();
|
||||
}
|
||||
|
||||
void CtrlScript::logScript(std::string line)
|
||||
{
|
||||
if (line.back() == '\r')
|
||||
line.resize(line.size() - 1);
|
||||
if (line.empty())
|
||||
return;
|
||||
auto tmp = utils::string::tolower_copy(line);
|
||||
AixLog::Severity severity = AixLog::Severity::info;
|
||||
if (tmp.find(" trace") != string::npos)
|
||||
severity = AixLog::Severity::trace;
|
||||
else if (tmp.find(" debug") != string::npos)
|
||||
severity = AixLog::Severity::debug;
|
||||
else if (tmp.find(" info") != string::npos)
|
||||
severity = AixLog::Severity::info;
|
||||
else if (tmp.find(" warning") != string::npos)
|
||||
severity = AixLog::Severity::warning;
|
||||
else if (tmp.find(" error") != string::npos)
|
||||
severity = AixLog::Severity::error;
|
||||
else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos))
|
||||
severity = AixLog::Severity::fatal;
|
||||
LOG(severity, SCRIPT_LOG_TAG) << line << "\n";
|
||||
}
|
||||
|
||||
|
||||
void CtrlScript::stderrReadLine()
|
||||
{
|
||||
|
@ -116,7 +100,7 @@ void CtrlScript::stderrReadLine()
|
|||
|
||||
// Extract up to the first delimiter.
|
||||
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
|
||||
logScript(std::move(line));
|
||||
log_handler_(std::move(line));
|
||||
streambuf_stderr_.consume(bytes_transferred);
|
||||
stderrReadLine();
|
||||
});
|
||||
|
@ -135,7 +119,54 @@ void CtrlScript::stdoutReadLine()
|
|||
|
||||
// Extract up to the first delimiter.
|
||||
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
|
||||
receive_handler_(std::move(line));
|
||||
|
||||
jsonrpcpp::entity_ptr entity(nullptr);
|
||||
try
|
||||
{
|
||||
entity = jsonrpcpp::Parser::do_parse(line);
|
||||
if (!entity)
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Failed to parse message\n";
|
||||
}
|
||||
if (entity->is_notification())
|
||||
{
|
||||
jsonrpcpp::notification_ptr notification = dynamic_pointer_cast<jsonrpcpp::Notification>(entity);
|
||||
notification_handler_(*notification);
|
||||
}
|
||||
else if (entity->is_request())
|
||||
{
|
||||
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
|
||||
request_handler_(*request);
|
||||
}
|
||||
else if (entity->is_response())
|
||||
{
|
||||
jsonrpcpp::response_ptr response = dynamic_pointer_cast<jsonrpcpp::Response>(entity);
|
||||
LOG(INFO, LOG_TAG) << "Response: " << response->to_json() << ", id: " << response->id() << "\n";
|
||||
auto iter = request_callbacks_.find(response->id());
|
||||
if (iter != request_callbacks_.end())
|
||||
{
|
||||
iter->second(*response);
|
||||
request_callbacks_.erase(iter);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response->id() << "\n";
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARNING, LOG_TAG) << "Not handling message: " << line << "\n";
|
||||
}
|
||||
}
|
||||
catch (const jsonrpcpp::ParseErrorException& e)
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
|
||||
}
|
||||
|
||||
streambuf_stdout_.consume(bytes_transferred);
|
||||
stdoutReadLine();
|
||||
});
|
||||
|
@ -223,60 +254,51 @@ std::string PcmStream::getCodec() const
|
|||
}
|
||||
|
||||
|
||||
void PcmStream::onControlMsg(const std::string& msg)
|
||||
void PcmStream::onControlRequest(const jsonrpcpp::Request& request)
|
||||
{
|
||||
LOG(DEBUG, LOG_TAG) << "Received: " << msg << "\n";
|
||||
LOG(INFO, LOG_TAG) << "Request: " << request.method() << ", id: " << request.id() << ", params: " << request.params().to_json() << "\n";
|
||||
}
|
||||
|
||||
jsonrpcpp::entity_ptr entity(nullptr);
|
||||
try
|
||||
{
|
||||
entity = jsonrpcpp::Parser::do_parse(msg);
|
||||
if (!entity)
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Failed to parse message\n";
|
||||
return;
|
||||
}
|
||||
}
|
||||
catch (const jsonrpcpp::ParseErrorException& e)
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
|
||||
return;
|
||||
// return e.to_json().dump();
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
|
||||
return;
|
||||
// return jsonrpcpp::ParseErrorException(e.what()).to_json().dump();
|
||||
}
|
||||
|
||||
if (entity->is_notification())
|
||||
void PcmStream::onControlNotification(const jsonrpcpp::Notification& notification)
|
||||
{
|
||||
LOG(INFO, LOG_TAG) << "Notification method: " << notification.method() << ", params: " << notification.params().to_json() << "\n";
|
||||
if (notification.method() == "Player.Metadata")
|
||||
{
|
||||
jsonrpcpp::notification_ptr notification = dynamic_pointer_cast<jsonrpcpp::Notification>(entity);
|
||||
LOG(INFO, LOG_TAG) << "Notification method: " << notification->method() << ", params: " << notification->params().to_json() << "\n";
|
||||
if (notification->method() == "Player.Metadata")
|
||||
{
|
||||
LOG(DEBUG, LOG_TAG) << "Received metadata notification\n";
|
||||
setMeta(notification->params().to_json());
|
||||
}
|
||||
else if (notification->method() == "Player.Properties")
|
||||
{
|
||||
LOG(DEBUG, LOG_TAG) << "Received properties notification\n";
|
||||
setProperties(notification->params().to_json());
|
||||
}
|
||||
else
|
||||
LOG(WARNING, LOG_TAG) << "Received unknown notification method: '" << notification->method() << "'\n";
|
||||
LOG(DEBUG, LOG_TAG) << "Received metadata notification\n";
|
||||
setMeta(notification.params().to_json());
|
||||
}
|
||||
else if (entity->is_request())
|
||||
else if (notification.method() == "Player.Properties")
|
||||
{
|
||||
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
|
||||
LOG(INFO, LOG_TAG) << "Request: " << request->method() << ", id: " << request->id() << ", params: " << request->params().to_json() << "\n";
|
||||
}
|
||||
else if (entity->is_response())
|
||||
{
|
||||
jsonrpcpp::response_ptr response = dynamic_pointer_cast<jsonrpcpp::Response>(entity);
|
||||
LOG(INFO, LOG_TAG) << "Response: " << response->result().dump() << ", id: " << response->id() << "\n";
|
||||
LOG(DEBUG, LOG_TAG) << "Received properties notification\n";
|
||||
setProperties(notification.params().to_json());
|
||||
}
|
||||
else
|
||||
LOG(WARNING, LOG_TAG) << "Received unknown notification method: '" << notification.method() << "'\n";
|
||||
}
|
||||
|
||||
|
||||
void PcmStream::onControlLog(std::string line)
|
||||
{
|
||||
if (line.back() == '\r')
|
||||
line.resize(line.size() - 1);
|
||||
if (line.empty())
|
||||
return;
|
||||
auto tmp = utils::string::tolower_copy(line);
|
||||
AixLog::Severity severity = AixLog::Severity::info;
|
||||
if (tmp.find(" trace") != string::npos)
|
||||
severity = AixLog::Severity::trace;
|
||||
else if (tmp.find(" debug") != string::npos)
|
||||
severity = AixLog::Severity::debug;
|
||||
else if (tmp.find(" info") != string::npos)
|
||||
severity = AixLog::Severity::info;
|
||||
else if (tmp.find(" warning") != string::npos)
|
||||
severity = AixLog::Severity::warning;
|
||||
else if (tmp.find(" error") != string::npos)
|
||||
severity = AixLog::Severity::error;
|
||||
else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos))
|
||||
severity = AixLog::Severity::fatal;
|
||||
LOG(severity, SCRIPT_LOG_TAG) << "Stream: " << getId() << ", message: " << line << "\n";
|
||||
}
|
||||
|
||||
|
||||
|
@ -289,7 +311,9 @@ void PcmStream::start()
|
|||
active_ = true;
|
||||
|
||||
if (ctrl_script_)
|
||||
ctrl_script_->start(getId(), server_settings_, [this](const std::string& msg) { onControlMsg(msg); });
|
||||
ctrl_script_->start(
|
||||
getId(), server_settings_, [this](const jsonrpcpp::Notification& notification) { onControlNotification(notification); },
|
||||
[this](const jsonrpcpp::Request& request) { onControlRequest(request); }, [this](std::string message) { onControlLog(std::move(message)); });
|
||||
}
|
||||
|
||||
|
||||
|
@ -399,54 +423,70 @@ std::shared_ptr<Properties> PcmStream::getProperties() const
|
|||
}
|
||||
|
||||
|
||||
void PcmStream::setProperty(const std::string& name, const json& value)
|
||||
void PcmStream::setProperty(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler)
|
||||
{
|
||||
auto name = request.params().get("property");
|
||||
auto value = request.params().get("value");
|
||||
LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' set property: " << name << " = " << value << "\n";
|
||||
// TODO: check validity
|
||||
bool valid = true;
|
||||
if (name == "loopStatus")
|
||||
;
|
||||
{
|
||||
auto val = value.get<std::string>();
|
||||
valid = ((val == "none") || (val == "track") || (val == "playlist"));
|
||||
}
|
||||
else if (name == "shuffle")
|
||||
;
|
||||
{
|
||||
valid = value.is_boolean();
|
||||
}
|
||||
else if (name == "volume")
|
||||
;
|
||||
{
|
||||
valid = value.is_number_integer();
|
||||
}
|
||||
else if (name == "rate")
|
||||
;
|
||||
{
|
||||
valid = value.is_number_float();
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(ERROR, LOG_TAG) << "Property not supported: " << name << "\n";
|
||||
valid = false;
|
||||
}
|
||||
|
||||
if (!valid)
|
||||
{
|
||||
auto error = jsonrpcpp::InvalidParamsException(request);
|
||||
response_handler(error.to_json());
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: queue commands, send next on timeout or after reception of the last command's response
|
||||
if (ctrl_script_)
|
||||
{
|
||||
jsonrpcpp::Request request(++req_id_, "Player.SetProperty", {name, value});
|
||||
ctrl_script_->send(request.to_json().dump() + "\n"); //, params);
|
||||
}
|
||||
else // TODO: Will the ctr_script always loop back the new properties?
|
||||
{
|
||||
// properties_ = std::make_shared<Properties>(props);
|
||||
// // Trigger a stream update
|
||||
// for (auto* listener : pcmListeners_)
|
||||
// {
|
||||
// if (listener != nullptr)
|
||||
// listener->onPropertiesChanged(this);
|
||||
// }
|
||||
jsonrpcpp::Request req(++req_id_, "Player.SetProperty", {name, value});
|
||||
ctrl_script_->send(req, response_handler);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void PcmStream::control(const std::string& command, const json& params)
|
||||
void PcmStream::control(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler)
|
||||
{
|
||||
LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << params << "'\n";
|
||||
for (const auto& it : params.items())
|
||||
|
||||
std::string command = request.params().get("command");
|
||||
static std::set<std::string> supported_commands{"Next", "Previous", "Pause", "PlayPause", "Stop", "Play", "Seek", "SetPosition"};
|
||||
if ((supported_commands.find(command) == supported_commands.end()) ||
|
||||
((command == "SetPosition") &&
|
||||
(!request.params().has("params") || !request.params().get("params").contains("Position") || !request.params().get("params").contains("TrackId"))) ||
|
||||
((command == "Seek") && (!request.params().has("params") || !request.params().get("params").contains("Offset"))))
|
||||
{
|
||||
LOG(INFO, LOG_TAG) << "Stream " << getId() << " key: '" << it.key() << "', param: '" << it.value() << "'\n";
|
||||
auto error = jsonrpcpp::InvalidParamsException(request);
|
||||
response_handler(error.to_json());
|
||||
return;
|
||||
}
|
||||
// TODO: queue commands, send next on timeout or after reception of the last command's response
|
||||
|
||||
LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << request.params().to_json() << "'\n";
|
||||
if (ctrl_script_)
|
||||
{
|
||||
jsonrpcpp::Request request(++req_id_, "Player." + command, params);
|
||||
ctrl_script_->send(request.to_json().dump() + "\n"); //, params);
|
||||
jsonrpcpp::Request req(++req_id_, "Player." + command, request.params().has("params") ? request.params().get("params") : json{});
|
||||
ctrl_script_->send(req, response_handler);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@
|
|||
#include "encoder/encoder.hpp"
|
||||
#include "message/codec_header.hpp"
|
||||
// #include "message/stream_tags.hpp"
|
||||
#include "jsonrpcpp.hpp"
|
||||
#include "server_settings.hpp"
|
||||
#include "stream_uri.hpp"
|
||||
|
||||
|
@ -115,20 +116,23 @@ public:
|
|||
class CtrlScript
|
||||
{
|
||||
public:
|
||||
using OnReceive = std::function<void(std::string msg)>;
|
||||
using OnRequest = std::function<void(const jsonrpcpp::Request& response)>;
|
||||
using OnNotification = std::function<void(const jsonrpcpp::Notification& response)>;
|
||||
using OnResponse = std::function<void(const jsonrpcpp::Response& response)>;
|
||||
using OnLog = std::function<void(std::string message)>;
|
||||
|
||||
CtrlScript(boost::asio::io_context& ioc, const std::string& script);
|
||||
virtual ~CtrlScript();
|
||||
|
||||
void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler);
|
||||
void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler,
|
||||
const OnRequest& request_handler, const OnLog& log_handler);
|
||||
void stop();
|
||||
/// Send a message to stdin of the process
|
||||
void send(const std::string& msg);
|
||||
void send(const jsonrpcpp::Request& request, const OnResponse& response_handler);
|
||||
|
||||
private:
|
||||
void stderrReadLine();
|
||||
void stdoutReadLine();
|
||||
void logScript(std::string line);
|
||||
|
||||
bp::child process_;
|
||||
bp::pipe pipe_stdout_;
|
||||
|
@ -137,11 +141,15 @@ private:
|
|||
std::unique_ptr<boost::asio::posix::stream_descriptor> stream_stderr_;
|
||||
boost::asio::streambuf streambuf_stdout_;
|
||||
boost::asio::streambuf streambuf_stderr_;
|
||||
OnReceive receive_handler_;
|
||||
OnRequest request_handler_;
|
||||
OnNotification notification_handler_;
|
||||
OnLog log_handler_;
|
||||
|
||||
boost::asio::io_context& ioc_;
|
||||
std::string script_;
|
||||
bp::opstream in_;
|
||||
|
||||
std::map<jsonrpcpp::Id, OnResponse> request_callbacks_;
|
||||
};
|
||||
|
||||
|
||||
|
@ -170,11 +178,10 @@ public:
|
|||
virtual std::string getCodec() const;
|
||||
|
||||
std::shared_ptr<Metatags> getMeta() const;
|
||||
|
||||
std::shared_ptr<Properties> getProperties() const;
|
||||
void setProperty(const std::string& name, const json& value);
|
||||
|
||||
virtual void control(const std::string& command, const json& params);
|
||||
virtual void setProperty(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler);
|
||||
virtual void control(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler);
|
||||
|
||||
virtual ReaderState getState() const;
|
||||
virtual json toJson() const;
|
||||
|
@ -184,7 +191,10 @@ public:
|
|||
protected:
|
||||
std::atomic<bool> active_;
|
||||
|
||||
void onControlMsg(const std::string& msg);
|
||||
void onControlRequest(const jsonrpcpp::Request& request);
|
||||
void onControlNotification(const jsonrpcpp::Notification& notification);
|
||||
void onControlLog(std::string line);
|
||||
|
||||
void setState(ReaderState newState);
|
||||
void chunkRead(const msg::PcmChunk& chunk);
|
||||
void resync(const std::chrono::nanoseconds& duration);
|
||||
|
|
Loading…
Add table
Reference in a new issue