diff --git a/control/meta_mpd.py b/control/meta_mpd.py index 37eb6484..e96efe54 100755 --- a/control/meta_mpd.py +++ b/control/meta_mpd.py @@ -24,7 +24,6 @@ # - python-mpd2 # - musicbrainzngs -from configparser import ConfigParser import os import sys import socket @@ -34,9 +33,9 @@ from dbus.mainloop.glib import DBusGMainLoop import logging import gettext import time -import base64 +import json import musicbrainzngs -import requests +import fcntl __version__ = "@version@" __git_version__ = "@gitversion@" @@ -131,6 +130,11 @@ urlhandlers = ['http://'] downloaded_covers = ['~/.covers/%s-%s.jpg'] +def send(json_msg): + print(json.dumps(json_msg)) + sys.stdout.flush() + + class MPDWrapper(object): """ Wrapper of mpd.MPDClient to handle socket errors and similar @@ -181,6 +185,7 @@ class MPDWrapper(object): self._idling = False self._can_idle = False self._can_single = False + self._buffer = '' self.client.connect( self._params['mpd-host'], self._params['mpd-port']) @@ -229,6 +234,13 @@ class MPDWrapper(object): self._watch_id = GLib.io_add_watch(self, GLib.IO_IN | GLib.IO_HUP, self.socket_callback) + + flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) + flags |= os.O_NONBLOCK + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags) + GLib.io_add_watch(sys.stdin, GLib.IO_IN | + GLib.IO_HUP, self.io_callback) + # Reset error counter self._errors = 0 @@ -312,6 +324,67 @@ class MPDWrapper(object): self.idle_enter() return True + def control(self, cmd): + try: + request = json.loads(cmd) + cmd = request['method'] + id = request['id'] + success = True + if cmd == 'Next': + self.next() + elif cmd == 'Previous': + self.previous() + elif cmd == 'Play': + self.play() + elif cmd == 'Pause': + self.pause(1) + elif cmd == 'PlayPause': + if self.status()['state'] == 'play': + self.pause(1) + else: + self.play() + elif cmd == 'Stop': + self.stop() + elif cmd == 'SetPosition': + trackid = request['params']['TrackId'] + trackid = trackid.rsplit('/', 1)[1] + position = request['params']['Position'] + position = int(position) / 1000000 + self.seekid(int(trackid), position) + elif cmd == 'Seek': + offset = request['params']['Offset'] + offset = int(offset) / 1000000 + strOffset = str(offset) + if offset >= 0: + strOffset = "+" + strOffset + self.seekcur(strOffset) + else: + send({"jsonrpc": "2.0", "error": {"code": -32601, + "message": "Method not found"}, "id": id}) + success = False + if success: + send({"jsonrpc": "2.0", "result": "ok", "id": id}) + except: + send({"jsonrpc": "2.0", "error": { + "code": -32700, "message": "Parse error"}, "id": id}) + + def io_callback(self, fd, event): + logger.debug("IO event %r on fd %r" % (event, fd)) + if event & GLib.IO_HUP: + logger.debug("IO_HUP") + return True + elif event & GLib.IO_IN: + chunk = fd.read() + for char in chunk: + if char == '\n': + logger.info(f'Received: {self._buffer}') + + self.control(self._buffer) + self._buffer = '' + else: + self._buffer += char + return True + def socket_callback(self, fd, event): try: logger.debug("Socket event %r on fd %r" % (event, fd)) @@ -389,6 +462,9 @@ class MPDWrapper(object): snapmeta['artist'] = [fields[0]] snapmeta['title'] = fields[1] + send({"jsonrpc": "2.0", "method": "Stream.OnMetadata", "params": snapmeta}) + + snapmeta['artUrl'] = 'http://127.0.0.1:1780/launcher-icon.png' album_key = 'musicbrainzAlbumId' try: if not album_key in snapmeta: @@ -424,8 +500,7 @@ class MPDWrapper(object): f'Error while getting cover for {snapmeta[album_key]}: {e}') logger.info(f'Snapmeta: {snapmeta}') - requests.post(f'http://{params["snapcast-host"]}:{params["snapcast-port"]}/jsonrpc', json={ - "id": 4, "jsonrpc": "2.0", "method": "Stream.SetMeta", "params": {"id": params['stream'], "meta": snapmeta}}) + send({"jsonrpc": "2.0", "method": "Stream.OnMetadata", "params": snapmeta}) def _update_properties(self, force=False): old_status = self._status @@ -607,7 +682,7 @@ if __name__ == '__main__': # Parse command line try: (opts, args) = getopt.getopt(sys.argv[1:], 'hdjv', - ['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'command=', 'debug', 'use-journal', 'version']) + ['help', 'mpd-host=', 'mpd-port=', 'snapcast-host=', 'snapcast-port=', 'stream=', 'debug', 'use-journal', 'version']) except getopt.GetoptError as ex: (msg, opt) = ex.args print("%s: %s" % (sys.argv[0], msg), file=sys.stderr) @@ -629,8 +704,6 @@ if __name__ == '__main__': params['snapcast-port'] = int(arg) elif opt in ['--stream']: params['stream'] = arg - elif opt in ['--command']: - params['command'] = arg elif opt in ['-d', '--debug']: log_level = logging.DEBUG elif opt in ['-j', '--use-journal']: @@ -677,41 +750,6 @@ if __name__ == '__main__': logger.debug(f'Parameters: {params}') - if 'command' in params: - try: - cmd = params['command'] - if cmd not in ['next', 'previous', 'play', 'pause', 'playpause', 'stop']: - logger.error(f'Command not supported: {cmd}') - sys.exit(1) - - client = mpd.MPDClient() - client.connect(params['mpd-host'], params['mpd-port']) - if params['mpd-password']: - client.password(params['mpd-password']) - - if cmd == 'next': - client.next() - elif cmd == 'previous': - client.previous() - elif cmd == 'play': - client.play() - elif cmd == 'pause': - client.pause(1) - elif cmd == 'playpause': - if client.status()['state'] == 'play': - client.pause(1) - else: - client.play() - elif cmd == 'stop': - client.stop() - - client.close() - client.disconnect() - except mpd.CommandError as e: - logger.error(e) - sys.exit(1) - sys.exit(0) - # Set up the main loop if using_gi_glib: logger.debug('Using GObject-Introspection main loop.') diff --git a/control/snapcast_mpris.py b/control/snapcast_mpris.py index b5d2c5d4..244f87fa 100755 --- a/control/snapcast_mpris.py +++ b/control/snapcast_mpris.py @@ -308,6 +308,7 @@ class MPDWrapper(object): self._metadata = {} self._position = 0 self._time = 0 + self._req_id = 0 self._bus = dbus.SessionBus() if self._params['mmkeys']: @@ -458,10 +459,14 @@ class MPDWrapper(object): # def last_currentsong(self): # return self._currentsong.copy() - def control(self, command): # , param = ""): - logger.info(f'Control: {command}') - requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={ - "id": 1, "jsonrpc": "2.0", "method": "Stream.Control", "params": {"id": "Pipe", "command": command}}) + def control(self, command, params={}): + j = {"id": self._req_id, "jsonrpc": "2.0", "method": "Stream.Control", + "params": {"id": "Pipe", "command": command, "params": params}} + logger.info(f'Control: {command}, json: {j}') + url = f'http://{self._params["host"]}:{self._params["port"]}/jsonrpc' + logger.info(f'url: {url}') + self._req_id += 1 + requests.post(url, json=j) @property def metadata(self): @@ -626,6 +631,7 @@ class MPDWrapper(object): class NotifyWrapper(object): def __init__(self, params): + self._last_notification = None self._notification = None self._enabled = True @@ -673,6 +679,11 @@ class NotifyWrapper(object): if not self._enabled: return + if self._last_notification == [title, body, uri]: + return + + self._last_notification = [title, body, uri] + # If we did not yet manage to get a notification service, # try again if not self._notification: @@ -909,37 +920,38 @@ class MPRISInterface(dbus.service.Object): # Player methods @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Next(self): - snapcast_wrapper.control("next") + snapcast_wrapper.control("Next") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Previous(self): - snapcast_wrapper.control("previous") + snapcast_wrapper.control("Previous") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Pause(self): - snapcast_wrapper.control("pause") + snapcast_wrapper.control("Pause") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def PlayPause(self): - snapcast_wrapper.control("playpause") + snapcast_wrapper.control("PlayPause") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Stop(self): - snapcast_wrapper.control("stop") + snapcast_wrapper.control("Stop") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Play(self): - snapcast_wrapper.control("play") + snapcast_wrapper.control("Play") return @ dbus.service.method(__player_interface, in_signature='x', out_signature='') def Seek(self, offset): logger.info(f'Seek {offset}') + snapcast_wrapper.control("Seek", {"Offset": offset}) # status = mpd_wrapper.status() # current, end = status['time'].split(':') # current = int(current) @@ -955,7 +967,11 @@ class MPRISInterface(dbus.service.Object): @ dbus.service.method(__player_interface, in_signature='ox', out_signature='') def SetPosition(self, trackid, position): - logger.info(f'SetPosition trackid: {trackid}, position: {position}') + logger.info(f'SetPosition TrackId: {trackid}, Position: {position}') + snapcast_wrapper.control( + "SetPosition", {"TrackId": trackid, "Position": position}) + self.Seeked(position) + # song = mpd_wrapper.last_currentsong() # # FIXME: use real dbus objects # if str(trackid) != '/org/mpris/MediaPlayer2/Track/%s' % song['id']: @@ -1018,6 +1034,9 @@ Report bugs to https://github.com/eonpatapon/mpDris2/issues""" % params) if __name__ == '__main__': DBusGMainLoop(set_as_default=True) + # TODO: + # -cleanup: remove mpd-ish stuff + # -stream id: keep track of the client's stream gettext.bindtextdomain('mpDris2', '@datadir@/locale') gettext.textdomain('mpDris2') diff --git a/server/server.cpp b/server/server.cpp index 77eb765f..de5e9842 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -438,7 +438,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent // clang-format on LOG(INFO, LOG_TAG) << "Stream.Control id: " << request->params().get("id") << ", command: " << request->params().get("command") - << "\n"; + << ", params: " << (request->params().has("params") ? request->params().get("params") : "") << "\n"; // Find stream string streamId = request->params().get("id"); @@ -447,7 +447,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); // Set metadata from request - stream->control(request->params().get("command"), request->params().has("param") ? request->params().get("param") : ""); + stream->control(request->params().get("command"), request->params().has("params") ? request->params().get("params") : json{}); // Setup response result["id"] = streamId; diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 210cf0e4..dc7f2969 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -25,6 +25,7 @@ #include "common/str_compat.hpp" #include "common/utils/string_utils.hpp" #include "encoder/encoder_factory.hpp" +#include "jsonrpcpp.hpp" #include "pcm_stream.hpp" @@ -48,36 +49,42 @@ CtrlScript::~CtrlScript() } -void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command, const std::string& param) +void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler) { + receive_handler_ = receive_handler; pipe_stderr_ = bp::pipe(); pipe_stdout_ = bp::pipe(); stringstream params; params << " \"--stream=" + stream_id + "\""; if (server_setttings.http.enabled) params << " --snapcast-port=" << server_setttings.http.port; - if (!command.empty()) - params << " --command=" << command; - if (!param.empty()) - params << " --param=" << param; process_ = bp::child( - script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, - bp::on_exit = [](int exit, - const std::error_code& ec_in) { LOG(INFO, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; }, + script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, bp::std_in < in_, + bp::on_exit = + [](int exit, const std::error_code& ec_in) { + auto severity = AixLog::Severity::debug; + if (exit != 0) + severity = AixLog::Severity::error; + LOG(severity, SCRIPT_LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n"; + }, ioc_); stream_stdout_ = make_unique(ioc_, pipe_stdout_.native_source()); stream_stderr_ = make_unique(ioc_, pipe_stderr_.native_source()); - stderrReadLine(); stdoutReadLine(); + stderrReadLine(); } +void CtrlScript::send(const std::string& msg) +{ + in_.write(msg.data(), msg.size()); + in_.flush(); +} -void CtrlScript::logScript(const std::string& source, std::string line) +void CtrlScript::logScript(std::string line) { if (line.empty()) return; - std::ignore = source; if (line.back() == '\r') line.resize(line.size() - 1); auto tmp = utils::string::tolower_copy(line); @@ -104,13 +111,13 @@ void CtrlScript::stderrReadLine() boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { - LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n"; + LOG(ERROR, SCRIPT_LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n"; return; } // Extract up to the first delimiter. std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()}; - logScript("stderr", std::move(line)); + logScript(std::move(line)); streambuf_stderr_.consume(bytes_transferred); stderrReadLine(); }); @@ -123,13 +130,13 @@ void CtrlScript::stdoutReadLine() boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { - LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n"; + LOG(ERROR, SCRIPT_LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n"; return; } // Extract up to the first delimiter. std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()}; - logScript("stdout", std::move(line)); + receive_handler_(std::move(line)); streambuf_stdout_.consume(bytes_transferred); stdoutReadLine(); }); @@ -147,7 +154,7 @@ void CtrlScript::stop() PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) - : active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings) + : active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), req_id_(0) { encoder::EncoderFactory encoderFactory; if (uri_.query.find(kUriCodec) == uri_.query.end()) @@ -166,7 +173,6 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con if (uri_.query.find(kControlScript) != uri_.query.end()) { ctrl_script_ = std::make_unique(ioc, uri_.query[kControlScript]); - command_script_ = std::make_unique(ioc, uri_.query[kControlScript]); } if (uri_.query.find(kUriChunkMs) != uri_.query.end()) @@ -218,6 +224,74 @@ std::string PcmStream::getCodec() const } +void PcmStream::onControlMsg(const std::string& msg) +{ + LOG(DEBUG, LOG_TAG) << "Received: " << msg << "\n"; + + jsonrpcpp::entity_ptr entity(nullptr); + try + { + entity = jsonrpcpp::Parser::do_parse(msg); + if (!entity) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message\n"; + return; + } + } + catch (const jsonrpcpp::ParseErrorException& e) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; + return; + // return e.to_json().dump(); + } + catch (const std::exception& e) + { + LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n"; + return; + // return jsonrpcpp::ParseErrorException(e.what()).to_json().dump(); + } + + if (entity->is_notification()) + { + jsonrpcpp::notification_ptr notification = dynamic_pointer_cast(entity); + LOG(INFO, LOG_TAG) << "Notification method: " << notification->method() << ", params: " << notification->params().to_json() << "\n"; + if (notification->method() == "Stream.OnMetadata") + { + setMeta(notification->params().to_json()); + } + } + else if (entity->is_request()) + { + LOG(INFO, LOG_TAG) << "Request\n"; + // jsonrpcpp::entity_ptr response(nullptr); + // jsonrpcpp::notification_ptr notification(nullptr); + // jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); + // processRequest(request, response, notification); + // saveConfig(); + // ////cout << "Request: " << request->to_json().dump() << "\n"; + // if (notification) + // { + // ////cout << "Notification: " << notification->to_json().dump() << "\n"; + // controlServer_->send(notification->to_json().dump(), controlSession); + // } + // if (response) + // { + // ////cout << "Response: " << response->to_json().dump() << "\n"; + // return response->to_json().dump(); + // } + // return ""; + } + else if (entity->is_response()) + { + jsonrpcpp::response_ptr response = dynamic_pointer_cast(entity); + LOG(INFO, LOG_TAG) << "Response: " << response->result().dump() << ", id: " << response->id() << "\n"; + } + + // json j = json::parse(msg); + // setMeta(j["params"]["meta"]); +} + + void PcmStream::start() { LOG(DEBUG, LOG_TAG) << "Start: " << name_ << ", type: " << uri_.scheme << ", sampleformat: " << sampleFormat_.toString() << ", codec: " << getCodec() @@ -227,7 +301,7 @@ void PcmStream::start() active_ = true; if (ctrl_script_) - ctrl_script_->start(getId(), server_settings_); + ctrl_script_->start(getId(), server_settings_, [this](const std::string& msg) { onControlMsg(msg); }); } @@ -329,11 +403,16 @@ std::shared_ptr PcmStream::getMeta() const } -void PcmStream::control(const std::string& command, const std::string& param) +void PcmStream::control(const std::string& command, const json& params) { - LOG(INFO, LOG_TAG) << "Stream " << getId() << " control: '" << command << "', param: '" << param << "'\n"; - if (command_script_) - command_script_->start(getId(), server_settings_, command, param); + LOG(INFO, LOG_TAG) << "Stream '" << getId() << "' received command: '" << command << "', params: '" << params << "'\n"; + for (const auto& it : params.items()) + { + LOG(INFO, LOG_TAG) << "Stream " << getId() << " key: '" << it.key() << "', param: '" << it.value() << "'\n"; + } + jsonrpcpp::Request request(++req_id_, command, params); + if (ctrl_script_) + ctrl_script_->send(request.to_json().dump() + "\n"); //, params); } diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index fab92d39..5bd40fa8 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -45,6 +45,7 @@ namespace bp = boost::process; +using json = nlohmann::json; namespace streamreader @@ -111,16 +112,20 @@ public: class CtrlScript { public: + using OnReceive = std::function; + CtrlScript(boost::asio::io_context& ioc, const std::string& script); virtual ~CtrlScript(); - void start(const std::string& stream_id, const ServerSettings& server_setttings, const std::string& command = "", const std::string& param = ""); + void start(const std::string& stream_id, const ServerSettings& server_setttings, const OnReceive& receive_handler); void stop(); + /// Send a message to stdin of the process + void send(const std::string& msg); private: void stderrReadLine(); void stdoutReadLine(); - void logScript(const std::string& source, std::string line); + void logScript(std::string line); bp::child process_; bp::pipe pipe_stdout_; @@ -129,9 +134,11 @@ private: std::unique_ptr stream_stderr_; boost::asio::streambuf streambuf_stdout_; boost::asio::streambuf streambuf_stderr_; + OnReceive receive_handler_; boost::asio::io_context& ioc_; std::string script_; + bp::opstream in_; }; @@ -162,7 +169,7 @@ public: std::shared_ptr getMeta() const; void setMeta(const json& j); - void control(const std::string& command, const std::string& param); + virtual void control(const std::string& command, const json& params); virtual ReaderState getState() const; virtual json toJson() const; @@ -172,6 +179,7 @@ public: protected: std::atomic active_; + void onControlMsg(const std::string& msg); void setState(ReaderState newState); void chunkRead(const msg::PcmChunk& chunk); void resync(const std::chrono::nanoseconds& duration); @@ -189,7 +197,7 @@ protected: boost::asio::io_context& ioc_; ServerSettings server_settings_; std::unique_ptr ctrl_script_; - std::unique_ptr command_script_; + size_t req_id_; }; } // namespace streamreader