#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Authors: Jean-Philippe Braun , # Mantas Mikulėnas # Based on mpDris from: Erik Karlsson # Some bits taken from quodlibet mpris plugin by from time import sleep from systemd.journal import _valid_field_name import websocket import logging import threading import json import webbrowser import os import sys import getopt import time import socket import dbus import dbus.service from dbus.mainloop.glib import DBusGMainLoop import logging import gettext import requests __version__ = "@version@" __git_version__ = "@gitversion@" try: import mutagen except ImportError: mutagen = None try: import gi gi.require_version('Notify', '0.7') except (ImportError, ValueError): pass using_gi_glib = False try: from gi.repository import GLib using_gi_glib = True except ImportError: import glib as GLib using_gi_notify = False using_old_notify = False try: from gi.repository import Notify using_gi_notify = True except ImportError: try: import pynotify using_old_notify = True except ImportError: pass _ = gettext.gettext identity = "Snapcast" params = { 'progname': sys.argv[0], # Connection 'host': None, 'port': None, 'password': None, 'bus_name': None, 'client': None, # Bling 'mmkeys': True, 'notify': (using_gi_notify or using_old_notify), 'notify_urgency': 0, } defaults = { # Connection 'host': 'localhost', 'port': 1780, 'client': None, 'password': None, 'bus_name': None, } notification = None # python dbus bindings don't include annotations and properties MPRIS2_INTROSPECTION = """ """ tag_mapping = { 'trackId': ['mpris:trackid', lambda val: dbus.ObjectPath(f'/org/mpris/MediaPlayer2/Track/{val}')], 'artUrl': ['mpris:artUrl', str], 'duration': ['mpris:length', lambda val: int(val * 1000000)], 'album': ['xesam:album', str], 'albumArtist': ['xesam:albumArtist', list], 'artist': ['xesam:artist', list], 'asText': ['xesam:asText', str], 'audioBPM': ['xesam:audioBPM', int], 'autoRating': ['xesam:autoRating', float], 'comment': ['xesam:comment', list], 'composer': ['xesam:composer', list], 'contentCreated': ['xesam:contentCreated', str], 'discNumber': ['xesam:discNumber', int], 'firstUsed': ['xesam:firstUsed', str], 'genre': ['xesam:genre', list], 'lastUsed': ['xesam:lastUsed', str], 'lyricist': ['xesam:lyricist', str], 'title': ['xesam:title', str], 'trackNumber': ['xesam:trackNumber', int], 'url': ['xesam:url', str], 'useCount': ['xesam:useCount', int], 'userRating': ['xesam:userRating', float] } property_mapping = { 'playbackStatus': 'PlaybackStatus', 'loopStatus': 'LoopStatus', 'shuffle': 'Shuffle', 'volume': 'Volume', 'canGoNext': 'CanGoNext', 'canGoPrevious': 'CanGoPrevious', 'canPlay': 'CanPlay', 'canPause': 'CanPause', 'canSeek': 'CanSeek', 'canControl': 'CanControl' } class SnapcastWrapper(object): """ Wrapper of mpd.MPDClient to handle socket errors and similar """ def __init__(self, params): self._dbus = dbus self._params = params self._dbus_service = None self._metadata = {} self._properties = {} self._req_id = 0 self._stream_id = '' self._request_map = {} self._bus = dbus.SessionBus() if self._params['mmkeys']: self.setup_mediakeys() self.websocket = websocket.WebSocketApp("ws://" + self._params['host'] + ":" + str(self._params['port']) + "/jsonrpc", on_message=self.on_ws_message, on_error=self.on_ws_error, on_open=self.on_ws_open, on_close=self.on_ws_close) self.websocket_thread = threading.Thread( target=self.websocket_loop, args=()) self.websocket_thread.name = "SnapcastRpcWebsocketWrapper" self.websocket_thread.start() def websocket_loop(self): logger.info("Started SnapcastRpcWebsocketWrapper loop") while True: self.websocket.run_forever() sleep(1) logger.info("Ending SnapcastRpcWebsocketWrapper loop") def __get_stream_id_from_server_status(self, status, client_id): try: for group in status['server']['groups']: for client in group['clients']: if client['id'] == client_id: return group['stream_id'] for group in status['server']['groups']: for client in group['clients']: if client['name'] == client_id: return group['stream_id'] except: logger.error('Failed to parse server status') logger.error(f'Failed to get stream id for client {client_id}') return None def __update_metadata(self, meta): try: if meta is None: meta = {} logger.info(f'Meta: "{meta}"') self._metadata = {} self._metadata['xesam:artist'] = ['Unknown Artist'] self._metadata['xesam:title'] = 'Unknown Title' for key, value in meta.items(): if key in tag_mapping: try: self._metadata[tag_mapping[key][0] ] = tag_mapping[key][1](value) except KeyError: logger.warning(f'tag "{key}" not supported') except (ValueError, TypeError): logger.warning( f"Can't cast value {value} to {tag_mapping[key][1]}") if not 'mpris:artUrl' in self._metadata: self._metadata['mpris:artUrl'] = f'http://{self._params["host"]}:{self._params["port"]}/launcher-icon.png' logger.info(f'mpris meta: {self._metadata}') self.notify_about_track(self._metadata) new_meta = self._dbus_service.update_property('org.mpris.MediaPlayer2.Player', 'Metadata') logger.info(f'new meta {new_meta}') except Exception as e: logger.error(f'Error in update_metadata: {str(e)}') def __update_properties(self, props): try: if props is None: props = {} logger.info(f'Properties: "{props}"') # store the last receive time stamp for better position estimation if 'position' in props: props['_received'] = time.time() # ignore "internal" properties, starting with "_" changed_properties = {} for key, value in props.items(): if key.startswith('_'): continue if not key in self._properties: changed_properties[key] = [None, value] elif value != self._properties[key]: changed_properties[key] = [self._properties[key], value] for key, value in self._properties.items(): if key.startswith('_'): continue if not key in props: changed_properties[key] = [value, None] self._properties = props logger.info(f'Changed properties: "{changed_properties}"') for key, value in changed_properties.items(): if key in property_mapping: self._dbus_service.update_property( 'org.mpris.MediaPlayer2.Player', property_mapping[key]) except Exception as e: logger.error(f'Error in update_properties: {str(e)}') def on_ws_message(self, ws, message): logger.info(f'Snapcast RPC websocket message received: {message}') jmsg = json.loads(message) if 'id' in jmsg: id = jmsg['id'] if id in self._request_map: request = self._request_map[id] del self._request_map[id] logger.info(f'Received response to {request}') if request == 'Server.GetStatus': self._stream_id = self.__get_stream_id_from_server_status( jmsg['result'], self._params['client']) logger.info(f'Stream id: {self._stream_id}') for stream in jmsg['result']['server']['streams']: if stream['id'] == self._stream_id: if 'meta' in stream: self.__update_metadata(stream['meta']) if 'properties' in stream: self.__update_properties(stream['properties']) break elif jmsg['method'] == "Server.OnUpdate": self._stream_id = self.__get_stream_id_from_server_status( jmsg['params'], self._params['client']) logger.info(f'Stream id: {self._stream_id}') elif jmsg['method'] == "Group.OnStreamChanged": self.send_request("Server.GetStatus") elif jmsg["method"] == "Stream.OnMetadata": stream_id = jmsg["params"]["id"] logger.info(f'Stream meta changed for "{stream_id}"') if self._stream_id != stream_id: return meta = jmsg["params"]["meta"] self.__update_metadata(meta) elif jmsg["method"] == "Stream.OnProperties": stream_id = jmsg["params"]["id"] logger.info( f'Stream properties changed for "{stream_id}"') if self._stream_id != stream_id: return props = jmsg["params"]["properties"] self.__update_properties(props) def on_ws_error(self, ws, error): logger.error("Snapcast RPC websocket error") logger.error(error) def on_ws_open(self, ws): logger.info("Snapcast RPC websocket opened") # Export our DBUS service if not self._dbus_service: self._dbus_service = MPRISInterface(self._params) else: # Add our service to the session bus # self._dbus_service.add_to_connection(dbus.SessionBus(), # '/org/mpris/MediaPlayer2') self._dbus_service.acquire_name() self.send_request("Server.GetStatus") def on_ws_close(self, ws): logger.info("Snapcast RPC websocket closed") if self._dbus_service is not None: self._dbus_service.release_name() # self._dbus_service.remove_from_connection() def send_request(self, method, params=None): j = {"id": self._req_id, "jsonrpc": "2.0", "method": str(method)} if not params is None: j["params"] = params logger.info(f'send_request: {j}') result = self._req_id self._request_map[result] = str(method) self._req_id += 1 self.websocket.send(json.dumps(j)) return result def stop(self): self.websocket.keep_running = False logger.info("Waiting for websocket thread to exit") # self.websocket_thread.join() def mediakey_callback(self, appname, key): """ GNOME media key handler """ logger.debug('Got GNOME mmkey "%s" for "%s"' % (key, appname)) # if key == 'Play': # if self._status['state'] == 'play': # self.client.pause(1) # self.notify_about_state('pause') # else: # self.play() # self.notify_about_state('play') # elif key == 'Next': # self.client.next() # elif key == 'Previous': # self.client.previous() # elif key == 'Stop': # self.client.stop() # self.notify_about_state('stop') # def last_currentsong(self): # return self._currentsong.copy() def control(self, command, params={}): self.send_request("Stream.Control", { "id": self._stream_id, "command": command, "params": params}) def set_property(self, property, value): logger.info(f'set_property {property} = {value}') self.send_request("Stream.SetProperty", { "id": self._stream_id, "property": property, "value": value}) @property def metadata(self): return self._metadata @property def properties(self): return self._properties def position(self): logger.debug( f'Position props: {self._properties}, meta: {self._metadata}') if not 'position' in self._properties: return 0 if not 'mpris:length' in self._metadata: return 0 position = self._properties['position'] if '_received' in self._properties: position += (time.time() - self._properties['_received']) return position * 1000000 def property(self, name, default): return self._properties.get(name, default) def notify_about_track(self, meta, state='play'): uri = 'sound' # if 'mpris:artUrl' in meta: # uri = meta['mpris:artUrl'] title = 'Unknown Title' if 'xesam:title' in meta: title = meta['xesam:title'] elif 'xesam:url' in meta: title = meta['xesam:url'].split('/')[-1] artist = ['Unknown Artist'] if 'xesam:artist' in meta: artist = ", ".join(meta['xesam:artist']) body = _('by %s') % artist if state == 'pause': uri = 'media-playback-pause-symbolic' body += ' (%s)' % _('Paused') notification.notify(title, body, uri) def notify_about_state(self, state): if state == 'stop': notification.notify(identity, _('Stopped'), 'media-playback-stop-symbolic') else: self.notify_about_track(self.metadata, state) # if 'elapsed' in new_status: # new_position = float(new_status['elapsed']) # elif 'time' in new_status: # new_position = int(new_status['time'].split(':')[0]) # else: # new_position = 0 # self._position = new_position # if not force: # if new_status['state'] == 'play': # expected_position = old_position + (new_time - old_time) # else: # expected_position = old_position # if abs(new_position - expected_position) > 0.6: # logger.debug("Expected pos %r, actual %r, diff %r" % ( # expected_position, new_position, new_position - expected_position)) # logger.debug("Old position was %r at %r (%r seconds ago)" % ( # old_position, old_time, new_time - old_time)) # self._dbus_service.Seeked(new_position * 1000000) # Media keys def setup_mediakeys(self): self.register_mediakeys() self._dbus_obj = self._bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus") self._dbus_obj.connect_to_signal("NameOwnerChanged", self.gsd_name_owner_changed_callback, arg0="org.gnome.SettingsDaemon") def register_mediakeys(self): try: try: gsd_object = self._bus.get_object("org.gnome.SettingsDaemon.MediaKeys", "/org/gnome/SettingsDaemon/MediaKeys") except: # Try older name. gsd_object = self._bus.get_object("org.gnome.SettingsDaemon", "/org/gnome/SettingsDaemon/MediaKeys") gsd_object.GrabMediaPlayerKeys("mpDris2", 0, dbus_interface="org.gnome.SettingsDaemon.MediaKeys") except: logger.warning( "Failed to connect to GNOME Settings Daemon. Media keys won't work.") else: self._bus.remove_signal_receiver(self.mediakey_callback) gsd_object.connect_to_signal( "MediaPlayerKeyPressed", self.mediakey_callback) def gsd_name_owner_changed_callback(self, bus_name, old_owner, new_owner): if bus_name == "org.gnome.SettingsDaemon" and new_owner != "": def reregister(): logger.debug( "Re-registering with GNOME Settings Daemon (owner %s)" % new_owner) self.register_mediakeys() return False # Timeout is necessary since g-s-d takes some time to load all plugins. GLib.timeout_add(600, reregister) # Compatibility functions class NotifyWrapper(object): def __init__(self, params): self._last_notification = None self._notification = None self._enabled = True if params["notify"]: self._notification = self._bootstrap_notifications() if not self._notification: logger.error( "No notification service provider could be found; disabling notifications") else: self._enabled = False def _bootstrap_notifications(self): # Check if someone is providing the notification service bus = dbus.SessionBus() try: bus.get_name_owner("org.freedesktop.Notifications") except dbus.exceptions.DBusException: return None notif = None # Bootstrap whatever notifications system we are using if using_gi_notify: logger.debug("Initializing GObject.Notify") if Notify.init(identity): notif = Notify.Notification() notif.set_hint("desktop-entry", GLib.Variant("s", "mpdris2")) notif.set_hint("transient", GLib.Variant("b", True)) else: logger.error( "Failed to init libnotify; disabling notifications") elif using_old_notify: logger.debug("Initializing old pynotify") if pynotify.init(identity): notif = pynotify.Notification("", "", "") notif.set_hint("desktop-entry", "mpdris2") notif.set_hint("transient", True) else: logger.error( "Failed to init libnotify; disabling notifications") return notif def notify(self, title, body, uri=''): if not self._enabled: return if self._last_notification == [title, body, uri]: return self._last_notification = [title, body, uri] # If we did not yet manage to get a notification service, # try again if not self._notification: logger.info( 'Retrying to acquire a notification service provider...') self._notification = self._bootstrap_notifications() if self._notification: logger.info('Notification service provider acquired!') if self._notification: try: self._notification.set_urgency(params['notify_urgency']) self._notification.update(title, body, uri) self._notification.show() except GLib.GError as err: logger.error("Failed to show notification: %s" % err) class MPRISInterface(dbus.service.Object): ''' The base object of an MPRIS player ''' __path = "/org/mpris/MediaPlayer2" __introspect_interface = "org.freedesktop.DBus.Introspectable" __prop_interface = dbus.PROPERTIES_IFACE def __init__(self, params): dbus.service.Object.__init__(self, dbus.SessionBus(), MPRISInterface.__path) self._params = params or {} self._name = self._params["bus_name"] or "org.mpris.MediaPlayer2.snapcast" if not self._name.startswith("org.mpris.MediaPlayer2."): logger.warn( "Configured bus name %r is outside MPRIS2 namespace" % self._name) self._bus = dbus.SessionBus() self._uname = self._bus.get_unique_name() self._dbus_obj = self._bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus") self._dbus_obj.connect_to_signal("NameOwnerChanged", self._name_owner_changed_callback, arg0=self._name) self.acquire_name() def _name_owner_changed_callback(self, name, old_owner, new_owner): if name == self._name and old_owner == self._uname and new_owner != "": try: pid = self._dbus_obj.GetConnectionUnixProcessID(new_owner) except: pid = None logger.info("Replaced by %s (PID %s)" % (new_owner, pid or "unknown")) loop.quit() def acquire_name(self): self._bus_name = dbus.service.BusName(self._name, bus=self._bus, allow_replacement=True, replace_existing=True) def release_name(self): if hasattr(self, "_bus_name"): del self._bus_name __root_interface = "org.mpris.MediaPlayer2" __root_props = { "CanQuit": (False, None), "CanRaise": (True, None), "DesktopEntry": ("mpdris2", None), "HasTrackList": (False, None), "Identity": (identity, None), "SupportedUriSchemes": (dbus.Array(signature="s"), None), "SupportedMimeTypes": (dbus.Array(signature="s"), None) } def __get_playback_status(): status = snapcast_wrapper.property('playbackStatus', 'stopped') logger.debug(f'get_playback_status "{status}"') return {'playing': 'Playing', 'paused': 'Paused', 'stopped': 'Stopped'}[status] def __set_loop_status(value): logger.debug(f'set_loop_status "{value}"') snapcast_wrapper.set_property( 'loopStatus', {'None': 'none', 'Track': 'track', 'Playlist': 'playlist'}[value]) return def __get_loop_status(): status = snapcast_wrapper.property('loopStatus', 'none') logger.debug(f'get_loop_status "{status}"') return {'none': 'None', 'track': 'Track', 'playlist': 'Playlist'}[status] def __set_shuffle(value): logger.debug(f'set_shuffle "{value}"') snapcast_wrapper.set_property('shuffle', bool(value)) return def __get_shuffle(): shuffle = snapcast_wrapper.property('shuffle', False) logger.debug(f'get_shuffle "{shuffle}"') return shuffle def __get_metadata(): logger.debug(f'get_metadata: {snapcast_wrapper.metadata}') return dbus.Dictionary(snapcast_wrapper.metadata, signature='sv') def __get_volume(): volume = snapcast_wrapper.property('volume', 100) logger.debug(f'get_volume "{volume}"') if volume > 0: return volume / 100.0 else: return 0.0 def __set_volume(value): logger.debug(f'set_voume: {value}') if value >= 0 and value <= 1: snapcast_wrapper.set_property('volume', int(value * 100)) return def __get_position(): position = snapcast_wrapper.position() logger.debug(f'get_position: {position}') return dbus.Int64(position) def __get_can_go_next(): can_go_next = snapcast_wrapper.property('canGoNext', False) logger.debug(f'get_can_go_next "{can_go_next}"') return can_go_next def __get_can_go_previous(): can_go_previous = snapcast_wrapper.property('canGoPrevious', False) logger.debug(f'get_can_go_previous "{can_go_previous}"') return can_go_previous def __get_can_play(): can_play = snapcast_wrapper.property('canPlay', False) logger.debug(f'get_can_play "{can_play}"') return can_play def __get_can_pause(): can_pause = snapcast_wrapper.property('canPause', False) logger.debug(f'get_can_pause "{can_pause}"') return can_pause def __get_can_seek(): can_seek = snapcast_wrapper.property('canSeek', False) logger.debug(f'get_can_seek "{can_seek}"') return can_seek def __get_can_control(): can_control = snapcast_wrapper.property('canControl', False) logger.debug(f'get_can_control "{can_control}"') return can_control __player_interface = "org.mpris.MediaPlayer2.Player" __player_props = { "PlaybackStatus": (__get_playback_status, None), "LoopStatus": (__get_loop_status, __set_loop_status), "Rate": (1.0, None), "Shuffle": (__get_shuffle, __set_shuffle), "Metadata": (__get_metadata, None), "Volume": (__get_volume, __set_volume), "Position": (__get_position, None), "MinimumRate": (1.0, None), "MaximumRate": (1.0, None), "CanGoNext": (__get_can_go_next, None), "CanGoPrevious": (__get_can_go_previous, None), "CanPlay": (__get_can_play, None), "CanPause": (__get_can_pause, None), "CanSeek": (__get_can_seek, None), "CanControl": (__get_can_control, None), } __prop_mapping = { __player_interface: __player_props, __root_interface: __root_props, } @ dbus.service.method(__introspect_interface) def Introspect(self): return MPRIS2_INTROSPECTION @ dbus.service.signal(__prop_interface, signature="sa{sv}as") def PropertiesChanged(self, interface, changed_properties, invalidated_properties): pass @ dbus.service.method(__prop_interface, in_signature="ss", out_signature="v") def Get(self, interface, prop): getter, setter = self.__prop_mapping[interface][prop] if callable(getter): return getter() return getter @ dbus.service.method(__prop_interface, in_signature="ssv", out_signature="") def Set(self, interface, prop, value): getter, setter = self.__prop_mapping[interface][prop] if setter is not None: setter(value) @ dbus.service.method(__prop_interface, in_signature="s", out_signature="a{sv}") def GetAll(self, interface): read_props = {} props = self.__prop_mapping[interface] for key, (getter, setter) in props.items(): if callable(getter): getter = getter() read_props[key] = getter return read_props def update_property(self, interface, prop): getter, setter = self.__prop_mapping[interface][prop] if callable(getter): value = getter() else: value = getter logger.debug('Updated property: %s = %s' % (prop, value)) self.PropertiesChanged(interface, {prop: value}, []) return value # Root methods @ dbus.service.method(__root_interface, in_signature='', out_signature='') def Raise(self): logger.debug('Raise') webbrowser.open(url=f'http://{params["host"]}:{params["port"]}', new=1) return @ dbus.service.method(__root_interface, in_signature='', out_signature='') def Quit(self): logger.debug('Quit') return # Player methods @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Next(self): snapcast_wrapper.control("Next") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Previous(self): snapcast_wrapper.control("Previous") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Pause(self): snapcast_wrapper.control("Pause") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def PlayPause(self): snapcast_wrapper.control("PlayPause") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Stop(self): snapcast_wrapper.control("Stop") return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def Play(self): snapcast_wrapper.control("Play") return @ dbus.service.method(__player_interface, in_signature='x', out_signature='') def Seek(self, offset): logger.debug(f'Seek {offset}') snapcast_wrapper.control("Seek", {"Offset": offset}) # status = mpd_wrapper.status() # current, end = status['time'].split(':') # current = int(current) # end = int(end) # offset = int(offset) / 1000000 # if current + offset <= end: # position = current + offset # if position < 0: # position = 0 # mpd_wrapper.seekid(int(status['songid']), position) # self.Seeked(position * 1000000) return @ dbus.service.method(__player_interface, in_signature='ox', out_signature='') def SetPosition(self, trackid, position): logger.debug(f'SetPosition TrackId: {trackid}, Position: {position}') snapcast_wrapper.control( "SetPosition", {"TrackId": trackid, "Position": position}) self.Seeked(position) # song = mpd_wrapper.last_currentsong() # # FIXME: use real dbus objects # if str(trackid) != '/org/mpris/MediaPlayer2/Track/%s' % song['id']: # return # # Convert position to seconds # position = int(position) / 1000000 # if position <= int(song['time']): # mpd_wrapper.seekid(int(song['id']), position) # self.Seeked(position * 1000000) return @ dbus.service.method(__player_interface, in_signature='', out_signature='') def OpenUri(self): logger.debug('OpenUri') # TODO return # Player signals @ dbus.service.signal(__player_interface, signature='x') def Seeked(self, position): logger.debug(f'Seeked to {position}') snapcast_wrapper.properties['position'] = float(position) / 1000000 return float(position) def __get_client_from_server_status(status): client_id = None last_seen = 0 try: for group in status['result']['server']['groups']: for client in group['clients']: if client['host']['name'] == hostname: active = client["connected"] logger.info( f'Client with id "{client["id"]}" active: {active}') if int(client["lastSeen"]["sec"]) > last_seen: client_id = client['id'] last_seen = int(client["lastSeen"]["sec"]) if active: return client_id except: logger.error('Failed to parse server status') return client_id def usage(params): print("""\ Usage: %(progname)s [OPTION]... -c, --config=PATH Read a custom configuration file -h, --host=ADDR Set the mpd server address --port=PORT Set the TCP port --client=ID Set the client id -d, --debug Run in debug mode -j, --use-journal Log to systemd journal instead of stderr -v, --version mpDris2 version Environment variables MPD_HOST and MPD_PORT can be used. Report bugs to https://github.com/eonpatapon/mpDris2/issues""" % params) if __name__ == '__main__': DBusGMainLoop(set_as_default=True) # TODO: # -cleanup: remove mpd-ish stuff # -stream id: keep track of the client's stream gettext.bindtextdomain('mpDris2', '@datadir@/locale') gettext.textdomain('mpDris2') log_format_stderr = '%(asctime)s %(module)s %(levelname)s: %(message)s' log_journal = False log_level = logging.INFO config_file = None # Parse command line try: (opts, args) = getopt.getopt(sys.argv[1:], 'c:dh:jp:v', ['help', 'bus-name=', 'config=', 'debug', 'host=', 'client=' 'use-journal', 'path=', 'port=', 'version']) except getopt.GetoptError as ex: (msg, opt) = ex.args print("%s: %s" % (sys.argv[0], msg), file=sys.stderr) print(file=sys.stderr) usage(params) sys.exit(2) for (opt, arg) in opts: if opt in ['--help']: usage(params) sys.exit() elif opt in ['--bus-name']: params['bus_name'] = arg elif opt in ['-c', '--config']: config_file = arg elif opt in ['-d', '--debug']: log_level = logging.DEBUG elif opt in ['-h', '--host']: params['host'] = arg elif opt in ['-j', '--use-journal']: log_journal = True elif opt in ['--port']: params['port'] = int(arg) elif opt in ['--client']: params['client'] = int(arg) elif opt in ['-v', '--version']: v = __version__ if __git_version__: v = __git_version__ print("mpDris2 version %s" % v) sys.exit() if len(args) > 2: usage(params) sys.exit() logger = logging.getLogger('snapcast_mpris') logger.propagate = False logger.setLevel(log_level) # Attempt to configure systemd journal logging, if enabled if log_journal: try: from systemd.journal import JournalHandler log_handler = JournalHandler(SYSLOG_IDENTIFIER='snapcast_mpris') except ImportError: log_journal = False # Log to stderr if journal logging was not enabled, or if setup failed if not log_journal: log_handler = logging.StreamHandler() log_handler.setFormatter(logging.Formatter(log_format_stderr)) logger.addHandler(log_handler) # Pick up the server address (argv -> environment -> config) for arg in args[:2]: if arg.isdigit(): params['port'] = arg else: params['host'] = arg for p in ['host', 'port', 'password', 'bus_name', 'client']: if not params[p]: params[p] = defaults[p] params['host'] = os.path.expanduser(params['host']) # for p in ['mmkeys', 'notify']: # if config.has_option('Bling', p): # params[p] = config.getboolean('Bling', p) # if config.has_option('Bling', 'notify_urgency'): # params['notify_urgency'] = int(config.get('Bling', 'notify_urgency')) if params['client'] is None: hostname = socket.gethostname() logger.info( f'No client id specified, trying to find a client running on host "{hostname}"') resp = requests.post(f'http://{params["host"]}:{params["port"]}/jsonrpc', json={ "id": 1, "jsonrpc": "2.0", "method": "Server.GetStatus"}) if resp.ok: params['client'] = __get_client_from_server_status( json.loads(resp.text)) if params['client'] is None: logger.error('Client not found or not configured') sys.exit() logger.info(f'Client: {params["client"]}') logger.debug(f'Parameters: {params}') if mutagen: logger.info('Using Mutagen to read covers from music files.') else: logger.info( 'Mutagen not available, covers in music files will be ignored.') # Set up the main loop if using_gi_glib: logger.debug('Using GObject-Introspection main loop.') else: logger.debug('Using legacy pygobject2 main loop.') loop = GLib.MainLoop() # Wrapper to send notifications notification = NotifyWrapper(params) # Create wrapper to handle connection failures with MPD more gracefully # wrapper = SnapcastRpcWebsocketWrapper("127.0.0.1", 1780, "id", any) snapcast_wrapper = SnapcastWrapper(params) # mpd_wrapper.run() # Run idle loop try: loop.run() except KeyboardInterrupt: logger.debug('Caught SIGINT, exiting.') # Clean up # try: # mpd_wrapper.client.close() # mpd_wrapper.client.disconnect() # logger.debug('Exiting') # except mpd.ConnectionError: # logger.error('Failed to disconnect properly')