delegate json requests to ProcessJson function

This commit is contained in:
badaix 2017-02-03 21:33:21 +01:00
parent 1c4fad8f46
commit da7811b5a4
3 changed files with 32 additions and 32 deletions

View file

@ -104,7 +104,7 @@ void ControlSession::sendAsync(const std::string& message)
bool ControlSession::send(const std::string& message) const bool ControlSession::send(const std::string& message) const
{ {
// logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; //logO << "send: " << message << ", size: " << message.length() << "\n";
std::lock_guard<std::mutex> socketLock(socketMutex_); std::lock_guard<std::mutex> socketLock(socketMutex_);
{ {
std::lock_guard<std::mutex> activeLock(activeMutex_); std::lock_guard<std::mutex> activeLock(activeMutex_);
@ -115,7 +115,7 @@ bool ControlSession::send(const std::string& message) const
std::ostream request_stream(&streambuf); std::ostream request_stream(&streambuf);
request_stream << message << "\r\n"; request_stream << message << "\r\n";
asio::write(*socket_.get(), streambuf); asio::write(*socket_.get(), streambuf);
// logO << "done: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; //logO << "done\n";
return true; return true;
} }

View file

@ -16,7 +16,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#include "jsonrp.hpp"
#include "streamServer.h" #include "streamServer.h"
#include "message/time.h" #include "message/time.h"
#include "message/hello.h" #include "message/hello.h"
@ -105,17 +104,16 @@ void StreamServer::onDisconnect(StreamSession* streamSession)
} }
void StreamServer::ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const
void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message)
{ {
logO << "onMessageReceived: " << message << "\n"; logD << "onMessageReceived: " << json << "\n";
jsonrpcpp::Request request; jsonrpcpp::Request request;
try try
{ {
request.parse(message); request.parse(json);
logO << "method: " << request.method << ", " << "id: " << request.id << "\n"; logO << "StreamServer::ProcessJson method: " << request.method << ", " << "id: " << request.id << "\n";
json result; Json result;
if (request.method.find("Client.") == 0) if (request.method.find("Client.") == 0)
{ {
@ -166,9 +164,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
} }
/// Notify others /// Notify others
json notification = jsonrpcpp::Notification("Client.OnUpdate", clientInfo->toJson()).to_json(); notification.reset(new jsonrpcpp::Notification("Client.OnUpdate", clientInfo->toJson()));
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
} }
} }
else if (request.method.find("Group.") == 0) else if (request.method.find("Group.") == 0)
@ -205,9 +201,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
} }
/// Notify others /// Notify others
json notification = jsonrpcpp::Notification("Group.OnUpdate", group->toJson()).to_json(); notification.reset(new jsonrpcpp::Notification("Group.OnUpdate", group->toJson()));;
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
} }
else if (request.method == "Group.SetClients") else if (request.method == "Group.SetClients")
{ {
@ -260,13 +254,11 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
if (group->empty()) if (group->empty())
Config::instance().remove(group); Config::instance().remove(group);
json serverJson = Config::instance().getServerStatus(streamManager_->toJson()); Json serverJson = Config::instance().getServerStatus(streamManager_->toJson());
result = {{"method", "Server.OnUpdate"}, {"params", serverJson}}; result = {{"method", "Server.OnUpdate"}, {"params", serverJson}};
/// Notify others: since at least two groups are affected, send a complete server update /// Notify others: since at least two groups are affected, send a complete server update
json notification = jsonrpcpp::Notification("Server.OnUpdate", serverJson).to_json(); notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson));
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
} }
else else
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request.id);
@ -285,13 +277,11 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
Config::instance().remove(clientInfo); Config::instance().remove(clientInfo);
json serverJson = Config::instance().getServerStatus(streamManager_->toJson()); Json serverJson = Config::instance().getServerStatus(streamManager_->toJson());
result = {{"method", "Server.OnUpdate"}, {"params", serverJson}}; result = {{"method", "Server.OnUpdate"}, {"params", serverJson}};
/// Notify others /// Notify others
json notification = jsonrpcpp::Notification("Server.OnUpdate", serverJson).to_json(); notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", serverJson));
logO << "Notification: " << notification.dump() << "\n";
controlServer_->send(notification.dump(), controlSession);
} }
else else
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request.id);
@ -300,24 +290,33 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
throw jsonrpcpp::MethodNotFoundException(request.id); throw jsonrpcpp::MethodNotFoundException(request.id);
Config::instance().save(); Config::instance().save();
string responseJson = jsonrpcpp::Response(request, result).to_json().dump(); response.reset(new jsonrpcpp::Response(request, result));
logO << "Response: " << responseJson << "\n";
controlSession->send(responseJson);
} }
catch (const jsonrpcpp::RequestException& e) catch (const jsonrpcpp::RequestException& e)
{ {
logE << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << message << "\n"; logE << "StreamServer::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << json << "\n";
controlSession->send(e.to_json().dump()); response.reset(new jsonrpcpp::RequestException(e));
} }
catch (const exception& e) catch (const exception& e)
{ {
logE << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << message << "\n"; logE << "StreamServer::onMessageReceived exception: " << e.what() << ", message: " << json << "\n";
jsonrpcpp::InternalErrorException jsonException(e.what(), request.id); response.reset(new jsonrpcpp::InternalErrorException(e.what(), request.id));
controlSession->send(jsonException.to_json().dump()); //jsonrpcpp::Response(jsonException).to_json().dump());
} }
} }
void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message)
{
jsonrpcpp::entity_ptr response(nullptr);
jsonrpcpp::notification_ptr notification(nullptr);
ProcessJson(message, response, notification);
if (response != nullptr)
controlSession->send(response->to_json().dump());
if (notification != nullptr)
controlServer_->send(notification->to_json().dump(), controlSession);
}
void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
@ -405,7 +404,6 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
else else
{ {
json notification = jsonrpcpp::Notification("Client.OnConnect", client->toJson()).to_json(); json notification = jsonrpcpp::Notification("Client.OnConnect", client->toJson()).to_json();
// logO << notification.dump(4) << "\n";
controlServer_->send(notification.dump()); controlServer_->send(notification.dump());
} }
// cout << Config::instance().getServerStatus(streamManager_->toJson()).dump(4) << "\n"; // cout << Config::instance().getServerStatus(streamManager_->toJson()).dump(4) << "\n";

View file

@ -27,6 +27,7 @@
#include <sstream> #include <sstream>
#include <mutex> #include <mutex>
#include "jsonrp.hpp"
#include "streamSession.h" #include "streamSession.h"
#include "streamreader/streamManager.h" #include "streamreader/streamManager.h"
#include "common/queue.h" #include "common/queue.h"
@ -98,6 +99,7 @@ private:
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
session_ptr getStreamSession(const std::string& mac) const; session_ptr getStreamSession(const std::string& mac) const;
session_ptr getStreamSession(StreamSession* session) const; session_ptr getStreamSession(StreamSession* session) const;
void ProcessJson(const std::string& json, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const;
mutable std::recursive_mutex sessionsMutex_; mutable std::recursive_mutex sessionsMutex_;
std::set<session_ptr> sessions_; std::set<session_ptr> sessions_;
asio::io_service* io_service_; asio::io_service* io_service_;