diff --git a/server/control_server.cpp b/server/control_server.cpp index bea4de71..fcc75a7c 100644 --- a/server/control_server.cpp +++ b/server/control_server.cpp @@ -74,28 +74,27 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu } -std::string ControlServer::onMessageReceived(ControlSession* session, const std::string& message) +void ControlServer::onMessageReceived(std::shared_ptr session, const std::string& message, const ResponseHander& response_handler) { // LOG(DEBUG, LOG_TAG) << "received: \"" << message << "\"\n"; if (controlMessageReceiver_ != nullptr) - return controlMessageReceiver_->onMessageReceived(session, message); - return ""; + controlMessageReceiver_->onMessageReceived(std::move(session), message, response_handler); } -void ControlServer::onNewSession(const shared_ptr& session) +void ControlServer::onNewSession(shared_ptr session) { std::lock_guard mlock(session_mutex_); session->start(); - sessions_.emplace_back(session); + sessions_.emplace_back(std::move(session)); cleanup(); } -void ControlServer::onNewSession(const std::shared_ptr& session) +void ControlServer::onNewSession(std::shared_ptr session) { if (controlMessageReceiver_ != nullptr) - controlMessageReceiver_->onNewSession(session); + controlMessageReceiver_->onNewSession(std::move(session)); } @@ -136,7 +135,7 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args) // socket->set_option(boost::asio::ip::tcp::no_delay(false)); LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; shared_ptr session = make_shared(this, io_context_, std::move(socket), std::forward(args)...); - onNewSession(session); + onNewSession(std::move(session)); } catch (const std::exception& e) { diff --git a/server/control_server.hpp b/server/control_server.hpp index e956457f..fe9843ce 100644 --- a/server/control_server.hpp +++ b/server/control_server.hpp @@ -61,9 +61,9 @@ private: void cleanup(); /// Implementation of ControlMessageReceiver - std::string onMessageReceived(ControlSession* session, const std::string& message) override; - void onNewSession(const std::shared_ptr& session) override; - void onNewSession(const std::shared_ptr& session) override; + void onMessageReceived(std::shared_ptr session, const std::string& message, const ResponseHander& response_handler) override; + void onNewSession(std::shared_ptr session) override; + void onNewSession(std::shared_ptr session) override; mutable std::recursive_mutex session_mutex_; std::vector> sessions_; diff --git a/server/control_session.hpp b/server/control_session.hpp index c9d833b9..0cc8d390 100644 --- a/server/control_session.hpp +++ b/server/control_session.hpp @@ -39,10 +39,11 @@ class StreamSession; class ControlMessageReceiver { public: + using ResponseHander = std::function; // TODO: rename, error handling - virtual std::string onMessageReceived(ControlSession* session, const std::string& message) = 0; - virtual void onNewSession(const std::shared_ptr& session) = 0; - virtual void onNewSession(const std::shared_ptr& session) = 0; + virtual void onMessageReceived(std::shared_ptr session, const std::string& message, const ResponseHander& response_handler) = 0; + virtual void onNewSession(std::shared_ptr session) = 0; + virtual void onNewSession(std::shared_ptr session) = 0; }; diff --git a/server/control_session_http.cpp b/server/control_session_http.cpp index 6255ccd2..97e76000 100644 --- a/server/control_session_http.cpp +++ b/server/control_session_http.cpp @@ -220,14 +220,15 @@ void ControlSessionHttp::handle_request(http::requestonMessageReceived(this, req.body()); - http::response res{http::status::ok, req.version()}; - res.set(http::field::server, HTTP_SERVER_NAME); - res.set(http::field::content_type, "application/json"); - res.keep_alive(req.keep_alive()); - res.body() = response; - res.prepare_payload(); - return send(std::move(res)); + message_receiver_->onMessageReceived(shared_from_this(), req.body(), [this, req = std::move(req), send = std::move(send)](const std::string& response) { + http::response res{http::status::ok, req.version()}; + res.set(http::field::server, HTTP_SERVER_NAME); + res.set(http::field::content_type, "application/json"); + res.keep_alive(req.keep_alive()); + res.body() = response; + res.prepare_payload(); + return send(std::move(res)); + }); } // Request path must be absolute and not contain "..". @@ -322,7 +323,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe else { auto ws_session = make_shared(message_receiver_, strand_.context(), std::move(*ws)); - message_receiver_->onNewSession(ws_session); + message_receiver_->onNewSession(std::move(ws_session)); } }); } @@ -339,7 +340,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe else { auto ws_session = make_shared(strand_.context(), nullptr, std::move(*ws)); - message_receiver_->onNewSession(ws_session); + message_receiver_->onNewSession(std::move(ws_session)); } }); } diff --git a/server/control_session_tcp.cpp b/server/control_session_tcp.cpp index 407b5f11..26cfa31f 100644 --- a/server/control_session_tcp.cpp +++ b/server/control_session_tcp.cpp @@ -61,9 +61,10 @@ void ControlSessionTcp::do_read() // LOG(DEBUG, LOG_TAG) << "received: " << line << "\n"; if ((message_receiver_ != nullptr) && !line.empty()) { - string response = message_receiver_->onMessageReceived(this, line); - if (!response.empty()) - sendAsync(response); + message_receiver_->onMessageReceived(shared_from_this(), line, [this](const std::string& response) { + if (!response.empty()) + sendAsync(response); + }); } } streambuf_.consume(bytes_transferred); diff --git a/server/control_session_ws.cpp b/server/control_session_ws.cpp index 9f6b3d4a..3157294c 100644 --- a/server/control_session_ws.cpp +++ b/server/control_session_ws.cpp @@ -122,11 +122,12 @@ void ControlSessionWebsocket::on_read_ws(beast::error_code ec, std::size_t bytes // LOG(DEBUG, LOG_TAG) << "received: " << line << "\n"; if ((message_receiver_ != nullptr) && !line.empty()) { - string response = message_receiver_->onMessageReceived(this, line); - if (!response.empty()) - { - sendAsync(response); - } + message_receiver_->onMessageReceived(shared_from_this(), line, [this](const std::string& response) { + if (!response.empty()) + { + sendAsync(response); + } + }); } } buffer_.consume(bytes_transferred); diff --git a/server/server.cpp b/server/server.cpp index 72f1d212..3d9c0274 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -42,10 +42,10 @@ Server::Server(boost::asio::io_context& io_context, const ServerSettings& server Server::~Server() = default; -void Server::onNewSession(const std::shared_ptr& session) +void Server::onNewSession(std::shared_ptr session) { LOG(DEBUG, LOG_TAG) << "onNewSession\n"; - streamServer_->addSession(session); + streamServer_->addSession(std::move(session)); } @@ -140,8 +140,10 @@ void Server::onDisconnect(StreamSession* streamSession) } -void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const +void Server::processRequest(const jsonrpcpp::request_ptr request, const OnResponse& on_response) const { + jsonrpcpp::entity_ptr response; + jsonrpcpp::notification_ptr notification; try { // LOG(INFO, LOG_TAG) << "Server::processRequest method: " << request->method << ", " << "id: " << request->id() << "\n"; @@ -172,8 +174,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent std::lock_guard lock(clientMutex_); clientInfo->config.volume.fromJson(request->params().get("volume")); result["volume"] = clientInfo->config.volume.toJson(); - notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged", - jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson()))); + notification = std::make_shared( + "Client.OnVolumeChanged", jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson())); } else if (request->method() == "Client.SetLatency") { @@ -189,8 +191,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent latency = settings_.stream.bufferMs; clientInfo->config.latency = latency; //, -10000, settings_.stream.bufferMs); result["latency"] = clientInfo->config.latency; - notification.reset( - new jsonrpcpp::Notification("Client.OnLatencyChanged", jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency))); + notification = std::make_shared("Client.OnLatencyChanged", + jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency)); } else if (request->method() == "Client.SetName") { @@ -201,8 +203,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent // clang-format on clientInfo->config.name = request->params().get("name"); result["name"] = clientInfo->config.name; - notification.reset( - new jsonrpcpp::Notification("Client.OnNameChanged", jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name))); + notification = std::make_shared("Client.OnNameChanged", + jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name)); } else throw jsonrpcpp::MethodNotFoundException(request->id()); @@ -247,7 +249,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent // clang-format on group->name = request->params().get("name"); result["name"] = group->name; - notification.reset(new jsonrpcpp::Notification("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name))); + notification = std::make_shared("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name)); } else if (request->method() == "Group.SetMute") { @@ -276,7 +278,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent } result["mute"] = group->muted; - notification.reset(new jsonrpcpp::Notification("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted))); + notification = std::make_shared("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted)); } else if (request->method() == "Group.SetStream") { @@ -306,7 +308,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent // Notify others result["stream_id"] = group->streamId; - notification.reset(new jsonrpcpp::Notification("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId))); + notification = + std::make_shared("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId)); } else if (request->method() == "Group.SetClients") { @@ -366,7 +369,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent result["server"] = server; // Notify others: since at least two groups are affected, send a complete server update - notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server))); + notification = std::make_shared("Server.OnUpdate", jsonrpcpp::Parameter("server", server)); } else throw jsonrpcpp::MethodNotFoundException(request->id()); @@ -409,7 +412,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent result["server"] = server; /// Notify others - notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server))); + notification = std::make_shared("Server.OnUpdate", jsonrpcpp::Parameter("server", server)); } else throw jsonrpcpp::MethodNotFoundException(request->id()); @@ -457,13 +460,14 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent if (stream == nullptr) throw jsonrpcpp::InternalErrorException("Stream not found", request->id()); - stream->control(*request, [](const jsonrpcpp::Response& response) { - LOG(INFO, LOG_TAG) << "Received response for Stream.Control, id: " << response.id() << ", result: " << response.result() - << ", error: " << response.error().code() << "\n"; + stream->control(*request, [request, on_response](const jsonrpcpp::Response& ctrl_response) { + LOG(INFO, LOG_TAG) << "Received response for Stream.Control, id: " << ctrl_response.id() << ", result: " << ctrl_response.result() + << ", error: " << ctrl_response.error().code() << "\n"; + auto response = make_shared(request->id(), ctrl_response.result()); + on_response(response, nullptr); }); - // Setup response - result["id"] = streamId; + return; } else if (request->method().find("Stream.SetProperty") == 0) { @@ -529,22 +533,23 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent else throw jsonrpcpp::MethodNotFoundException(request->id()); - response.reset(new jsonrpcpp::Response(*request, result)); + response = std::make_shared(*request, result); } catch (const jsonrpcpp::RequestException& e) { LOG(ERROR, LOG_TAG) << "Server::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n"; - response.reset(new jsonrpcpp::RequestException(e)); + response = std::make_shared(e); } catch (const exception& e) { LOG(ERROR, LOG_TAG) << "Server::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n"; - response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id())); + response = std::make_shared(e.what(), request->id()); } + on_response(std::move(response), std::move(notification)); } -std::string Server::onMessageReceived(ControlSession* controlSession, const std::string& message) +void Server::onMessageReceived(std::shared_ptr controlSession, const std::string& message, const ResponseHander& response_handler) { // LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n"; jsonrpcpp::entity_ptr entity(nullptr); @@ -552,15 +557,15 @@ std::string Server::onMessageReceived(ControlSession* controlSession, const std: { entity = jsonrpcpp::Parser::do_parse(message); if (!entity) - return ""; + return response_handler(""); } catch (const jsonrpcpp::ParseErrorException& e) { - return e.to_json().dump(); + return response_handler(e.to_json().dump()); } catch (const std::exception& e) { - return jsonrpcpp::ParseErrorException(e.what()).to_json().dump(); + return response_handler(jsonrpcpp::ParseErrorException(e.what()).to_json().dump()); } jsonrpcpp::entity_ptr response(nullptr); @@ -568,23 +573,26 @@ std::string Server::onMessageReceived(ControlSession* controlSession, const std: if (entity->is_request()) { jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); - processRequest(request, response, notification); - saveConfig(); - ////cout << "Request: " << request->to_json().dump() << "\n"; - if (notification) - { - ////cout << "Notification: " << notification->to_json().dump() << "\n"; - controlServer_->send(notification->to_json().dump(), controlSession); - } - if (response) - { - ////cout << "Response: " << response->to_json().dump() << "\n"; - return response->to_json().dump(); - } - return ""; + processRequest(request, [this, controlSession, response_handler](jsonrpcpp::entity_ptr response, jsonrpcpp::notification_ptr notification) { + saveConfig(); + ////cout << "Request: " << request->to_json().dump() << "\n"; + if (notification) + { + ////cout << "Notification: " << notification->to_json().dump() << "\n"; + controlServer_->send(notification->to_json().dump(), controlSession.get()); + } + if (response) + { + ////cout << "Response: " << response->to_json().dump() << "\n"; + return response_handler(response->to_json().dump()); + } + return response_handler(""); + }); } else if (entity->is_batch()) { + /// Attention: this will only work as long as the response handler in processRequest is called synchronously. + /// This is true for volume changes, which is the only batch request, but not for Control commands! jsonrpcpp::batch_ptr batch = dynamic_pointer_cast(entity); ////cout << "Batch: " << batch->to_json().dump() << "\n"; jsonrpcpp::Batch responseBatch; @@ -594,21 +602,23 @@ std::string Server::onMessageReceived(ControlSession* controlSession, const std: if (batch_entity->is_request()) { jsonrpcpp::request_ptr request = dynamic_pointer_cast(batch_entity); - processRequest(request, response, notification); - if (response != nullptr) - responseBatch.add_ptr(response); - if (notification != nullptr) - notificationBatch.add_ptr(notification); + processRequest(request, [this, controlSession, response_handler, &responseBatch, ¬ificationBatch](jsonrpcpp::entity_ptr response, + jsonrpcpp::notification_ptr notification) { + if (response != nullptr) + responseBatch.add_ptr(response); + if (notification != nullptr) + notificationBatch.add_ptr(notification); + }); } } saveConfig(); if (!notificationBatch.entities.empty()) - controlServer_->send(notificationBatch.to_json().dump(), controlSession); + controlServer_->send(notificationBatch.to_json().dump(), controlSession.get()); if (!responseBatch.entities.empty()) - return responseBatch.to_json().dump(); - return ""; + return response_handler(responseBatch.to_json().dump()); + return response_handler(""); } - return ""; + return response_handler(""); } @@ -624,7 +634,8 @@ void Server::onMessageReceived(StreamSession* streamSession, const msg::BaseMess timeMsg->deserialize(baseMessage, buffer); timeMsg->refersTo = timeMsg->id; timeMsg->latency = timeMsg->received - timeMsg->sent; - // LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; + // LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << + // "\n"; streamSession->send(timeMsg); // refresh streamSession state diff --git a/server/server.hpp b/server/server.hpp index 553e91d0..d8fcba60 100644 --- a/server/server.hpp +++ b/server/server.hpp @@ -55,6 +55,9 @@ using session_ptr = std::shared_ptr; class Server : public StreamMessageReceiver, public ControlMessageReceiver, public PcmListener { public: + // TODO: revise handler names + using OnResponse = std::function; + Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings); virtual ~Server(); @@ -67,12 +70,12 @@ private: void onDisconnect(StreamSession* streamSession) override; /// Implementation of ControllMessageReceiver - std::string onMessageReceived(ControlSession* controlSession, const std::string& message) override; - void onNewSession(const std::shared_ptr& session) override + void onMessageReceived(std::shared_ptr controlSession, const std::string& message, const ResponseHander& response_handler) override; + void onNewSession(std::shared_ptr session) override { std::ignore = session; }; - void onNewSession(const std::shared_ptr& session) override; + void onNewSession(std::shared_ptr session) override; /// Implementation of PcmListener void onMetaChanged(const PcmStream* pcmStream) override; @@ -83,7 +86,7 @@ private: void onResync(const PcmStream* pcmStream, double ms) override; private: - void processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; + void processRequest(const jsonrpcpp::request_ptr request, const OnResponse& on_response) const; /// Save the server state deferred to prevent blocking and lower disk io /// @param deferred the delay after the last call to saveConfig void saveConfig(const std::chrono::milliseconds& deferred = std::chrono::seconds(2)); diff --git a/server/stream_server.cpp b/server/stream_server.cpp index 9b5c7962..61567552 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -54,14 +54,14 @@ void StreamServer::cleanup() } -void StreamServer::addSession(const std::shared_ptr& session) +void StreamServer::addSession(std::shared_ptr session) { session->setMessageReceiver(this); session->setBufferMs(settings_.stream.bufferMs); session->start(); std::lock_guard mlock(sessionsMutex_); - sessions_.emplace_back(session); + sessions_.emplace_back(std::move(session)); cleanup(); } diff --git a/server/stream_server.hpp b/server/stream_server.hpp index decc1527..7cb719dc 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -63,7 +63,7 @@ public: /// Send a message to all connceted clients // void send(const msg::BaseMessage* message); - void addSession(const std::shared_ptr& session); + void addSession(std::shared_ptr session); // void onMetaChanged(const PcmStream* pcmStream, std::shared_ptr meta); void onChunkEncoded(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr chunk, double duration);