handle Json RPC Batches

This commit is contained in:
badaix 2017-02-05 16:32:32 +01:00
parent 49c176dcab
commit c84c64dd15
2 changed files with 90 additions and 46 deletions

View file

@ -104,55 +104,52 @@ void StreamServer::onDisconnect(StreamSession* streamSession)
} }
void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const
{ {
logD << "onMessageReceived: " << json << "\n";
jsonrpcpp::Request request;
try try
{ {
request.parse(json); logO << "StreamServer::ProcessRequest method: " << request->method << ", " << "id: " << request->id << "\n";
logO << "StreamServer::ProcessJson method: " << request.method << ", " << "id: " << request.id << "\n";
Json result; Json result;
if (request.method.find("Client.") == 0) if (request->method.find("Client.") == 0)
{ {
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request.params.get("client")); ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params.get("client"));
if (clientInfo == nullptr) if (clientInfo == nullptr)
throw jsonrpcpp::InternalErrorException("Client not found", request.id); throw jsonrpcpp::InternalErrorException("Client not found", request->id);
if (request.method == "Client.GetStatus") if (request->method == "Client.GetStatus")
{ {
result = clientInfo->toJson(); result = clientInfo->toJson();
} }
else if (request.method == "Client.SetVolume") else if (request->method == "Client.SetVolume")
{ {
clientInfo->config.volume.fromJson(request.params.get("volume")); clientInfo->config.volume.fromJson(request->params.get("volume"));
} }
else if (request.method == "Client.SetLatency") else if (request->method == "Client.SetLatency")
{ {
int latency = request.params.get("latency"); int latency = request->params.get("latency");
if (latency < -10000) if (latency < -10000)
latency = -10000; latency = -10000;
else if (latency > settings_.bufferMs) else if (latency > settings_.bufferMs)
latency = settings_.bufferMs; latency = settings_.bufferMs;
clientInfo->config.latency = latency; //, -10000, settings_.bufferMs); clientInfo->config.latency = latency; //, -10000, settings_.bufferMs);
} }
else if (request.method == "Client.SetName") else if (request->method == "Client.SetName")
{ {
clientInfo->config.name = request.params.get("name"); clientInfo->config.name = request->params.get("name");
} }
else else
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request->id);
if (request.method.find("Client.Set") == 0) if (request->method.find("Client.Set") == 0)
{ {
/// Response: updated client /// Response: updated client
result = {{"method", "Client.OnUpdate"}, {"params", clientInfo->toJson()}}; result = {{"method", "Client.OnUpdate"}, {"params", clientInfo->toJson()}};
/// Update client /// Update client
session_ptr session = getStreamSession(request.params.get("client")); session_ptr session = getStreamSession(request->params.get("client"));
if (session != nullptr) if (session != nullptr)
{ {
msg::ServerSettings serverSettings; msg::ServerSettings serverSettings;
@ -167,22 +164,22 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r
notification.reset(new jsonrpcpp::Notification("Client.OnUpdate", clientInfo->toJson())); notification.reset(new jsonrpcpp::Notification("Client.OnUpdate", clientInfo->toJson()));
} }
} }
else if (request.method.find("Group.") == 0) else if (request->method.find("Group.") == 0)
{ {
GroupPtr group = Config::instance().getGroup(request.params.get("group")); GroupPtr group = Config::instance().getGroup(request->params.get("group"));
if (group == nullptr) if (group == nullptr)
throw jsonrpcpp::InternalErrorException("Group not found", request.id); throw jsonrpcpp::InternalErrorException("Group not found", request->id);
if (request.method == "Group.GetStatus") if (request->method == "Group.GetStatus")
{ {
result = group->toJson(); result = group->toJson();
} }
else if (request.method == "Group.SetStream") else if (request->method == "Group.SetStream")
{ {
string streamId = request.params.get("id"); string streamId = request->params.get("id");
PcmStreamPtr stream = streamManager_->getStream(streamId); PcmStreamPtr stream = streamManager_->getStream(streamId);
if (stream == nullptr) if (stream == nullptr)
throw jsonrpcpp::InternalErrorException("Stream not found", request.id); throw jsonrpcpp::InternalErrorException("Stream not found", request->id);
group->streamId = streamId; group->streamId = streamId;
@ -203,10 +200,10 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r
/// Notify others /// Notify others
notification.reset(new jsonrpcpp::Notification("Group.OnUpdate", group->toJson()));; notification.reset(new jsonrpcpp::Notification("Group.OnUpdate", group->toJson()));;
} }
else if (request.method == "Group.SetClients") else if (request->method == "Group.SetClients")
{ {
vector<string> clients = request.params.get("clients"); vector<string> clients = request->params.get("clients");
string groupId = request.params.get("group"); string groupId = request->params.get("group");
GroupPtr group = Config::instance().getGroup(groupId); GroupPtr group = Config::instance().getGroup(groupId);
/// Remove clients from group /// Remove clients from group
@ -261,19 +258,19 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson)); notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson));
} }
else else
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request->id);
} }
else if (request.method.find("Server.") == 0) else if (request->method.find("Server.") == 0)
{ {
if (request.method == "Server.GetStatus") if (request->method == "Server.GetStatus")
{ {
result = Config::instance().getServerStatus(streamManager_->toJson()); result = Config::instance().getServerStatus(streamManager_->toJson());
} }
else if (request.method == "Server.DeleteClient") else if (request->method == "Server.DeleteClient")
{ {
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request.params.get("client")); ClientInfoPtr clientInfo = Config::instance().getClientInfo(request->params.get("client"));
if (clientInfo == nullptr) if (clientInfo == nullptr)
throw jsonrpcpp::InternalErrorException("Client not found", request.id); throw jsonrpcpp::InternalErrorException("Client not found", request->id);
Config::instance().remove(clientInfo); Config::instance().remove(clientInfo);
@ -284,36 +281,83 @@ void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& r
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson)); notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson));
} }
else else
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request->id);
} }
else else
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request->id);
Config::instance().save(); Config::instance().save();
response.reset(new jsonrpcpp::Response(request, result)); response.reset(new jsonrpcpp::Response(*request, result));
} }
catch (const jsonrpcpp::RequestException& e) catch (const jsonrpcpp::RequestException& e)
{ {
logE << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << json << "\n"; logE << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n";
response.reset(new jsonrpcpp::RequestException(e)); response.reset(new jsonrpcpp::RequestException(e));
} }
catch (const exception& e) catch (const exception& e)
{ {
logE << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << json << "\n"; logE << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n";
response.reset(new jsonrpcpp::InternalErrorException(e.what(), request.id)); response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id));
} }
} }
void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message) void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message)
{ {
logO << "onMessageReceived: " << message << "\n";
jsonrpcpp::entity_ptr entity(nullptr);
try
{
entity = jsonrpcpp::Parser::parse(message);
if (!entity)
return;
}
catch(const jsonrpcpp::ParseErrorException& e)
{
controlSession->send(e.to_json().dump());
return;
}
catch(const std::exception& e)
{
controlSession->send(jsonrpcpp::ParseErrorException(e.what()).to_json().dump());
return;
}
jsonrpcpp::entity_ptr response(nullptr); jsonrpcpp::entity_ptr response(nullptr);
jsonrpcpp::notification_ptr notification(nullptr); jsonrpcpp::notification_ptr notification(nullptr);
ProcessJson(message, response, notification); if (entity->is_request())
if (response != nullptr) {
controlSession->send(response->to_json().dump()); jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
if (notification != nullptr) logO << "isRequest: " << request->to_json().dump() << "\n";
controlServer_->send(notification->to_json().dump(), controlSession); ProcessRequest(request, response, notification);
if (response)
controlSession->send(response->to_json().dump());
if (notification)
controlServer_->send(notification->to_json().dump(), controlSession);
}
else if (entity->is_batch())
{
jsonrpcpp::batch_ptr batch = dynamic_pointer_cast<jsonrpcpp::Batch>(entity);
logO << "isBatch: " << batch->to_json().dump() << "\n";
jsonrpcpp::Batch responseBatch;
jsonrpcpp::Batch notificationBatch;
for (const auto& batch_entity: batch->entities)
{
if (batch_entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(batch_entity);
ProcessRequest(request, response, notification);
if (response != nullptr)
responseBatch.add_ptr(response);
if (notification != nullptr)
notificationBatch.add_ptr(notification);
}
}
if (!responseBatch.entities.empty())
controlSession->send(responseBatch.to_json().dump());
if (!notificationBatch.entities.empty())
controlServer_->send(notificationBatch.to_json().dump(), controlSession);
}
} }

View file

@ -99,7 +99,7 @@ private:
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
session_ptr getStreamSession(const std::string& mac) const; session_ptr getStreamSession(const std::string& mac) const;
session_ptr getStreamSession(StreamSession* session) const; session_ptr getStreamSession(StreamSession* session) const;
void ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; void ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const;
mutable std::recursive_mutex sessionsMutex_; mutable std::recursive_mutex sessionsMutex_;
std::set<session_ptr> sessions_; std::set<session_ptr> sessions_;
asio::io_service* io_service_; asio::io_service* io_service_;