Communicate via stdin/stdout with ctrl script

This commit is contained in:
badaix 2021-05-26 23:57:59 +02:00
parent 91ea368121
commit 6e6b63ec26
5 changed files with 226 additions and 82 deletions

View file

@ -24,7 +24,6 @@
# - python-mpd2
# - musicbrainzngs
from configparser import ConfigParser
import os
import sys
import socket
@ -34,9 +33,9 @@ from dbus.mainloop.glib import DBusGMainLoop
import logging
import gettext
import time
import base64
import json
import musicbrainzngs
import requests
import fcntl
__version__ = "@version@"
__git_version__ = "@gitversion@"
@ -131,6 +130,11 @@ urlhandlers = ['http://']
downloaded_covers = ['~/.covers/%s-%s.jpg']
def send(json_msg):
print(json.dumps(json_msg))
sys.stdout.flush()
class MPDWrapper(object):
""" Wrapper of mpd.MPDClient to handle socket
errors and similar
@ -181,6 +185,7 @@ class MPDWrapper(object):
self._idling = False
self._can_idle = False
self._can_single = False
self._buffer = ''
self.client.connect(
self._params['mpd-host'], self._params['mpd-port'])
@ -229,6 +234,13 @@ class MPDWrapper(object):
self._watch_id = GLib.io_add_watch(self,
GLib.IO_IN | GLib.IO_HUP,
self.socket_callback)
flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL)
flags |= os.O_NONBLOCK
fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags)
GLib.io_add_watch(sys.stdin, GLib.IO_IN |
GLib.IO_HUP, self.io_callback)
# Reset error counter
self._errors = 0
@ -312,6 +324,67 @@ class MPDWrapper(object):
self.idle_enter()
return True
def control(self, cmd):
try:
request = json.loads(cmd)
cmd = request['method']
id = request['id']
success = True
if cmd == 'Next':
self.next()
elif cmd == 'Previous':
self.previous()
elif cmd == 'Play':
self.play()
elif cmd == 'Pause':
self.pause(1)
elif cmd == 'PlayPause':
if self.status()['state'] == 'play':
self.pause(1)
else:
self.play()
elif cmd == 'Stop':
self.stop()
elif cmd == 'SetPosition':
trackid = request['params']['TrackId']
trackid = trackid.rsplit('/', 1)[1]
position = request['params']['Position']
position = int(position) / 1000000
self.seekid(int(trackid), position)
elif cmd == 'Seek':
offset = request['params']['Offset']
offset = int(offset) / 1000000
strOffset = str(offset)
if offset >= 0:
strOffset = "+" + strOffset
self.seekcur(strOffset)
else:
send({"jsonrpc": "2.0", "error": {"code": -32601,
"message": "Method not found"}, "id": id})
success = False
if success:
send({"jsonrpc": "2.0", "result": "ok", "id": id})
except:
send({"jsonrpc": "2.0", "error": {
"code": -32700, "message": "Parse error"}, "id": id})
def io_callback(self, fd, event):
logger.debug("IO event %r on fd %r" % (event, fd))
if event & GLib.IO_HUP:
logger.debug("IO_HUP")
return True
elif event & GLib.IO_IN:
chunk = fd.read()
for char in chunk:
if char == '\n':
logger.info(f'Received: {self._buffer}')
self.control(self._buffer)
self._buffer = ''
else:
self._buffer += char
return True
def socket_callback(self, fd, event):
try:
logger.debug("Socket event %r on fd %r" % (event, fd))
@ -389,6 +462,9 @@ class MPDWrapper(object):
snapmeta['artist'] = [fields[0]]
snapmeta['title'] = fields[1]
send({"jsonrpc": "2.0", "method": "Stream.OnMetadata", "params": snapmeta})
snapmeta['artUrl'] = 'http://127.0.0.1:1780/launcher-icon.png'
album_key = 'musicbrainzAlbumId'
try:
if not album_key in snapmeta:
@ -424,8 +500,7 @@ class MPDWrapper(object):
f'Error while getting cover for {snapmeta[album_key]}: {e}')
logger.info(f'Snapmeta: {snapmeta}')
requests.post(f'http://{params["snapcast-host"]}:{params["snapcast-port"]}/jsonrpc', json={
"id": 4, "jsonrpc": "2.0", "method": "Stream.SetMeta", "params": {"id": params['stream'], "meta": snapmeta}})
send({"jsonrpc": "2.0", "method": "Stream.OnMetadata", "params": snapmeta})
def _update_properties(self, force=False):
old_status = self._status
@ -607,7 +682,7 @@ if __name__ == '__main__':
# Parse command line
try:
(opts, args) = getopt.getopt(sys.argv[1:], 'hdjv',
['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'command=', 'debug', 'use-journal', 'version'])
['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'debug', 'use-journal', 'version'])
except getopt.GetoptError as ex:
(msg, opt) = ex.args
print("%s: %s" % (sys.argv[0], msg), file=sys.stderr)
@ -629,8 +704,6 @@ if __name__ == '__main__':
params['snapcast-port'] = int(arg)
elif opt in ['--stream']:
params['stream'] = arg
elif opt in ['--command']:
params['command'] = arg
elif opt in ['-d', '--debug']:
log_level = logging.DEBUG
elif opt in ['-j', '--use-journal']:
@ -677,41 +750,6 @@ if __name__ == '__main__':
logger.debug(f'Parameters: {params}')
if 'command' in params:
try:
cmd = params['command']
if cmd not in ['next', 'previous', 'play', 'pause', 'playpause', 'stop']:
logger.error(f'Command not supported: {cmd}')
sys.exit(1)
client = mpd.MPDClient()
client.connect(params['mpd-host'], params['mpd-port'])
if params['mpd-password']:
client.password(params['mpd-password'])
if cmd == 'next':
client.next()
elif cmd == 'previous':
client.previous()
elif cmd == 'play':
client.play()
elif cmd == 'pause':
client.pause(1)
elif cmd == 'playpause':
if client.status()['state'] == 'play':
client.pause(1)
else:
client.play()
elif cmd == 'stop':
client.stop()
client.close()
client.disconnect()
except mpd.CommandError as e:
logger.error(e)
sys.exit(1)
sys.exit(0)
# Set up the main loop
if using_gi_glib:
logger.debug('Using GObject-Introspection main loop.')

View file

@ -308,6 +308,7 @@ class MPDWrapper(object):
self._metadata = {}
self._position = 0
self._time = 0
self._req_id = 0
self._bus = dbus.SessionBus()
if self._params['mmkeys']:
@ -458,10 +459,14 @@ class MPDWrapper(object):
# def last_currentsong(self):
# return self._currentsong.copy()
def control(self, command): # , param = ""):
logger.info(f'Control: {command}')
requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={
"id": 1, "jsonrpc": "2.0", "method": "Stream.Control", "params": {"id": "Pipe", "command": command}})
def control(self, command, params={}):
j = {"id": self._req_id, "jsonrpc": "2.0", "method": "Stream.Control",
"params": {"id": "Pipe", "command": command, "params": params}}
logger.info(f'Control: {command}, json: {j}')
url = f'http://{self._params["host"]}:{self._params["port"]}/jsonrpc'
logger.info(f'url: {url}')
self._req_id += 1
requests.post(url, json=j)
@property
def metadata(self):
@ -626,6 +631,7 @@ class MPDWrapper(object):
class NotifyWrapper(object):
def __init__(self, params):
self._last_notification = None
self._notification = None
self._enabled = True
@ -673,6 +679,11 @@ class NotifyWrapper(object):
if not self._enabled:
return
if self._last_notification == [title, body, uri]:
return
self._last_notification = [title, body, uri]
# If we did not yet manage to get a notification service,
# try again
if not self._notification:
@ -909,37 +920,38 @@ class MPRISInterface(dbus.service.Object):
# Player methods
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Next(self):
snapcast_wrapper.control("next")
snapcast_wrapper.control("Next")
return
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Previous(self):
snapcast_wrapper.control("previous")
snapcast_wrapper.control("Previous")
return
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Pause(self):
snapcast_wrapper.control("pause")
snapcast_wrapper.control("Pause")
return
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def PlayPause(self):
snapcast_wrapper.control("playpause")
snapcast_wrapper.control("PlayPause")
return
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Stop(self):
snapcast_wrapper.control("stop")
snapcast_wrapper.control("Stop")
return
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Play(self):
snapcast_wrapper.control("play")
snapcast_wrapper.control("Play")
return
@ dbus.service.method(__player_interface, in_signature='x', out_signature='')
def Seek(self, offset):
logger.info(f'Seek {offset}')
snapcast_wrapper.control("Seek", {"Offset": offset})
# status = mpd_wrapper.status()
# current, end = status['time'].split(':')
# current = int(current)
@ -955,7 +967,11 @@ class MPRISInterface(dbus.service.Object):
@ dbus.service.method(__player_interface, in_signature='ox', out_signature='')
def SetPosition(self, trackid, position):
logger.info(f'SetPosition trackid: {trackid}, position: {position}')
logger.info(f'SetPosition TrackId: {trackid}, Position: {position}')
snapcast_wrapper.control(
"SetPosition", {"TrackId": trackid, "Position": position})
self.Seeked(position)
# song = mpd_wrapper.last_currentsong()
# # FIXME: use real dbus objects
# if str(trackid) != '/org/mpris/MediaPlayer2/Track/%s' % song['id']:
@ -1018,6 +1034,9 @@ Report bugs to https://github.com/eonpatapon/mpDris2/issues""" % params)
if __name__ == '__main__':
DBusGMainLoop(set_as_default=True)
# TODO:
# -cleanup: remove mpd-ish stuff
# -stream id: keep track of the client's stream
gettext.bindtextdomain('mpDris2', '@datadir@/locale')
gettext.textdomain('mpDris2')

