Move StreamCtrl into separate file

This commit is contained in:
badaix 2021-06-22 19:49:06 +02:00
parent f817602a50
commit 1f51befbad
5 changed files with 405 additions and 223 deletions

View file

@ -34,154 +34,6 @@ namespace streamreader
{
static constexpr auto LOG_TAG = "PcmStream";
static constexpr auto SCRIPT_LOG_TAG = "Script";
CtrlScript::CtrlScript(boost::asio::io_context& ioc, const std::string& script) : ioc_(ioc), script_(script)
{
}
CtrlScript::~CtrlScript()
{
stop();
}
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnNotification& notification_handler,
const OnRequest& request_handler, const OnLog& log_handler)
{
notification_handler_ = notification_handler;
request_handler_ = request_handler;
log_handler_ = log_handler;
pipe_stderr_ = bp::pipe();
pipe_stdout_ = bp::pipe();
stringstream params;
params << " \"--stream=" + stream_id + "\"";
if (server_setttings.http.enabled)
params << " --snapcast-port=" << server_setttings.http.port;
process_ = bp::child(
script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, bp::std_in < in_,
bp::on_exit =
[](int exit, const std::error_code& ec_in) {
auto severity = AixLog::Severity::debug;
if (exit != 0)
severity = AixLog::Severity::error;
LOG(severity, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n";
},
ioc_);
stream_stdout_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stdout_.native_source());
stream_stderr_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stderr_.native_source());
stdoutReadLine();
stderrReadLine();
}
void CtrlScript::send(const jsonrpcpp::Request& request, const OnResponse& response_handler)
{
if (response_handler)
request_callbacks_[request.id()] = response_handler;
std::string msg = request.to_json().dump() + "\n";
LOG(INFO, SCRIPT_LOG_TAG) << "Sending request: " << msg;
in_.write(msg.data(), msg.size());
in_.flush();
}
void CtrlScript::stderrReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, SCRIPT_LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
log_handler_(std::move(line));
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
}
void CtrlScript::stdoutReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, SCRIPT_LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
jsonrpcpp::entity_ptr entity(nullptr);
try
{
entity = jsonrpcpp::Parser::do_parse(line);
if (!entity)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message\n";
}
if (entity->is_notification())
{
jsonrpcpp::notification_ptr notification = dynamic_pointer_cast<jsonrpcpp::Notification>(entity);
notification_handler_(*notification);
}
else if (entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
request_handler_(*request);
}
else if (entity->is_response())
{
jsonrpcpp::response_ptr response = dynamic_pointer_cast<jsonrpcpp::Response>(entity);
LOG(INFO, LOG_TAG) << "Response: " << response->to_json() << ", id: " << response->id() << "\n";
auto iter = request_callbacks_.find(response->id());
if (iter != request_callbacks_.end())
{
iter->second(*response);
request_callbacks_.erase(iter);
}
else
{
LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response->id() << "\n";
}
}
else
{
LOG(WARNING, LOG_TAG) << "Not handling message: " << line << "\n";
}
}
catch (const jsonrpcpp::ParseErrorException& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
}
streambuf_stdout_.consume(bytes_transferred);
stdoutReadLine();
});
}
void CtrlScript::stop()
{
if (process_.running())
{
::kill(-process_.native_handle(), SIGINT);
}
}
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
@ -203,7 +55,7 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
if (uri_.query.find(kControlScript) != uri_.query.end())
{
ctrl_script_ = std::make_unique<CtrlScript>(ioc, uri_.query[kControlScript]);
stream_ctrl_ = std::make_unique<ScriptStreamControl>(ioc, uri_.query[kControlScript]);
}
if (uri_.query.find(kUriChunkMs) != uri_.query.end())
@ -277,12 +129,12 @@ void PcmStream::onControlNotification(const jsonrpcpp::Notification& notificatio
else if (notification.method() == "Plugin.Stream.Ready")
{
LOG(DEBUG, LOG_TAG) << "Plugin is ready\n";
ctrl_script_->send({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) {
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) {
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0)
setProperties(response.result());
});
ctrl_script_->send({++req_id_, "Plugin.Stream.Player.GetMetadata"}, [this](const jsonrpcpp::Response& response) {
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetMetadata"}, [this](const jsonrpcpp::Response& response) {
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetMetadata: " << response.to_json() << "\n";
if (response.error().code() == 0)
setMetadata(response.result());
@ -324,7 +176,7 @@ void PcmStream::onControlLog(std::string line)
severity = AixLog::Severity::error;
else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos))
severity = AixLog::Severity::fatal;
LOG(severity, SCRIPT_LOG_TAG) << "Stream: " << getId() << ", message: " << line << "\n";
LOG(severity, LOG_TAG) << "Stream: " << getId() << ", message: " << line << "\n";
}
@ -336,9 +188,9 @@ void PcmStream::start()
sampleFormat_);
active_ = true;
if (ctrl_script_)
if (stream_ctrl_)
{
ctrl_script_->start(
stream_ctrl_->start(
getId(), server_settings_, [this](const jsonrpcpp::Notification& notification) { onControlNotification(notification); },
[this](const jsonrpcpp::Request& request) { onControlRequest(request); }, [this](std::string message) { onControlLog(std::move(message)); });
}
@ -449,7 +301,7 @@ const Properties& PcmStream::getProperties() const
}
void PcmStream::setProperty(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler)
void PcmStream::setProperty(const jsonrpcpp::Request& request, const StreamControl::OnResponse& response_handler)
{
try
{
@ -488,10 +340,10 @@ void PcmStream::setProperty(const jsonrpcpp::Request& request, const CtrlScript:
if (!properties_.can_control)
throw SnapException("CanControl is false");
if (ctrl_script_)
if (stream_ctrl_)
{
jsonrpcpp::Request req(++req_id_, "Plugin.Stream.Player.SetProperty", {name, value});
ctrl_script_->send(req, response_handler);
stream_ctrl_->command(req, response_handler);
}
}
catch (const std::exception& e)
@ -503,7 +355,7 @@ void PcmStream::setProperty(const jsonrpcpp::Request& request, const CtrlScript:
}
void PcmStream::control(const jsonrpcpp::Request& request, const CtrlScript::OnResponse& response_handler)
void PcmStream::control(const jsonrpcpp::Request& request, const StreamControl::OnResponse& response_handler)
{
try
{
@ -511,27 +363,56 @@ void PcmStream::control(const jsonrpcpp::Request& request, const CtrlScript::OnR
throw SnapException("Parameter 'command' is missing");
std::string command = request.params().get("command");
static std::set<std::string> supported_commands{"Next", "Previous", "Pause", "PlayPause", "Stop", "Play", "Seek", "SetPosition"};
if (supported_commands.find(command) == supported_commands.end())
if (command == "SetPosition")
{
if (!request.params().has("params") || !request.params().get("params").contains("Position") || !request.params().get("params").contains("TrackId"))
throw SnapException("SetPosition requires parameters 'Position' and 'TrackId'");
if (!properties_.can_seek)
throw SnapException("CanSeek is false");
}
else if (command == "Seek")
{
if (!request.params().has("params") || !request.params().get("params").contains("Offset"))
throw SnapException("Seek requires parameter 'Offset'");
if (!properties_.can_seek)
throw SnapException("CanSeek is false");
}
else if (command == "Next")
{
if (!properties_.can_go_next)
throw SnapException("CanGoNext is false");
}
else if (command == "Previous")
{
if (!properties_.can_go_previous)
throw SnapException("CanGoPrevious is false");
}
else if ((command == "Pause") || (command == "PlayPause"))
{
if (!properties_.can_pause)
throw SnapException("CanPause is false");
}
else if (command == "Stop")
{
if (!properties_.can_control)
throw SnapException("CanControl is false");
}
else if (command == "Play")
{
if (!properties_.can_play)
throw SnapException("CanPlay is false");
}
else
throw SnapException("Command not supported");
if (((command == "SetPosition") || (command == "Seek")) && !request.params().has("params"))
throw SnapException("Parameter 'params' is missing");
if ((command == "SetPosition") && (!request.params().get("params").contains("Position") || !request.params().get("params").contains("TrackId")))
throw SnapException("SetPosition requires parameters 'Position' and 'TrackId'");
if ((command == "Seek") && !request.params().get("params").contains("Offset"))
throw SnapException("Seek requires parameter 'Offset'");
LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << request.params().to_json() << "'\n";
if (ctrl_script_)
if (stream_ctrl_)
{
jsonrpcpp::Parameter params{"command", command};
if (request.params().has("params"))
params.add("params", request.params().get("params"));
jsonrpcpp::Request req(++req_id_, "Plugin.Stream.Player.Control", params);
ctrl_script_->send(req, response_handler);
stream_ctrl_->command(req, response_handler);
}
}
catch (const std::exception& e)