From c84c64dd154acb222e9ef4c5b4922f9d8c81e36e Mon Sep 17 00:00:00 2001 From: badaix Date: Sun, 5 Feb 2017 16:32:32 +0100 Subject: [PATCH] handle Json RPC Batches --- server/streamServer.cpp | 134 ++++++++++++++++++++++++++-------------- server/streamServer.h | 2 +- 2 files changed, 90 insertions(+), 46 deletions(-) diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 7c9f3dfa..31c3ff88 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -104,55 +104,52 @@ void StreamServer::onDisconnect(StreamSession* streamSession) } -void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const +void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const { - logD << "onMessageReceived: " << json << "\n"; - jsonrpcpp::Request request; try { - request.parse(json); - logO << "StreamServer::ProcessJson method: " << request.method << ", " << "id: " << request.id << "\n"; + logO << "StreamServer::ProcessRequest method: " << request->method << ", " << "id: " << request->id << "\n"; Json result; - if (request.method.find("Client.") == 0) + if (request->method.find("Client.") == 0) { - ClientInfoPtr clientInfo = Config::instance().getClientInfo(request.params.get("client")); + ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params.get("client")); if (clientInfo == nullptr) - throw jsonrpcpp::InternalErrorException("Client not found", request.id); + throw jsonrpcpp::InternalErrorException("Client not found", request->id); - if (request.method == "Client.GetStatus") + if (request->method == "Client.GetStatus") { result = clientInfo->toJson(); } - else if (request.method == "Client.SetVolume") + else if (request->method == "Client.SetVolume") { - clientInfo->config.volume.fromJson(request.params.get("volume")); + clientInfo->config.volume.fromJson(request->params.get("volume")); } - else if (request.method == "Client.SetLatency") + else if (request->method == "Client.SetLatency") { - int latency = request.params.get("latency"); + int latency = request->params.get("latency"); if (latency < -10000) latency = -10000; else if (latency > settings_.bufferMs) latency = settings_.bufferMs; clientInfo->config.latency = latency; //, -10000, settings_.bufferMs); } - else if (request.method == "Client.SetName") + else if (request->method == "Client.SetName") { - clientInfo->config.name = request.params.get("name"); + clientInfo->config.name = request->params.get("name"); } else - throw jsonrpcpp::MethodNotFoundException(request.id); + throw jsonrpcpp::MethodNotFoundException(request->id); - if (request.method.find("Client.Set") == 0) + if (request->method.find("Client.Set") == 0) { /// Response: updated client result = {{"method", "Client.OnUpdate"}, {"params", clientInfo->toJson()}}; /// Update client - session_ptr session = getStreamSession(request.params.get("client")); + session_ptr session = getStreamSession(request->params.get("client")); if (session != nullptr) { msg::ServerSettings serverSettings; @@ -167,22 +164,22 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r notification.reset(new jsonrpcpp::Notification("Client.OnUpdate", clientInfo->toJson())); } } - else if (request.method.find("Group.") == 0) + else if (request->method.find("Group.") == 0) { - GroupPtr group = Config::instance().getGroup(request.params.get("group")); + GroupPtr group = Config::instance().getGroup(request->params.get("group")); if (group == nullptr) - throw jsonrpcpp::InternalErrorException("Group not found", request.id); + throw jsonrpcpp::InternalErrorException("Group not found", request->id); - if (request.method == "Group.GetStatus") + if (request->method == "Group.GetStatus") { result = group->toJson(); } - else if (request.method == "Group.SetStream") + else if (request->method == "Group.SetStream") { - string streamId = request.params.get("id"); + string streamId = request->params.get("id"); PcmStreamPtr stream = streamManager_->getStream(streamId); if (stream == nullptr) - throw jsonrpcpp::InternalErrorException("Stream not found", request.id); + throw jsonrpcpp::InternalErrorException("Stream not found", request->id); group->streamId = streamId; @@ -203,10 +200,10 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r /// Notify others notification.reset(new jsonrpcpp::Notification("Group.OnUpdate", group->toJson()));; } - else if (request.method == "Group.SetClients") + else if (request->method == "Group.SetClients") { - vector clients = request.params.get("clients"); - string groupId = request.params.get("group"); + vector clients = request->params.get("clients"); + string groupId = request->params.get("group"); GroupPtr group = Config::instance().getGroup(groupId); /// Remove clients from group @@ -261,19 +258,19 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson)); } else - throw jsonrpcpp::MethodNotFoundException(request.id); + throw jsonrpcpp::MethodNotFoundException(request->id); } - else if (request.method.find("Server.") == 0) + else if (request->method.find("Server.") == 0) { - if (request.method == "Server.GetStatus") + if (request->method == "Server.GetStatus") { result = Config::instance().getServerStatus(streamManager_->toJson()); } - else if (request.method == "Server.DeleteClient") + else if (request->method == "Server.DeleteClient") { - ClientInfoPtr clientInfo = Config::instance().getClientInfo(request.params.get("client")); + ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params.get("client")); if (clientInfo == nullptr) - throw jsonrpcpp::InternalErrorException("Client not found", request.id); + throw jsonrpcpp::InternalErrorException("Client not found", request->id); Config::instance().remove(clientInfo); @@ -284,36 +281,83 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson)); } else - throw jsonrpcpp::MethodNotFoundException(request.id); + throw jsonrpcpp::MethodNotFoundException(request->id); } else - throw jsonrpcpp::MethodNotFoundException(request.id); + throw jsonrpcpp::MethodNotFoundException(request->id); Config::instance().save(); - response.reset(new jsonrpcpp::Response(request, result)); + response.reset(new jsonrpcpp::Response(*request, result)); } catch (const jsonrpcpp::RequestException& e) { - logE << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << json << "\n"; + logE << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n"; response.reset(new jsonrpcpp::RequestException(e)); } catch (const exception& e) { - logE << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << json << "\n"; - response.reset(new jsonrpcpp::InternalErrorException(e.what(), request.id)); + logE << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n"; + response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id)); } } void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message) { + logO << "onMessageReceived: " << message << "\n"; + jsonrpcpp::entity_ptr entity(nullptr); + try + { + entity = jsonrpcpp::Parser::parse(message); + if (!entity) + return; + } + catch(const jsonrpcpp::ParseErrorException& e) + { + controlSession->send(e.to_json().dump()); + return; + } + catch(const std::exception& e) + { + controlSession->send(jsonrpcpp::ParseErrorException(e.what()).to_json().dump()); + return; + } + jsonrpcpp::entity_ptr response(nullptr); jsonrpcpp::notification_ptr notification(nullptr); - ProcessJson(message, response, notification); - if (response != nullptr) - controlSession->send(response->to_json().dump()); - if (notification != nullptr) - controlServer_->send(notification->to_json().dump(), controlSession); + if (entity->is_request()) + { + jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); + logO << "isRequest: " << request->to_json().dump() << "\n"; + ProcessRequest(request, response, notification); + if (response) + controlSession->send(response->to_json().dump()); + if (notification) + controlServer_->send(notification->to_json().dump(), controlSession); + } + else if (entity->is_batch()) + { + jsonrpcpp::batch_ptr batch = dynamic_pointer_cast(entity); + logO << "isBatch: " << batch->to_json().dump() << "\n"; + jsonrpcpp::Batch responseBatch; + jsonrpcpp::Batch notificationBatch; + for (const auto& batch_entity: batch->entities) + { + if (batch_entity->is_request()) + { + jsonrpcpp::request_ptr request = dynamic_pointer_cast(batch_entity); + ProcessRequest(request, response, notification); + if (response != nullptr) + responseBatch.add_ptr(response); + if (notification != nullptr) + notificationBatch.add_ptr(notification); + } + } + if (!responseBatch.entities.empty()) + controlSession->send(responseBatch.to_json().dump()); + if (!notificationBatch.entities.empty()) + controlServer_->send(notificationBatch.to_json().dump(), controlSession); + } } diff --git a/server/streamServer.h b/server/streamServer.h index 60d696ea..ec4a6e8c 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -99,7 +99,7 @@ private: void handleAccept(socket_ptr socket); session_ptr getStreamSession(const std::string& mac) const; session_ptr getStreamSession(StreamSession* session) const; - void ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; + void ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; mutable std::recursive_mutex sessionsMutex_; std::set sessions_; asio::io_service* io_service_;