diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index ac36b2f3..77f200f8 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -3,9 +3,12 @@ set(SERVER_SOURCES control_server.cpp control_session_tcp.cpp control_session_http.cpp + control_session_ws.cpp snapserver.cpp stream_server.cpp stream_session.cpp + stream_session_tcp.cpp + stream_session_ws.cpp encoder/encoder_factory.cpp encoder/pcm_encoder.cpp streamreader/base64.cpp diff --git a/server/Makefile b/server/Makefile index c64b75f7..e9c9d5cb 100644 --- a/server/Makefile +++ b/server/Makefile @@ -44,7 +44,7 @@ endif CXXFLAGS += $(ADD_CFLAGS) -std=c++14 -Wall -Wextra -Wpedantic -Wno-unused-function -DBOOST_ERROR_CODE_HEADER_ONLY -DHAS_FLAC -DHAS_OGG -DHAS_VORBIS -DHAS_VORBIS_ENC -DHAS_OPUS -DVERSION=\"$(VERSION)\" -I. -I.. -I../common LDFLAGS += $(ADD_LDFLAGS) -lvorbis -lvorbisenc -logg -lFLAC -lopus -OBJ = snapserver.o config.o control_server.o control_session_tcp.o control_session_http.o stream_server.o stream_session.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o +OBJ = snapserver.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o ifneq (,$(TARGET)) CXXFLAGS += -D$(TARGET) diff --git a/server/control_server.cpp b/server/control_server.cpp index 4f920069..190338d0 100644 --- a/server/control_server.cpp +++ b/server/control_server.cpp @@ -31,6 +31,8 @@ using namespace std; using json = nlohmann::json; +static constexpr auto LOG_TAG = "ControlServer"; + ControlServer::ControlServer(boost::asio::io_context& io_context, const ServerSettings::Tcp& tcp_settings, const ServerSettings::Http& http_settings, ControlMessageReceiver* controlMessageReceiver) @@ -51,7 +53,7 @@ void ControlServer::cleanup() auto count = distance(new_end, sessions_.end()); if (count > 0) { - LOG(ERROR) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n"; + LOG(ERROR, LOG_TAG) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n"; sessions_.erase(new_end, sessions_.end()); } } @@ -72,29 +74,45 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu } -std::string ControlServer::onMessageReceived(ControlSession* connection, const std::string& message) +std::string ControlServer::onMessageReceived(ControlSession* session, const std::string& message) { - // LOG(DEBUG) << "received: \"" << message << "\"\n"; + // LOG(DEBUG, LOG_TAG) << "received: \"" << message << "\"\n"; if (controlMessageReceiver_ != nullptr) - return controlMessageReceiver_->onMessageReceived(connection, message); + return controlMessageReceiver_->onMessageReceived(session, message); return ""; } +void ControlServer::onNewSession(const shared_ptr& session) +{ + std::lock_guard mlock(session_mutex_); + session->start(); + sessions_.emplace_back(session); + cleanup(); +} + + +void ControlServer::onNewSession(const std::shared_ptr& session) +{ + if (controlMessageReceiver_ != nullptr) + controlMessageReceiver_->onNewSession(session); +} + + void ControlServer::startAccept() { auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) { if (!ec) handleAccept(std::move(socket)); else - LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n"; }; auto accept_handler_http = [this](error_code ec, tcp::socket socket) { if (!ec) handleAccept(std::move(socket), http_settings_); else - LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n"; }; for (auto& acceptor : acceptor_tcp_) @@ -116,18 +134,13 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args) setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); // socket->set_option(boost::asio::ip::tcp::no_delay(false)); - LOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; + LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; shared_ptr session = make_shared(this, io_context_, std::move(socket), std::forward(args)...); - { - std::lock_guard mlock(session_mutex_); - session->start(); - sessions_.emplace_back(session); - cleanup(); - } + onNewSession(session); } catch (const std::exception& e) { - LOG(ERROR) << "Exception in ControlServer::handleAccept: " << e.what() << endl; + LOG(ERROR, LOG_TAG) << "Exception in ControlServer::handleAccept: " << e.what() << endl; } startAccept(); } @@ -142,13 +155,13 @@ void ControlServer::start() { try { - LOG(INFO) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n"; + LOG(INFO, LOG_TAG) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n"; acceptor_tcp_.emplace_back( make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port))); } catch (const boost::system::system_error& e) { - LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; + LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; } } } @@ -158,13 +171,13 @@ void ControlServer::start() { try { - LOG(INFO) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n"; + LOG(INFO, LOG_TAG) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n"; acceptor_http_.emplace_back( make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port))); } catch (const boost::system::system_error& e) { - LOG(ERROR) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n"; + LOG(ERROR, LOG_TAG) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n"; } } } diff --git a/server/control_server.hpp b/server/control_server.hpp index 32fb3ad0..4d460ffc 100644 --- a/server/control_server.hpp +++ b/server/control_server.hpp @@ -54,7 +54,9 @@ public: void send(const std::string& message, const ControlSession* excludeSession = nullptr); /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived - std::string onMessageReceived(ControlSession* connection, const std::string& message) override; + std::string onMessageReceived(ControlSession* session, const std::string& message) override; + void onNewSession(const std::shared_ptr& session) override; + void onNewSession(const std::shared_ptr& session) override; private: void startAccept(); diff --git a/server/control_session.hpp b/server/control_session.hpp index 3b78f5a0..98539682 100644 --- a/server/control_session.hpp +++ b/server/control_session.hpp @@ -33,14 +33,16 @@ using boost::asio::ip::tcp; class ControlSession; - +class StreamSession; /// Interface: callback for a received message. class ControlMessageReceiver { public: // TODO: rename, error handling - virtual std::string onMessageReceived(ControlSession* connection, const std::string& message) = 0; + virtual std::string onMessageReceived(ControlSession* session, const std::string& message) = 0; + virtual void onNewSession(const std::shared_ptr& session) = 0; + virtual void onNewSession(const std::shared_ptr& session) = 0; }; @@ -50,7 +52,7 @@ public: * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSession +class ControlSession : public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to MessageReceiver @@ -61,9 +63,6 @@ public: virtual void start() = 0; virtual void stop() = 0; - /// Sends a message to the client (synchronous) - virtual bool send(const std::string& message) = 0; - /// Sends a message to the client (asynchronous) virtual void sendAsync(const std::string& message) = 0; diff --git a/server/control_session_http.cpp b/server/control_session_http.cpp index 0c33e2e5..d199712e 100644 --- a/server/control_session_http.cpp +++ b/server/control_session_http.cpp @@ -18,12 +18,17 @@ #include "control_session_http.hpp" #include "common/aixlog.hpp" +#include "control_session_ws.hpp" #include "message/pcm_chunk.hpp" +#include "stream_session_ws.hpp" #include #include using namespace std; +static constexpr auto LOG_TAG = "ControlSessionHTTP"; + + static constexpr const char* HTTP_SERVER_NAME = "Snapcast"; namespace @@ -102,13 +107,13 @@ ControlSessionHttp::ControlSessionHttp(ControlMessageReceiver* receiver, boost:: const ServerSettings::Http& settings) : ControlSession(receiver), socket_(std::move(socket)), settings_(settings), strand_(ioc) { - LOG(DEBUG) << "ControlSessionHttp\n"; + LOG(DEBUG, LOG_TAG) << "ControlSessionHttp\n"; } ControlSessionHttp::~ControlSessionHttp() { - LOG(DEBUG) << "ControlSessionHttp::~ControlSessionHttp()\n"; + LOG(DEBUG, LOG_TAG) << "ControlSessionHttp::~ControlSessionHttp()\n"; stop(); } @@ -193,7 +198,7 @@ void ControlSessionHttp::handle_request(http::request(std::move(socket_), state_)->run(std::move(req_)); - ws_ = make_unique>(std::move(socket_)); - ws_->async_accept(req_, [ this, self = shared_from_this() ](beast::error_code ec) { on_accept_ws(ec); }); - LOG(DEBUG) << "websocket upgrade\n"; + LOG(DEBUG, LOG_TAG) << "websocket upgrade, target: " << req_.target() << "\n"; + if (req_.target() == "/jsonrpc") + { + // Create a WebSocket session by transferring the socket + // std::make_shared(std::move(socket_), state_)->run(std::move(req_)); + auto ws = std::make_shared>(std::move(socket_)); + ws->async_accept(req_, [ this, ws, self = shared_from_this() ](beast::error_code ec) { + auto ws_session = make_shared(message_receiver_, strand_.context(), std::move(*ws)); + message_receiver_->onNewSession(ws_session); + }); + } + else if (req_.target() == "/stream") + { + // Create a WebSocket session by transferring the socket + // std::make_shared(std::move(socket_), state_)->run(std::move(req_)); + auto ws = std::make_shared>(std::move(socket_)); + ws->async_accept(req_, [ this, ws, self = shared_from_this() ](beast::error_code ec) { + auto ws_session = make_shared(strand_.context(), nullptr, std::move(*ws)); + message_receiver_->onNewSession(ws_session); + }); + } return; } @@ -282,7 +303,7 @@ void ControlSessionHttp::on_write(beast::error_code ec, std::size_t, bool close) // Handle the error, if any if (ec) { - LOG(ERROR) << "ControlSessionHttp::on_write, error: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "ControlSessionHttp::on_write, error: " << ec.message() << "\n"; return; } @@ -308,103 +329,7 @@ void ControlSessionHttp::stop() { } + void ControlSessionHttp::sendAsync(const std::string& message) { - if (!ws_) - return; - - strand_.post([ this, self = shared_from_this(), msg = message ]() { - messages_.push_back(std::move(msg)); - if (messages_.size() > 1) - { - LOG(DEBUG) << "HTTP session outstanding async_writes: " << messages_.size() << "\n"; - return; - } - send_next(); - }); -} - -void ControlSessionHttp::send_next() -{ - if (!ws_) - return; - - const std::string& message = messages_.front(); - ws_->async_write(boost::asio::buffer(message), - boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](std::error_code ec, std::size_t length) { - messages_.pop_front(); - if (ec) - { - LOG(ERROR) << "Error while writing to web socket: " << ec.message() << "\n"; - } - else - { - LOG(DEBUG) << "Wrote " << length << " bytes to web socket\n"; - } - if (!messages_.empty()) - send_next(); - })); -} - - -bool ControlSessionHttp::send(const std::string& message) -{ - if (!ws_) - return false; - - boost::system::error_code ec; - ws_->write(boost::asio::buffer(message), ec); - return !ec; -} - -void ControlSessionHttp::on_accept_ws(beast::error_code ec) -{ - if (ec) - { - LOG(ERROR) << "ControlSessionWs::on_accept, error: " << ec.message() << "\n"; - return; - } - - // Read a message - do_read_ws(); -} - -void ControlSessionHttp::do_read_ws() -{ - // Read a message into our buffer - ws_->async_read(buffer_, boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes_transferred) { - on_read_ws(ec, bytes_transferred); - })); -} - - -void ControlSessionHttp::on_read_ws(beast::error_code ec, std::size_t bytes_transferred) -{ - boost::ignore_unused(bytes_transferred); - - // This indicates that the session was closed - if (ec == websocket::error::closed) - return; - - if (ec) - { - LOG(ERROR) << "ControlSessionHttp::on_read_ws error: " << ec.message() << "\n"; - return; - } - - std::string line{boost::beast::buffers_to_string(buffer_.data())}; - if (!line.empty()) - { - // LOG(DEBUG) << "received: " << line << "\n"; - if ((message_receiver_ != nullptr) && !line.empty()) - { - string response = message_receiver_->onMessageReceived(this, line); - if (!response.empty()) - { - sendAsync(response); - } - } - } - buffer_.consume(bytes_transferred); - do_read_ws(); } diff --git a/server/control_session_http.hpp b/server/control_session_http.hpp index 93689418..d9807a8a 100644 --- a/server/control_session_http.hpp +++ b/server/control_session_http.hpp @@ -36,7 +36,7 @@ namespace net = boost::asio; // from * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSessionHttp : public ControlSession, public std::enable_shared_from_this +class ControlSessionHttp : public ControlSession { public: /// ctor. Received message from the client are passed to MessageReceiver @@ -45,9 +45,6 @@ public: void start() override; void stop() override; - /// Sends a message to the client (synchronous) - bool send(const std::string& message) override; - /// Sends a message to the client (asynchronous) void sendAsync(const std::string& message) override; @@ -59,18 +56,8 @@ protected: template void handle_request(http::request>&& req, Send&& send); - void send_next(); - http::request req_; -protected: - // Websocket methods - void on_accept_ws(beast::error_code ec); - void on_read_ws(beast::error_code ec, std::size_t bytes_transferred); - void do_read_ws(); - - std::unique_ptr> ws_; - protected: tcp::socket socket_; beast::flat_buffer buffer_; diff --git a/server/control_session_tcp.cpp b/server/control_session_tcp.cpp index 936a52b2..ff14e16c 100644 --- a/server/control_session_tcp.cpp +++ b/server/control_session_tcp.cpp @@ -22,6 +22,8 @@ using namespace std; +static constexpr auto LOG_TAG = "ControlSessionTCP"; + // https://stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not-interleaving-async-write-calls/7756894 @@ -33,7 +35,7 @@ ControlSessionTcp::ControlSessionTcp(ControlMessageReceiver* receiver, boost::as ControlSessionTcp::~ControlSessionTcp() { - LOG(DEBUG) << "ControlSessionTcp::~ControlSessionTcp()\n"; + LOG(DEBUG, LOG_TAG) << "ControlSessionTcp::~ControlSessionTcp()\n"; stop(); } @@ -46,7 +48,7 @@ void ControlSessionTcp::do_read() boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), delimiter ](const std::error_code& ec, std::size_t bytes_transferred) { if (ec) { - LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n"; return; } @@ -56,7 +58,7 @@ void ControlSessionTcp::do_read() { if (line.back() == '\r') line.resize(line.size() - 1); - // LOG(DEBUG) << "received: " << line << "\n"; + // LOG(DEBUG, LOG_TAG) << "received: " << line << "\n"; if ((message_receiver_ != nullptr) && !line.empty()) { string response = message_receiver_->onMessageReceived(this, line); @@ -78,15 +80,15 @@ void ControlSessionTcp::start() void ControlSessionTcp::stop() { - LOG(DEBUG) << "ControlSession::stop\n"; + LOG(DEBUG, LOG_TAG) << "ControlSession::stop\n"; boost::system::error_code ec; socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); if (ec) - LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error in socket shutdown: " << ec.message() << "\n"; socket_.close(ec); if (ec) - LOG(ERROR) << "Error in socket close: " << ec.message() << "\n"; - LOG(DEBUG) << "ControlSession ControlSession stopped\n"; + LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; + LOG(DEBUG, LOG_TAG) << "ControlSession ControlSession stopped\n"; } @@ -96,7 +98,7 @@ void ControlSessionTcp::sendAsync(const std::string& message) messages_.emplace_back(message + "\r\n"); if (messages_.size() > 1) { - LOG(DEBUG) << "TCP session outstanding async_writes: " << messages_.size() << "\n"; + LOG(DEBUG, LOG_TAG) << "TCP session outstanding async_writes: " << messages_.size() << "\n"; return; } send_next(); @@ -110,20 +112,13 @@ void ControlSessionTcp::send_next() messages_.pop_front(); if (ec) { - LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error while writing to control socket: " << ec.message() << "\n"; } else { - LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n"; + LOG(DEBUG, LOG_TAG) << "Wrote " << length << " bytes to control socket\n"; } if (!messages_.empty()) send_next(); })); } - -bool ControlSessionTcp::send(const std::string& message) -{ - boost::system::error_code ec; - boost::asio::write(socket_, boost::asio::buffer(message + "\r\n"), ec); - return !ec; -} diff --git a/server/control_session_tcp.hpp b/server/control_session_tcp.hpp index 3d0a6a3b..854ed640 100644 --- a/server/control_session_tcp.hpp +++ b/server/control_session_tcp.hpp @@ -28,7 +28,7 @@ * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSessionTcp : public ControlSession, public std::enable_shared_from_this +class ControlSessionTcp : public ControlSession { public: /// ctor. Received message from the client are passed to MessageReceiver @@ -37,9 +37,6 @@ public: void start() override; void stop() override; - /// Sends a message to the client (synchronous) - bool send(const std::string& message) override; - /// Sends a message to the client (asynchronous) void sendAsync(const std::string& message) override; diff --git a/server/control_session_ws.cpp b/server/control_session_ws.cpp new file mode 100644 index 00000000..a5835c76 --- /dev/null +++ b/server/control_session_ws.cpp @@ -0,0 +1,134 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#include "control_session_ws.hpp" +#include "common/aixlog.hpp" +#include "message/pcm_chunk.hpp" +#include + +using namespace std; + +static constexpr auto LOG_TAG = "ControlSessionWS"; + + +ControlSessionWebsocket::ControlSessionWebsocket(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, websocket::stream&& socket) + : ControlSession(receiver), ws_(std::move(socket)), strand_(ioc) +{ + LOG(DEBUG, LOG_TAG) << "ControlSessionWebsocket\n"; +} + + +ControlSessionWebsocket::~ControlSessionWebsocket() +{ + LOG(DEBUG, LOG_TAG) << "ControlSessionWebsocket::~ControlSessionWebsocket()\n"; + stop(); +} + + +void ControlSessionWebsocket::start() +{ + // Read a message + do_read_ws(); +} + + +void ControlSessionWebsocket::stop() +{ + if (ws_.is_open()) + { + boost::beast::error_code ec; + ws_.close(beast::websocket::close_code::normal, ec); + if (ec) + LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; + } +} + + +void ControlSessionWebsocket::sendAsync(const std::string& message) +{ + strand_.post([ this, self = shared_from_this(), msg = message ]() { + messages_.push_back(std::move(msg)); + if (messages_.size() > 1) + { + LOG(DEBUG, LOG_TAG) << "HTTP session outstanding async_writes: " << messages_.size() << "\n"; + return; + } + send_next(); + }); +} + + +void ControlSessionWebsocket::send_next() +{ + const std::string& message = messages_.front(); + ws_.async_write(boost::asio::buffer(message), + boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](std::error_code ec, std::size_t length) { + messages_.pop_front(); + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error while writing to web socket: " << ec.message() << "\n"; + } + else + { + LOG(DEBUG, LOG_TAG) << "Wrote " << length << " bytes to web socket\n"; + } + if (!messages_.empty()) + send_next(); + })); +} + + +void ControlSessionWebsocket::do_read_ws() +{ + // Read a message into our buffer + ws_.async_read(buffer_, boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes_transferred) { + on_read_ws(ec, bytes_transferred); + })); +} + + +void ControlSessionWebsocket::on_read_ws(beast::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == websocket::error::closed) + return; + + if (ec) + { + LOG(ERROR, LOG_TAG) << "ControlSessionWebsocket::on_read_ws error: " << ec.message() << "\n"; + return; + } + + std::string line{boost::beast::buffers_to_string(buffer_.data())}; + if (!line.empty()) + { + // LOG(DEBUG, LOG_TAG) << "received: " << line << "\n"; + if ((message_receiver_ != nullptr) && !line.empty()) + { + string response = message_receiver_->onMessageReceived(this, line); + if (!response.empty()) + { + sendAsync(response); + } + } + } + buffer_.consume(bytes_transferred); + do_read_ws(); +} diff --git a/server/control_session_ws.hpp b/server/control_session_ws.hpp new file mode 100644 index 00000000..c6d9e2d6 --- /dev/null +++ b/server/control_session_ws.hpp @@ -0,0 +1,67 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef CONTROL_SESSION_WS_HPP +#define CONTROL_SESSION_WS_HPP + +#include "control_session.hpp" +#include +#include +#include + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace websocket = beast::websocket; // from +namespace net = boost::asio; // from + + +/// Endpoint for a connected control client. +/** + * Endpoint for a connected control client. + * Messages are sent to the client with the "send" method. + * Received messages from the client are passed to the ControlMessageReceiver callback + */ +class ControlSessionWebsocket : public ControlSession +{ +public: + /// ctor. Received message from the client are passed to MessageReceiver + ControlSessionWebsocket(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, websocket::stream&& socket); + ~ControlSessionWebsocket() override; + void start() override; + void stop() override; + + /// Sends a message to the client (asynchronous) + void sendAsync(const std::string& message) override; + +protected: + // Websocket methods + void on_read_ws(beast::error_code ec, std::size_t bytes_transferred); + void do_read_ws(); + void send_next(); + + websocket::stream ws_; + +protected: + beast::flat_buffer buffer_; + boost::asio::io_context::strand strand_; + std::deque messages_; +}; + + + +#endif diff --git a/server/stream_server.cpp b/server/stream_server.cpp index 283a8b85..5bee0680 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -23,6 +23,7 @@ #include "message/hello.hpp" #include "message/stream_tags.hpp" #include "message/time.hpp" +#include "stream_session_tcp.hpp" #include using namespace std; @@ -30,6 +31,7 @@ using namespace streamreader; using json = nlohmann::json; +static constexpr auto LOG_TAG = "StreamServer"; StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings) : io_context_(io_context), config_timer_(io_context), settings_(serverSettings) @@ -46,7 +48,7 @@ void StreamServer::cleanup() auto count = distance(new_end, sessions_.end()); if (count > 0) { - LOG(ERROR) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n"; + LOG(ERROR, LOG_TAG) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n"; sessions_.erase(new_end, sessions_.end()); } } @@ -60,7 +62,7 @@ void StreamServer::onMetaChanged(const PcmStream* pcmStream) // Send meta to all connected clients const auto meta = pcmStream->getMeta(); - LOG(DEBUG) << "metadata = " << meta->msg.dump(3) << "\n"; + LOG(DEBUG, LOG_TAG) << "metadata = " << meta->msg.dump(3) << "\n"; std::lock_guard mlock(sessionsMutex_); for (auto s : sessions_) @@ -68,11 +70,11 @@ void StreamServer::onMetaChanged(const PcmStream* pcmStream) if (auto session = s.lock()) { if (session->pcmStream().get() == pcmStream) - session->sendAsync(meta); + session->send(meta); } } - LOG(INFO) << "onMetaChanged (" << pcmStream->getName() << ")\n"; + LOG(INFO, LOG_TAG) << "onMetaChanged (" << pcmStream->getName() << ")\n"; json notification = jsonrpcpp::Notification("Stream.OnMetadata", jsonrpcpp::Parameter("id", pcmStream->getId(), "meta", meta->msg)).to_json(); controlServer_->send(notification.dump(), nullptr); // cout << "Notification: " << notification.dump() << "\n"; @@ -83,17 +85,17 @@ void StreamServer::onStateChanged(const PcmStream* pcmStream, const ReaderState& // clang-format off // Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}} // clang-format on - LOG(INFO) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast(state) << "\n"; - // LOG(INFO) << pcmStream->toJson().dump(4); + LOG(INFO, LOG_TAG) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast(state) << "\n"; + // LOG(INFO, LOG_TAG) << pcmStream->toJson().dump(4); json notification = jsonrpcpp::Notification("Stream.OnUpdate", jsonrpcpp::Parameter("id", pcmStream->getId(), "stream", pcmStream->toJson())).to_json(); controlServer_->send(notification.dump(), nullptr); // cout << "Notification: " << notification.dump() << "\n"; } -void StreamServer::onChunkRead(const PcmStream* pcmStream, std::shared_ptr chunk, double /*duration*/) +void StreamServer::onNewChunk(const PcmStream* pcmStream, std::shared_ptr chunk, double /*duration*/) { - // LOG(INFO) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n"; + // LOG(INFO, LOG_TAG) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n"; bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get()); shared_const_buffer buffer(*chunk); @@ -128,16 +130,28 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, std::shared_ptrpcmStream() && isDefaultStream) //->getName() == "default") - session->sendAsync(buffer); + session->send(buffer); else if (session->pcmStream().get() == pcmStream) - session->sendAsync(buffer); + session->send(buffer); } } void StreamServer::onResync(const PcmStream* pcmStream, double ms) { - LOG(INFO) << "onResync (" << pcmStream->getName() << "): " << ms << " ms\n"; + LOG(INFO, LOG_TAG) << "onResync (" << pcmStream->getName() << "): " << ms << " ms\n"; +} + + +void StreamServer::onNewSession(const std::shared_ptr& session) +{ + session->setMessageReceiver(this); + session->setBufferMs(settings_.stream.bufferMs); + session->start(); + + std::lock_guard mlock(sessionsMutex_); + sessions_.emplace_back(session); + cleanup(); } @@ -149,15 +163,15 @@ void StreamServer::onDisconnect(StreamSession* streamSession) if (session == nullptr) return; - LOG(INFO) << "onDisconnect: " << session->clientId << "\n"; - LOG(DEBUG) << "sessions: " << sessions_.size() << "\n"; + LOG(INFO, LOG_TAG) << "onDisconnect: " << session->clientId << "\n"; + LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n"; sessions_.erase(std::remove_if(sessions_.begin(), sessions_.end(), [streamSession](std::weak_ptr session) { auto s = session.lock(); return s.get() == streamSession; }), sessions_.end()); - LOG(DEBUG) << "sessions: " << sessions_.size() << "\n"; + LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n"; // notify controllers if not yet done ClientInfoPtr clientInfo = Config::instance().getClientInfo(session->clientId); @@ -192,7 +206,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp { try { - // LOG(INFO) << "StreamServer::ProcessRequest method: " << request->method << ", " << "id: " << request->id() << "\n"; + // LOG(INFO, LOG_TAG) << "StreamServer::ProcessRequest method: " << request->method << ", " << "id: " << request->id() << "\n"; Json result; if (request->method().find("Client.") == 0) @@ -268,7 +282,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp GroupPtr group = Config::instance().getGroupFromClient(clientInfo); serverSettings->setMuted(clientInfo->config.volume.muted || group->muted); serverSettings->setLatency(clientInfo->config.latency); - session->sendAsync(serverSettings); + session->send(serverSettings); } } } @@ -319,7 +333,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp GroupPtr group = Config::instance().getGroupFromClient(client); serverSettings->setMuted(client->config.volume.muted || group->muted); serverSettings->setLatency(client->config.latency); - session->sendAsync(serverSettings); + session->send(serverSettings); } } @@ -346,8 +360,8 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp session_ptr session = getStreamSession(client->id); if (session && (session->pcmStream() != stream)) { - session->sendAsync(stream->getMeta()); - session->sendAsync(stream->getHeader()); + session->send(stream->getMeta()); + session->send(stream->getHeader()); session->setPcmStream(stream); } } @@ -401,8 +415,8 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp session_ptr session = getStreamSession(client->id); if (session && stream && (session->pcmStream() != stream)) { - session->sendAsync(stream->getMeta()); - session->sendAsync(stream->getHeader()); + session->send(stream->getMeta()); + session->send(stream->getHeader()); session->setPcmStream(stream); } } @@ -472,7 +486,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp /// Response: {"id":4,"jsonrpc":"2.0","result":{"stream_id":"Spotify"}} /// Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications - LOG(INFO) << "Stream.SetMeta(" << request->params().get("id") << ")" << request->params().get("meta") << "\n"; + LOG(INFO, LOG_TAG) << "Stream.SetMeta(" << request->params().get("id") << ")" << request->params().get("meta") << "\n"; // Find stream string streamId = request->params().get("id"); @@ -494,8 +508,8 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications // clang-format on - LOG(INFO) << "Stream.AddStream(" << request->params().get("streamUri") << ")" - << "\n"; + LOG(INFO, LOG_TAG) << "Stream.AddStream(" << request->params().get("streamUri") << ")" + << "\n"; // Find stream string streamUri = request->params().get("streamUri"); @@ -514,8 +528,8 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp // Call onMetaChanged(const PcmStream* pcmStream) for updates and notifications // clang-format on - LOG(INFO) << "Stream.RemoveStream(" << request->params().get("id") << ")" - << "\n"; + LOG(INFO, LOG_TAG) << "Stream.RemoveStream(" << request->params().get("id") << ")" + << "\n"; // Find stream string streamId = request->params().get("id"); @@ -533,12 +547,13 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp } catch (const jsonrpcpp::RequestException& e) { - LOG(ERROR) << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n"; + LOG(ERROR, LOG_TAG) << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() + << "\n"; response.reset(new jsonrpcpp::RequestException(e)); } catch (const exception& e) { - LOG(ERROR) << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n"; + LOG(ERROR, LOG_TAG) << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n"; response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id())); } } @@ -546,7 +561,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp std::string StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message) { - // LOG(DEBUG) << "onMessageReceived: " << message << "\n"; + // LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n"; jsonrpcpp::entity_ptr entity(nullptr); try { @@ -615,17 +630,17 @@ std::string StreamServer::onMessageReceived(ControlSession* controlSession, cons void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer) { - // LOG(DEBUG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << - // baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << - // baseMessage.received.usec << "\n"; + LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id + << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec + << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; if (baseMessage.type == message_type::kTime) { auto timeMsg = make_shared(); timeMsg->deserialize(baseMessage, buffer); timeMsg->refersTo = timeMsg->id; timeMsg->latency = timeMsg->received - timeMsg->sent; - // LOG(INFO) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; - streamSession->sendAsync(timeMsg); + // LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; + streamSession->send(timeMsg); // refresh streamSession state ClientInfoPtr client = Config::instance().getClientInfo(streamSession->clientId); @@ -640,7 +655,7 @@ void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::Ba ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId); if (clientInfo == nullptr) { - LOG(ERROR) << "client not found: " << streamSession->clientId << "\n"; + LOG(ERROR, LOG_TAG) << "client not found: " << streamSession->clientId << "\n"; return; } msg::ClientInfo infoMsg; @@ -657,12 +672,10 @@ void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::Ba msg::Hello helloMsg; helloMsg.deserialize(baseMessage, buffer); streamSession->clientId = helloMsg.getUniqueId(); - LOG(INFO) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() - << ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch() - << ", Protocol version: " << helloMsg.getProtocolVersion() << "\n"; + LOG(INFO, LOG_TAG) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() + << ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch() + << ", Protocol version: " << helloMsg.getProtocolVersion() << "\n"; - LOG(DEBUG) << "request kServerSettings: " << streamSession->clientId << "\n"; - // std::lock_guard mlock(mutex_); bool newGroup(false); GroupPtr group = Config::instance().getGroupFromClient(streamSession->clientId); if (group == nullptr) @@ -673,14 +686,14 @@ void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::Ba ClientInfoPtr client = group->getClient(streamSession->clientId); - LOG(DEBUG) << "request kServerSettings\n"; + LOG(DEBUG, LOG_TAG) << "Sending ServerSettings to " << streamSession->clientId << "\n"; auto serverSettings = make_shared(); serverSettings->setVolume(client->config.volume.percent); serverSettings->setMuted(client->config.volume.muted || group->muted); serverSettings->setLatency(client->config.latency); serverSettings->setBufferMs(settings_.stream.bufferMs); serverSettings->refersTo = helloMsg.id; - streamSession->sendAsync(serverSettings); + streamSession->send(serverSettings); client->host.mac = helloMsg.getMacAddress(); client->host.ip = streamSession->getIP(); @@ -701,14 +714,16 @@ void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::Ba stream = streamManager_->getDefaultStream(); group->streamId = stream->getId(); } - LOG(DEBUG) << "Group: " << group->id << ", stream: " << group->streamId << "\n"; + LOG(DEBUG, LOG_TAG) << "Group: " << group->id << ", stream: " << group->streamId << "\n"; saveConfig(); - streamSession->sendAsync(stream->getMeta()); + LOG(DEBUG, LOG_TAG) << "Sending meta data to " << streamSession->clientId << "\n"; + streamSession->send(stream->getMeta()); streamSession->setPcmStream(stream); auto headerChunk = stream->getHeader(); - streamSession->sendAsync(headerChunk); + LOG(DEBUG, LOG_TAG) << "Sending codec header to " << streamSession->clientId << "\n"; + streamSession->send(headerChunk); if (newGroup) { @@ -741,7 +756,7 @@ void StreamServer::saveConfig(const std::chrono::milliseconds& deferred) config_timer_.async_wait([](const boost::system::error_code& ec) { if (!ec) { - LOG(DEBUG) << "Saving config\n"; + LOG(DEBUG, LOG_TAG) << "Saving config\n"; Config::instance().save(); } }); @@ -782,7 +797,7 @@ void StreamServer::startAccept() if (!ec) handleAccept(std::move(socket)); else - LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n"; }; for (auto& acceptor : acceptor_) @@ -803,19 +818,13 @@ void StreamServer::handleAccept(tcp::socket socket) /// experimental: turn on tcp::no_delay socket.set_option(tcp::no_delay(true)); - LOG(NOTICE) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; - shared_ptr session = make_shared(io_context_, this, std::move(socket)); - - session->setBufferMs(settings_.stream.bufferMs); - session->start(); - - std::lock_guard mlock(sessionsMutex_); - sessions_.emplace_back(session); - cleanup(); + LOG(NOTICE, LOG_TAG) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; + shared_ptr session = make_shared(io_context_, this, std::move(socket)); + onNewSession(session); } catch (const std::exception& e) { - LOG(ERROR) << "Exception in StreamServer::handleAccept: " << e.what() << endl; + LOG(ERROR, LOG_TAG) << "Exception in StreamServer::handleAccept: " << e.what() << endl; } startAccept(); } @@ -835,7 +844,7 @@ void StreamServer::start() { PcmStreamPtr stream = streamManager_->addStream(streamUri); if (stream) - LOG(INFO) << "Stream: " << stream->getUri().toJson() << "\n"; + LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n"; } streamManager_->start(); @@ -843,13 +852,13 @@ void StreamServer::start() { try { - LOG(INFO) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n"; + LOG(INFO, LOG_TAG) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n"; acceptor_.emplace_back( make_unique(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port))); } catch (const boost::system::system_error& e) { - LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; + LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; } } @@ -857,7 +866,7 @@ void StreamServer::start() } catch (const std::exception& e) { - LOG(NOTICE) << "StreamServer::start: " << e.what() << endl; + LOG(NOTICE, LOG_TAG) << "StreamServer::start: " << e.what() << endl; stop(); throw; } diff --git a/server/stream_server.hpp b/server/stream_server.hpp index fed49641..1407b83a 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -69,11 +69,17 @@ public: /// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived std::string onMessageReceived(ControlSession* connection, const std::string& message) override; + // TODO Refactor: ControlServer implements ControlMessageReceiver, calling this one. + void onNewSession(const std::shared_ptr& session) override + { + std::ignore = session; + }; + void onNewSession(const std::shared_ptr& session) override; /// Implementation of PcmListener void onMetaChanged(const PcmStream* pcmStream) override; void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) override; - void onChunkRead(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) override; + void onNewChunk(const PcmStream* pcmStream, std::shared_ptr chunk, double duration) override; void onResync(const PcmStream* pcmStream, double ms) override; private: diff --git a/server/stream_session.cpp b/server/stream_session.cpp index 35c2cfc7..b143db62 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -29,83 +29,13 @@ using namespace streamreader; static constexpr auto LOG_TAG = "StreamSession"; -StreamSession::StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket) - : socket_(std::move(socket)), messageReceiver_(receiver), pcmStream_(nullptr), strand_(ioc) +StreamSession::StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver) : messageReceiver_(receiver), pcmStream_(nullptr), strand_(ioc) { base_msg_size_ = baseMessage_.getSize(); buffer_.resize(base_msg_size_); } -StreamSession::~StreamSession() -{ - stop(); -} - - -std::string StreamSession::getIP() -{ - try - { - return socket_.remote_endpoint().address().to_string(); - } - catch (...) - { - return "0.0.0.0"; - } -} - - -void StreamSession::read_next() -{ - boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), - boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; - messageReceiver_->onDisconnect(this); - return; - } - - baseMessage_.deserialize(buffer_.data()); - LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id - << ", refers: " << baseMessage_.refersTo << "\n"; - if (baseMessage_.type > message_type::kLast) - { - LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; - messageReceiver_->onDisconnect(this); - return; - } - else if (baseMessage_.size > msg::max_size) - { - LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; - messageReceiver_->onDisconnect(this); - return; - } - - if (baseMessage_.size > buffer_.size()) - buffer_.resize(baseMessage_.size); - - boost::asio::async_read( - socket_, boost::asio::buffer(buffer_, baseMessage_.size), - boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; - messageReceiver_->onDisconnect(this); - return; - } - - tv t; - baseMessage_.received = t; - if (messageReceiver_ != nullptr) - messageReceiver_->onMessageReceived(this, baseMessage_, buffer_.data()); - read_next(); - })); - })); -} - - void StreamSession::setPcmStream(PcmStreamPtr pcmStream) { pcmStream_ = pcmStream; @@ -118,49 +48,29 @@ const PcmStreamPtr StreamSession::pcmStream() const } -void StreamSession::start() -{ - read_next(); - // strand_.post([this]() { read_next(); }); -} - - -void StreamSession::stop() -{ - LOG(DEBUG, LOG_TAG) << "StreamSession::stop\n"; - boost::system::error_code ec; - socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - if (ec) - LOG(ERROR, LOG_TAG) << "Error in socket shutdown: " << ec.message() << "\n"; - socket_.close(ec); - if (ec) - LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; - LOG(DEBUG, LOG_TAG) << "StreamSession stopped\n"; -} - - void StreamSession::send_next() { auto& buffer = messages_.front(); buffer.on_air = true; - boost::asio::async_write(socket_, buffer, - boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), buffer ](boost::system::error_code ec, std::size_t length) { - messages_.pop_front(); - if (ec) - { - LOG(ERROR, LOG_TAG) << "StreamSession write error (msg length: " << length << "): " << ec.message() << "\n"; - messageReceiver_->onDisconnect(this); - return; - } - if (!messages_.empty()) - send_next(); - })); + strand_.post([ this, self = shared_from_this(), buffer ]() { + sendAsync(buffer, [this](boost::system::error_code ec, std::size_t length) { + messages_.pop_front(); + if (ec) + { + LOG(ERROR, LOG_TAG) << "StreamSession write error (msg length: " << length << "): " << ec.message() << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + if (!messages_.empty()) + send_next(); + }); + }); } -void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) +void StreamSession::send(shared_const_buffer const_buf) { - strand_.post([ this, self = shared_from_this(), const_buf, send_now ]() { + strand_.post([ this, self = shared_from_this(), const_buf ]() { // delete PCM chunks that are older than the overall buffer duration messages_.erase(std::remove_if(messages_.begin(), messages_.end(), [this](const shared_const_buffer& buffer) { @@ -172,10 +82,7 @@ void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) }), messages_.end()); - if (send_now) - messages_.push_front(const_buf); - else - messages_.push_back(const_buf); + messages_.push_back(const_buf); if (messages_.size() > 1) { @@ -187,13 +94,13 @@ void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) } -void StreamSession::sendAsync(msg::message_ptr message, bool send_now) +void StreamSession::send(msg::message_ptr message) { if (!message) return; // TODO: better set the timestamp in send_next for more accurate time sync - sendAsync(shared_const_buffer(*message), send_now); + send(shared_const_buffer(*message)); } diff --git a/server/stream_session.hpp b/server/stream_session.hpp index 8d3f42d8..e1ef8a37 100644 --- a/server/stream_session.hpp +++ b/server/stream_session.hpp @@ -55,6 +55,7 @@ class shared_const_buffer { std::vector data; bool is_pcm_chunk; + uint16_t type; chronos::time_point_clk rec_time; }; @@ -65,6 +66,7 @@ public: message.sent = t; const msg::PcmChunk* pcm_chunk = dynamic_cast(&message); message_ = std::make_shared(); + message_->type = message.type; message_->is_pcm_chunk = (pcm_chunk != nullptr); if (message_->is_pcm_chunk) message_->rec_time = pcm_chunk->start(); @@ -102,6 +104,8 @@ private: }; +using WriteHandler = std::function; + /// Endpoint for a connected client. /** * Endpoint for a connected client. @@ -112,35 +116,43 @@ class StreamSession : public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to MessageReceiver - StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket); - ~StreamSession(); - void start(); - void stop(); + StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver); + virtual ~StreamSession() = default; + + virtual std::string getIP() = 0; + + virtual void start() = 0; + virtual void stop() = 0; + + void setMessageReceiver(MessageReceiver* receiver) + { + messageReceiver_ = receiver; + } + +protected: + virtual void sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) = 0; + +public: + /// Sends a message to the client (asynchronous) + void send(msg::message_ptr message); /// Sends a message to the client (asynchronous) - void sendAsync(msg::message_ptr message, bool send_now = false); - - /// Sends a message to the client (asynchronous) - void sendAsync(shared_const_buffer const_buf, bool send_now = false); + void send(shared_const_buffer const_buf); /// Max playout latency. No need to send PCM data that is older than bufferMs void setBufferMs(size_t bufferMs); std::string clientId; - std::string getIP(); - void setPcmStream(streamreader::PcmStreamPtr pcmStream); const streamreader::PcmStreamPtr pcmStream() const; protected: - void read_next(); void send_next(); msg::BaseMessage baseMessage_; std::vector buffer_; size_t base_msg_size_; - tcp::socket socket_; MessageReceiver* messageReceiver_; size_t bufferMs_; streamreader::PcmStreamPtr pcmStream_; diff --git a/server/stream_session_tcp.cpp b/server/stream_session_tcp.cpp new file mode 100644 index 00000000..3a62bf96 --- /dev/null +++ b/server/stream_session_tcp.cpp @@ -0,0 +1,136 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#include "stream_session_tcp.hpp" + +#include "common/aixlog.hpp" +#include "message/pcm_chunk.hpp" +#include + +using namespace std; +using namespace streamreader; + + +static constexpr auto LOG_TAG = "StreamSessionTCP"; + + +StreamSessionTcp::StreamSessionTcp(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket) + : StreamSession(ioc, receiver), socket_(std::move(socket)) +{ +} + + +StreamSessionTcp::~StreamSessionTcp() +{ + LOG(DEBUG, LOG_TAG) << "~StreamSessionTcp\n"; + stop(); +} + + +void StreamSessionTcp::start() +{ + read_next(); +} + + +void StreamSessionTcp::stop() +{ + LOG(DEBUG, LOG_TAG) << "stop\n"; + if (socket_.is_open()) + { + boost::system::error_code ec; + socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + if (ec) + LOG(ERROR, LOG_TAG) << "Error in socket shutdown: " << ec.message() << "\n"; + socket_.close(ec); + if (ec) + LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; + LOG(DEBUG, LOG_TAG) << "stopped\n"; + } +} + + +std::string StreamSessionTcp::getIP() +{ + try + { + return socket_.remote_endpoint().address().to_string(); + } + catch (...) + { + return "0.0.0.0"; + } +} + + +void StreamSessionTcp::read_next() +{ + boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), + boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + + baseMessage_.deserialize(buffer_.data()); + LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id + << ", refers: " << baseMessage_.refersTo << "\n"; + if (baseMessage_.type > message_type::kLast) + { + LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + else if (baseMessage_.size > msg::max_size) + { + LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + + if (baseMessage_.size > buffer_.size()) + buffer_.resize(baseMessage_.size); + + boost::asio::async_read( + socket_, boost::asio::buffer(buffer_, baseMessage_.size), + boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + + tv t; + baseMessage_.received = t; + if (messageReceiver_ != nullptr) + messageReceiver_->onMessageReceived(this, baseMessage_, buffer_.data()); + read_next(); + })); + })); +} + + +void StreamSessionTcp::sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) +{ + boost::asio::async_write(socket_, buffer, + boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), buffer, + handler ](boost::system::error_code ec, std::size_t length) { handler(ec, length); })); +} diff --git a/server/stream_session_tcp.hpp b/server/stream_session_tcp.hpp new file mode 100644 index 00000000..2cb31734 --- /dev/null +++ b/server/stream_session_tcp.hpp @@ -0,0 +1,53 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef STREAM_SESSION_TCP_HPP +#define STREAM_SESSION_TCP_HPP + +#include "stream_session.hpp" + +using boost::asio::ip::tcp; + + +/// Endpoint for a connected client. +/** + * Endpoint for a connected client. + * Messages are sent to the client with the "send" method. + * Received messages from the client are passed to the MessageReceiver callback + */ +class StreamSessionTcp : public StreamSession +{ +public: + /// ctor. Received message from the client are passed to MessageReceiver + StreamSessionTcp(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket); + ~StreamSessionTcp() override; + void start() override; + void stop() override; + std::string getIP() override; + +protected: + void read_next(); + void sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) override; + +private: + tcp::socket socket_; +}; + + + +#endif diff --git a/server/stream_session_ws.cpp b/server/stream_session_ws.cpp new file mode 100644 index 00000000..23b750aa --- /dev/null +++ b/server/stream_session_ws.cpp @@ -0,0 +1,125 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#include "stream_session_ws.hpp" +#include "common/aixlog.hpp" +#include "message/pcm_chunk.hpp" +#include + +using namespace std; + +static constexpr auto LOG_TAG = "StreamSessionWS"; + + +StreamSessionWebsocket::StreamSessionWebsocket(boost::asio::io_context& ioc, MessageReceiver* receiver, websocket::stream&& socket) + : StreamSession(ioc, receiver), ws_(std::move(socket)) +{ + LOG(DEBUG, LOG_TAG) << "StreamSessionWS\n"; +} + + +StreamSessionWebsocket::~StreamSessionWebsocket() +{ + LOG(DEBUG, LOG_TAG) << "~StreamSessionWS\n"; + stop(); +} + + +void StreamSessionWebsocket::start() +{ + // Read a message + LOG(DEBUG, LOG_TAG) << "start\n"; + ws_.binary(true); + do_read_ws(); +} + + +void StreamSessionWebsocket::stop() +{ + if (ws_.is_open()) + { + boost::beast::error_code ec; + ws_.close(beast::websocket::close_code::normal, ec); + if (ec) + LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; + } +} + + +std::string StreamSessionWebsocket::getIP() +{ + try + { + return ws_.next_layer().socket().remote_endpoint().address().to_string(); + } + catch (...) + { + return "0.0.0.0"; + } +} + + +void StreamSessionWebsocket::sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) +{ + LOG(TRACE, LOG_TAG) << "sendAsync: " << buffer.message().type << "\n"; + ws_.async_write(buffer, boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), buffer, + handler ](boost::system::error_code ec, std::size_t length) { handler(ec, length); })); +} + + +void StreamSessionWebsocket::do_read_ws() +{ + // Read a message into our buffer + ws_.async_read(buffer_, boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](beast::error_code ec, std::size_t bytes_transferred) { + on_read_ws(ec, bytes_transferred); + })); +} + + +void StreamSessionWebsocket::on_read_ws(beast::error_code ec, std::size_t bytes_transferred) +{ + LOG(DEBUG, LOG_TAG) << "on_read_ws, ec: " << ec << ", bytes_transferred: " << bytes_transferred << "\n"; + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == websocket::error::closed) + { + messageReceiver_->onDisconnect(this); + return; + } + + if (ec) + { + LOG(ERROR) << "ControlSessionWebsocket::on_read_ws error: " << ec.message() << "\n"; + messageReceiver_->onDisconnect(this); + return; + } + + auto data = boost::asio::buffer_cast(buffer_.data()); + baseMessage_.deserialize(data); + LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id + << ", refers: " << baseMessage_.refersTo << "\n"; + + tv t; + baseMessage_.received = t; + if (messageReceiver_ != nullptr) + messageReceiver_->onMessageReceived(this, baseMessage_, data + base_msg_size_); + + buffer_.consume(bytes_transferred); + do_read_ws(); +} diff --git a/server/stream_session_ws.hpp b/server/stream_session_ws.hpp new file mode 100644 index 00000000..75d60d3a --- /dev/null +++ b/server/stream_session_ws.hpp @@ -0,0 +1,63 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef STREAM_SESSION_WS_HPP +#define STREAM_SESSION_WS_HPP + +#include "stream_session.hpp" +#include +#include +#include + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace websocket = beast::websocket; // from +namespace net = boost::asio; // from + + +/// Endpoint for a connected control client. +/** + * Endpoint for a connected control client. + * Messages are sent to the client with the "send" method. + * Received messages from the client are passed to the ControlMessageReceiver callback + */ +class StreamSessionWebsocket : public StreamSession +{ +public: + /// ctor. Received message from the client are passed to MessageReceiver + StreamSessionWebsocket(boost::asio::io_context& ioc, MessageReceiver* receiver, websocket::stream&& socket); + ~StreamSessionWebsocket() override; + void start() override; + void stop() override; + std::string getIP() override; + +protected: + // Websocket methods + void sendAsync(const shared_const_buffer& buffer, const WriteHandler& handler) override; + void on_read_ws(beast::error_code ec, std::size_t bytes_transferred); + void do_read_ws(); + + websocket::stream ws_; + +protected: + beast::flat_buffer buffer_; +}; + + + +#endif