diff --git a/CMakeLists.txt b/CMakeLists.txt index 3aad38ce..96d8b424 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.2) -project(snapcast LANGUAGES CXX VERSION 0.20.0) +project(snapcast LANGUAGES CXX VERSION 0.21.0) set(PROJECT_DESCRIPTION "Multiroom client-server audio player") set(PROJECT_URL "https://github.com/badaix/snapcast") diff --git a/changelog.md b/changelog.md index 64d9a646..b2d212ea 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,17 @@ # Snapcast changelog +## Version 0.21.0 + +### Features + +- Server: Support for WebSocket streaming clients + +### Bugfixes + +### General + +_Johannes Pohl Sat, 13 Jun 2020 00:13:37 +0200_ + ## Version 0.20.0 ### Features diff --git a/client/Makefile b/client/Makefile index 416fb343..16ed0a6f 100644 --- a/client/Makefile +++ b/client/Makefile @@ -14,7 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -VERSION = 0.20.0 +VERSION = 0.21.0 BIN = snapclient ifeq ($(TARGET), FREEBSD) diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 77f200f8..24b214b4 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -5,6 +5,7 @@ set(SERVER_SOURCES control_session_http.cpp control_session_ws.cpp snapserver.cpp + server.cpp stream_server.cpp stream_session.cpp stream_session_tcp.cpp diff --git a/server/Makefile b/server/Makefile index e9c9d5cb..1e4ab6e8 100644 --- a/server/Makefile +++ b/server/Makefile @@ -14,7 +14,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -VERSION = 0.20.0 +VERSION = 0.21.0 BIN = snapserver ifeq ($(TARGET), FREEBSD) @@ -44,7 +44,7 @@ endif CXXFLAGS += $(ADD_CFLAGS) -std=c++14 -Wall -Wextra -Wpedantic -Wno-unused-function -DBOOST_ERROR_CODE_HEADER_ONLY -DHAS_FLAC -DHAS_OGG -DHAS_VORBIS -DHAS_VORBIS_ENC -DHAS_OPUS -DVERSION=\"$(VERSION)\" -I. -I.. -I../common LDFLAGS += $(ADD_LDFLAGS) -lvorbis -lvorbisenc -logg -lFLAC -lopus -OBJ = snapserver.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o +OBJ = snapserver.o server.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o ifneq (,$(TARGET)) CXXFLAGS += -D$(TARGET) diff --git a/server/control_session_ws.cpp b/server/control_session_ws.cpp index a5835c76..43534a80 100644 --- a/server/control_session_ws.cpp +++ b/server/control_session_ws.cpp @@ -49,13 +49,13 @@ void ControlSessionWebsocket::start() void ControlSessionWebsocket::stop() { - if (ws_.is_open()) - { - boost::beast::error_code ec; - ws_.close(beast::websocket::close_code::normal, ec); - if (ec) - LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; - } + // if (ws_.is_open()) + // { + // boost::beast::error_code ec; + // ws_.close(beast::websocket::close_code::normal, ec); + // if (ec) + // LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; + // } } diff --git a/server/server.cpp b/server/server.cpp new file mode 100644 index 00000000..ca09a9e7 --- /dev/null +++ b/server/server.cpp @@ -0,0 +1,734 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#include "server.hpp" +#include "common/aixlog.hpp" +#include "config.hpp" +#include "message/client_info.hpp" +#include "message/hello.hpp" +#include "message/stream_tags.hpp" +#include "message/time.hpp" +#include "stream_session_tcp.hpp" +#include + +using namespace std; +using namespace streamreader; + +using json = nlohmann::json; + +static constexpr auto LOG_TAG = "Server"; + +Server::Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings) + : io_context_(io_context), config_timer_(io_context), settings_(serverSettings) +{ +} + + +Server::~Server() = default; + + +void Server::onNewSession(const std::shared_ptr& session) +{ + LOG(INFO, LOG_TAG) << "onNewSession\n"; + streamServer_->addSession(session); +} + + +void Server::onMetaChanged(const PcmStream* pcmStream) +{ + // clang-format off + // Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}} + // clang-format on + + const auto meta = pcmStream->getMeta(); + LOG(DEBUG, LOG_TAG) << "metadata = " << meta->msg.dump(3) << "\n"; + LOG(INFO, LOG_TAG) << "onMetaChanged (" << pcmStream->getName() << ")\n"; + + streamServer_->onMetaChanged(pcmStream, meta); + + // Send meta to all connected clients + json notification = jsonrpcpp::Notification("Stream.OnMetadata", jsonrpcpp::Parameter("id", pcmStream->getId(), "meta", meta->msg)).to_json(); + controlServer_->send(notification.dump(), nullptr); + // cout << "Notification: " << notification.dump() << "\n"; +} + + +void Server::onStateChanged(const PcmStream* pcmStream, const ReaderState& state) +{ + // clang-format off + // Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}} + // clang-format on + LOG(INFO, LOG_TAG) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast(state) << "\n"; + // LOG(INFO, LOG_TAG) << pcmStream->toJson().dump(4); + json notification = jsonrpcpp::Notification("Stream.OnUpdate", jsonrpcpp::Parameter("id", pcmStream->getId(), "stream", pcmStream->toJson())).to_json(); + controlServer_->send(notification.dump(), nullptr); + // cout << "Notification: " << notification.dump() << "\n"; +} + + +void Server::onNewChunk(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) +{ + streamServer_->onNewChunk(pcmStream, pcmStream == streamManager_->getDefaultStream().get(), chunk, duration); +} + + +void Server::onResync(const PcmStream* pcmStream, double ms) +{ + LOG(INFO, LOG_TAG) << "onResync (" << pcmStream->getName() << "): " << ms << " ms\n"; +} + + +void Server::onDisconnect(StreamSession* streamSession) +{ + // notify controllers if not yet done + ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId); + if (!clientInfo || !clientInfo->connected) + return; + + clientInfo->connected = false; + chronos::systemtimeofday(&clientInfo->lastSeen); + saveConfig(); + if (controlServer_ != nullptr) + { + // Check if there is no session of this client left + // Can happen in case of ungraceful disconnect/reconnect or + // in case of a duplicate client id + if (streamServer_->getStreamSession(clientInfo->id) == nullptr) + { + // clang-format off + // Notification: + // {"jsonrpc":"2.0","method":"Client.OnDisconnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":false,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025523,"usec":814067},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}} + // clang-format on + json notification = + jsonrpcpp::Notification("Client.OnDisconnect", jsonrpcpp::Parameter("id", clientInfo->id, "client", clientInfo->toJson())).to_json(); + controlServer_->send(notification.dump()); + // cout << "Notification: " << notification.dump() << "\n"; + } + } +} + + +void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const +{ + try + { + // LOG(INFO, LOG_TAG) << "Server::processRequest method: " << request->method << ", " << "id: " << request->id() << "\n"; + Json result; + + if (request->method().find("Client.") == 0) + { + ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get("id")); + if (clientInfo == nullptr) + throw jsonrpcpp::InternalErrorException("Client not found", request->id()); + + if (request->method() == "Client.GetStatus") + { + // clang-format off + // Request: {"id":8,"jsonrpc":"2.0","method":"Client.GetStatus","params":{"id":"00:21:6a:7d:74:fc"}} + // Response: {"id":8,"jsonrpc":"2.0","result":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026416,"usec":135973},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}}} + // clang-format on + result["client"] = clientInfo->toJson(); + } + else if (request->method() == "Client.SetVolume") + { + // clang-format off + // Request: {"id":8,"jsonrpc":"2.0","method":"Client.SetVolume","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}} + // Response: {"id":8,"jsonrpc":"2.0","result":{"volume":{"muted":false,"percent":74}}} + // Notification: {"jsonrpc":"2.0","method":"Client.OnVolumeChanged","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}} + // clang-format on + + std::lock_guard lock(clientMutex_); + clientInfo->config.volume.fromJson(request->params().get("volume")); + result["volume"] = clientInfo->config.volume.toJson(); + notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged", + jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson()))); + } + else if (request->method() == "Client.SetLatency") + { + // clang-format off + // Request: {"id":7,"jsonrpc":"2.0","method":"Client.SetLatency","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}} + // Response: {"id":7,"jsonrpc":"2.0","result":{"latency":10}} + // Notification: {"jsonrpc":"2.0","method":"Client.OnLatencyChanged","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}} + // clang-format on + int latency = request->params().get("latency"); + if (latency < -10000) + latency = -10000; + else if (latency > settings_.stream.bufferMs) + latency = settings_.stream.bufferMs; + clientInfo->config.latency = latency; //, -10000, settings_.stream.bufferMs); + result["latency"] = clientInfo->config.latency; + notification.reset( + new jsonrpcpp::Notification("Client.OnLatencyChanged", jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency))); + } + else if (request->method() == "Client.SetName") + { + // clang-format off + // Request: {"id":6,"jsonrpc":"2.0","method":"Client.SetName","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}} + // Response: {"id":6,"jsonrpc":"2.0","result":{"name":"Laptop"}} + // Notification: {"jsonrpc":"2.0","method":"Client.OnNameChanged","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}} + // clang-format on + clientInfo->config.name = request->params().get("name"); + result["name"] = clientInfo->config.name; + notification.reset( + new jsonrpcpp::Notification("Client.OnNameChanged", jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name))); + } + else + throw jsonrpcpp::MethodNotFoundException(request->id()); + + + if (request->method().find("Client.Set") == 0) + { + /// Update client + session_ptr session = streamServer_->getStreamSession(clientInfo->id); + if (session != nullptr) + { + auto serverSettings = make_shared(); + serverSettings->setBufferMs(settings_.stream.bufferMs); + serverSettings->setVolume(clientInfo->config.volume.percent); + GroupPtr group = Config::instance().getGroupFromClient(clientInfo); + serverSettings->setMuted(clientInfo->config.volume.muted || group->muted); + serverSettings->setLatency(clientInfo->config.latency); + session->send(serverSettings); + } + } + } + else if (request->method().find("Group.") == 0) + { + GroupPtr group = Config::instance().getGroup(request->params().get("id")); + if (group == nullptr) + throw jsonrpcpp::InternalErrorException("Group not found", request->id()); + + if (request->method() == "Group.GetStatus") + { + // clang-format off + // Request: {"id":5,"jsonrpc":"2.0","method":"Group.GetStatus","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}} + // Response: {"id":5,"jsonrpc":"2.0","result":{"group":{"clients":[{"config":{"instance":2,"latency":10,"name":"Laptop","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488026485,"usec":644997},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026481,"usec":223747},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":true,"name":"","stream_id":"stream 1"}}} + // clang-format on + result["group"] = group->toJson(); + } + else if (request->method() == "Group.SetName") + { + // clang-format off + // Request: {"id":6,"jsonrpc":"2.0","method":"Group.SetName","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","name":"Laptop"}} + // Response: {"id":6,"jsonrpc":"2.0","result":{"name":"MediaPlayer"}} + // Notification: {"jsonrpc":"2.0","method":"Group.OnNameChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","MediaPlayer":"Laptop"}} + // clang-format on + group->name = request->params().get("name"); + result["name"] = group->name; + notification.reset(new jsonrpcpp::Notification("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name))); + } + else if (request->method() == "Group.SetMute") + { + // clang-format off + // Request: {"id":5,"jsonrpc":"2.0","method":"Group.SetMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}} + // Response: {"id":5,"jsonrpc":"2.0","result":{"mute":true}} + // Notification: {"jsonrpc":"2.0","method":"Group.OnMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}} + // clang-format on + bool muted = request->params().get("mute"); + group->muted = muted; + + /// Update clients + for (auto client : group->clients) + { + session_ptr session = streamServer_->getStreamSession(client->id); + if (session != nullptr) + { + auto serverSettings = make_shared(); + serverSettings->setBufferMs(settings_.stream.bufferMs); + serverSettings->setVolume(client->config.volume.percent); + GroupPtr group = Config::instance().getGroupFromClient(client); + serverSettings->setMuted(client->config.volume.muted || group->muted); + serverSettings->setLatency(client->config.latency); + session->send(serverSettings); + } + } + + result["mute"] = group->muted; + notification.reset(new jsonrpcpp::Notification("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted))); + } + else if (request->method() == "Group.SetStream") + { + // clang-format off + // Request: {"id":4,"jsonrpc":"2.0","method":"Group.SetStream","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"stream 1"}} + // Notification: {"jsonrpc":"2.0","method":"Group.OnStreamChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}} + // clang-format on + string streamId = request->params().get("stream_id"); + PcmStreamPtr stream = streamManager_->getStream(streamId); + if (stream == nullptr) + throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); + + group->streamId = streamId; + + // Update clients + for (auto client : group->clients) + { + session_ptr session = streamServer_->getStreamSession(client->id); + if (session && (session->pcmStream() != stream)) + { + session->send(stream->getMeta()); + session->send(stream->getHeader()); + session->setPcmStream(stream); + } + } + + // Notify others + result["stream_id"] = group->streamId; + notification.reset(new jsonrpcpp::Notification("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId))); + } + else if (request->method() == "Group.SetClients") + { + // clang-format off + // Request: {"id":3,"jsonrpc":"2.0","method":"Group.SetClients","params":{"clients":["00:21:6a:7d:74:fc#2","00:21:6a:7d:74:fc"],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}} + // Response: {"id":3,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} + // Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} + // clang-format on + vector clients = request->params().get("clients"); + // Remove clients from group + for (auto iter = group->clients.begin(); iter != group->clients.end();) + { + auto client = *iter; + if (find(clients.begin(), clients.end(), client->id) != clients.end()) + { + ++iter; + continue; + } + iter = group->clients.erase(iter); + GroupPtr newGroup = Config::instance().addClientInfo(client); + newGroup->streamId = group->streamId; + } + + // Add clients to group + PcmStreamPtr stream = streamManager_->getStream(group->streamId); + for (const auto& clientId : clients) + { + ClientInfoPtr client = Config::instance().getClientInfo(clientId); + if (!client) + continue; + GroupPtr oldGroup = Config::instance().getGroupFromClient(client); + if (oldGroup && (oldGroup->id == group->id)) + continue; + + if (oldGroup) + { + oldGroup->removeClient(client); + Config::instance().remove(oldGroup); + } + + group->addClient(client); + + // assign new stream + session_ptr session = streamServer_->getStreamSession(client->id); + if (session && stream && (session->pcmStream() != stream)) + { + session->send(stream->getMeta()); + session->send(stream->getHeader()); + session->setPcmStream(stream); + } + } + + if (group->empty()) + Config::instance().remove(group); + + json server = Config::instance().getServerStatus(streamManager_->toJson()); + result["server"] = server; + + // Notify others: since at least two groups are affected, send a complete server update + notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server))); + } + else + throw jsonrpcpp::MethodNotFoundException(request->id()); + } + else if (request->method().find("Server.") == 0) + { + if (request->method().find("Server.GetRPCVersion") == 0) + { + // Request: {"id":8,"jsonrpc":"2.0","method":"Server.GetRPCVersion"} + // Response: {"id":8,"jsonrpc":"2.0","result":{"major":2,"minor":0,"patch":0}} + // : backwards incompatible change + result["major"] = 2; + // : feature addition to the API + result["minor"] = 0; + // : bugfix release + result["patch"] = 0; + } + else if (request->method() == "Server.GetStatus") + { + // clang-format off + // Request: {"id":1,"jsonrpc":"2.0","method":"Server.GetStatus"} + // Response: {"id":1,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025696,"usec":578142},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025696,"usec":611255},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} + // clang-format on + result["server"] = Config::instance().getServerStatus(streamManager_->toJson()); + } + else if (request->method() == "Server.DeleteClient") + { + // clang-format off + // Request: {"id":2,"jsonrpc":"2.0","method":"Server.DeleteClient","params":{"id":"00:21:6a:7d:74:fc"}} + // Response: {"id":2,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} + // Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} + // clang-format on + ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get("id")); + if (clientInfo == nullptr) + throw jsonrpcpp::InternalErrorException("Client not found", request->id()); + + Config::instance().remove(clientInfo); + + json server = Config::instance().getServerStatus(streamManager_->toJson()); + result["server"] = server; + + /// Notify others + notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server))); + } + else + throw jsonrpcpp::MethodNotFoundException(request->id()); + } + else if (request->method().find("Stream.") == 0) + { + if (request->method().find("Stream.SetMeta") == 0) + { + // clang-format off + // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.SetMeta","params":{"id":"Spotify", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} + // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications + // clang-format on + + LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get("id") << ")" << request->params().get("meta") << "\n"; + + // Find stream + string streamId = request->params().get("id"); + PcmStreamPtr stream = streamManager_->getStream(streamId); + if (stream == nullptr) + throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); + + // Set metadata from request + stream->setMeta(request->params().get("meta")); + + // Setup response + result["id"] = streamId; + } + else if (request->method() == "Stream.AddStream") + { + // clang-format off + // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.AddStream","params":{"streamUri":"uri"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} + // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications + // clang-format on + + LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")" + << "\n"; + + // Find stream + string streamUri = request->params().get("streamUri"); + PcmStreamPtr stream = streamManager_->addStream(streamUri); + if (stream == nullptr) + throw jsonrpcpp::InternalErrorException("Stream not created", request->id()); + stream->start(); // We start the stream, otherwise it would be silent + // Setup response + result["id"] = stream->getId(); + } + else if (request->method() == "Stream.RemoveStream") + { + // clang-format off + // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.RemoveStream","params":{"id":"Spotify"}} + // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} + // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications + // clang-format on + + LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")" + << "\n"; + + // Find stream + string streamId = request->params().get("id"); + streamManager_->removeStream(streamId); + // Setup response + result["id"] = streamId; + } + else + throw jsonrpcpp::MethodNotFoundException(request->id()); + } + else + throw jsonrpcpp::MethodNotFoundException(request->id()); + + response.reset(new jsonrpcpp::Response(*request, result)); + } + catch (const jsonrpcpp::RequestException& e) + { + LOG(ERROR, LOG_TAG) << "Server::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n"; + response.reset(new jsonrpcpp::RequestException(e)); + } + catch (const exception& e) + { + LOG(ERROR, LOG_TAG) << "Server::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n"; + response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id())); + } +} + + +std::string Server::onMessageReceived(ControlSession* controlSession, const std::string& message) +{ + // LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n"; + jsonrpcpp::entity_ptr entity(nullptr); + try + { + entity = jsonrpcpp::Parser::do_parse(message); + if (!entity) + return ""; + } + catch (const jsonrpcpp::ParseErrorException& e) + { + return e.to_json().dump(); + } + catch (const std::exception& e) + { + return jsonrpcpp::ParseErrorException(e.what()).to_json().dump(); + } + + jsonrpcpp::entity_ptr response(nullptr); + jsonrpcpp::notification_ptr notification(nullptr); + if (entity->is_request()) + { + jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); + processRequest(request, response, notification); + saveConfig(); + ////cout << "Request: " << request->to_json().dump() << "\n"; + if (notification) + { + ////cout << "Notification: " << notification->to_json().dump() << "\n"; + controlServer_->send(notification->to_json().dump(), controlSession); + } + if (response) + { + ////cout << "Response: " << response->to_json().dump() << "\n"; + return response->to_json().dump(); + } + return ""; + } + else if (entity->is_batch()) + { + jsonrpcpp::batch_ptr batch = dynamic_pointer_cast(entity); + ////cout << "Batch: " << batch->to_json().dump() << "\n"; + jsonrpcpp::Batch responseBatch; + jsonrpcpp::Batch notificationBatch; + for (const auto& batch_entity : batch->entities) + { + if (batch_entity->is_request()) + { + jsonrpcpp::request_ptr request = dynamic_pointer_cast(batch_entity); + processRequest(request, response, notification); + if (response != nullptr) + responseBatch.add_ptr(response); + if (notification != nullptr) + notificationBatch.add_ptr(notification); + } + } + saveConfig(); + if (!notificationBatch.entities.empty()) + controlServer_->send(notificationBatch.to_json().dump(), controlSession); + if (!responseBatch.entities.empty()) + return responseBatch.to_json().dump(); + return ""; + } + return ""; +} + + + +void Server::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer) +{ + LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id + << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec + << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; + if (baseMessage.type == message_type::kTime) + { + auto timeMsg = make_shared(); + timeMsg->deserialize(baseMessage, buffer); + timeMsg->refersTo = timeMsg->id; + timeMsg->latency = timeMsg->received - timeMsg->sent; + // LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; + streamSession->send(timeMsg); + + // refresh streamSession state + ClientInfoPtr client = Config::instance().getClientInfo(streamSession->clientId); + if (client != nullptr) + { + chronos::systemtimeofday(&client->lastSeen); + client->connected = true; + } + } + else if (baseMessage.type == message_type::kClientInfo) + { + ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId); + if (clientInfo == nullptr) + { + LOG(ERROR, LOG_TAG) << "client not found: " << streamSession->clientId << "\n"; + return; + } + msg::ClientInfo infoMsg; + infoMsg.deserialize(baseMessage, buffer); + + clientInfo->config.volume.percent = infoMsg.getVolume(); + clientInfo->config.volume.muted = infoMsg.isMuted(); + jsonrpcpp::notification_ptr notification = make_shared( + "Client.OnVolumeChanged", jsonrpcpp::Parameter("id", streamSession->clientId, "volume", clientInfo->config.volume.toJson())); + controlServer_->send(notification->to_json().dump()); + } + else if (baseMessage.type == message_type::kHello) + { + msg::Hello helloMsg; + helloMsg.deserialize(baseMessage, buffer); + streamSession->clientId = helloMsg.getUniqueId(); + LOG(INFO, LOG_TAG) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() + << ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch() + << ", Protocol version: " << helloMsg.getProtocolVersion() << "\n"; + + bool newGroup(false); + GroupPtr group = Config::instance().getGroupFromClient(streamSession->clientId); + if (group == nullptr) + { + group = Config::instance().addClientInfo(streamSession->clientId); + newGroup = true; + } + + ClientInfoPtr client = group->getClient(streamSession->clientId); + + LOG(DEBUG, LOG_TAG) << "Sending ServerSettings to " << streamSession->clientId << "\n"; + auto serverSettings = make_shared(); + serverSettings->setVolume(client->config.volume.percent); + serverSettings->setMuted(client->config.volume.muted || group->muted); + serverSettings->setLatency(client->config.latency); + serverSettings->setBufferMs(settings_.stream.bufferMs); + serverSettings->refersTo = helloMsg.id; + streamSession->send(serverSettings); + + client->host.mac = helloMsg.getMacAddress(); + client->host.ip = streamSession->getIP(); + client->host.name = helloMsg.getHostName(); + client->host.os = helloMsg.getOS(); + client->host.arch = helloMsg.getArch(); + client->snapclient.version = helloMsg.getVersion(); + client->snapclient.name = helloMsg.getClientName(); + client->snapclient.protocolVersion = helloMsg.getProtocolVersion(); + client->config.instance = helloMsg.getInstance(); + client->connected = true; + chronos::systemtimeofday(&client->lastSeen); + + // Assign and update stream + PcmStreamPtr stream = streamManager_->getStream(group->streamId); + if (!stream) + { + stream = streamManager_->getDefaultStream(); + group->streamId = stream->getId(); + } + LOG(DEBUG, LOG_TAG) << "Group: " << group->id << ", stream: " << group->streamId << "\n"; + + saveConfig(); + + LOG(DEBUG, LOG_TAG) << "Sending meta data to " << streamSession->clientId << "\n"; + streamSession->send(stream->getMeta()); + streamSession->setPcmStream(stream); + auto headerChunk = stream->getHeader(); + LOG(DEBUG, LOG_TAG) << "Sending codec header to " << streamSession->clientId << "\n"; + streamSession->send(headerChunk); + + if (newGroup) + { + // clang-format off + // Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025796,"usec":714671},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"},{"clients":[{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025798,"usec":728305},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"c5da8f7a-f377-1e51-8266-c5cc61099b71","muted":false,"name":"","stream_id":"stream 1"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} + // clang-format on + json server = Config::instance().getServerStatus(streamManager_->toJson()); + json notification = jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)).to_json(); + controlServer_->send(notification.dump()); + } + else + { + // clang-format off + // Notification: {"jsonrpc":"2.0","method":"Client.OnConnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025524,"usec":876332},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}} + // clang-format on + json notification = jsonrpcpp::Notification("Client.OnConnect", jsonrpcpp::Parameter("id", client->id, "client", client->toJson())).to_json(); + controlServer_->send(notification.dump()); + // cout << "Notification: " << notification.dump() << "\n"; + } + // cout << Config::instance().getServerStatus(streamManager_->toJson()).dump(4) << "\n"; + // cout << group->toJson().dump(4) << "\n"; + } +} + + +void Server::saveConfig(const std::chrono::milliseconds& deferred) +{ + config_timer_.cancel(); + config_timer_.expires_after(deferred); + config_timer_.async_wait([](const boost::system::error_code& ec) { + if (!ec) + { + LOG(DEBUG, LOG_TAG) << "Saving config\n"; + Config::instance().save(); + } + }); +} + + +void Server::start() +{ + try + { + controlServer_ = std::make_unique(io_context_, settings_.tcp, settings_.http, this); + streamServer_ = std::make_unique(io_context_, settings_, this); + streamManager_ = + std::make_unique(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamChunkMs); + // throw SnapException("xxx"); + for (const auto& streamUri : settings_.stream.pcmStreams) + { + PcmStreamPtr stream = streamManager_->addStream(streamUri); + if (stream) + LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n"; + } + + streamManager_->start(); + controlServer_->start(); + streamServer_->start(); + } + catch (const std::exception& e) + { + LOG(NOTICE, LOG_TAG) << "Server::start: " << e.what() << endl; + stop(); + throw; + } +} + + +void Server::stop() +{ + if (streamManager_) + { + streamManager_->stop(); + streamManager_ = nullptr; + } + + if (controlServer_) + { + controlServer_->stop(); + controlServer_ = nullptr; + } + + if (streamServer_) + { + streamServer_->stop(); + streamServer_ = nullptr; + } +} diff --git a/server/server.hpp b/server/server.hpp new file mode 100644 index 00000000..f0927fbb --- /dev/null +++ b/server/server.hpp @@ -0,0 +1,106 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef SERVER_HPP +#define SERVER_HPP + +#include +#include +#include +#include +#include +#include + +#include "common/queue.h" +#include "common/sample_format.hpp" +#include "control_server.hpp" +#include "stream_server.hpp" +#include "jsonrpcpp.hpp" +#include "message/codec_header.hpp" +#include "message/message.hpp" +#include "message/server_settings.hpp" +#include "server_settings.hpp" +#include "stream_session.hpp" +#include "streamreader/stream_manager.hpp" + +using namespace streamreader; + +using boost::asio::ip::tcp; +using acceptor_ptr = std::unique_ptr; +using session_ptr = std::shared_ptr; + + +/// Forwars PCM data to the connected clients +/** + * Reads PCM data using PipeStream, implements PcmListener to get the (encoded) PCM stream. + * Accepts and holds client connections (StreamSession) + * Receives (via the MessageReceiver interface) and answers messages from the clients + * Forwards PCM data to the clients + */ +class Server : public MessageReceiver, public ControlMessageReceiver, public PcmListener +{ +public: + Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings); + virtual ~Server(); + + void start(); + void stop(); + + /// Send a message to all connceted clients + // void send(const msg::BaseMessage* message); + + /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived + void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override; + void onDisconnect(StreamSession* connection) override; + + /// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived + std::string onMessageReceived(ControlSession* connection, const std::string& message) override; + // TODO Refactor: ControlServer implements ControlMessageReceiver, calling this one. + void onNewSession(const std::shared_ptr& session) override + { + std::ignore = session; + }; + void onNewSession(const std::shared_ptr& session) override; + + /// Implementation of PcmListener + void onMetaChanged(const PcmStream* pcmStream) override; + void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) override; + void onNewChunk(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) override; + void onResync(const PcmStream* pcmStream, double ms) override; + +private: + void processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; + /// Save the server state deferred to prevent blocking and lower disk io + /// @param deferred the delay after the last call to saveConfig + void saveConfig(const std::chrono::milliseconds& deferred = std::chrono::seconds(2)); + + mutable std::recursive_mutex sessionsMutex_; + mutable std::recursive_mutex clientMutex_; + boost::asio::io_context& io_context_; + boost::asio::steady_timer config_timer_; + + ServerSettings settings_; + Queue> messages_; + std::unique_ptr controlServer_; + std::unique_ptr streamServer_; + std::unique_ptr streamManager_; +}; + + + +#endif diff --git a/server/snapserver.cpp b/server/snapserver.cpp index c9094b74..862390ee 100644 --- a/server/snapserver.cpp +++ b/server/snapserver.cpp @@ -31,7 +31,7 @@ #include "encoder/encoder_factory.hpp" #include "message/message.hpp" #include "server_settings.hpp" -#include "stream_server.hpp" +#include "server.hpp" #if defined(HAS_AVAHI) || defined(HAS_BONJOUR) #include "publishZeroConf/publish_mdns.hpp" #endif @@ -284,8 +284,8 @@ int main(int argc, char* argv[]) settings.stream.bufferMs = 400; } - auto streamServer = std::make_unique(io_context, settings); - streamServer->start(); + auto server = std::make_unique(io_context, settings); + server->start(); if (settings.server.threads < 0) settings.server.threads = std::max(2, std::min(4, static_cast(std::thread::hardware_concurrency()))); @@ -311,7 +311,7 @@ int main(int argc, char* argv[]) t.join(); LOG(INFO) << "Stopping streamServer" << endl; - streamServer->stop(); + server->stop(); LOG(INFO) << "done" << endl; } catch (const std::exception& e) diff --git a/server/stream_server.cpp b/server/stream_server.cpp index 5bee0680..3d6da842 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -33,8 +33,8 @@ using json = nlohmann::json; static constexpr auto LOG_TAG = "StreamServer"; -StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings) - : io_context_(io_context), config_timer_(io_context), settings_(serverSettings) +StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, MessageReceiver* messageReceiver) + : io_context_(io_context), config_timer_(io_context), settings_(serverSettings), messageReceiver_(messageReceiver) { } @@ -54,15 +54,25 @@ void StreamServer::cleanup() } -void StreamServer::onMetaChanged(const PcmStream* pcmStream) +void StreamServer::addSession(const std::shared_ptr& session) +{ + session->setMessageReceiver(this); + session->setBufferMs(settings_.stream.bufferMs); + session->start(); + + std::lock_guard mlock(sessionsMutex_); + sessions_.emplace_back(session); + cleanup(); +} + + +void StreamServer::onMetaChanged(const PcmStream* pcmStream, std::shared_ptr meta) { // clang-format off // Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}} // clang-format on // Send meta to all connected clients - const auto meta = pcmStream->getMeta(); - LOG(DEBUG, LOG_TAG) << "metadata = " << meta->msg.dump(3) << "\n"; std::lock_guard mlock(sessionsMutex_); for (auto s : sessions_) @@ -73,33 +83,15 @@ void StreamServer::onMetaChanged(const PcmStream* pcmStream) session->send(meta); } } - - LOG(INFO, LOG_TAG) << "onMetaChanged (" << pcmStream->getName() << ")\n"; - json notification = jsonrpcpp::Notification("Stream.OnMetadata", jsonrpcpp::Parameter("id", pcmStream->getId(), "meta", meta->msg)).to_json(); - controlServer_->send(notification.dump(), nullptr); - // cout << "Notification: " << notification.dump() << "\n"; -} - -void StreamServer::onStateChanged(const PcmStream* pcmStream, const ReaderState& state) -{ - // clang-format off - // Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}} - // clang-format on - LOG(INFO, LOG_TAG) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast(state) << "\n"; - // LOG(INFO, LOG_TAG) << pcmStream->toJson().dump(4); - json notification = jsonrpcpp::Notification("Stream.OnUpdate", jsonrpcpp::Parameter("id", pcmStream->getId(), "stream", pcmStream->toJson())).to_json(); - controlServer_->send(notification.dump(), nullptr); - // cout << "Notification: " << notification.dump() << "\n"; } -void StreamServer::onNewChunk(const PcmStream* pcmStream, std::shared_ptr chunk, double /*duration*/) +void StreamServer::onNewChunk(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr chunk, double /*duration*/) { // LOG(INFO, LOG_TAG) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n"; - bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get()); - shared_const_buffer buffer(*chunk); + // make a copy of the sessions to avoid that a session get's deleted std::vector> sessions; { std::lock_guard mlock(sessionsMutex_); @@ -137,21 +129,10 @@ void StreamServer::onNewChunk(const PcmStream* pcmStream, std::shared_ptrgetName() << "): " << ms << " ms\n"; -} - - -void StreamServer::onNewSession(const std::shared_ptr& session) -{ - session->setMessageReceiver(this); - session->setBufferMs(settings_.stream.bufferMs); - session->start(); - - std::lock_guard mlock(sessionsMutex_); - sessions_.emplace_back(session); - cleanup(); + if (messageReceiver_) + messageReceiver_->onMessageReceived(streamSession, baseMessage, buffer); } @@ -172,597 +153,12 @@ void StreamServer::onDisconnect(StreamSession* streamSession) }), sessions_.end()); LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n"; - - // notify controllers if not yet done - ClientInfoPtr clientInfo = Config::instance().getClientInfo(session->clientId); - if (!clientInfo || !clientInfo->connected) - return; - - clientInfo->connected = false; - chronos::systemtimeofday(&clientInfo->lastSeen); - saveConfig(); - if (controlServer_ != nullptr) - { - // Check if there is no session of this client is left - // Can happen in case of ungraceful disconnect/reconnect or - // in case of a duplicate client id - if (getStreamSession(clientInfo->id) == nullptr) - { - // clang-format off - // Notification: - // {"jsonrpc":"2.0","method":"Client.OnDisconnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":false,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025523,"usec":814067},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}} - // clang-format on - json notification = - jsonrpcpp::Notification("Client.OnDisconnect", jsonrpcpp::Parameter("id", clientInfo->id, "client", clientInfo->toJson())).to_json(); - controlServer_->send(notification.dump()); - // cout << "Notification: " << notification.dump() << "\n"; - } - } + if (messageReceiver_) + messageReceiver_->onDisconnect(streamSession); cleanup(); } -void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const -{ - try - { - // LOG(INFO, LOG_TAG) << "StreamServer::ProcessRequest method: " << request->method << ", " << "id: " << request->id() << "\n"; - Json result; - - if (request->method().find("Client.") == 0) - { - ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get("id")); - if (clientInfo == nullptr) - throw jsonrpcpp::InternalErrorException("Client not found", request->id()); - - if (request->method() == "Client.GetStatus") - { - // clang-format off - // Request: {"id":8,"jsonrpc":"2.0","method":"Client.GetStatus","params":{"id":"00:21:6a:7d:74:fc"}} - // Response: {"id":8,"jsonrpc":"2.0","result":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026416,"usec":135973},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}}} - // clang-format on - result["client"] = clientInfo->toJson(); - } - else if (request->method() == "Client.SetVolume") - { - // clang-format off - // Request: {"id":8,"jsonrpc":"2.0","method":"Client.SetVolume","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}} - // Response: {"id":8,"jsonrpc":"2.0","result":{"volume":{"muted":false,"percent":74}}} - // Notification: {"jsonrpc":"2.0","method":"Client.OnVolumeChanged","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}} - // clang-format on - - std::lock_guard lock(clientMutex_); - clientInfo->config.volume.fromJson(request->params().get("volume")); - result["volume"] = clientInfo->config.volume.toJson(); - notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged", - jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson()))); - } - else if (request->method() == "Client.SetLatency") - { - // clang-format off - // Request: {"id":7,"jsonrpc":"2.0","method":"Client.SetLatency","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}} - // Response: {"id":7,"jsonrpc":"2.0","result":{"latency":10}} - // Notification: {"jsonrpc":"2.0","method":"Client.OnLatencyChanged","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}} - // clang-format on - int latency = request->params().get("latency"); - if (latency < -10000) - latency = -10000; - else if (latency > settings_.stream.bufferMs) - latency = settings_.stream.bufferMs; - clientInfo->config.latency = latency; //, -10000, settings_.stream.bufferMs); - result["latency"] = clientInfo->config.latency; - notification.reset( - new jsonrpcpp::Notification("Client.OnLatencyChanged", jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency))); - } - else if (request->method() == "Client.SetName") - { - // clang-format off - // Request: {"id":6,"jsonrpc":"2.0","method":"Client.SetName","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}} - // Response: {"id":6,"jsonrpc":"2.0","result":{"name":"Laptop"}} - // Notification: {"jsonrpc":"2.0","method":"Client.OnNameChanged","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}} - // clang-format on - clientInfo->config.name = request->params().get("name"); - result["name"] = clientInfo->config.name; - notification.reset( - new jsonrpcpp::Notification("Client.OnNameChanged", jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name))); - } - else - throw jsonrpcpp::MethodNotFoundException(request->id()); - - - if (request->method().find("Client.Set") == 0) - { - /// Update client - session_ptr session = getStreamSession(clientInfo->id); - if (session != nullptr) - { - auto serverSettings = make_shared(); - serverSettings->setBufferMs(settings_.stream.bufferMs); - serverSettings->setVolume(clientInfo->config.volume.percent); - GroupPtr group = Config::instance().getGroupFromClient(clientInfo); - serverSettings->setMuted(clientInfo->config.volume.muted || group->muted); - serverSettings->setLatency(clientInfo->config.latency); - session->send(serverSettings); - } - } - } - else if (request->method().find("Group.") == 0) - { - GroupPtr group = Config::instance().getGroup(request->params().get("id")); - if (group == nullptr) - throw jsonrpcpp::InternalErrorException("Group not found", request->id()); - - if (request->method() == "Group.GetStatus") - { - // clang-format off - // Request: {"id":5,"jsonrpc":"2.0","method":"Group.GetStatus","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}} - // Response: {"id":5,"jsonrpc":"2.0","result":{"group":{"clients":[{"config":{"instance":2,"latency":10,"name":"Laptop","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488026485,"usec":644997},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026481,"usec":223747},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":true,"name":"","stream_id":"stream 1"}}} - // clang-format on - result["group"] = group->toJson(); - } - else if (request->method() == "Group.SetName") - { - // clang-format off - // Request: {"id":6,"jsonrpc":"2.0","method":"Group.SetName","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","name":"Laptop"}} - // Response: {"id":6,"jsonrpc":"2.0","result":{"name":"MediaPlayer"}} - // Notification: {"jsonrpc":"2.0","method":"Group.OnNameChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","MediaPlayer":"Laptop"}} - // clang-format on - group->name = request->params().get("name"); - result["name"] = group->name; - notification.reset(new jsonrpcpp::Notification("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name))); - } - else if (request->method() == "Group.SetMute") - { - // clang-format off - // Request: {"id":5,"jsonrpc":"2.0","method":"Group.SetMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}} - // Response: {"id":5,"jsonrpc":"2.0","result":{"mute":true}} - // Notification: {"jsonrpc":"2.0","method":"Group.OnMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}} - // clang-format on - bool muted = request->params().get("mute"); - group->muted = muted; - - /// Update clients - for (auto client : group->clients) - { - session_ptr session = getStreamSession(client->id); - if (session != nullptr) - { - auto serverSettings = make_shared(); - serverSettings->setBufferMs(settings_.stream.bufferMs); - serverSettings->setVolume(client->config.volume.percent); - GroupPtr group = Config::instance().getGroupFromClient(client); - serverSettings->setMuted(client->config.volume.muted || group->muted); - serverSettings->setLatency(client->config.latency); - session->send(serverSettings); - } - } - - result["mute"] = group->muted; - notification.reset(new jsonrpcpp::Notification("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted))); - } - else if (request->method() == "Group.SetStream") - { - // clang-format off - // Request: {"id":4,"jsonrpc":"2.0","method":"Group.SetStream","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}} - // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"stream 1"}} - // Notification: {"jsonrpc":"2.0","method":"Group.OnStreamChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}} - // clang-format on - string streamId = request->params().get("stream_id"); - PcmStreamPtr stream = streamManager_->getStream(streamId); - if (stream == nullptr) - throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); - - group->streamId = streamId; - - // Update clients - for (auto client : group->clients) - { - session_ptr session = getStreamSession(client->id); - if (session && (session->pcmStream() != stream)) - { - session->send(stream->getMeta()); - session->send(stream->getHeader()); - session->setPcmStream(stream); - } - } - - // Notify others - result["stream_id"] = group->streamId; - notification.reset(new jsonrpcpp::Notification("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId))); - } - else if (request->method() == "Group.SetClients") - { - // clang-format off - // Request: {"id":3,"jsonrpc":"2.0","method":"Group.SetClients","params":{"clients":["00:21:6a:7d:74:fc#2","00:21:6a:7d:74:fc"],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}} - // Response: {"id":3,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} - // Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} - // clang-format on - vector clients = request->params().get("clients"); - // Remove clients from group - for (auto iter = group->clients.begin(); iter != group->clients.end();) - { - auto client = *iter; - if (find(clients.begin(), clients.end(), client->id) != clients.end()) - { - ++iter; - continue; - } - iter = group->clients.erase(iter); - GroupPtr newGroup = Config::instance().addClientInfo(client); - newGroup->streamId = group->streamId; - } - - // Add clients to group - PcmStreamPtr stream = streamManager_->getStream(group->streamId); - for (const auto& clientId : clients) - { - ClientInfoPtr client = Config::instance().getClientInfo(clientId); - if (!client) - continue; - GroupPtr oldGroup = Config::instance().getGroupFromClient(client); - if (oldGroup && (oldGroup->id == group->id)) - continue; - - if (oldGroup) - { - oldGroup->removeClient(client); - Config::instance().remove(oldGroup); - } - - group->addClient(client); - - // assign new stream - session_ptr session = getStreamSession(client->id); - if (session && stream && (session->pcmStream() != stream)) - { - session->send(stream->getMeta()); - session->send(stream->getHeader()); - session->setPcmStream(stream); - } - } - - if (group->empty()) - Config::instance().remove(group); - - json server = Config::instance().getServerStatus(streamManager_->toJson()); - result["server"] = server; - - // Notify others: since at least two groups are affected, send a complete server update - notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server))); - } - else - throw jsonrpcpp::MethodNotFoundException(request->id()); - } - else if (request->method().find("Server.") == 0) - { - if (request->method().find("Server.GetRPCVersion") == 0) - { - // Request: {"id":8,"jsonrpc":"2.0","method":"Server.GetRPCVersion"} - // Response: {"id":8,"jsonrpc":"2.0","result":{"major":2,"minor":0,"patch":0}} - // : backwards incompatible change - result["major"] = 2; - // : feature addition to the API - result["minor"] = 0; - // : bugfix release - result["patch"] = 0; - } - else if (request->method() == "Server.GetStatus") - { - // clang-format off - // Request: {"id":1,"jsonrpc":"2.0","method":"Server.GetStatus"} - // Response: {"id":1,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025696,"usec":578142},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025696,"usec":611255},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} - // clang-format on - result["server"] = Config::instance().getServerStatus(streamManager_->toJson()); - } - else if (request->method() == "Server.DeleteClient") - { - // clang-format off - // Request: {"id":2,"jsonrpc":"2.0","method":"Server.DeleteClient","params":{"id":"00:21:6a:7d:74:fc"}} - // Response: {"id":2,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} - // Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} - // clang-format on - ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get("id")); - if (clientInfo == nullptr) - throw jsonrpcpp::InternalErrorException("Client not found", request->id()); - - Config::instance().remove(clientInfo); - - json server = Config::instance().getServerStatus(streamManager_->toJson()); - result["server"] = server; - - /// Notify others - notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server))); - } - else - throw jsonrpcpp::MethodNotFoundException(request->id()); - } - else if (request->method().find("Stream.") == 0) - { - if (request->method().find("Stream.SetMeta") == 0) - { - /// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.SetMeta","params":{"id":"Spotify", - /// "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}} - /// - /// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} - /// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications - - LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get("id") << ")" << request->params().get("meta") << "\n"; - - // Find stream - string streamId = request->params().get("id"); - PcmStreamPtr stream = streamManager_->getStream(streamId); - if (stream == nullptr) - throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); - - // Set metadata from request - stream->setMeta(request->params().get("meta")); - - // Setup response - result["id"] = streamId; - } - else if (request->method() == "Stream.AddStream") - { - // clang-format off - // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.AddStream","params":{"streamUri":"uri"}} - // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} - // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications - // clang-format on - - LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")" - << "\n"; - - // Find stream - string streamUri = request->params().get("streamUri"); - PcmStreamPtr stream = streamManager_->addStream(streamUri); - if (stream == nullptr) - throw jsonrpcpp::InternalErrorException("Stream not created", request->id()); - stream->start(); // We start the stream, otherwise it would be silent - // Setup response - result["id"] = stream->getId(); - } - else if (request->method() == "Stream.RemoveStream") - { - // clang-format off - // Request: {"id":4,"jsonrpc":"2.0","method":"Stream.RemoveStream","params":{"id":"Spotify"}} - // Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} - // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications - // clang-format on - - LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")" - << "\n"; - - // Find stream - string streamId = request->params().get("id"); - streamManager_->removeStream(streamId); - // Setup response - result["id"] = streamId; - } - else - throw jsonrpcpp::MethodNotFoundException(request->id()); - } - else - throw jsonrpcpp::MethodNotFoundException(request->id()); - - response.reset(new jsonrpcpp::Response(*request, result)); - } - catch (const jsonrpcpp::RequestException& e) - { - LOG(ERROR, LOG_TAG) << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() - << "\n"; - response.reset(new jsonrpcpp::RequestException(e)); - } - catch (const exception& e) - { - LOG(ERROR, LOG_TAG) << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n"; - response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id())); - } -} - - -std::string StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message) -{ - // LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n"; - jsonrpcpp::entity_ptr entity(nullptr); - try - { - entity = jsonrpcpp::Parser::do_parse(message); - if (!entity) - return ""; - } - catch (const jsonrpcpp::ParseErrorException& e) - { - return e.to_json().dump(); - } - catch (const std::exception& e) - { - return jsonrpcpp::ParseErrorException(e.what()).to_json().dump(); - } - - jsonrpcpp::entity_ptr response(nullptr); - jsonrpcpp::notification_ptr notification(nullptr); - if (entity->is_request()) - { - jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); - ProcessRequest(request, response, notification); - saveConfig(); - ////cout << "Request: " << request->to_json().dump() << "\n"; - if (notification) - { - ////cout << "Notification: " << notification->to_json().dump() << "\n"; - controlServer_->send(notification->to_json().dump(), controlSession); - } - if (response) - { - ////cout << "Response: " << response->to_json().dump() << "\n"; - return response->to_json().dump(); - } - return ""; - } - else if (entity->is_batch()) - { - jsonrpcpp::batch_ptr batch = dynamic_pointer_cast(entity); - ////cout << "Batch: " << batch->to_json().dump() << "\n"; - jsonrpcpp::Batch responseBatch; - jsonrpcpp::Batch notificationBatch; - for (const auto& batch_entity : batch->entities) - { - if (batch_entity->is_request()) - { - jsonrpcpp::request_ptr request = dynamic_pointer_cast(batch_entity); - ProcessRequest(request, response, notification); - if (response != nullptr) - responseBatch.add_ptr(response); - if (notification != nullptr) - notificationBatch.add_ptr(notification); - } - } - saveConfig(); - if (!notificationBatch.entities.empty()) - controlServer_->send(notificationBatch.to_json().dump(), controlSession); - if (!responseBatch.entities.empty()) - return responseBatch.to_json().dump(); - return ""; - } - return ""; -} - - - -void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer) -{ - LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id - << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec - << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; - if (baseMessage.type == message_type::kTime) - { - auto timeMsg = make_shared(); - timeMsg->deserialize(baseMessage, buffer); - timeMsg->refersTo = timeMsg->id; - timeMsg->latency = timeMsg->received - timeMsg->sent; - // LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; - streamSession->send(timeMsg); - - // refresh streamSession state - ClientInfoPtr client = Config::instance().getClientInfo(streamSession->clientId); - if (client != nullptr) - { - chronos::systemtimeofday(&client->lastSeen); - client->connected = true; - } - } - else if (baseMessage.type == message_type::kClientInfo) - { - ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId); - if (clientInfo == nullptr) - { - LOG(ERROR, LOG_TAG) << "client not found: " << streamSession->clientId << "\n"; - return; - } - msg::ClientInfo infoMsg; - infoMsg.deserialize(baseMessage, buffer); - - clientInfo->config.volume.percent = infoMsg.getVolume(); - clientInfo->config.volume.muted = infoMsg.isMuted(); - jsonrpcpp::notification_ptr notification = make_shared( - "Client.OnVolumeChanged", jsonrpcpp::Parameter("id", streamSession->clientId, "volume", clientInfo->config.volume.toJson())); - controlServer_->send(notification->to_json().dump()); - } - else if (baseMessage.type == message_type::kHello) - { - msg::Hello helloMsg; - helloMsg.deserialize(baseMessage, buffer); - streamSession->clientId = helloMsg.getUniqueId(); - LOG(INFO, LOG_TAG) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() - << ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch() - << ", Protocol version: " << helloMsg.getProtocolVersion() << "\n"; - - bool newGroup(false); - GroupPtr group = Config::instance().getGroupFromClient(streamSession->clientId); - if (group == nullptr) - { - group = Config::instance().addClientInfo(streamSession->clientId); - newGroup = true; - } - - ClientInfoPtr client = group->getClient(streamSession->clientId); - - LOG(DEBUG, LOG_TAG) << "Sending ServerSettings to " << streamSession->clientId << "\n"; - auto serverSettings = make_shared(); - serverSettings->setVolume(client->config.volume.percent); - serverSettings->setMuted(client->config.volume.muted || group->muted); - serverSettings->setLatency(client->config.latency); - serverSettings->setBufferMs(settings_.stream.bufferMs); - serverSettings->refersTo = helloMsg.id; - streamSession->send(serverSettings); - - client->host.mac = helloMsg.getMacAddress(); - client->host.ip = streamSession->getIP(); - client->host.name = helloMsg.getHostName(); - client->host.os = helloMsg.getOS(); - client->host.arch = helloMsg.getArch(); - client->snapclient.version = helloMsg.getVersion(); - client->snapclient.name = helloMsg.getClientName(); - client->snapclient.protocolVersion = helloMsg.getProtocolVersion(); - client->config.instance = helloMsg.getInstance(); - client->connected = true; - chronos::systemtimeofday(&client->lastSeen); - - // Assign and update stream - PcmStreamPtr stream = streamManager_->getStream(group->streamId); - if (!stream) - { - stream = streamManager_->getDefaultStream(); - group->streamId = stream->getId(); - } - LOG(DEBUG, LOG_TAG) << "Group: " << group->id << ", stream: " << group->streamId << "\n"; - - saveConfig(); - - LOG(DEBUG, LOG_TAG) << "Sending meta data to " << streamSession->clientId << "\n"; - streamSession->send(stream->getMeta()); - streamSession->setPcmStream(stream); - auto headerChunk = stream->getHeader(); - LOG(DEBUG, LOG_TAG) << "Sending codec header to " << streamSession->clientId << "\n"; - streamSession->send(headerChunk); - - if (newGroup) - { - // clang-format off - // Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025796,"usec":714671},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"},{"clients":[{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025798,"usec":728305},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"c5da8f7a-f377-1e51-8266-c5cc61099b71","muted":false,"name":"","stream_id":"stream 1"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}} - // clang-format on - json server = Config::instance().getServerStatus(streamManager_->toJson()); - json notification = jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)).to_json(); - controlServer_->send(notification.dump()); - } - else - { - // clang-format off - // Notification: {"jsonrpc":"2.0","method":"Client.OnConnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025524,"usec":876332},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}} - // clang-format on - json notification = jsonrpcpp::Notification("Client.OnConnect", jsonrpcpp::Parameter("id", client->id, "client", client->toJson())).to_json(); - controlServer_->send(notification.dump()); - // cout << "Notification: " << notification.dump() << "\n"; - } - // cout << Config::instance().getServerStatus(streamManager_->toJson()).dump(4) << "\n"; - // cout << group->toJson().dump(4) << "\n"; - } -} - - -void StreamServer::saveConfig(const std::chrono::milliseconds& deferred) -{ - config_timer_.cancel(); - config_timer_.expires_after(deferred); - config_timer_.async_wait([](const boost::system::error_code& ec) { - if (!ec) - { - LOG(DEBUG, LOG_TAG) << "Saving config\n"; - Config::instance().save(); - } - }); -} - - session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const { std::lock_guard mlock(sessionsMutex_); @@ -820,7 +216,7 @@ void StreamServer::handleAccept(tcp::socket socket) LOG(NOTICE, LOG_TAG) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; shared_ptr session = make_shared(io_context_, this, std::move(socket)); - onNewSession(session); + addSession(session); } catch (const std::exception& e) { @@ -832,44 +228,21 @@ void StreamServer::handleAccept(tcp::socket socket) void StreamServer::start() { - try + for (const auto& address : settings_.stream.bind_to_address) { - controlServer_ = std::make_unique(io_context_, settings_.tcp, settings_.http, this); - controlServer_->start(); - - streamManager_ = - std::make_unique(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamChunkMs); - // throw SnapException("xxx"); - for (const auto& streamUri : settings_.stream.pcmStreams) + try { - PcmStreamPtr stream = streamManager_->addStream(streamUri); - if (stream) - LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n"; + LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n"; + acceptor_.emplace_back( + make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port))); } - streamManager_->start(); - - for (const auto& address : settings_.stream.bind_to_address) + catch (const boost::system::system_error& e) { - try - { - LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n"; - acceptor_.emplace_back( - make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port))); - } - catch (const boost::system::system_error& e) - { - LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; - } + LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; } + } - startAccept(); - } - catch (const std::exception& e) - { - LOG(NOTICE, LOG_TAG) << "StreamServer::start: " << e.what() << endl; - stop(); - throw; - } + startAccept(); } @@ -879,18 +252,6 @@ void StreamServer::stop() acceptor->cancel(); acceptor_.clear(); - if (streamManager_) - { - streamManager_->stop(); - streamManager_ = nullptr; - } - - if (controlServer_) - { - controlServer_->stop(); - controlServer_ = nullptr; - } - std::lock_guard mlock(sessionsMutex_); cleanup(); for (auto s : sessions_) diff --git a/server/stream_server.hpp b/server/stream_server.hpp index 1407b83a..a636a24f 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -51,10 +51,10 @@ using session_ptr = std::shared_ptr; * Receives (via the MessageReceiver interface) and answers messages from the clients * Forwards PCM data to the clients */ -class StreamServer : public MessageReceiver, public ControlMessageReceiver, public PcmListener +class StreamServer : public MessageReceiver { public: - StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings); + StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, MessageReceiver* messageReceiver = nullptr); virtual ~StreamServer(); void start(); @@ -63,35 +63,21 @@ public: /// Send a message to all connceted clients // void send(const msg::BaseMessage* message); - /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived - void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override; - void onDisconnect(StreamSession* connection) override; + void addSession(const std::shared_ptr& session); + void onMetaChanged(const PcmStream* pcmStream, std::shared_ptr meta); + void onNewChunk(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr chunk, double duration); - /// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived - std::string onMessageReceived(ControlSession* connection, const std::string& message) override; - // TODO Refactor: ControlServer implements ControlMessageReceiver, calling this one. - void onNewSession(const std::shared_ptr& session) override - { - std::ignore = session; - }; - void onNewSession(const std::shared_ptr& session) override; - - /// Implementation of PcmListener - void onMetaChanged(const PcmStream* pcmStream) override; - void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) override; - void onNewChunk(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) override; - void onResync(const PcmStream* pcmStream, double ms) override; + session_ptr getStreamSession(const std::string& mac) const; + session_ptr getStreamSession(StreamSession* session) const; private: void startAccept(); void handleAccept(tcp::socket socket); - session_ptr getStreamSession(const std::string& mac) const; - session_ptr getStreamSession(StreamSession* session) const; - void ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; void cleanup(); - /// Save the server state deferred to prevent blocking and lower disk io - /// @param deferred the delay after the last call to saveConfig - void saveConfig(const std::chrono::milliseconds& deferred = std::chrono::seconds(2)); + + /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived + void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override; + void onDisconnect(StreamSession* connection) override; mutable std::recursive_mutex sessionsMutex_; mutable std::recursive_mutex clientMutex_; @@ -102,8 +88,7 @@ private: ServerSettings settings_; Queue> messages_; - std::unique_ptr controlServer_; - std::unique_ptr streamManager_; + MessageReceiver* messageReceiver_; };