mirror of
https://github.com/badaix/snapcast.git
synced 2025-05-02 11:46:34 +02:00
Move logic from StreamServer into new Server class
This commit is contained in:
parent
bd19c51d58
commit
d52015ff09
11 changed files with 912 additions and 713 deletions
|
@ -1,6 +1,6 @@
|
||||||
cmake_minimum_required(VERSION 3.2)
|
cmake_minimum_required(VERSION 3.2)
|
||||||
|
|
||||||
project(snapcast LANGUAGES CXX VERSION 0.20.0)
|
project(snapcast LANGUAGES CXX VERSION 0.21.0)
|
||||||
set(PROJECT_DESCRIPTION "Multiroom client-server audio player")
|
set(PROJECT_DESCRIPTION "Multiroom client-server audio player")
|
||||||
set(PROJECT_URL "https://github.com/badaix/snapcast")
|
set(PROJECT_URL "https://github.com/badaix/snapcast")
|
||||||
|
|
||||||
|
|
12
changelog.md
12
changelog.md
|
@ -1,5 +1,17 @@
|
||||||
# Snapcast changelog
|
# Snapcast changelog
|
||||||
|
|
||||||
|
## Version 0.21.0
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
- Server: Support for WebSocket streaming clients
|
||||||
|
|
||||||
|
### Bugfixes
|
||||||
|
|
||||||
|
### General
|
||||||
|
|
||||||
|
_Johannes Pohl <snapcast@badaix.de> Sat, 13 Jun 2020 00:13:37 +0200_
|
||||||
|
|
||||||
## Version 0.20.0
|
## Version 0.20.0
|
||||||
|
|
||||||
### Features
|
### Features
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
VERSION = 0.20.0
|
VERSION = 0.21.0
|
||||||
BIN = snapclient
|
BIN = snapclient
|
||||||
|
|
||||||
ifeq ($(TARGET), FREEBSD)
|
ifeq ($(TARGET), FREEBSD)
|
||||||
|
|
|
@ -5,6 +5,7 @@ set(SERVER_SOURCES
|
||||||
control_session_http.cpp
|
control_session_http.cpp
|
||||||
control_session_ws.cpp
|
control_session_ws.cpp
|
||||||
snapserver.cpp
|
snapserver.cpp
|
||||||
|
server.cpp
|
||||||
stream_server.cpp
|
stream_server.cpp
|
||||||
stream_session.cpp
|
stream_session.cpp
|
||||||
stream_session_tcp.cpp
|
stream_session_tcp.cpp
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
VERSION = 0.20.0
|
VERSION = 0.21.0
|
||||||
BIN = snapserver
|
BIN = snapserver
|
||||||
|
|
||||||
ifeq ($(TARGET), FREEBSD)
|
ifeq ($(TARGET), FREEBSD)
|
||||||
|
@ -44,7 +44,7 @@ endif
|
||||||
|
|
||||||
CXXFLAGS += $(ADD_CFLAGS) -std=c++14 -Wall -Wextra -Wpedantic -Wno-unused-function -DBOOST_ERROR_CODE_HEADER_ONLY -DHAS_FLAC -DHAS_OGG -DHAS_VORBIS -DHAS_VORBIS_ENC -DHAS_OPUS -DVERSION=\"$(VERSION)\" -I. -I.. -I../common
|
CXXFLAGS += $(ADD_CFLAGS) -std=c++14 -Wall -Wextra -Wpedantic -Wno-unused-function -DBOOST_ERROR_CODE_HEADER_ONLY -DHAS_FLAC -DHAS_OGG -DHAS_VORBIS -DHAS_VORBIS_ENC -DHAS_OPUS -DVERSION=\"$(VERSION)\" -I. -I.. -I../common
|
||||||
LDFLAGS += $(ADD_LDFLAGS) -lvorbis -lvorbisenc -logg -lFLAC -lopus
|
LDFLAGS += $(ADD_LDFLAGS) -lvorbis -lvorbisenc -logg -lFLAC -lopus
|
||||||
OBJ = snapserver.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o
|
OBJ = snapserver.o server.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o
|
||||||
|
|
||||||
ifneq (,$(TARGET))
|
ifneq (,$(TARGET))
|
||||||
CXXFLAGS += -D$(TARGET)
|
CXXFLAGS += -D$(TARGET)
|
||||||
|
|
|
@ -49,13 +49,13 @@ void ControlSessionWebsocket::start()
|
||||||
|
|
||||||
void ControlSessionWebsocket::stop()
|
void ControlSessionWebsocket::stop()
|
||||||
{
|
{
|
||||||
if (ws_.is_open())
|
// if (ws_.is_open())
|
||||||
{
|
// {
|
||||||
boost::beast::error_code ec;
|
// boost::beast::error_code ec;
|
||||||
ws_.close(beast::websocket::close_code::normal, ec);
|
// ws_.close(beast::websocket::close_code::normal, ec);
|
||||||
if (ec)
|
// if (ec)
|
||||||
LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n";
|
// LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n";
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
734
server/server.cpp
Normal file
734
server/server.cpp
Normal file
|
@ -0,0 +1,734 @@
|
||||||
|
/***
|
||||||
|
This file is part of snapcast
|
||||||
|
Copyright (C) 2014-2020 Johannes Pohl
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
***/
|
||||||
|
|
||||||
|
#include "server.hpp"
|
||||||
|
#include "common/aixlog.hpp"
|
||||||
|
#include "config.hpp"
|
||||||
|
#include "message/client_info.hpp"
|
||||||
|
#include "message/hello.hpp"
|
||||||
|
#include "message/stream_tags.hpp"
|
||||||
|
#include "message/time.hpp"
|
||||||
|
#include "stream_session_tcp.hpp"
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
using namespace streamreader;
|
||||||
|
|
||||||
|
using json = nlohmann::json;
|
||||||
|
|
||||||
|
static constexpr auto LOG_TAG = "Server";
|
||||||
|
|
||||||
|
Server::Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings)
|
||||||
|
: io_context_(io_context), config_timer_(io_context), settings_(serverSettings)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Server::~Server() = default;
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onNewSession(const std::shared_ptr<StreamSession>& session)
|
||||||
|
{
|
||||||
|
LOG(INFO, LOG_TAG) << "onNewSession\n";
|
||||||
|
streamServer_->addSession(session);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onMetaChanged(const PcmStream* pcmStream)
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
|
const auto meta = pcmStream->getMeta();
|
||||||
|
LOG(DEBUG, LOG_TAG) << "metadata = " << meta->msg.dump(3) << "\n";
|
||||||
|
LOG(INFO, LOG_TAG) << "onMetaChanged (" << pcmStream->getName() << ")\n";
|
||||||
|
|
||||||
|
streamServer_->onMetaChanged(pcmStream, meta);
|
||||||
|
|
||||||
|
// Send meta to all connected clients
|
||||||
|
json notification = jsonrpcpp::Notification("Stream.OnMetadata", jsonrpcpp::Parameter("id", pcmStream->getId(), "meta", meta->msg)).to_json();
|
||||||
|
controlServer_->send(notification.dump(), nullptr);
|
||||||
|
// cout << "Notification: " << notification.dump() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onStateChanged(const PcmStream* pcmStream, const ReaderState& state)
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}}
|
||||||
|
// clang-format on
|
||||||
|
LOG(INFO, LOG_TAG) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast<int>(state) << "\n";
|
||||||
|
// LOG(INFO, LOG_TAG) << pcmStream->toJson().dump(4);
|
||||||
|
json notification = jsonrpcpp::Notification("Stream.OnUpdate", jsonrpcpp::Parameter("id", pcmStream->getId(), "stream", pcmStream->toJson())).to_json();
|
||||||
|
controlServer_->send(notification.dump(), nullptr);
|
||||||
|
// cout << "Notification: " << notification.dump() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration)
|
||||||
|
{
|
||||||
|
streamServer_->onNewChunk(pcmStream, pcmStream == streamManager_->getDefaultStream().get(), chunk, duration);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onResync(const PcmStream* pcmStream, double ms)
|
||||||
|
{
|
||||||
|
LOG(INFO, LOG_TAG) << "onResync (" << pcmStream->getName() << "): " << ms << " ms\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onDisconnect(StreamSession* streamSession)
|
||||||
|
{
|
||||||
|
// notify controllers if not yet done
|
||||||
|
ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId);
|
||||||
|
if (!clientInfo || !clientInfo->connected)
|
||||||
|
return;
|
||||||
|
|
||||||
|
clientInfo->connected = false;
|
||||||
|
chronos::systemtimeofday(&clientInfo->lastSeen);
|
||||||
|
saveConfig();
|
||||||
|
if (controlServer_ != nullptr)
|
||||||
|
{
|
||||||
|
// Check if there is no session of this client left
|
||||||
|
// Can happen in case of ungraceful disconnect/reconnect or
|
||||||
|
// in case of a duplicate client id
|
||||||
|
if (streamServer_->getStreamSession(clientInfo->id) == nullptr)
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Notification:
|
||||||
|
// {"jsonrpc":"2.0","method":"Client.OnDisconnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":false,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025523,"usec":814067},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}}
|
||||||
|
// clang-format on
|
||||||
|
json notification =
|
||||||
|
jsonrpcpp::Notification("Client.OnDisconnect", jsonrpcpp::Parameter("id", clientInfo->id, "client", clientInfo->toJson())).to_json();
|
||||||
|
controlServer_->send(notification.dump());
|
||||||
|
// cout << "Notification: " << notification.dump() << "\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// LOG(INFO, LOG_TAG) << "Server::processRequest method: " << request->method << ", " << "id: " << request->id() << "\n";
|
||||||
|
Json result;
|
||||||
|
|
||||||
|
if (request->method().find("Client.") == 0)
|
||||||
|
{
|
||||||
|
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get<std::string>("id"));
|
||||||
|
if (clientInfo == nullptr)
|
||||||
|
throw jsonrpcpp::InternalErrorException("Client not found", request->id());
|
||||||
|
|
||||||
|
if (request->method() == "Client.GetStatus")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":8,"jsonrpc":"2.0","method":"Client.GetStatus","params":{"id":"00:21:6a:7d:74:fc"}}
|
||||||
|
// Response: {"id":8,"jsonrpc":"2.0","result":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026416,"usec":135973},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}}}
|
||||||
|
// clang-format on
|
||||||
|
result["client"] = clientInfo->toJson();
|
||||||
|
}
|
||||||
|
else if (request->method() == "Client.SetVolume")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":8,"jsonrpc":"2.0","method":"Client.SetVolume","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}}
|
||||||
|
// Response: {"id":8,"jsonrpc":"2.0","result":{"volume":{"muted":false,"percent":74}}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Client.OnVolumeChanged","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}}
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> lock(clientMutex_);
|
||||||
|
clientInfo->config.volume.fromJson(request->params().get("volume"));
|
||||||
|
result["volume"] = clientInfo->config.volume.toJson();
|
||||||
|
notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged",
|
||||||
|
jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson())));
|
||||||
|
}
|
||||||
|
else if (request->method() == "Client.SetLatency")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":7,"jsonrpc":"2.0","method":"Client.SetLatency","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}}
|
||||||
|
// Response: {"id":7,"jsonrpc":"2.0","result":{"latency":10}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Client.OnLatencyChanged","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}}
|
||||||
|
// clang-format on
|
||||||
|
int latency = request->params().get("latency");
|
||||||
|
if (latency < -10000)
|
||||||
|
latency = -10000;
|
||||||
|
else if (latency > settings_.stream.bufferMs)
|
||||||
|
latency = settings_.stream.bufferMs;
|
||||||
|
clientInfo->config.latency = latency; //, -10000, settings_.stream.bufferMs);
|
||||||
|
result["latency"] = clientInfo->config.latency;
|
||||||
|
notification.reset(
|
||||||
|
new jsonrpcpp::Notification("Client.OnLatencyChanged", jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency)));
|
||||||
|
}
|
||||||
|
else if (request->method() == "Client.SetName")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":6,"jsonrpc":"2.0","method":"Client.SetName","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}}
|
||||||
|
// Response: {"id":6,"jsonrpc":"2.0","result":{"name":"Laptop"}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Client.OnNameChanged","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}}
|
||||||
|
// clang-format on
|
||||||
|
clientInfo->config.name = request->params().get<std::string>("name");
|
||||||
|
result["name"] = clientInfo->config.name;
|
||||||
|
notification.reset(
|
||||||
|
new jsonrpcpp::Notification("Client.OnNameChanged", jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw jsonrpcpp::MethodNotFoundException(request->id());
|
||||||
|
|
||||||
|
|
||||||
|
if (request->method().find("Client.Set") == 0)
|
||||||
|
{
|
||||||
|
/// Update client
|
||||||
|
session_ptr session = streamServer_->getStreamSession(clientInfo->id);
|
||||||
|
if (session != nullptr)
|
||||||
|
{
|
||||||
|
auto serverSettings = make_shared<msg::ServerSettings>();
|
||||||
|
serverSettings->setBufferMs(settings_.stream.bufferMs);
|
||||||
|
serverSettings->setVolume(clientInfo->config.volume.percent);
|
||||||
|
GroupPtr group = Config::instance().getGroupFromClient(clientInfo);
|
||||||
|
serverSettings->setMuted(clientInfo->config.volume.muted || group->muted);
|
||||||
|
serverSettings->setLatency(clientInfo->config.latency);
|
||||||
|
session->send(serverSettings);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (request->method().find("Group.") == 0)
|
||||||
|
{
|
||||||
|
GroupPtr group = Config::instance().getGroup(request->params().get<std::string>("id"));
|
||||||
|
if (group == nullptr)
|
||||||
|
throw jsonrpcpp::InternalErrorException("Group not found", request->id());
|
||||||
|
|
||||||
|
if (request->method() == "Group.GetStatus")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":5,"jsonrpc":"2.0","method":"Group.GetStatus","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}}
|
||||||
|
// Response: {"id":5,"jsonrpc":"2.0","result":{"group":{"clients":[{"config":{"instance":2,"latency":10,"name":"Laptop","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488026485,"usec":644997},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026481,"usec":223747},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":true,"name":"","stream_id":"stream 1"}}}
|
||||||
|
// clang-format on
|
||||||
|
result["group"] = group->toJson();
|
||||||
|
}
|
||||||
|
else if (request->method() == "Group.SetName")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":6,"jsonrpc":"2.0","method":"Group.SetName","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","name":"Laptop"}}
|
||||||
|
// Response: {"id":6,"jsonrpc":"2.0","result":{"name":"MediaPlayer"}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Group.OnNameChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","MediaPlayer":"Laptop"}}
|
||||||
|
// clang-format on
|
||||||
|
group->name = request->params().get<std::string>("name");
|
||||||
|
result["name"] = group->name;
|
||||||
|
notification.reset(new jsonrpcpp::Notification("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name)));
|
||||||
|
}
|
||||||
|
else if (request->method() == "Group.SetMute")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":5,"jsonrpc":"2.0","method":"Group.SetMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}}
|
||||||
|
// Response: {"id":5,"jsonrpc":"2.0","result":{"mute":true}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Group.OnMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}}
|
||||||
|
// clang-format on
|
||||||
|
bool muted = request->params().get<bool>("mute");
|
||||||
|
group->muted = muted;
|
||||||
|
|
||||||
|
/// Update clients
|
||||||
|
for (auto client : group->clients)
|
||||||
|
{
|
||||||
|
session_ptr session = streamServer_->getStreamSession(client->id);
|
||||||
|
if (session != nullptr)
|
||||||
|
{
|
||||||
|
auto serverSettings = make_shared<msg::ServerSettings>();
|
||||||
|
serverSettings->setBufferMs(settings_.stream.bufferMs);
|
||||||
|
serverSettings->setVolume(client->config.volume.percent);
|
||||||
|
GroupPtr group = Config::instance().getGroupFromClient(client);
|
||||||
|
serverSettings->setMuted(client->config.volume.muted || group->muted);
|
||||||
|
serverSettings->setLatency(client->config.latency);
|
||||||
|
session->send(serverSettings);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result["mute"] = group->muted;
|
||||||
|
notification.reset(new jsonrpcpp::Notification("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted)));
|
||||||
|
}
|
||||||
|
else if (request->method() == "Group.SetStream")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":4,"jsonrpc":"2.0","method":"Group.SetStream","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}}
|
||||||
|
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"stream 1"}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Group.OnStreamChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}}
|
||||||
|
// clang-format on
|
||||||
|
string streamId = request->params().get<std::string>("stream_id");
|
||||||
|
PcmStreamPtr stream = streamManager_->getStream(streamId);
|
||||||
|
if (stream == nullptr)
|
||||||
|
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
|
||||||
|
|
||||||
|
group->streamId = streamId;
|
||||||
|
|
||||||
|
// Update clients
|
||||||
|
for (auto client : group->clients)
|
||||||
|
{
|
||||||
|
session_ptr session = streamServer_->getStreamSession(client->id);
|
||||||
|
if (session && (session->pcmStream() != stream))
|
||||||
|
{
|
||||||
|
session->send(stream->getMeta());
|
||||||
|
session->send(stream->getHeader());
|
||||||
|
session->setPcmStream(stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify others
|
||||||
|
result["stream_id"] = group->streamId;
|
||||||
|
notification.reset(new jsonrpcpp::Notification("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId)));
|
||||||
|
}
|
||||||
|
else if (request->method() == "Group.SetClients")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":3,"jsonrpc":"2.0","method":"Group.SetClients","params":{"clients":["00:21:6a:7d:74:fc#2","00:21:6a:7d:74:fc"],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}}
|
||||||
|
// Response: {"id":3,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
||||||
|
// clang-format on
|
||||||
|
vector<string> clients = request->params().get("clients");
|
||||||
|
// Remove clients from group
|
||||||
|
for (auto iter = group->clients.begin(); iter != group->clients.end();)
|
||||||
|
{
|
||||||
|
auto client = *iter;
|
||||||
|
if (find(clients.begin(), clients.end(), client->id) != clients.end())
|
||||||
|
{
|
||||||
|
++iter;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
iter = group->clients.erase(iter);
|
||||||
|
GroupPtr newGroup = Config::instance().addClientInfo(client);
|
||||||
|
newGroup->streamId = group->streamId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add clients to group
|
||||||
|
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
|
||||||
|
for (const auto& clientId : clients)
|
||||||
|
{
|
||||||
|
ClientInfoPtr client = Config::instance().getClientInfo(clientId);
|
||||||
|
if (!client)
|
||||||
|
continue;
|
||||||
|
GroupPtr oldGroup = Config::instance().getGroupFromClient(client);
|
||||||
|
if (oldGroup && (oldGroup->id == group->id))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (oldGroup)
|
||||||
|
{
|
||||||
|
oldGroup->removeClient(client);
|
||||||
|
Config::instance().remove(oldGroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
group->addClient(client);
|
||||||
|
|
||||||
|
// assign new stream
|
||||||
|
session_ptr session = streamServer_->getStreamSession(client->id);
|
||||||
|
if (session && stream && (session->pcmStream() != stream))
|
||||||
|
{
|
||||||
|
session->send(stream->getMeta());
|
||||||
|
session->send(stream->getHeader());
|
||||||
|
session->setPcmStream(stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (group->empty())
|
||||||
|
Config::instance().remove(group);
|
||||||
|
|
||||||
|
json server = Config::instance().getServerStatus(streamManager_->toJson());
|
||||||
|
result["server"] = server;
|
||||||
|
|
||||||
|
// Notify others: since at least two groups are affected, send a complete server update
|
||||||
|
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw jsonrpcpp::MethodNotFoundException(request->id());
|
||||||
|
}
|
||||||
|
else if (request->method().find("Server.") == 0)
|
||||||
|
{
|
||||||
|
if (request->method().find("Server.GetRPCVersion") == 0)
|
||||||
|
{
|
||||||
|
// Request: {"id":8,"jsonrpc":"2.0","method":"Server.GetRPCVersion"}
|
||||||
|
// Response: {"id":8,"jsonrpc":"2.0","result":{"major":2,"minor":0,"patch":0}}
|
||||||
|
// <major>: backwards incompatible change
|
||||||
|
result["major"] = 2;
|
||||||
|
// <minor>: feature addition to the API
|
||||||
|
result["minor"] = 0;
|
||||||
|
// <patch>: bugfix release
|
||||||
|
result["patch"] = 0;
|
||||||
|
}
|
||||||
|
else if (request->method() == "Server.GetStatus")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":1,"jsonrpc":"2.0","method":"Server.GetStatus"}
|
||||||
|
// Response: {"id":1,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025696,"usec":578142},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025696,"usec":611255},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
||||||
|
// clang-format on
|
||||||
|
result["server"] = Config::instance().getServerStatus(streamManager_->toJson());
|
||||||
|
}
|
||||||
|
else if (request->method() == "Server.DeleteClient")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":2,"jsonrpc":"2.0","method":"Server.DeleteClient","params":{"id":"00:21:6a:7d:74:fc"}}
|
||||||
|
// Response: {"id":2,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
||||||
|
// clang-format on
|
||||||
|
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get<std::string>("id"));
|
||||||
|
if (clientInfo == nullptr)
|
||||||
|
throw jsonrpcpp::InternalErrorException("Client not found", request->id());
|
||||||
|
|
||||||
|
Config::instance().remove(clientInfo);
|
||||||
|
|
||||||
|
json server = Config::instance().getServerStatus(streamManager_->toJson());
|
||||||
|
result["server"] = server;
|
||||||
|
|
||||||
|
/// Notify others
|
||||||
|
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw jsonrpcpp::MethodNotFoundException(request->id());
|
||||||
|
}
|
||||||
|
else if (request->method().find("Stream.") == 0)
|
||||||
|
{
|
||||||
|
if (request->method().find("Stream.SetMeta") == 0)
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.SetMeta","params":{"id":"Spotify", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}}
|
||||||
|
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
|
||||||
|
// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
|
LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get<std::string>("id") << ")" << request->params().get("meta") << "\n";
|
||||||
|
|
||||||
|
// Find stream
|
||||||
|
string streamId = request->params().get<std::string>("id");
|
||||||
|
PcmStreamPtr stream = streamManager_->getStream(streamId);
|
||||||
|
if (stream == nullptr)
|
||||||
|
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
|
||||||
|
|
||||||
|
// Set metadata from request
|
||||||
|
stream->setMeta(request->params().get("meta"));
|
||||||
|
|
||||||
|
// Setup response
|
||||||
|
result["id"] = streamId;
|
||||||
|
}
|
||||||
|
else if (request->method() == "Stream.AddStream")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.AddStream","params":{"streamUri":"uri"}}
|
||||||
|
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
|
||||||
|
// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
|
LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")"
|
||||||
|
<< "\n";
|
||||||
|
|
||||||
|
// Find stream
|
||||||
|
string streamUri = request->params().get("streamUri");
|
||||||
|
PcmStreamPtr stream = streamManager_->addStream(streamUri);
|
||||||
|
if (stream == nullptr)
|
||||||
|
throw jsonrpcpp::InternalErrorException("Stream not created", request->id());
|
||||||
|
stream->start(); // We start the stream, otherwise it would be silent
|
||||||
|
// Setup response
|
||||||
|
result["id"] = stream->getId();
|
||||||
|
}
|
||||||
|
else if (request->method() == "Stream.RemoveStream")
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.RemoveStream","params":{"id":"Spotify"}}
|
||||||
|
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
|
||||||
|
// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications
|
||||||
|
// clang-format on
|
||||||
|
|
||||||
|
LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")"
|
||||||
|
<< "\n";
|
||||||
|
|
||||||
|
// Find stream
|
||||||
|
string streamId = request->params().get("id");
|
||||||
|
streamManager_->removeStream(streamId);
|
||||||
|
// Setup response
|
||||||
|
result["id"] = streamId;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw jsonrpcpp::MethodNotFoundException(request->id());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
throw jsonrpcpp::MethodNotFoundException(request->id());
|
||||||
|
|
||||||
|
response.reset(new jsonrpcpp::Response(*request, result));
|
||||||
|
}
|
||||||
|
catch (const jsonrpcpp::RequestException& e)
|
||||||
|
{
|
||||||
|
LOG(ERROR, LOG_TAG) << "Server::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n";
|
||||||
|
response.reset(new jsonrpcpp::RequestException(e));
|
||||||
|
}
|
||||||
|
catch (const exception& e)
|
||||||
|
{
|
||||||
|
LOG(ERROR, LOG_TAG) << "Server::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n";
|
||||||
|
response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::string Server::onMessageReceived(ControlSession* controlSession, const std::string& message)
|
||||||
|
{
|
||||||
|
// LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n";
|
||||||
|
jsonrpcpp::entity_ptr entity(nullptr);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
entity = jsonrpcpp::Parser::do_parse(message);
|
||||||
|
if (!entity)
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
catch (const jsonrpcpp::ParseErrorException& e)
|
||||||
|
{
|
||||||
|
return e.to_json().dump();
|
||||||
|
}
|
||||||
|
catch (const std::exception& e)
|
||||||
|
{
|
||||||
|
return jsonrpcpp::ParseErrorException(e.what()).to_json().dump();
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonrpcpp::entity_ptr response(nullptr);
|
||||||
|
jsonrpcpp::notification_ptr notification(nullptr);
|
||||||
|
if (entity->is_request())
|
||||||
|
{
|
||||||
|
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
|
||||||
|
processRequest(request, response, notification);
|
||||||
|
saveConfig();
|
||||||
|
////cout << "Request: " << request->to_json().dump() << "\n";
|
||||||
|
if (notification)
|
||||||
|
{
|
||||||
|
////cout << "Notification: " << notification->to_json().dump() << "\n";
|
||||||
|
controlServer_->send(notification->to_json().dump(), controlSession);
|
||||||
|
}
|
||||||
|
if (response)
|
||||||
|
{
|
||||||
|
////cout << "Response: " << response->to_json().dump() << "\n";
|
||||||
|
return response->to_json().dump();
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
else if (entity->is_batch())
|
||||||
|
{
|
||||||
|
jsonrpcpp::batch_ptr batch = dynamic_pointer_cast<jsonrpcpp::Batch>(entity);
|
||||||
|
////cout << "Batch: " << batch->to_json().dump() << "\n";
|
||||||
|
jsonrpcpp::Batch responseBatch;
|
||||||
|
jsonrpcpp::Batch notificationBatch;
|
||||||
|
for (const auto& batch_entity : batch->entities)
|
||||||
|
{
|
||||||
|
if (batch_entity->is_request())
|
||||||
|
{
|
||||||
|
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(batch_entity);
|
||||||
|
processRequest(request, response, notification);
|
||||||
|
if (response != nullptr)
|
||||||
|
responseBatch.add_ptr(response);
|
||||||
|
if (notification != nullptr)
|
||||||
|
notificationBatch.add_ptr(notification);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
saveConfig();
|
||||||
|
if (!notificationBatch.entities.empty())
|
||||||
|
controlServer_->send(notificationBatch.to_json().dump(), controlSession);
|
||||||
|
if (!responseBatch.entities.empty())
|
||||||
|
return responseBatch.to_json().dump();
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void Server::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer)
|
||||||
|
{
|
||||||
|
LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id
|
||||||
|
<< ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec
|
||||||
|
<< ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
|
||||||
|
if (baseMessage.type == message_type::kTime)
|
||||||
|
{
|
||||||
|
auto timeMsg = make_shared<msg::Time>();
|
||||||
|
timeMsg->deserialize(baseMessage, buffer);
|
||||||
|
timeMsg->refersTo = timeMsg->id;
|
||||||
|
timeMsg->latency = timeMsg->received - timeMsg->sent;
|
||||||
|
// LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n";
|
||||||
|
streamSession->send(timeMsg);
|
||||||
|
|
||||||
|
// refresh streamSession state
|
||||||
|
ClientInfoPtr client = Config::instance().getClientInfo(streamSession->clientId);
|
||||||
|
if (client != nullptr)
|
||||||
|
{
|
||||||
|
chronos::systemtimeofday(&client->lastSeen);
|
||||||
|
client->connected = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (baseMessage.type == message_type::kClientInfo)
|
||||||
|
{
|
||||||
|
ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId);
|
||||||
|
if (clientInfo == nullptr)
|
||||||
|
{
|
||||||
|
LOG(ERROR, LOG_TAG) << "client not found: " << streamSession->clientId << "\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
msg::ClientInfo infoMsg;
|
||||||
|
infoMsg.deserialize(baseMessage, buffer);
|
||||||
|
|
||||||
|
clientInfo->config.volume.percent = infoMsg.getVolume();
|
||||||
|
clientInfo->config.volume.muted = infoMsg.isMuted();
|
||||||
|
jsonrpcpp::notification_ptr notification = make_shared<jsonrpcpp::Notification>(
|
||||||
|
"Client.OnVolumeChanged", jsonrpcpp::Parameter("id", streamSession->clientId, "volume", clientInfo->config.volume.toJson()));
|
||||||
|
controlServer_->send(notification->to_json().dump());
|
||||||
|
}
|
||||||
|
else if (baseMessage.type == message_type::kHello)
|
||||||
|
{
|
||||||
|
msg::Hello helloMsg;
|
||||||
|
helloMsg.deserialize(baseMessage, buffer);
|
||||||
|
streamSession->clientId = helloMsg.getUniqueId();
|
||||||
|
LOG(INFO, LOG_TAG) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion()
|
||||||
|
<< ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch()
|
||||||
|
<< ", Protocol version: " << helloMsg.getProtocolVersion() << "\n";
|
||||||
|
|
||||||
|
bool newGroup(false);
|
||||||
|
GroupPtr group = Config::instance().getGroupFromClient(streamSession->clientId);
|
||||||
|
if (group == nullptr)
|
||||||
|
{
|
||||||
|
group = Config::instance().addClientInfo(streamSession->clientId);
|
||||||
|
newGroup = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientInfoPtr client = group->getClient(streamSession->clientId);
|
||||||
|
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Sending ServerSettings to " << streamSession->clientId << "\n";
|
||||||
|
auto serverSettings = make_shared<msg::ServerSettings>();
|
||||||
|
serverSettings->setVolume(client->config.volume.percent);
|
||||||
|
serverSettings->setMuted(client->config.volume.muted || group->muted);
|
||||||
|
serverSettings->setLatency(client->config.latency);
|
||||||
|
serverSettings->setBufferMs(settings_.stream.bufferMs);
|
||||||
|
serverSettings->refersTo = helloMsg.id;
|
||||||
|
streamSession->send(serverSettings);
|
||||||
|
|
||||||
|
client->host.mac = helloMsg.getMacAddress();
|
||||||
|
client->host.ip = streamSession->getIP();
|
||||||
|
client->host.name = helloMsg.getHostName();
|
||||||
|
client->host.os = helloMsg.getOS();
|
||||||
|
client->host.arch = helloMsg.getArch();
|
||||||
|
client->snapclient.version = helloMsg.getVersion();
|
||||||
|
client->snapclient.name = helloMsg.getClientName();
|
||||||
|
client->snapclient.protocolVersion = helloMsg.getProtocolVersion();
|
||||||
|
client->config.instance = helloMsg.getInstance();
|
||||||
|
client->connected = true;
|
||||||
|
chronos::systemtimeofday(&client->lastSeen);
|
||||||
|
|
||||||
|
// Assign and update stream
|
||||||
|
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
|
||||||
|
if (!stream)
|
||||||
|
{
|
||||||
|
stream = streamManager_->getDefaultStream();
|
||||||
|
group->streamId = stream->getId();
|
||||||
|
}
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Group: " << group->id << ", stream: " << group->streamId << "\n";
|
||||||
|
|
||||||
|
saveConfig();
|
||||||
|
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Sending meta data to " << streamSession->clientId << "\n";
|
||||||
|
streamSession->send(stream->getMeta());
|
||||||
|
streamSession->setPcmStream(stream);
|
||||||
|
auto headerChunk = stream->getHeader();
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Sending codec header to " << streamSession->clientId << "\n";
|
||||||
|
streamSession->send(headerChunk);
|
||||||
|
|
||||||
|
if (newGroup)
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025796,"usec":714671},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"},{"clients":[{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025798,"usec":728305},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"c5da8f7a-f377-1e51-8266-c5cc61099b71","muted":false,"name":"","stream_id":"stream 1"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
||||||
|
// clang-format on
|
||||||
|
json server = Config::instance().getServerStatus(streamManager_->toJson());
|
||||||
|
json notification = jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)).to_json();
|
||||||
|
controlServer_->send(notification.dump());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// clang-format off
|
||||||
|
// Notification: {"jsonrpc":"2.0","method":"Client.OnConnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025524,"usec":876332},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}}
|
||||||
|
// clang-format on
|
||||||
|
json notification = jsonrpcpp::Notification("Client.OnConnect", jsonrpcpp::Parameter("id", client->id, "client", client->toJson())).to_json();
|
||||||
|
controlServer_->send(notification.dump());
|
||||||
|
// cout << "Notification: " << notification.dump() << "\n";
|
||||||
|
}
|
||||||
|
// cout << Config::instance().getServerStatus(streamManager_->toJson()).dump(4) << "\n";
|
||||||
|
// cout << group->toJson().dump(4) << "\n";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::saveConfig(const std::chrono::milliseconds& deferred)
|
||||||
|
{
|
||||||
|
config_timer_.cancel();
|
||||||
|
config_timer_.expires_after(deferred);
|
||||||
|
config_timer_.async_wait([](const boost::system::error_code& ec) {
|
||||||
|
if (!ec)
|
||||||
|
{
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Saving config\n";
|
||||||
|
Config::instance().save();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::start()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
controlServer_ = std::make_unique<ControlServer>(io_context_, settings_.tcp, settings_.http, this);
|
||||||
|
streamServer_ = std::make_unique<StreamServer>(io_context_, settings_, this);
|
||||||
|
streamManager_ =
|
||||||
|
std::make_unique<StreamManager>(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamChunkMs);
|
||||||
|
// throw SnapException("xxx");
|
||||||
|
for (const auto& streamUri : settings_.stream.pcmStreams)
|
||||||
|
{
|
||||||
|
PcmStreamPtr stream = streamManager_->addStream(streamUri);
|
||||||
|
if (stream)
|
||||||
|
LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
streamManager_->start();
|
||||||
|
controlServer_->start();
|
||||||
|
streamServer_->start();
|
||||||
|
}
|
||||||
|
catch (const std::exception& e)
|
||||||
|
{
|
||||||
|
LOG(NOTICE, LOG_TAG) << "Server::start: " << e.what() << endl;
|
||||||
|
stop();
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Server::stop()
|
||||||
|
{
|
||||||
|
if (streamManager_)
|
||||||
|
{
|
||||||
|
streamManager_->stop();
|
||||||
|
streamManager_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (controlServer_)
|
||||||
|
{
|
||||||
|
controlServer_->stop();
|
||||||
|
controlServer_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamServer_)
|
||||||
|
{
|
||||||
|
streamServer_->stop();
|
||||||
|
streamServer_ = nullptr;
|
||||||
|
}
|
||||||
|
}
|
106
server/server.hpp
Normal file
106
server/server.hpp
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
/***
|
||||||
|
This file is part of snapcast
|
||||||
|
Copyright (C) 2014-2020 Johannes Pohl
|
||||||
|
|
||||||
|
This program is free software: you can redistribute it and/or modify
|
||||||
|
it under the terms of the GNU General Public License as published by
|
||||||
|
the Free Software Foundation, either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
This program is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
***/
|
||||||
|
|
||||||
|
#ifndef SERVER_HPP
|
||||||
|
#define SERVER_HPP
|
||||||
|
|
||||||
|
#include <boost/asio.hpp>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <set>
|
||||||
|
#include <sstream>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "common/queue.h"
|
||||||
|
#include "common/sample_format.hpp"
|
||||||
|
#include "control_server.hpp"
|
||||||
|
#include "stream_server.hpp"
|
||||||
|
#include "jsonrpcpp.hpp"
|
||||||
|
#include "message/codec_header.hpp"
|
||||||
|
#include "message/message.hpp"
|
||||||
|
#include "message/server_settings.hpp"
|
||||||
|
#include "server_settings.hpp"
|
||||||
|
#include "stream_session.hpp"
|
||||||
|
#include "streamreader/stream_manager.hpp"
|
||||||
|
|
||||||
|
using namespace streamreader;
|
||||||
|
|
||||||
|
using boost::asio::ip::tcp;
|
||||||
|
using acceptor_ptr = std::unique_ptr<tcp::acceptor>;
|
||||||
|
using session_ptr = std::shared_ptr<StreamSession>;
|
||||||
|
|
||||||
|
|
||||||
|
/// Forwars PCM data to the connected clients
|
||||||
|
/**
|
||||||
|
* Reads PCM data using PipeStream, implements PcmListener to get the (encoded) PCM stream.
|
||||||
|
* Accepts and holds client connections (StreamSession)
|
||||||
|
* Receives (via the MessageReceiver interface) and answers messages from the clients
|
||||||
|
* Forwards PCM data to the clients
|
||||||
|
*/
|
||||||
|
class Server : public MessageReceiver, public ControlMessageReceiver, public PcmListener
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings);
|
||||||
|
virtual ~Server();
|
||||||
|
|
||||||
|
void start();
|
||||||
|
void stop();
|
||||||
|
|
||||||
|
/// Send a message to all connceted clients
|
||||||
|
// void send(const msg::BaseMessage* message);
|
||||||
|
|
||||||
|
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
|
||||||
|
void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override;
|
||||||
|
void onDisconnect(StreamSession* connection) override;
|
||||||
|
|
||||||
|
/// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived
|
||||||
|
std::string onMessageReceived(ControlSession* connection, const std::string& message) override;
|
||||||
|
// TODO Refactor: ControlServer implements ControlMessageReceiver, calling this one.
|
||||||
|
void onNewSession(const std::shared_ptr<ControlSession>& session) override
|
||||||
|
{
|
||||||
|
std::ignore = session;
|
||||||
|
};
|
||||||
|
void onNewSession(const std::shared_ptr<StreamSession>& session) override;
|
||||||
|
|
||||||
|
/// Implementation of PcmListener
|
||||||
|
void onMetaChanged(const PcmStream* pcmStream) override;
|
||||||
|
void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) override;
|
||||||
|
void onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;
|
||||||
|
void onResync(const PcmStream* pcmStream, double ms) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const;
|
||||||
|
/// Save the server state deferred to prevent blocking and lower disk io
|
||||||
|
/// @param deferred the delay after the last call to saveConfig
|
||||||
|
void saveConfig(const std::chrono::milliseconds& deferred = std::chrono::seconds(2));
|
||||||
|
|
||||||
|
mutable std::recursive_mutex sessionsMutex_;
|
||||||
|
mutable std::recursive_mutex clientMutex_;
|
||||||
|
boost::asio::io_context& io_context_;
|
||||||
|
boost::asio::steady_timer config_timer_;
|
||||||
|
|
||||||
|
ServerSettings settings_;
|
||||||
|
Queue<std::shared_ptr<msg::BaseMessage>> messages_;
|
||||||
|
std::unique_ptr<ControlServer> controlServer_;
|
||||||
|
std::unique_ptr<StreamServer> streamServer_;
|
||||||
|
std::unique_ptr<StreamManager> streamManager_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
|
@ -31,7 +31,7 @@
|
||||||
#include "encoder/encoder_factory.hpp"
|
#include "encoder/encoder_factory.hpp"
|
||||||
#include "message/message.hpp"
|
#include "message/message.hpp"
|
||||||
#include "server_settings.hpp"
|
#include "server_settings.hpp"
|
||||||
#include "stream_server.hpp"
|
#include "server.hpp"
|
||||||
#if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
|
#if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
|
||||||
#include "publishZeroConf/publish_mdns.hpp"
|
#include "publishZeroConf/publish_mdns.hpp"
|
||||||
#endif
|
#endif
|
||||||
|
@ -284,8 +284,8 @@ int main(int argc, char* argv[])
|
||||||
settings.stream.bufferMs = 400;
|
settings.stream.bufferMs = 400;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto streamServer = std::make_unique<StreamServer>(io_context, settings);
|
auto server = std::make_unique<Server>(io_context, settings);
|
||||||
streamServer->start();
|
server->start();
|
||||||
|
|
||||||
if (settings.server.threads < 0)
|
if (settings.server.threads < 0)
|
||||||
settings.server.threads = std::max(2, std::min(4, static_cast<int>(std::thread::hardware_concurrency())));
|
settings.server.threads = std::max(2, std::min(4, static_cast<int>(std::thread::hardware_concurrency())));
|
||||||
|
@ -311,7 +311,7 @@ int main(int argc, char* argv[])
|
||||||
t.join();
|
t.join();
|
||||||
|
|
||||||
LOG(INFO) << "Stopping streamServer" << endl;
|
LOG(INFO) << "Stopping streamServer" << endl;
|
||||||
streamServer->stop();
|
server->stop();
|
||||||
LOG(INFO) << "done" << endl;
|
LOG(INFO) << "done" << endl;
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
|
|
|
@ -33,8 +33,8 @@ using json = nlohmann::json;
|
||||||
|
|
||||||
static constexpr auto LOG_TAG = "StreamServer";
|
static constexpr auto LOG_TAG = "StreamServer";
|
||||||
|
|
||||||
StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings)
|
StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, MessageReceiver* messageReceiver)
|
||||||
: io_context_(io_context), config_timer_(io_context), settings_(serverSettings)
|
: io_context_(io_context), config_timer_(io_context), settings_(serverSettings), messageReceiver_(messageReceiver)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,15 +54,25 @@ void StreamServer::cleanup()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::onMetaChanged(const PcmStream* pcmStream)
|
void StreamServer::addSession(const std::shared_ptr<StreamSession>& session)
|
||||||
|
{
|
||||||
|
session->setMessageReceiver(this);
|
||||||
|
session->setBufferMs(settings_.stream.bufferMs);
|
||||||
|
session->start();
|
||||||
|
|
||||||
|
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
||||||
|
sessions_.emplace_back(session);
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void StreamServer::onMetaChanged(const PcmStream* pcmStream, std::shared_ptr<msg::StreamTags> meta)
|
||||||
{
|
{
|
||||||
// clang-format off
|
// clang-format off
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}
|
// Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
// Send meta to all connected clients
|
// Send meta to all connected clients
|
||||||
const auto meta = pcmStream->getMeta();
|
|
||||||
LOG(DEBUG, LOG_TAG) << "metadata = " << meta->msg.dump(3) << "\n";
|
|
||||||
|
|
||||||
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
||||||
for (auto s : sessions_)
|
for (auto s : sessions_)
|
||||||
|
@ -73,33 +83,15 @@ void StreamServer::onMetaChanged(const PcmStream* pcmStream)
|
||||||
session->send(meta);
|
session->send(meta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(INFO, LOG_TAG) << "onMetaChanged (" << pcmStream->getName() << ")\n";
|
|
||||||
json notification = jsonrpcpp::Notification("Stream.OnMetadata", jsonrpcpp::Parameter("id", pcmStream->getId(), "meta", meta->msg)).to_json();
|
|
||||||
controlServer_->send(notification.dump(), nullptr);
|
|
||||||
// cout << "Notification: " << notification.dump() << "\n";
|
|
||||||
}
|
|
||||||
|
|
||||||
void StreamServer::onStateChanged(const PcmStream* pcmStream, const ReaderState& state)
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}}
|
|
||||||
// clang-format on
|
|
||||||
LOG(INFO, LOG_TAG) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast<int>(state) << "\n";
|
|
||||||
// LOG(INFO, LOG_TAG) << pcmStream->toJson().dump(4);
|
|
||||||
json notification = jsonrpcpp::Notification("Stream.OnUpdate", jsonrpcpp::Parameter("id", pcmStream->getId(), "stream", pcmStream->toJson())).to_json();
|
|
||||||
controlServer_->send(notification.dump(), nullptr);
|
|
||||||
// cout << "Notification: " << notification.dump() << "\n";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double /*duration*/)
|
void StreamServer::onNewChunk(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double /*duration*/)
|
||||||
{
|
{
|
||||||
// LOG(INFO, LOG_TAG) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n";
|
// LOG(INFO, LOG_TAG) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n";
|
||||||
bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get());
|
|
||||||
|
|
||||||
shared_const_buffer buffer(*chunk);
|
shared_const_buffer buffer(*chunk);
|
||||||
|
|
||||||
|
// make a copy of the sessions to avoid that a session get's deleted
|
||||||
std::vector<std::shared_ptr<StreamSession>> sessions;
|
std::vector<std::shared_ptr<StreamSession>> sessions;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
||||||
|
@ -137,21 +129,10 @@ void StreamServer::onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::P
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::onResync(const PcmStream* pcmStream, double ms)
|
void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer)
|
||||||
{
|
{
|
||||||
LOG(INFO, LOG_TAG) << "onResync (" << pcmStream->getName() << "): " << ms << " ms\n";
|
if (messageReceiver_)
|
||||||
}
|
messageReceiver_->onMessageReceived(streamSession, baseMessage, buffer);
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::onNewSession(const std::shared_ptr<StreamSession>& session)
|
|
||||||
{
|
|
||||||
session->setMessageReceiver(this);
|
|
||||||
session->setBufferMs(settings_.stream.bufferMs);
|
|
||||||
session->start();
|
|
||||||
|
|
||||||
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
|
||||||
sessions_.emplace_back(session);
|
|
||||||
cleanup();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -172,597 +153,12 @@ void StreamServer::onDisconnect(StreamSession* streamSession)
|
||||||
}),
|
}),
|
||||||
sessions_.end());
|
sessions_.end());
|
||||||
LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n";
|
LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n";
|
||||||
|
if (messageReceiver_)
|
||||||
// notify controllers if not yet done
|
messageReceiver_->onDisconnect(streamSession);
|
||||||
ClientInfoPtr clientInfo = Config::instance().getClientInfo(session->clientId);
|
|
||||||
if (!clientInfo || !clientInfo->connected)
|
|
||||||
return;
|
|
||||||
|
|
||||||
clientInfo->connected = false;
|
|
||||||
chronos::systemtimeofday(&clientInfo->lastSeen);
|
|
||||||
saveConfig();
|
|
||||||
if (controlServer_ != nullptr)
|
|
||||||
{
|
|
||||||
// Check if there is no session of this client is left
|
|
||||||
// Can happen in case of ungraceful disconnect/reconnect or
|
|
||||||
// in case of a duplicate client id
|
|
||||||
if (getStreamSession(clientInfo->id) == nullptr)
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Notification:
|
|
||||||
// {"jsonrpc":"2.0","method":"Client.OnDisconnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":false,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025523,"usec":814067},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}}
|
|
||||||
// clang-format on
|
|
||||||
json notification =
|
|
||||||
jsonrpcpp::Notification("Client.OnDisconnect", jsonrpcpp::Parameter("id", clientInfo->id, "client", clientInfo->toJson())).to_json();
|
|
||||||
controlServer_->send(notification.dump());
|
|
||||||
// cout << "Notification: " << notification.dump() << "\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cleanup();
|
cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// LOG(INFO, LOG_TAG) << "StreamServer::ProcessRequest method: " << request->method << ", " << "id: " << request->id() << "\n";
|
|
||||||
Json result;
|
|
||||||
|
|
||||||
if (request->method().find("Client.") == 0)
|
|
||||||
{
|
|
||||||
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get<std::string>("id"));
|
|
||||||
if (clientInfo == nullptr)
|
|
||||||
throw jsonrpcpp::InternalErrorException("Client not found", request->id());
|
|
||||||
|
|
||||||
if (request->method() == "Client.GetStatus")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":8,"jsonrpc":"2.0","method":"Client.GetStatus","params":{"id":"00:21:6a:7d:74:fc"}}
|
|
||||||
// Response: {"id":8,"jsonrpc":"2.0","result":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026416,"usec":135973},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}}}
|
|
||||||
// clang-format on
|
|
||||||
result["client"] = clientInfo->toJson();
|
|
||||||
}
|
|
||||||
else if (request->method() == "Client.SetVolume")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":8,"jsonrpc":"2.0","method":"Client.SetVolume","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}}
|
|
||||||
// Response: {"id":8,"jsonrpc":"2.0","result":{"volume":{"muted":false,"percent":74}}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Client.OnVolumeChanged","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}}
|
|
||||||
// clang-format on
|
|
||||||
|
|
||||||
std::lock_guard<std::recursive_mutex> lock(clientMutex_);
|
|
||||||
clientInfo->config.volume.fromJson(request->params().get("volume"));
|
|
||||||
result["volume"] = clientInfo->config.volume.toJson();
|
|
||||||
notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged",
|
|
||||||
jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson())));
|
|
||||||
}
|
|
||||||
else if (request->method() == "Client.SetLatency")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":7,"jsonrpc":"2.0","method":"Client.SetLatency","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}}
|
|
||||||
// Response: {"id":7,"jsonrpc":"2.0","result":{"latency":10}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Client.OnLatencyChanged","params":{"id":"00:21:6a:7d:74:fc#2","latency":10}}
|
|
||||||
// clang-format on
|
|
||||||
int latency = request->params().get("latency");
|
|
||||||
if (latency < -10000)
|
|
||||||
latency = -10000;
|
|
||||||
else if (latency > settings_.stream.bufferMs)
|
|
||||||
latency = settings_.stream.bufferMs;
|
|
||||||
clientInfo->config.latency = latency; //, -10000, settings_.stream.bufferMs);
|
|
||||||
result["latency"] = clientInfo->config.latency;
|
|
||||||
notification.reset(
|
|
||||||
new jsonrpcpp::Notification("Client.OnLatencyChanged", jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency)));
|
|
||||||
}
|
|
||||||
else if (request->method() == "Client.SetName")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":6,"jsonrpc":"2.0","method":"Client.SetName","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}}
|
|
||||||
// Response: {"id":6,"jsonrpc":"2.0","result":{"name":"Laptop"}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Client.OnNameChanged","params":{"id":"00:21:6a:7d:74:fc#2","name":"Laptop"}}
|
|
||||||
// clang-format on
|
|
||||||
clientInfo->config.name = request->params().get<std::string>("name");
|
|
||||||
result["name"] = clientInfo->config.name;
|
|
||||||
notification.reset(
|
|
||||||
new jsonrpcpp::Notification("Client.OnNameChanged", jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name)));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw jsonrpcpp::MethodNotFoundException(request->id());
|
|
||||||
|
|
||||||
|
|
||||||
if (request->method().find("Client.Set") == 0)
|
|
||||||
{
|
|
||||||
/// Update client
|
|
||||||
session_ptr session = getStreamSession(clientInfo->id);
|
|
||||||
if (session != nullptr)
|
|
||||||
{
|
|
||||||
auto serverSettings = make_shared<msg::ServerSettings>();
|
|
||||||
serverSettings->setBufferMs(settings_.stream.bufferMs);
|
|
||||||
serverSettings->setVolume(clientInfo->config.volume.percent);
|
|
||||||
GroupPtr group = Config::instance().getGroupFromClient(clientInfo);
|
|
||||||
serverSettings->setMuted(clientInfo->config.volume.muted || group->muted);
|
|
||||||
serverSettings->setLatency(clientInfo->config.latency);
|
|
||||||
session->send(serverSettings);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (request->method().find("Group.") == 0)
|
|
||||||
{
|
|
||||||
GroupPtr group = Config::instance().getGroup(request->params().get<std::string>("id"));
|
|
||||||
if (group == nullptr)
|
|
||||||
throw jsonrpcpp::InternalErrorException("Group not found", request->id());
|
|
||||||
|
|
||||||
if (request->method() == "Group.GetStatus")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":5,"jsonrpc":"2.0","method":"Group.GetStatus","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}}
|
|
||||||
// Response: {"id":5,"jsonrpc":"2.0","result":{"group":{"clients":[{"config":{"instance":2,"latency":10,"name":"Laptop","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488026485,"usec":644997},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":74}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488026481,"usec":223747},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":true,"name":"","stream_id":"stream 1"}}}
|
|
||||||
// clang-format on
|
|
||||||
result["group"] = group->toJson();
|
|
||||||
}
|
|
||||||
else if (request->method() == "Group.SetName")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":6,"jsonrpc":"2.0","method":"Group.SetName","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","name":"Laptop"}}
|
|
||||||
// Response: {"id":6,"jsonrpc":"2.0","result":{"name":"MediaPlayer"}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Group.OnNameChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","MediaPlayer":"Laptop"}}
|
|
||||||
// clang-format on
|
|
||||||
group->name = request->params().get<std::string>("name");
|
|
||||||
result["name"] = group->name;
|
|
||||||
notification.reset(new jsonrpcpp::Notification("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name)));
|
|
||||||
}
|
|
||||||
else if (request->method() == "Group.SetMute")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":5,"jsonrpc":"2.0","method":"Group.SetMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}}
|
|
||||||
// Response: {"id":5,"jsonrpc":"2.0","result":{"mute":true}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Group.OnMute","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","mute":true}}
|
|
||||||
// clang-format on
|
|
||||||
bool muted = request->params().get<bool>("mute");
|
|
||||||
group->muted = muted;
|
|
||||||
|
|
||||||
/// Update clients
|
|
||||||
for (auto client : group->clients)
|
|
||||||
{
|
|
||||||
session_ptr session = getStreamSession(client->id);
|
|
||||||
if (session != nullptr)
|
|
||||||
{
|
|
||||||
auto serverSettings = make_shared<msg::ServerSettings>();
|
|
||||||
serverSettings->setBufferMs(settings_.stream.bufferMs);
|
|
||||||
serverSettings->setVolume(client->config.volume.percent);
|
|
||||||
GroupPtr group = Config::instance().getGroupFromClient(client);
|
|
||||||
serverSettings->setMuted(client->config.volume.muted || group->muted);
|
|
||||||
serverSettings->setLatency(client->config.latency);
|
|
||||||
session->send(serverSettings);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
result["mute"] = group->muted;
|
|
||||||
notification.reset(new jsonrpcpp::Notification("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted)));
|
|
||||||
}
|
|
||||||
else if (request->method() == "Group.SetStream")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":4,"jsonrpc":"2.0","method":"Group.SetStream","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}}
|
|
||||||
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"stream 1"}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Group.OnStreamChanged","params":{"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","stream_id":"stream 1"}}
|
|
||||||
// clang-format on
|
|
||||||
string streamId = request->params().get<std::string>("stream_id");
|
|
||||||
PcmStreamPtr stream = streamManager_->getStream(streamId);
|
|
||||||
if (stream == nullptr)
|
|
||||||
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
|
|
||||||
|
|
||||||
group->streamId = streamId;
|
|
||||||
|
|
||||||
// Update clients
|
|
||||||
for (auto client : group->clients)
|
|
||||||
{
|
|
||||||
session_ptr session = getStreamSession(client->id);
|
|
||||||
if (session && (session->pcmStream() != stream))
|
|
||||||
{
|
|
||||||
session->send(stream->getMeta());
|
|
||||||
session->send(stream->getHeader());
|
|
||||||
session->setPcmStream(stream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify others
|
|
||||||
result["stream_id"] = group->streamId;
|
|
||||||
notification.reset(new jsonrpcpp::Notification("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId)));
|
|
||||||
}
|
|
||||||
else if (request->method() == "Group.SetClients")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":3,"jsonrpc":"2.0","method":"Group.SetClients","params":{"clients":["00:21:6a:7d:74:fc#2","00:21:6a:7d:74:fc"],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1"}}
|
|
||||||
// Response: {"id":3,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025901,"usec":864472},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025905,"usec":45238},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
|
||||||
// clang-format on
|
|
||||||
vector<string> clients = request->params().get("clients");
|
|
||||||
// Remove clients from group
|
|
||||||
for (auto iter = group->clients.begin(); iter != group->clients.end();)
|
|
||||||
{
|
|
||||||
auto client = *iter;
|
|
||||||
if (find(clients.begin(), clients.end(), client->id) != clients.end())
|
|
||||||
{
|
|
||||||
++iter;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
iter = group->clients.erase(iter);
|
|
||||||
GroupPtr newGroup = Config::instance().addClientInfo(client);
|
|
||||||
newGroup->streamId = group->streamId;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add clients to group
|
|
||||||
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
|
|
||||||
for (const auto& clientId : clients)
|
|
||||||
{
|
|
||||||
ClientInfoPtr client = Config::instance().getClientInfo(clientId);
|
|
||||||
if (!client)
|
|
||||||
continue;
|
|
||||||
GroupPtr oldGroup = Config::instance().getGroupFromClient(client);
|
|
||||||
if (oldGroup && (oldGroup->id == group->id))
|
|
||||||
continue;
|
|
||||||
|
|
||||||
if (oldGroup)
|
|
||||||
{
|
|
||||||
oldGroup->removeClient(client);
|
|
||||||
Config::instance().remove(oldGroup);
|
|
||||||
}
|
|
||||||
|
|
||||||
group->addClient(client);
|
|
||||||
|
|
||||||
// assign new stream
|
|
||||||
session_ptr session = getStreamSession(client->id);
|
|
||||||
if (session && stream && (session->pcmStream() != stream))
|
|
||||||
{
|
|
||||||
session->send(stream->getMeta());
|
|
||||||
session->send(stream->getHeader());
|
|
||||||
session->setPcmStream(stream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (group->empty())
|
|
||||||
Config::instance().remove(group);
|
|
||||||
|
|
||||||
json server = Config::instance().getServerStatus(streamManager_->toJson());
|
|
||||||
result["server"] = server;
|
|
||||||
|
|
||||||
// Notify others: since at least two groups are affected, send a complete server update
|
|
||||||
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw jsonrpcpp::MethodNotFoundException(request->id());
|
|
||||||
}
|
|
||||||
else if (request->method().find("Server.") == 0)
|
|
||||||
{
|
|
||||||
if (request->method().find("Server.GetRPCVersion") == 0)
|
|
||||||
{
|
|
||||||
// Request: {"id":8,"jsonrpc":"2.0","method":"Server.GetRPCVersion"}
|
|
||||||
// Response: {"id":8,"jsonrpc":"2.0","result":{"major":2,"minor":0,"patch":0}}
|
|
||||||
// <major>: backwards incompatible change
|
|
||||||
result["major"] = 2;
|
|
||||||
// <minor>: feature addition to the API
|
|
||||||
result["minor"] = 0;
|
|
||||||
// <patch>: bugfix release
|
|
||||||
result["patch"] = 0;
|
|
||||||
}
|
|
||||||
else if (request->method() == "Server.GetStatus")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":1,"jsonrpc":"2.0","method":"Server.GetStatus"}
|
|
||||||
// Response: {"id":1,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025696,"usec":578142},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025696,"usec":611255},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
|
||||||
// clang-format on
|
|
||||||
result["server"] = Config::instance().getServerStatus(streamManager_->toJson());
|
|
||||||
}
|
|
||||||
else if (request->method() == "Server.DeleteClient")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":2,"jsonrpc":"2.0","method":"Server.DeleteClient","params":{"id":"00:21:6a:7d:74:fc"}}
|
|
||||||
// Response: {"id":2,"jsonrpc":"2.0","result":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025751,"usec":654777},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
|
||||||
// clang-format on
|
|
||||||
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params().get<std::string>("id"));
|
|
||||||
if (clientInfo == nullptr)
|
|
||||||
throw jsonrpcpp::InternalErrorException("Client not found", request->id());
|
|
||||||
|
|
||||||
Config::instance().remove(clientInfo);
|
|
||||||
|
|
||||||
json server = Config::instance().getServerStatus(streamManager_->toJson());
|
|
||||||
result["server"] = server;
|
|
||||||
|
|
||||||
/// Notify others
|
|
||||||
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw jsonrpcpp::MethodNotFoundException(request->id());
|
|
||||||
}
|
|
||||||
else if (request->method().find("Stream.") == 0)
|
|
||||||
{
|
|
||||||
if (request->method().find("Stream.SetMeta") == 0)
|
|
||||||
{
|
|
||||||
/// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.SetMeta","params":{"id":"Spotify",
|
|
||||||
/// "meta": {"album": "some album", "artist": "some artist", "track": "some track"...}}}
|
|
||||||
///
|
|
||||||
/// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
|
|
||||||
/// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications
|
|
||||||
|
|
||||||
LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get<std::string>("id") << ")" << request->params().get("meta") << "\n";
|
|
||||||
|
|
||||||
// Find stream
|
|
||||||
string streamId = request->params().get<std::string>("id");
|
|
||||||
PcmStreamPtr stream = streamManager_->getStream(streamId);
|
|
||||||
if (stream == nullptr)
|
|
||||||
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
|
|
||||||
|
|
||||||
// Set metadata from request
|
|
||||||
stream->setMeta(request->params().get("meta"));
|
|
||||||
|
|
||||||
// Setup response
|
|
||||||
result["id"] = streamId;
|
|
||||||
}
|
|
||||||
else if (request->method() == "Stream.AddStream")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.AddStream","params":{"streamUri":"uri"}}
|
|
||||||
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
|
|
||||||
// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications
|
|
||||||
// clang-format on
|
|
||||||
|
|
||||||
LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")"
|
|
||||||
<< "\n";
|
|
||||||
|
|
||||||
// Find stream
|
|
||||||
string streamUri = request->params().get("streamUri");
|
|
||||||
PcmStreamPtr stream = streamManager_->addStream(streamUri);
|
|
||||||
if (stream == nullptr)
|
|
||||||
throw jsonrpcpp::InternalErrorException("Stream not created", request->id());
|
|
||||||
stream->start(); // We start the stream, otherwise it would be silent
|
|
||||||
// Setup response
|
|
||||||
result["id"] = stream->getId();
|
|
||||||
}
|
|
||||||
else if (request->method() == "Stream.RemoveStream")
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Request: {"id":4,"jsonrpc":"2.0","method":"Stream.RemoveStream","params":{"id":"Spotify"}}
|
|
||||||
// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}}
|
|
||||||
// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications
|
|
||||||
// clang-format on
|
|
||||||
|
|
||||||
LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")"
|
|
||||||
<< "\n";
|
|
||||||
|
|
||||||
// Find stream
|
|
||||||
string streamId = request->params().get("id");
|
|
||||||
streamManager_->removeStream(streamId);
|
|
||||||
// Setup response
|
|
||||||
result["id"] = streamId;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw jsonrpcpp::MethodNotFoundException(request->id());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
throw jsonrpcpp::MethodNotFoundException(request->id());
|
|
||||||
|
|
||||||
response.reset(new jsonrpcpp::Response(*request, result));
|
|
||||||
}
|
|
||||||
catch (const jsonrpcpp::RequestException& e)
|
|
||||||
{
|
|
||||||
LOG(ERROR, LOG_TAG) << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump()
|
|
||||||
<< "\n";
|
|
||||||
response.reset(new jsonrpcpp::RequestException(e));
|
|
||||||
}
|
|
||||||
catch (const exception& e)
|
|
||||||
{
|
|
||||||
LOG(ERROR, LOG_TAG) << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n";
|
|
||||||
response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
std::string StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message)
|
|
||||||
{
|
|
||||||
// LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n";
|
|
||||||
jsonrpcpp::entity_ptr entity(nullptr);
|
|
||||||
try
|
|
||||||
{
|
|
||||||
entity = jsonrpcpp::Parser::do_parse(message);
|
|
||||||
if (!entity)
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
catch (const jsonrpcpp::ParseErrorException& e)
|
|
||||||
{
|
|
||||||
return e.to_json().dump();
|
|
||||||
}
|
|
||||||
catch (const std::exception& e)
|
|
||||||
{
|
|
||||||
return jsonrpcpp::ParseErrorException(e.what()).to_json().dump();
|
|
||||||
}
|
|
||||||
|
|
||||||
jsonrpcpp::entity_ptr response(nullptr);
|
|
||||||
jsonrpcpp::notification_ptr notification(nullptr);
|
|
||||||
if (entity->is_request())
|
|
||||||
{
|
|
||||||
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
|
|
||||||
ProcessRequest(request, response, notification);
|
|
||||||
saveConfig();
|
|
||||||
////cout << "Request: " << request->to_json().dump() << "\n";
|
|
||||||
if (notification)
|
|
||||||
{
|
|
||||||
////cout << "Notification: " << notification->to_json().dump() << "\n";
|
|
||||||
controlServer_->send(notification->to_json().dump(), controlSession);
|
|
||||||
}
|
|
||||||
if (response)
|
|
||||||
{
|
|
||||||
////cout << "Response: " << response->to_json().dump() << "\n";
|
|
||||||
return response->to_json().dump();
|
|
||||||
}
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
else if (entity->is_batch())
|
|
||||||
{
|
|
||||||
jsonrpcpp::batch_ptr batch = dynamic_pointer_cast<jsonrpcpp::Batch>(entity);
|
|
||||||
////cout << "Batch: " << batch->to_json().dump() << "\n";
|
|
||||||
jsonrpcpp::Batch responseBatch;
|
|
||||||
jsonrpcpp::Batch notificationBatch;
|
|
||||||
for (const auto& batch_entity : batch->entities)
|
|
||||||
{
|
|
||||||
if (batch_entity->is_request())
|
|
||||||
{
|
|
||||||
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(batch_entity);
|
|
||||||
ProcessRequest(request, response, notification);
|
|
||||||
if (response != nullptr)
|
|
||||||
responseBatch.add_ptr(response);
|
|
||||||
if (notification != nullptr)
|
|
||||||
notificationBatch.add_ptr(notification);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
saveConfig();
|
|
||||||
if (!notificationBatch.entities.empty())
|
|
||||||
controlServer_->send(notificationBatch.to_json().dump(), controlSession);
|
|
||||||
if (!responseBatch.entities.empty())
|
|
||||||
return responseBatch.to_json().dump();
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
return "";
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer)
|
|
||||||
{
|
|
||||||
LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id
|
|
||||||
<< ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec
|
|
||||||
<< ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
|
|
||||||
if (baseMessage.type == message_type::kTime)
|
|
||||||
{
|
|
||||||
auto timeMsg = make_shared<msg::Time>();
|
|
||||||
timeMsg->deserialize(baseMessage, buffer);
|
|
||||||
timeMsg->refersTo = timeMsg->id;
|
|
||||||
timeMsg->latency = timeMsg->received - timeMsg->sent;
|
|
||||||
// LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n";
|
|
||||||
streamSession->send(timeMsg);
|
|
||||||
|
|
||||||
// refresh streamSession state
|
|
||||||
ClientInfoPtr client = Config::instance().getClientInfo(streamSession->clientId);
|
|
||||||
if (client != nullptr)
|
|
||||||
{
|
|
||||||
chronos::systemtimeofday(&client->lastSeen);
|
|
||||||
client->connected = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (baseMessage.type == message_type::kClientInfo)
|
|
||||||
{
|
|
||||||
ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId);
|
|
||||||
if (clientInfo == nullptr)
|
|
||||||
{
|
|
||||||
LOG(ERROR, LOG_TAG) << "client not found: " << streamSession->clientId << "\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
msg::ClientInfo infoMsg;
|
|
||||||
infoMsg.deserialize(baseMessage, buffer);
|
|
||||||
|
|
||||||
clientInfo->config.volume.percent = infoMsg.getVolume();
|
|
||||||
clientInfo->config.volume.muted = infoMsg.isMuted();
|
|
||||||
jsonrpcpp::notification_ptr notification = make_shared<jsonrpcpp::Notification>(
|
|
||||||
"Client.OnVolumeChanged", jsonrpcpp::Parameter("id", streamSession->clientId, "volume", clientInfo->config.volume.toJson()));
|
|
||||||
controlServer_->send(notification->to_json().dump());
|
|
||||||
}
|
|
||||||
else if (baseMessage.type == message_type::kHello)
|
|
||||||
{
|
|
||||||
msg::Hello helloMsg;
|
|
||||||
helloMsg.deserialize(baseMessage, buffer);
|
|
||||||
streamSession->clientId = helloMsg.getUniqueId();
|
|
||||||
LOG(INFO, LOG_TAG) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion()
|
|
||||||
<< ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch()
|
|
||||||
<< ", Protocol version: " << helloMsg.getProtocolVersion() << "\n";
|
|
||||||
|
|
||||||
bool newGroup(false);
|
|
||||||
GroupPtr group = Config::instance().getGroupFromClient(streamSession->clientId);
|
|
||||||
if (group == nullptr)
|
|
||||||
{
|
|
||||||
group = Config::instance().addClientInfo(streamSession->clientId);
|
|
||||||
newGroup = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
ClientInfoPtr client = group->getClient(streamSession->clientId);
|
|
||||||
|
|
||||||
LOG(DEBUG, LOG_TAG) << "Sending ServerSettings to " << streamSession->clientId << "\n";
|
|
||||||
auto serverSettings = make_shared<msg::ServerSettings>();
|
|
||||||
serverSettings->setVolume(client->config.volume.percent);
|
|
||||||
serverSettings->setMuted(client->config.volume.muted || group->muted);
|
|
||||||
serverSettings->setLatency(client->config.latency);
|
|
||||||
serverSettings->setBufferMs(settings_.stream.bufferMs);
|
|
||||||
serverSettings->refersTo = helloMsg.id;
|
|
||||||
streamSession->send(serverSettings);
|
|
||||||
|
|
||||||
client->host.mac = helloMsg.getMacAddress();
|
|
||||||
client->host.ip = streamSession->getIP();
|
|
||||||
client->host.name = helloMsg.getHostName();
|
|
||||||
client->host.os = helloMsg.getOS();
|
|
||||||
client->host.arch = helloMsg.getArch();
|
|
||||||
client->snapclient.version = helloMsg.getVersion();
|
|
||||||
client->snapclient.name = helloMsg.getClientName();
|
|
||||||
client->snapclient.protocolVersion = helloMsg.getProtocolVersion();
|
|
||||||
client->config.instance = helloMsg.getInstance();
|
|
||||||
client->connected = true;
|
|
||||||
chronos::systemtimeofday(&client->lastSeen);
|
|
||||||
|
|
||||||
// Assign and update stream
|
|
||||||
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
|
|
||||||
if (!stream)
|
|
||||||
{
|
|
||||||
stream = streamManager_->getDefaultStream();
|
|
||||||
group->streamId = stream->getId();
|
|
||||||
}
|
|
||||||
LOG(DEBUG, LOG_TAG) << "Group: " << group->id << ", stream: " << group->streamId << "\n";
|
|
||||||
|
|
||||||
saveConfig();
|
|
||||||
|
|
||||||
LOG(DEBUG, LOG_TAG) << "Sending meta data to " << streamSession->clientId << "\n";
|
|
||||||
streamSession->send(stream->getMeta());
|
|
||||||
streamSession->setPcmStream(stream);
|
|
||||||
auto headerChunk = stream->getHeader();
|
|
||||||
LOG(DEBUG, LOG_TAG) << "Sending codec header to " << streamSession->clientId << "\n";
|
|
||||||
streamSession->send(headerChunk);
|
|
||||||
|
|
||||||
if (newGroup)
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Server.OnUpdate","params":{"server":{"groups":[{"clients":[{"config":{"instance":2,"latency":6,"name":"123 456","volume":{"muted":false,"percent":48}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc#2","lastSeen":{"sec":1488025796,"usec":714671},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"4dcc4e3b-c699-a04b-7f0c-8260d23c43e1","muted":false,"name":"","stream_id":"stream 2"},{"clients":[{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":100}},"connected":true,"host":{"arch":"x86_64","ip":"127.0.0.1","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025798,"usec":728305},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}}],"id":"c5da8f7a-f377-1e51-8266-c5cc61099b71","muted":false,"name":"","stream_id":"stream 1"}],"server":{"host":{"arch":"x86_64","ip":"","mac":"","name":"T400","os":"Linux Mint 17.3 Rosa"},"snapserver":{"controlProtocolVersion":1,"name":"Snapserver","protocolVersion":1,"version":"0.10.0"}},"streams":[{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}},{"id":"stream 2","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 2","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 2","scheme":"pipe"}}]}}}
|
|
||||||
// clang-format on
|
|
||||||
json server = Config::instance().getServerStatus(streamManager_->toJson());
|
|
||||||
json notification = jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)).to_json();
|
|
||||||
controlServer_->send(notification.dump());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// clang-format off
|
|
||||||
// Notification: {"jsonrpc":"2.0","method":"Client.OnConnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":true,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025524,"usec":876332},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}}
|
|
||||||
// clang-format on
|
|
||||||
json notification = jsonrpcpp::Notification("Client.OnConnect", jsonrpcpp::Parameter("id", client->id, "client", client->toJson())).to_json();
|
|
||||||
controlServer_->send(notification.dump());
|
|
||||||
// cout << "Notification: " << notification.dump() << "\n";
|
|
||||||
}
|
|
||||||
// cout << Config::instance().getServerStatus(streamManager_->toJson()).dump(4) << "\n";
|
|
||||||
// cout << group->toJson().dump(4) << "\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void StreamServer::saveConfig(const std::chrono::milliseconds& deferred)
|
|
||||||
{
|
|
||||||
config_timer_.cancel();
|
|
||||||
config_timer_.expires_after(deferred);
|
|
||||||
config_timer_.async_wait([](const boost::system::error_code& ec) {
|
|
||||||
if (!ec)
|
|
||||||
{
|
|
||||||
LOG(DEBUG, LOG_TAG) << "Saving config\n";
|
|
||||||
Config::instance().save();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const
|
session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const
|
||||||
{
|
{
|
||||||
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
||||||
|
@ -820,7 +216,7 @@ void StreamServer::handleAccept(tcp::socket socket)
|
||||||
|
|
||||||
LOG(NOTICE, LOG_TAG) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
|
LOG(NOTICE, LOG_TAG) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
|
||||||
shared_ptr<StreamSession> session = make_shared<StreamSessionTcp>(io_context_, this, std::move(socket));
|
shared_ptr<StreamSession> session = make_shared<StreamSessionTcp>(io_context_, this, std::move(socket));
|
||||||
onNewSession(session);
|
addSession(session);
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
{
|
{
|
||||||
|
@ -832,44 +228,21 @@ void StreamServer::handleAccept(tcp::socket socket)
|
||||||
|
|
||||||
void StreamServer::start()
|
void StreamServer::start()
|
||||||
{
|
{
|
||||||
try
|
for (const auto& address : settings_.stream.bind_to_address)
|
||||||
{
|
{
|
||||||
controlServer_ = std::make_unique<ControlServer>(io_context_, settings_.tcp, settings_.http, this);
|
try
|
||||||
controlServer_->start();
|
|
||||||
|
|
||||||
streamManager_ =
|
|
||||||
std::make_unique<StreamManager>(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamChunkMs);
|
|
||||||
// throw SnapException("xxx");
|
|
||||||
for (const auto& streamUri : settings_.stream.pcmStreams)
|
|
||||||
{
|
{
|
||||||
PcmStreamPtr stream = streamManager_->addStream(streamUri);
|
LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n";
|
||||||
if (stream)
|
acceptor_.emplace_back(
|
||||||
LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n";
|
make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port)));
|
||||||
}
|
}
|
||||||
streamManager_->start();
|
catch (const boost::system::system_error& e)
|
||||||
|
|
||||||
for (const auto& address : settings_.stream.bind_to_address)
|
|
||||||
{
|
{
|
||||||
try
|
LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
|
||||||
{
|
|
||||||
LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n";
|
|
||||||
acceptor_.emplace_back(
|
|
||||||
make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port)));
|
|
||||||
}
|
|
||||||
catch (const boost::system::system_error& e)
|
|
||||||
{
|
|
||||||
LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
startAccept();
|
startAccept();
|
||||||
}
|
|
||||||
catch (const std::exception& e)
|
|
||||||
{
|
|
||||||
LOG(NOTICE, LOG_TAG) << "StreamServer::start: " << e.what() << endl;
|
|
||||||
stop();
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -879,18 +252,6 @@ void StreamServer::stop()
|
||||||
acceptor->cancel();
|
acceptor->cancel();
|
||||||
acceptor_.clear();
|
acceptor_.clear();
|
||||||
|
|
||||||
if (streamManager_)
|
|
||||||
{
|
|
||||||
streamManager_->stop();
|
|
||||||
streamManager_ = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (controlServer_)
|
|
||||||
{
|
|
||||||
controlServer_->stop();
|
|
||||||
controlServer_ = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
|
||||||
cleanup();
|
cleanup();
|
||||||
for (auto s : sessions_)
|
for (auto s : sessions_)
|
||||||
|
|
|
@ -51,10 +51,10 @@ using session_ptr = std::shared_ptr<StreamSession>;
|
||||||
* Receives (via the MessageReceiver interface) and answers messages from the clients
|
* Receives (via the MessageReceiver interface) and answers messages from the clients
|
||||||
* Forwards PCM data to the clients
|
* Forwards PCM data to the clients
|
||||||
*/
|
*/
|
||||||
class StreamServer : public MessageReceiver, public ControlMessageReceiver, public PcmListener
|
class StreamServer : public MessageReceiver
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings);
|
StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, MessageReceiver* messageReceiver = nullptr);
|
||||||
virtual ~StreamServer();
|
virtual ~StreamServer();
|
||||||
|
|
||||||
void start();
|
void start();
|
||||||
|
@ -63,35 +63,21 @@ public:
|
||||||
/// Send a message to all connceted clients
|
/// Send a message to all connceted clients
|
||||||
// void send(const msg::BaseMessage* message);
|
// void send(const msg::BaseMessage* message);
|
||||||
|
|
||||||
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
|
void addSession(const std::shared_ptr<StreamSession>& session);
|
||||||
void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override;
|
void onMetaChanged(const PcmStream* pcmStream, std::shared_ptr<msg::StreamTags> meta);
|
||||||
void onDisconnect(StreamSession* connection) override;
|
void onNewChunk(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double duration);
|
||||||
|
|
||||||
/// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived
|
session_ptr getStreamSession(const std::string& mac) const;
|
||||||
std::string onMessageReceived(ControlSession* connection, const std::string& message) override;
|
session_ptr getStreamSession(StreamSession* session) const;
|
||||||
// TODO Refactor: ControlServer implements ControlMessageReceiver, calling this one.
|
|
||||||
void onNewSession(const std::shared_ptr<ControlSession>& session) override
|
|
||||||
{
|
|
||||||
std::ignore = session;
|
|
||||||
};
|
|
||||||
void onNewSession(const std::shared_ptr<StreamSession>& session) override;
|
|
||||||
|
|
||||||
/// Implementation of PcmListener
|
|
||||||
void onMetaChanged(const PcmStream* pcmStream) override;
|
|
||||||
void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) override;
|
|
||||||
void onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;
|
|
||||||
void onResync(const PcmStream* pcmStream, double ms) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void startAccept();
|
void startAccept();
|
||||||
void handleAccept(tcp::socket socket);
|
void handleAccept(tcp::socket socket);
|
||||||
session_ptr getStreamSession(const std::string& mac) const;
|
|
||||||
session_ptr getStreamSession(StreamSession* session) const;
|
|
||||||
void ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const;
|
|
||||||
void cleanup();
|
void cleanup();
|
||||||
/// Save the server state deferred to prevent blocking and lower disk io
|
|
||||||
/// @param deferred the delay after the last call to saveConfig
|
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
|
||||||
void saveConfig(const std::chrono::milliseconds& deferred = std::chrono::seconds(2));
|
void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override;
|
||||||
|
void onDisconnect(StreamSession* connection) override;
|
||||||
|
|
||||||
mutable std::recursive_mutex sessionsMutex_;
|
mutable std::recursive_mutex sessionsMutex_;
|
||||||
mutable std::recursive_mutex clientMutex_;
|
mutable std::recursive_mutex clientMutex_;
|
||||||
|
@ -102,8 +88,7 @@ private:
|
||||||
|
|
||||||
ServerSettings settings_;
|
ServerSettings settings_;
|
||||||
Queue<std::shared_ptr<msg::BaseMessage>> messages_;
|
Queue<std::shared_ptr<msg::BaseMessage>> messages_;
|
||||||
std::unique_ptr<ControlServer> controlServer_;
|
MessageReceiver* messageReceiver_;
|
||||||
std::unique_ptr<StreamManager> streamManager_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue