Add RPC "Stream.Control" command

This commit is contained in:
badaix 2021-05-24 22:36:16 +02:00
parent 09cde776b1
commit 91ea368121
5 changed files with 163 additions and 90 deletions

View file

@ -584,6 +584,7 @@ Usage: %(progname)s [OPTION]...
--snapcast-host=ADDR Set the mpd server address
--snapcast-port=PORT Set the TCP port
--stream=ID Set the stream id
--command=CMD Issue a command to MPD and exit
-d, --debug Run in debug mode
-j, --use-journal Log to systemd journal instead of stderr
@ -606,7 +607,7 @@ if __name__ == '__main__':
# Parse command line
try:
(opts, args) = getopt.getopt(sys.argv[1:], 'hdjv',
['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'debug', 'use-journal', 'version'])
['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'command=', 'debug', 'use-journal', 'version'])
except getopt.GetoptError as ex:
(msg, opt) = ex.args
print("%s: %s" % (sys.argv[0], msg), file=sys.stderr)
@ -628,6 +629,8 @@ if __name__ == '__main__':
params['snapcast-port'] = int(arg)
elif opt in ['--stream']:
params['stream'] = arg
elif opt in ['--command']:
params['command'] = arg
elif opt in ['-d', '--debug']:
log_level = logging.DEBUG
elif opt in ['-j', '--use-journal']:
@ -674,6 +677,41 @@ if __name__ == '__main__':
logger.debug(f'Parameters: {params}')
if 'command' in params:
try:
cmd = params['command']
if cmd not in ['next', 'previous', 'play', 'pause', 'playpause', 'stop']:
logger.error(f'Command not supported: {cmd}')
sys.exit(1)
client = mpd.MPDClient()
client.connect(params['mpd-host'], params['mpd-port'])
if params['mpd-password']:
client.password(params['mpd-password'])
if cmd == 'next':
client.next()
elif cmd == 'previous':
client.previous()
elif cmd == 'play':
client.play()
elif cmd == 'pause':
client.pause(1)
elif cmd == 'playpause':
if client.status()['state'] == 'play':
client.pause(1)
else:
client.play()
elif cmd == 'stop':
client.stop()
client.close()
client.disconnect()
except mpd.CommandError as e:
logger.error(e)
sys.exit(1)
sys.exit(0)
# Set up the main loop
if using_gi_glib:
logger.debug('Using GObject-Introspection main loop.')

View file

@ -458,6 +458,11 @@ class MPDWrapper(object):
# def last_currentsong(self):
# return self._currentsong.copy()
def control(self, command): # , param = ""):
logger.info(f'Control: {command}')
requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={
"id": 1, "jsonrpc": "2.0", "method": "Stream.Control", "params": {"id": "Pipe", "command": command}})
@property
def metadata(self):
return self._metadata
@ -840,39 +845,37 @@ class MPRISInterface(dbus.service.Object):
"CanControl": (True, None),
}
__tracklist_interface = "org.mpris.MediaPlayer2.TrackList"
__prop_mapping = {
__player_interface: __player_props,
__root_interface: __root_props,
}
@dbus.service.method(__introspect_interface)
@ dbus.service.method(__introspect_interface)
def Introspect(self):
return MPRIS2_INTROSPECTION
@dbus.service.signal(__prop_interface, signature="sa{sv}as")
@ dbus.service.signal(__prop_interface, signature="sa{sv}as")
def PropertiesChanged(self, interface, changed_properties,
invalidated_properties):
pass
@dbus.service.method(__prop_interface,
in_signature="ss", out_signature="v")
@ dbus.service.method(__prop_interface,
in_signature="ss", out_signature="v")
def Get(self, interface, prop):
getter, setter = self.__prop_mapping[interface][prop]
if callable(getter):
return getter()
return getter
@dbus.service.method(__prop_interface,
in_signature="ssv", out_signature="")
@ dbus.service.method(__prop_interface,
in_signature="ssv", out_signature="")
def Set(self, interface, prop, value):
getter, setter = self.__prop_mapping[interface][prop]
if setter is not None:
setter(value)
@dbus.service.method(__prop_interface,
in_signature="s", out_signature="a{sv}")
@ dbus.service.method(__prop_interface,
in_signature="s", out_signature="a{sv}")
def GetAll(self, interface):
read_props = {}
props = self.__prop_mapping[interface]
@ -893,63 +896,48 @@ class MPRISInterface(dbus.service.Object):
return value
# Root methods
@dbus.service.method(__root_interface, in_signature='', out_signature='')
@ dbus.service.method(__root_interface, in_signature='', out_signature='')
def Raise(self):
logger.info('Raise')
return
@dbus.service.method(__root_interface, in_signature='', out_signature='')
@ dbus.service.method(__root_interface, in_signature='', out_signature='')
def Quit(self):
logger.info('Quit')
return
# Player methods
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Next(self):
logger.info('Next')
# mpd_wrapper.next()
snapcast_wrapper.control("next")
return
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Previous(self):
logger.info('Previous')
# mpd_wrapper.previous()
snapcast_wrapper.control("previous")
return
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Pause(self):
logger.info('Pause')
# mpd_wrapper.pause(1)
# mpd_wrapper.notify_about_state('pause')
snapcast_wrapper.control("pause")
return
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def PlayPause(self):
logger.info('PlayPause')
# status = mpd_wrapper.status()
# if status['state'] == 'play':
# mpd_wrapper.pause(1)
# mpd_wrapper.notify_about_state('pause')
# else:
# mpd_wrapper.play()
# mpd_wrapper.notify_about_state('play')
snapcast_wrapper.control("playpause")
return
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Stop(self):
logger.info('Stop')
# mpd_wrapper.stop()
# mpd_wrapper.notify_about_state('stop')
snapcast_wrapper.control("stop")
return
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def Play(self):
logger.info('Play')
# mpd_wrapper.play()
# mpd_wrapper.notify_about_state('play')
snapcast_wrapper.control("play")
return
@dbus.service.method(__player_interface, in_signature='x', out_signature='')
@ dbus.service.method(__player_interface, in_signature='x', out_signature='')
def Seek(self, offset):
logger.info(f'Seek {offset}')
# status = mpd_wrapper.status()
@ -965,7 +953,7 @@ class MPRISInterface(dbus.service.Object):
# self.Seeked(position * 1000000)
return
@dbus.service.method(__player_interface, in_signature='ox', out_signature='')
@ dbus.service.method(__player_interface, in_signature='ox', out_signature='')
def SetPosition(self, trackid, position):
logger.info(f'SetPosition trackid: {trackid}, position: {position}')
# song = mpd_wrapper.last_currentsong()
@ -979,17 +967,18 @@ class MPRISInterface(dbus.service.Object):
# self.Seeked(position * 1000000)
return
@dbus.service.signal(__player_interface, signature='x')
def Seeked(self, position):
logger.debug("Seeked to %i" % position)
return float(position)
@dbus.service.method(__player_interface, in_signature='', out_signature='')
@ dbus.service.method(__player_interface, in_signature='', out_signature='')
def OpenUri(self):
logger.info('OpenUri')
# TODO
return
# Player signals
@ dbus.service.signal(__player_interface, signature='x')
def Seeked(self, position):
logger.debug("Seeked to %i" % position)
return float(position)
def __get_client_from_server_status(status):
client = None
@ -998,7 +987,8 @@ def __get_client_from_server_status(status):
for client in group['clients']:
if client['host']['name'] == hostname:
active = client["connected"]
logger.info(f'Client with id "{client["id"]}" active: {active}')
logger.info(
f'Client with id "{client["id"]}" active: {active}')
client = client['id']
if active:
return client
@ -1006,6 +996,7 @@ def __get_client_from_server_status(status):
logger.error('Failed to parse server status')
return client
def usage(params):
print("""\
Usage: %(progname)s [OPTION]...
@ -1040,7 +1031,7 @@ if __name__ == '__main__':
try:
(opts, args) = getopt.getopt(sys.argv[1:], 'c:dh:jp:v',
['help', 'bus-name=', 'config=',
'debug', 'host=', 'client='
'debug', 'host=', 'client='
'use-journal', 'path=', 'port=',
'version'])
except getopt.GetoptError as ex:
@ -1130,10 +1121,13 @@ if __name__ == '__main__':
if params['client'] is None:
hostname = socket.gethostname()
logger.info(f'No client id specified, trying to find a client running on host "{hostname}"')
resp = requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={"id":1,"jsonrpc":"2.0","method":"Server.GetStatus"})
logger.info(
f'No client id specified, trying to find a client running on host "{hostname}"')
resp = requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={
"id": 1, "jsonrpc": "2.0", "method": "Server.GetStatus"})
if resp.ok:
params['client'] = __get_client_from_server_status(json.loads(resp.text))
params['client'] = __get_client_from_server_status(
json.loads(resp.text))
if params['client'] is None:
logger.error('Client not found or not configured')

View file

@ -410,10 +410,10 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
{
// clang-format off
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.SetMeta","params":{"id":"Spotify", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}}
// clang-format on
LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get<std::string>("id") << ")" << request->params().get("meta") << "\n";
LOG(INFO, LOG_TAG) << "Stream.SetMeta id: " << request->params().get<std::string>("id") << ", meta: " << request->params().get("meta") << "\n";
// Find stream
string streamId = request->params().get<std::string>("id");
@ -427,11 +427,36 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
// Setup response
result["id"] = streamId;
}
else if (request->method().find("Stream.Control") == 0)
{
// clang-format off
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.Control","params":{"id":"Spotify", "command": "next", params: {}}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}}
//
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.Control","params":{"id":"Spotify", "command": "seek", "param": "60000"}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}}
// clang-format on
LOG(INFO, LOG_TAG) << "Stream.Control id: " << request->params().get<std::string>("id") << ", command: " << request->params().get("command")
<< "\n";
// Find stream
string streamId = request->params().get<std::string>("id");
PcmStreamPtr stream = streamManager_->getStream(streamId);
if (stream == nullptr)
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
// Set metadata from request
stream->control(request->params().get("command"), request->params().has("param") ? request->params().get("param") : "");
// Setup response
result["id"] = streamId;
}
else if (request->method() == "Stream.AddStream")
{
// clang-format off
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.AddStream","params":{"streamUri":"uri"}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}}
// clang-format on
LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")"
@ -450,7 +475,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
{
// clang-format off
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.RemoveStream","params":{"id":"Spotify"}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
// Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}}
// clang-format on
LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")"

View file

@ -48,7 +48,7 @@ CtrlScript::~CtrlScript()
}
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings)
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command, const std::string& param)
{
pipe_stderr_ = bp::pipe();
pipe_stdout_ = bp::pipe();
@ -56,7 +56,15 @@ void CtrlScript::start(const std::string& stream_id, const ServerSettings& serve
params << " \"--stream=" + stream_id + "\"";
if (server_setttings.http.enabled)
params << " --snapcast-port=" << server_setttings.http.port;
process_ = bp::child(script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_);
if (!command.empty())
params << " --command=" << command;
if (!param.empty())
params << " --param=" << param;
process_ = bp::child(
script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_,
bp::on_exit = [](int exit,
const std::error_code& ec_in) { LOG(INFO, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; },
ioc_);
stream_stdout_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stdout_.native_source());
stream_stderr_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stderr_.native_source());
stderrReadLine();
@ -93,44 +101,38 @@ void CtrlScript::logScript(const std::string& source, std::string line)
void CtrlScript::stderrReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(
*stream_stderr_, streambuf_stderr_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
return;
}
LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
logScript("stderr", std::move(line));
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
logScript("stderr", std::move(line));
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
}
void CtrlScript::stdoutReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(
*stream_stdout_, streambuf_stdout_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";
return;
}
LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
logScript("stdout", std::move(line));
streambuf_stdout_.consume(bytes_transferred);
stdoutReadLine();
});
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
logScript("stdout", std::move(line));
streambuf_stdout_.consume(bytes_transferred);
stdoutReadLine();
});
}
@ -162,7 +164,10 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n";
if (uri_.query.find(kControlScript) != uri_.query.end())
{
ctrl_script_ = std::make_unique<CtrlScript>(ioc, uri_.query[kControlScript]);
command_script_ = std::make_unique<CtrlScript>(ioc, uri_.query[kControlScript]);
}
if (uri_.query.find(kUriChunkMs) != uri_.query.end())
chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]);
@ -324,6 +329,14 @@ std::shared_ptr<msg::StreamTags> PcmStream::getMeta() const
}
void PcmStream::control(const std::string& command, const std::string& param)
{
LOG(INFO, LOG_TAG) << "Stream " << getId() << " control: '" << command << "', param: '" << param << "'\n";
if (command_script_)
command_script_->start(getId(), server_settings_, command, param);
}
void PcmStream::setMeta(const json& jtag)
{
meta_.reset(new msg::StreamTags(jtag));

View file

@ -114,7 +114,7 @@ public:
CtrlScript(boost::asio::io_context& ioc, const std::string& script);
virtual ~CtrlScript();
void start(const std::string& stream_id, const ServerSettings& server_setttings);
void start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command = "", const std::string& param = "");
void stop();
private:
@ -162,6 +162,8 @@ public:
std::shared_ptr<msg::StreamTags> getMeta() const;
void setMeta(const json& j);
void control(const std::string& command, const std::string& param);
virtual ReaderState getState() const;
virtual json toJson() const;
@ -187,6 +189,7 @@ protected:
boost::asio::io_context& ioc_;
ServerSettings server_settings_;
std::unique_ptr<CtrlScript> ctrl_script_;
std::unique_ptr<CtrlScript> command_script_;
};
} // namespace streamreader