View file

@ -438,7 +438,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
// clang-format on
LOG(INFO, LOG_TAG) << "Stream.Control id: " << request->params().get<std::string>("id") << ", command: " << request->params().get("command")
<< "\n";
<< ", params: " << (request->params().has("params") ? request->params().get("params") : "") << "\n";
// Find stream
string streamId = request->params().get<std::string>("id");
@ -447,7 +447,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
// Set metadata from request
stream->control(request->params().get("command"), request->params().has("param") ? request->params().get("param") : "");
stream->control(request->params().get("command"), request->params().has("params") ? request->params().get("params") : json{});
// Setup response
result["id"] = streamId;

View file

@ -25,6 +25,7 @@
#include "common/str_compat.hpp"
#include "common/utils/string_utils.hpp"
#include "encoder/encoder_factory.hpp"
#include "jsonrpcpp.hpp"
#include "pcm_stream.hpp"
@ -48,36 +49,42 @@ CtrlScript::~CtrlScript()
}
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command, const std::string& param)
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler)
{
receive_handler_ = receive_handler;
pipe_stderr_ = bp::pipe();
pipe_stdout_ = bp::pipe();
stringstream params;
params << " \"--stream=" + stream_id + "\"";
if (server_setttings.http.enabled)
params << " --snapcast-port=" << server_setttings.http.port;
if (!command.empty())
params << " --command=" << command;
if (!param.empty())
params << " --param=" << param;
process_ = bp::child(
script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_,
bp::on_exit = [](int exit,
const std::error_code& ec_in) { LOG(INFO, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; },
script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, bp::std_in < in_,
bp::on_exit =
[](int exit, const std::error_code& ec_in) {
auto severity = AixLog::Severity::debug;
if (exit != 0)
severity = AixLog::Severity::error;
LOG(severity, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n";
},
ioc_);
stream_stdout_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stdout_.native_source());
stream_stderr_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stderr_.native_source());
stderrReadLine();
stdoutReadLine();
stderrReadLine();
}
void CtrlScript::send(const std::string& msg)
{
in_.write(msg.data(), msg.size());
in_.flush();
}
void CtrlScript::logScript(const std::string& source, std::string line)
void CtrlScript::logScript(std::string line)
{
if (line.empty())
return;
std::ignore = source;
if (line.back() == '\r')
line.resize(line.size() - 1);
auto tmp = utils::string::tolower_copy(line);
@ -104,13 +111,13 @@ void CtrlScript::stderrReadLine()
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
LOG(ERROR, SCRIPT_LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
logScript("stderr", std::move(line));
logScript(std::move(line));
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
@ -123,13 +130,13 @@ void CtrlScript::stdoutReadLine()
boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";
LOG(ERROR, SCRIPT_LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
logScript("stdout", std::move(line));
receive_handler_(std::move(line));
streambuf_stdout_.consume(bytes_transferred);
stdoutReadLine();
});
@ -147,7 +154,7 @@ void CtrlScript::stop()
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), req_id_(0)
{
encoder::EncoderFactory encoderFactory;
if (uri_.query.find(kUriCodec) == uri_.query.end())
@ -166,7 +173,6 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
if (uri_.query.find(kControlScript) != uri_.query.end())
{
ctrl_script_ = std::make_unique<CtrlScript>(ioc, uri_.query[kControlScript]);
command_script_ = std::make_unique<CtrlScript>(ioc, uri_.query[kControlScript]);
}
if (uri_.query.find(kUriChunkMs) != uri_.query.end())
@ -218,6 +224,74 @@ std::string PcmStream::getCodec() const
}
void PcmStream::onControlMsg(const std::string& msg)
{
LOG(DEBUG, LOG_TAG) << "Received: " << msg << "\n";
jsonrpcpp::entity_ptr entity(nullptr);
try
{
entity = jsonrpcpp::Parser::do_parse(msg);
if (!entity)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message\n";
return;
}
}
catch (const jsonrpcpp::ParseErrorException& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
return;
// return e.to_json().dump();
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
return;
// return jsonrpcpp::ParseErrorException(e.what()).to_json().dump();
}
if (entity->is_notification())
{
jsonrpcpp::notification_ptr notification = dynamic_pointer_cast<jsonrpcpp::Notification>(entity);
LOG(INFO, LOG_TAG) << "Notification method: " << notification->method() << ", params: " << notification->params().to_json() << "\n";
if (notification->method() == "Stream.OnMetadata")
{
setMeta(notification->params().to_json());
}
}
else if (entity->is_request())
{
LOG(INFO, LOG_TAG) << "Request\n";
// jsonrpcpp::entity_ptr response(nullptr);
// jsonrpcpp::notification_ptr notification(nullptr);
// jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
// processRequest(request, response, notification);
// saveConfig();
// ////cout << "Request: " << request->to_json().dump() << "\n";
// if (notification)
// {
// ////cout << "Notification: " << notification->to_json().dump() << "\n";
// controlServer_->send(notification->to_json().dump(), controlSession);
// }
// if (response)
// {
// ////cout << "Response: " << response->to_json().dump() << "\n";
// return response->to_json().dump();
// }
// return "";
}
else if (entity->is_response())
{
jsonrpcpp::response_ptr response = dynamic_pointer_cast<jsonrpcpp::Response>(entity);
LOG(INFO, LOG_TAG) << "Response: " << response->result().dump() << ", id: " << response->id() << "\n";
}
// json j = json::parse(msg);
// setMeta(j["params"]["meta"]);
}
void PcmStream::start()
{
LOG(DEBUG, LOG_TAG) << "Start: " << name_ << ", type: " << uri_.scheme << ", sampleformat: " << sampleFormat_.toString() << ", codec: " << getCodec()
@ -227,7 +301,7 @@ void PcmStream::start()
active_ = true;
if (ctrl_script_)
ctrl_script_->start(getId(), server_settings_);
ctrl_script_->start(getId(), server_settings_, [this](const std::string& msg) { onControlMsg(msg); });
}
@ -329,11 +403,16 @@ std::shared_ptr<msg::StreamTags> PcmStream::getMeta() const
}
void PcmStream::control(const std::string& command, const std::string& param)
void PcmStream::control(const std::string& command, const json& params)
{
LOG(INFO, LOG_TAG) << "Stream " << getId() << " control: '" << command << "', param: '" << param << "'\n";
if (command_script_)
command_script_->start(getId(), server_settings_, command, param);
LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << params << "'\n";
for (const auto& it : params.items())
{
LOG(INFO, LOG_TAG) << "Stream " << getId() << " key: '" << it.key() << "', param: '" << it.value() << "'\n";
}
jsonrpcpp::Request request(++req_id_, command, params);
if (ctrl_script_)
ctrl_script_->send(request.to_json().dump() + "\n"); //, params);
}

View file

@ -45,6 +45,7 @@
namespace bp = boost::process;
using json = nlohmann::json;
namespace streamreader
@ -111,16 +112,20 @@ public:
class CtrlScript
{
public:
using OnReceive = std::function<void(std::string msg)>;
CtrlScript(boost::asio::io_context& ioc, const std::string& script);
virtual ~CtrlScript();
void start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command = "", const std::string& param = "");
void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler);
void stop();
/// Send a message to stdin of the process
void send(const std::string& msg);
private:
void stderrReadLine();
void stdoutReadLine();
void logScript(const std::string& source, std::string line);
void logScript(std::string line);
bp::child process_;
bp::pipe pipe_stdout_;
@ -129,9 +134,11 @@ private:
std::unique_ptr<boost::asio::posix::stream_descriptor> stream_stderr_;
boost::asio::streambuf streambuf_stdout_;
boost::asio::streambuf streambuf_stderr_;
OnReceive receive_handler_;
boost::asio::io_context& ioc_;
std::string script_;
bp::opstream in_;
};
@ -162,7 +169,7 @@ public:
std::shared_ptr<msg::StreamTags> getMeta() const;
void setMeta(const json& j);
void control(const std::string& command, const std::string& param);
virtual void control(const std::string& command, const json& params);
virtual ReaderState getState() const;
virtual json toJson() const;
@ -172,6 +179,7 @@ public:
protected:
std::atomic<bool> active_;
void onControlMsg(const std::string& msg);
void setState(ReaderState newState);
void chunkRead(const msg::PcmChunk& chunk);
void resync(const std::chrono::nanoseconds& duration);
@ -189,7 +197,7 @@ protected:
boost::asio::io_context& ioc_;
ServerSettings server_settings_;
std::unique_ptr<CtrlScript> ctrl_script_;
std::unique_ptr<CtrlScript> command_script_;
size_t req_id_;
};
} // namespace streamreader