Json RPC: Object.Set methods have the updated object in the response

This commit is contained in:
BadAix 2016-12-30 14:24:10 +01:00
parent c1eca56e38
commit 9e96b3e13b
5 changed files with 248 additions and 167 deletions

View file

@ -112,153 +112,191 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
try
{
request.parse(message);
logD << "method: " << request.method << ", " << "id: " << request.id << "\n";
logO << "method: " << request.method << ", " << "id: " << request.id << "\n";
json response;
ClientInfoPtr clientInfo = nullptr;
GroupPtr group = nullptr;
msg::ServerSettings serverSettings;
serverSettings.setBufferMs(settings_.bufferMs);
json result;
if (request.method.find("Group.Set") == 0)
if (request.method.find("Client.") == 0)
{
group = Config::instance().getGroup(request.getParam("group").get<string>());
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request.getParam("client").get<string>());
if (clientInfo == nullptr)
throw JsonInternalErrorException("Client not found", request.id);
if (request.method == "Client.GetStatus")
{
result = clientInfo->toJson();
}
else if (request.method == "Client.SetVolume")
{
clientInfo->config.volume.fromJson(request.getParam("volume"));
}
else if (request.method == "Client.SetLatency")
{
clientInfo->config.latency = request.getParam<int>("latency", -10000, settings_.bufferMs);
}
else if (request.method == "Client.SetName")
{
clientInfo->config.name = request.getParam("name").get<string>();
}
else
throw JsonMethodNotFoundException(request.id);
if (request.method.find("Client.Set") == 0)
{
/// Response: updated client
result = {{"method", "Client.OnUpdate"}, {"params", clientInfo->toJson()}};
/// Update client
session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != nullptr)
{
msg::ServerSettings serverSettings;
serverSettings.setBufferMs(settings_.bufferMs);
serverSettings.setVolume(clientInfo->config.volume.percent);
serverSettings.setMuted(clientInfo->config.volume.muted);
serverSettings.setLatency(clientInfo->config.latency);
session->send(&serverSettings);
}
/// Notify others
json notification = JsonNotification::getJson("Client.OnUpdate", clientInfo->toJson());
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
}
}
else if (request.method.find("Group.") == 0)
{
GroupPtr group = Config::instance().getGroup(request.getParam("group").get<string>());
if (group == nullptr)
throw JsonInternalErrorException("Group not found", request.id);
}
if (request.method.find("Client.Set") == 0)
{
clientInfo = Config::instance().getClientInfo(request.getParam("client").get<string>());
if (clientInfo == nullptr)
throw JsonInternalErrorException("Client not found", request.id);
}
if (request.method == "Server.GetStatus")
{
/// TODO: rpc
string clientId = request.hasParam("client") ? request.getParam("client").get<string>() : "";
response = Config::instance().getServerStatus(/*clientId,*/ streamManager_->toJson());
// logO << response.dump(4);
}
else if (request.method == "Server.DeleteClient")
{
clientInfo = Config::instance().getClientInfo(request.getParam("client").get<string>());
if (clientInfo == nullptr)
throw JsonInternalErrorException("Client not found", request.id);
response = clientInfo->host.mac;
Config::instance().remove(clientInfo);
Config::instance().save();
json notification = JsonNotification::getJson("Client.OnDelete", clientInfo->toJson());
controlServer_->send(notification.dump(), controlSession);
clientInfo = nullptr;
}
else if (request.method == "Client.SetVolume")
{
clientInfo->config.volume.fromJson(request.getParam("volume"));
response = clientInfo->config.volume.toJson();
}
else if (request.method == "Group.SetStream")
{
string streamId = request.getParam("id").get<string>();
PcmStreamPtr stream = streamManager_->getStream(streamId);
if (stream == nullptr)
throw JsonInternalErrorException("Stream not found", request.id);
group->streamId = streamId;
response = group->streamId;
for (auto client: group->clients)
if (request.method == "Group.GetStatus")
{
session_ptr session = getStreamSession(client->id);
if (session && (session->pcmStream() != stream))
{
session->sendAsync(stream->getHeader());
session->setPcmStream(stream);
}
result = group->toJson();
}
}
else if (request.method == "Group.SetClients")
{
vector<string> clients = request.getParam("clients").get<vector<string>>();
string groupId = request.getParam("group").get<string>();
GroupPtr group = Config::instance().getGroup(groupId);
/// Remove clients from group
for (auto iter = group->clients.begin(); iter != group->clients.end();)
else if (request.method == "Group.SetStream")
{
auto client = *iter;
if (find(clients.begin(), clients.end(), client->id) != clients.end())
{
++iter;
continue;
}
iter = group->clients.erase(iter);
GroupPtr newGroup = Config::instance().addClientInfo(client);
newGroup->streamId = group->streamId;
}
string streamId = request.getParam("id").get<string>();
PcmStreamPtr stream = streamManager_->getStream(streamId);
if (stream == nullptr)
throw JsonInternalErrorException("Stream not found", request.id);
/// Add clients to group
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
for (const auto& clientId: clients)
group->streamId = streamId;
/// Response: updated group
result = {{"method", "Group.OnUpdate"}, {"params", group->toJson()}};
/// Update clients
for (auto client: group->clients)
{
session_ptr session = getStreamSession(client->id);
if (session && (session->pcmStream() != stream))
{
session->sendAsync(stream->getHeader());
session->setPcmStream(stream);
}
}
/// Notify others
json notification = JsonNotification::getJson("Group.OnUpdate", group->toJson());
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
}
else if (request.method == "Group.SetClients")
{
ClientInfoPtr client = Config::instance().getClientInfo(clientId);
if (!client)
continue;
GroupPtr oldGroup = Config::instance().getGroupFromClient(client);
if (oldGroup && (oldGroup->id == groupId))
continue;
if (oldGroup)
{
oldGroup->removeClient(client);
Config::instance().remove(oldGroup);
}
group->addClient(client);
vector<string> clients = request.getParam("clients").get<vector<string>>();
string groupId = request.getParam("group").get<string>();
/// assign new stream
session_ptr session = getStreamSession(client->id);
if (session && stream && (session->pcmStream() != stream))
GroupPtr group = Config::instance().getGroup(groupId);
/// Remove clients from group
for (auto iter = group->clients.begin(); iter != group->clients.end();)
{
session->sendAsync(stream->getHeader());
session->setPcmStream(stream);
auto client = *iter;
if (find(clients.begin(), clients.end(), client->id) != clients.end())
{
++iter;
continue;
}
iter = group->clients.erase(iter);
GroupPtr newGroup = Config::instance().addClientInfo(client);
newGroup->streamId = group->streamId;
}
}
if (group->empty())
Config::instance().remove(group);
// response = Config::instance().getServerStatus(/*clientId,*/ streamManager_->toJson());
/// Add clients to group
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
for (const auto& clientId: clients)
{
ClientInfoPtr client = Config::instance().getClientInfo(clientId);
if (!client)
continue;
GroupPtr oldGroup = Config::instance().getGroupFromClient(client);
if (oldGroup && (oldGroup->id == groupId))
continue;
if (oldGroup)
{
oldGroup->removeClient(client);
Config::instance().remove(oldGroup);
}
group->addClient(client);
/// assign new stream
session_ptr session = getStreamSession(client->id);
if (session && stream && (session->pcmStream() != stream))
{
session->sendAsync(stream->getHeader());
session->setPcmStream(stream);
}
}
if (group->empty())
Config::instance().remove(group);
json serverJson = Config::instance().getServerStatus(streamManager_->toJson());
result = {{"method", "Server.OnUpdate"}, {"params", serverJson}};
/// Notify others: since at least two groups are affected, send a complete server update
json notification = JsonNotification::getJson("Server.OnUpdate", serverJson);
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
}
else
throw JsonMethodNotFoundException(request.id);
}
else if (request.method == "Client.SetLatency")
else if (request.method.find("Server.") == 0)
{
clientInfo->config.latency = request.getParam<int>("latency", -10000, settings_.bufferMs);
response = clientInfo->config.latency;
}
else if (request.method == "Client.SetName")
{
clientInfo->config.name = request.getParam("name").get<string>();
response = clientInfo->config.name;
if (request.method == "Server.GetStatus")
{
result = Config::instance().getServerStatus(streamManager_->toJson());
}
else if (request.method == "Server.DeleteClient")
{
ClientInfoPtr clientInfo = Config::instance().getClientInfo(request.getParam("client").get<string>());
if (clientInfo == nullptr)
throw JsonInternalErrorException("Client not found", request.id);
Config::instance().remove(clientInfo);
json serverJson = Config::instance().getServerStatus(streamManager_->toJson());
result = {{"method", "Server.OnUpdate"}, {"params", serverJson}};
/// Notify others
json notification = JsonNotification::getJson("Server.OnUpdate", serverJson);
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
}
else
throw JsonMethodNotFoundException(request.id);
}
else
throw JsonMethodNotFoundException(request.id);
if (clientInfo != nullptr)
{
serverSettings.setVolume(clientInfo->config.volume.percent);
serverSettings.setMuted(clientInfo->config.volume.muted);
serverSettings.setLatency(clientInfo->config.latency);
session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != nullptr)
session->send(&serverSettings);
Config::instance().save();
json notification = JsonNotification::getJson("Client.OnUpdate", clientInfo->toJson());
controlServer_->send(notification.dump(), controlSession);
}
controlSession->send(request.getResponse(response).dump());
Config::instance().save();
string responseJson = request.getResponse(result).dump();
logO << "Response: " << responseJson << "\n";
controlSession->send(responseJson);
}
catch (const JsonRequestException& e)
{
@ -394,7 +432,7 @@ void StreamServer::handleAccept(socket_ptr socket)
setsockopt(socket->native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
/// experimental: turn on tcp::no_delay
/// experimental: turn on tcp::no_delay
socket->set_option(tcp::no_delay(true));
logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
@ -465,7 +503,7 @@ void StreamServer::stop()
controlServer_->stop();
controlServer_ = nullptr;
}
if (acceptor_)
{
acceptor_->cancel();