diff --git a/control/meta_mpd.py b/control/meta_mpd.py index 9c6d5b0a..37eb6484 100755 --- a/control/meta_mpd.py +++ b/control/meta_mpd.py @@ -584,6 +584,7 @@ Usage: %(progname)s [OPTION]... --snapcast-host=ADDR Set the mpd server address --snapcast-port=PORT Set the TCP port --stream=ID Set the stream id + --command=CMD Issue a command to MPD and exit -d, --debug Run in debug mode -j, --use-journal Log to systemd journal instead of stderr @@ -606,7 +607,7 @@ if __name__ == '__main__': # Parse command line try: (opts, args) = getopt.getopt(sys.argv[1:], 'hdjv', - ['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'debug', 'use-journal', 'version']) + ['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'command=', 'debug', 'use-journal', 'version']) except getopt.GetoptError as ex: (msg, opt) = ex.args print("%s: %s" % (sys.argv[0], msg), file=sys.stderr) @@ -628,6 +629,8 @@ if __name__ == '__main__': params['snapcast-port'] = int(arg) elif opt in ['--stream']: params['stream'] = arg + elif opt in ['--command']: + params['command'] = arg elif opt in ['-d', '--debug']: log_level = logging.DEBUG elif opt in ['-j', '--use-journal']: @@ -674,6 +677,41 @@ if __name__ == '__main__': logger.debug(f'Parameters: {params}') + if 'command' in params: + try: + cmd = params['command'] + if cmd not in ['next', 'previous', 'play', 'pause', 'playpause', 'stop']: + logger.error(f'Command not supported: {cmd}') + sys.exit(1) + + client = mpd.MPDClient() + client.connect(params['mpd-host'], params['mpd-port']) + if params['mpd-password']: + client.password(params['mpd-password']) + + if cmd == 'next': + client.next() + elif cmd == 'previous': + client.previous() + elif cmd == 'play': + client.play() + elif cmd == 'pause': + client.pause(1) + elif cmd == 'playpause': + if client.status()['state'] == 'play': + client.pause(1) + else: + client.play() + elif cmd == 'stop': + client.stop() + + client.close() + client.disconnect() + except mpd.CommandError as e: + logger.error(e) + sys.exit(1) + sys.exit(0) + # Set up the main loop if using_gi_glib: logger.debug('Using GObject-Introspection main loop.') diff --git a/control/snapcast_mpris.py b/control/snapcast_mpris.py index a593b655..b5d2c5d4 100755 --- a/control/snapcast_mpris.py +++ b/control/snapcast_mpris.py @@ -458,6 +458,11 @@ class MPDWrapper(object): # def last_currentsong(self): # return self._currentsong.copy() + def control(self, command): # , param = ""): + logger.info(f'Control: {command}') + requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={ + "id": 1, "jsonrpc": "2.0", "method": "Stream.Control", "params": {"id": "Pipe", "command": command}}) + @property def metadata(self): return self._metadata @@ -840,39 +845,37 @@ class MPRISInterface(dbus.service.Object): "CanControl": (True, None), } - __tracklist_interface = "org.mpris.MediaPlayer2.TrackList" - __prop_mapping = { __player_interface: __player_props, __root_interface: __root_props, } - @dbus.service.method(__introspect_interface) + @ dbus.service.method(__introspect_interface) def Introspect(self): return MPRIS2_INTROSPECTION - @dbus.service.signal(__prop_interface, signature="sa{sv}as") + @ dbus.service.signal(__prop_interface, signature="sa{sv}as") def PropertiesChanged(self, interface, changed_properties, invalidated_properties): pass - @dbus.service.method(__prop_interface, - in_signature="ss", out_signature="v") + @ dbus.service.method(__prop_interface, + in_signature="ss", out_signature="v") def Get(self, interface, prop): getter, setter = self.__prop_mapping[interface][prop] if callable(getter): return getter() return getter - @dbus.service.method(__prop_interface, - in_signature="ssv", out_signature="") + @ dbus.service.method(__prop_interface, + in_signature="ssv", out_signature="") def Set(self, interface, prop, value): getter, setter = self.__prop_mapping[interface][prop] if setter is not None: setter(value) - @dbus.service.method(__prop_interface, - in_signature="s", out_signature="a{sv}") + @ dbus.service.method(__prop_interface, + in_signature="s", out_signature="a{sv}") def GetAll(self, interface): read_props = {} props = self.__prop_mapping[interface] @@ -893,63 +896,48 @@ class MPRISInterface(dbus.service.Object): return value # Root methods - @dbus.service.method(__root_interface, in_signature='', out_signature='') + @ dbus.service.method(__root_interface, in_signature='', out_signature='') def Raise(self): logger.info('Raise') return - @dbus.service.method(__root_interface, in_signature='', out_signature='') + @ dbus.service.method(__root_interface, in_signature='', out_signature='') def Quit(self): logger.info('Quit') return # Player methods - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Next(self): - logger.info('Next') - # mpd_wrapper.next() + snapcast_wrapper.control("next") return - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Previous(self): - logger.info('Previous') - # mpd_wrapper.previous() + snapcast_wrapper.control("previous") return - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Pause(self): - logger.info('Pause') - # mpd_wrapper.pause(1) - # mpd_wrapper.notify_about_state('pause') + snapcast_wrapper.control("pause") return - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def PlayPause(self): - logger.info('PlayPause') - # status = mpd_wrapper.status() - # if status['state'] == 'play': - # mpd_wrapper.pause(1) - # mpd_wrapper.notify_about_state('pause') - # else: - # mpd_wrapper.play() - # mpd_wrapper.notify_about_state('play') + snapcast_wrapper.control("playpause") return - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Stop(self): - logger.info('Stop') - # mpd_wrapper.stop() - # mpd_wrapper.notify_about_state('stop') + snapcast_wrapper.control("stop") return - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Play(self): - logger.info('Play') - # mpd_wrapper.play() - # mpd_wrapper.notify_about_state('play') + snapcast_wrapper.control("play") return - @dbus.service.method(__player_interface, in_signature='x', out_signature='') + @ dbus.service.method(__player_interface, in_signature='x', out_signature='') def Seek(self, offset): logger.info(f'Seek {offset}') # status = mpd_wrapper.status() @@ -965,7 +953,7 @@ class MPRISInterface(dbus.service.Object): # self.Seeked(position * 1000000) return - @dbus.service.method(__player_interface, in_signature='ox', out_signature='') + @ dbus.service.method(__player_interface, in_signature='ox', out_signature='') def SetPosition(self, trackid, position): logger.info(f'SetPosition trackid: {trackid}, position: {position}') # song = mpd_wrapper.last_currentsong() @@ -979,17 +967,18 @@ class MPRISInterface(dbus.service.Object): # self.Seeked(position * 1000000) return - @dbus.service.signal(__player_interface, signature='x') - def Seeked(self, position): - logger.debug("Seeked to %i" % position) - return float(position) - - @dbus.service.method(__player_interface, in_signature='', out_signature='') + @ dbus.service.method(__player_interface, in_signature='', out_signature='') def OpenUri(self): logger.info('OpenUri') # TODO return + # Player signals + @ dbus.service.signal(__player_interface, signature='x') + def Seeked(self, position): + logger.debug("Seeked to %i" % position) + return float(position) + def __get_client_from_server_status(status): client = None @@ -998,14 +987,16 @@ def __get_client_from_server_status(status): for client in group['clients']: if client['host']['name'] == hostname: active = client["connected"] - logger.info(f'Client with id "{client["id"]}" active: {active}') + logger.info( + f'Client with id "{client["id"]}" active: {active}') client = client['id'] if active: - return client + return client except: logger.error('Failed to parse server status') return client + def usage(params): print("""\ Usage: %(progname)s [OPTION]... @@ -1040,7 +1031,7 @@ if __name__ == '__main__': try: (opts, args) = getopt.getopt(sys.argv[1:], 'c:dh:jp:v', ['help', 'bus-name=', 'config=', - 'debug', 'host=', 'client=' + 'debug', 'host=', 'client=' 'use-journal', 'path=', 'port=', 'version']) except getopt.GetoptError as ex: @@ -1130,11 +1121,14 @@ if __name__ == '__main__': if params['client'] is None: hostname = socket.gethostname() - logger.info(f'No client id specified, trying to find a client running on host "{hostname}"') - resp = requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={"id":1,"jsonrpc":"2.0","method":"Server.GetStatus"}) + logger.info( + f'No client id specified, trying to find a client running on host "{hostname}"') + resp = requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={ + "id": 1, "jsonrpc": "2.0", "method": "Server.GetStatus"}) if resp.ok: - params['client'] = __get_client_from_server_status(json.loads(resp.text)) - + params['client'] = __get_client_from_server_status( + json.loads(resp.text)) + if params['client'] is None: logger.error('Client not found or not configured') diff --git a/server/server.cpp b/server/server.cpp index 6e47e185..77eb765f 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -410,10 +410,10 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent { // clang-format off // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.SetMeta","params":{"id":"Spotify", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}} - // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}} // clang-format on - LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get("id") << ")" << request->params().get("meta") << "\n"; + LOG(INFO, LOG_TAG) << "Stream.SetMeta id: " << request->params().get("id") << ", meta: " << request->params().get("meta") << "\n"; // Find stream string streamId = request->params().get("id"); @@ -427,11 +427,36 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent // Setup response result["id"] = streamId; } + else if (request->method().find("Stream.Control") == 0) + { + // clang-format off + // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.Control","params":{"id":"Spotify", "command": "next", params: {}}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}} + // + // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.Control","params":{"id":"Spotify", "command": "seek", "param": "60000"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}} + // clang-format on + + LOG(INFO, LOG_TAG) << "Stream.Control id: " << request->params().get("id") << ", command: " << request->params().get("command") + << "\n"; + + // Find stream + string streamId = request->params().get("id"); + PcmStreamPtr stream = streamManager_->getStream(streamId); + if (stream == nullptr) + throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); + + // Set metadata from request + stream->control(request->params().get("command"), request->params().has("param") ? request->params().get("param") : ""); + + // Setup response + result["id"] = streamId; + } else if (request->method() == "Stream.AddStream") { // clang-format off // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.AddStream","params":{"streamUri":"uri"}} - // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}} // clang-format on LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")" @@ -450,7 +475,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent { // clang-format off // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.RemoveStream","params":{"id":"Spotify"}} - // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"id":"Spotify"}} // clang-format on LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")" diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index e47ebfae..210cf0e4 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -48,7 +48,7 @@ CtrlScript::~CtrlScript() } -void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings) +void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command, const std::string& param) { pipe_stderr_ = bp::pipe(); pipe_stdout_ = bp::pipe(); @@ -56,7 +56,15 @@ void CtrlScript::start(const std::string& stream_id, const ServerSettings& serve params << " \"--stream=" + stream_id + "\""; if (server_setttings.http.enabled) params << " --snapcast-port=" << server_setttings.http.port; - process_ = bp::child(script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_); + if (!command.empty()) + params << " --command=" << command; + if (!param.empty()) + params << " --param=" << param; + process_ = bp::child( + script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, + bp::on_exit = [](int exit, + const std::error_code& ec_in) { LOG(INFO, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; }, + ioc_); stream_stdout_ = make_unique(ioc_, pipe_stdout_.native_source()); stream_stderr_ = make_unique(ioc_, pipe_stderr_.native_source()); stderrReadLine(); @@ -93,44 +101,38 @@ void CtrlScript::logScript(const std::string& source, std::string line) void CtrlScript::stderrReadLine() { const std::string delimiter = "\n"; - boost::asio::async_read_until( - *stream_stderr_, streambuf_stderr_, delimiter, - [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) + boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + if (ec) { - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n"; - return; - } + LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n"; + return; + } - // Extract up to the first delimiter. - std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()}; - logScript("stderr", std::move(line)); - streambuf_stderr_.consume(bytes_transferred); - stderrReadLine(); - }); + // Extract up to the first delimiter. + std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()}; + logScript("stderr", std::move(line)); + streambuf_stderr_.consume(bytes_transferred); + stderrReadLine(); + }); } void CtrlScript::stdoutReadLine() { const std::string delimiter = "\n"; - boost::asio::async_read_until( - *stream_stdout_, streambuf_stdout_, delimiter, - [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) + boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + if (ec) { - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n"; - return; - } + LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n"; + return; + } - // Extract up to the first delimiter. - std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()}; - logScript("stdout", std::move(line)); - streambuf_stdout_.consume(bytes_transferred); - stdoutReadLine(); - }); + // Extract up to the first delimiter. + std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()}; + logScript("stdout", std::move(line)); + streambuf_stdout_.consume(bytes_transferred); + stdoutReadLine(); + }); } @@ -162,7 +164,10 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n"; if (uri_.query.find(kControlScript) != uri_.query.end()) + { ctrl_script_ = std::make_unique(ioc, uri_.query[kControlScript]); + command_script_ = std::make_unique(ioc, uri_.query[kControlScript]); + } if (uri_.query.find(kUriChunkMs) != uri_.query.end()) chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]); @@ -324,6 +329,14 @@ std::shared_ptr PcmStream::getMeta() const } +void PcmStream::control(const std::string& command, const std::string& param) +{ + LOG(INFO, LOG_TAG) << "Stream " << getId() << " control: '" << command << "', param: '" << param << "'\n"; + if (command_script_) + command_script_->start(getId(), server_settings_, command, param); +} + + void PcmStream::setMeta(const json& jtag) { meta_.reset(new msg::StreamTags(jtag)); diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index 2b6047ae..fab92d39 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -114,7 +114,7 @@ public: CtrlScript(boost::asio::io_context& ioc, const std::string& script); virtual ~CtrlScript(); - void start(const std::string& stream_id, const ServerSettings& server_setttings); + void start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command = "", const std::string& param = ""); void stop(); private: @@ -162,6 +162,8 @@ public: std::shared_ptr getMeta() const; void setMeta(const json& j); + void control(const std::string& command, const std::string& param); + virtual ReaderState getState() const; virtual json toJson() const; @@ -187,6 +189,7 @@ protected: boost::asio::io_context& ioc_; ServerSettings server_settings_; std::unique_ptr ctrl_script_; + std::unique_ptr command_script_; }; } // namespace streamreader