Process messages asynchronously

This commit is contained in:
badaix 2021-06-10 08:55:31 +02:00
parent 5e2d14d39a
commit 0eaee48f10
10 changed files with 106 additions and 89 deletions

View file

@ -74,28 +74,27 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu
}
std::string ControlServer::onMessageReceived(ControlSession* session, const std::string& message)
void ControlServer::onMessageReceived(std::shared_ptr<ControlSession> session, const std::string& message, const ResponseHander& response_handler)
{
// LOG(DEBUG, LOG_TAG) << "received: \"" << message << "\"\n";
if (controlMessageReceiver_ != nullptr)
return controlMessageReceiver_->onMessageReceived(session, message);
return "";
controlMessageReceiver_->onMessageReceived(std::move(session), message, response_handler);
}
void ControlServer::onNewSession(const shared_ptr<ControlSession>& session)
void ControlServer::onNewSession(shared_ptr<ControlSession> session)
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();
sessions_.emplace_back(session);
sessions_.emplace_back(std::move(session));
cleanup();
}
void ControlServer::onNewSession(const std::shared_ptr<StreamSession>& session)
void ControlServer::onNewSession(std::shared_ptr<StreamSession> session)
{
if (controlMessageReceiver_ != nullptr)
controlMessageReceiver_->onNewSession(session);
controlMessageReceiver_->onNewSession(std::move(session));
}
@ -136,7 +135,7 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args)
// socket->set_option(boost::asio::ip::tcp::no_delay(false));
LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<SessionType> session = make_shared<SessionType>(this, io_context_, std::move(socket), std::forward<Args>(args)...);
onNewSession(session);
onNewSession(std::move(session));
}
catch (const std::exception& e)
{

View file

@ -61,9 +61,9 @@ private:
void cleanup();
/// Implementation of ControlMessageReceiver
std::string onMessageReceived(ControlSession* session, const std::string& message) override;
void onNewSession(const std::shared_ptr<ControlSession>& session) override;
void onNewSession(const std::shared_ptr<StreamSession>& session) override;
void onMessageReceived(std::shared_ptr<ControlSession> session, const std::string& message, const ResponseHander& response_handler) override;
void onNewSession(std::shared_ptr<ControlSession> session) override;
void onNewSession(std::shared_ptr<StreamSession> session) override;
mutable std::recursive_mutex session_mutex_;
std::vector<std::weak_ptr<ControlSession>> sessions_;

View file

@ -39,10 +39,11 @@ class StreamSession;
class ControlMessageReceiver
{
public:
using ResponseHander = std::function<void(const std::string& response)>;
// TODO: rename, error handling
virtual std::string onMessageReceived(ControlSession* session, const std::string& message) = 0;
virtual void onNewSession(const std::shared_ptr<ControlSession>& session) = 0;
virtual void onNewSession(const std::shared_ptr<StreamSession>& session) = 0;
virtual void onMessageReceived(std::shared_ptr<ControlSession> session, const std::string& message, const ResponseHander& response_handler) = 0;
virtual void onNewSession(std::shared_ptr<ControlSession> session) = 0;
virtual void onNewSession(std::shared_ptr<StreamSession> session) = 0;
};

View file

@ -220,7 +220,7 @@ void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<A
if (req.target() != "/jsonrpc")
return send(bad_request("Illegal request-target"));
string response = message_receiver_->onMessageReceived(this, req.body());
message_receiver_->onMessageReceived(shared_from_this(), req.body(), [this, req = std::move(req), send = std::move(send)](const std::string& response) {
http::response<http::string_body> res{http::status::ok, req.version()};
res.set(http::field::server, HTTP_SERVER_NAME);
res.set(http::field::content_type, "application/json");
@ -228,6 +228,7 @@ void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<A
res.body() = response;
res.prepare_payload();
return send(std::move(res));
});
}
// Request path must be absolute and not contain "..".
@ -322,7 +323,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
else
{
auto ws_session = make_shared<ControlSessionWebsocket>(message_receiver_, strand_.context(), std::move(*ws));
message_receiver_->onNewSession(ws_session);
message_receiver_->onNewSession(std::move(ws_session));
}
});
}
@ -339,7 +340,7 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
else
{
auto ws_session = make_shared<StreamSessionWebsocket>(strand_.context(), nullptr, std::move(*ws));
message_receiver_->onNewSession(ws_session);
message_receiver_->onNewSession(std::move(ws_session));
}
});
}

View file

@ -61,9 +61,10 @@ void ControlSessionTcp::do_read()
// LOG(DEBUG, LOG_TAG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
{
string response = message_receiver_->onMessageReceived(this, line);
message_receiver_->onMessageReceived(shared_from_this(), line, [this](const std::string& response) {
if (!response.empty())
sendAsync(response);
});
}
}
streambuf_.consume(bytes_transferred);

View file

@ -122,11 +122,12 @@ void ControlSessionWebsocket::on_read_ws(beast::error_code ec, std::size_t bytes
// LOG(DEBUG, LOG_TAG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
{
string response = message_receiver_->onMessageReceived(this, line);
message_receiver_->onMessageReceived(shared_from_this(), line, [this](const std::string& response) {
if (!response.empty())
{
sendAsync(response);
}
});
}
}
buffer_.consume(bytes_transferred);

View file

@ -42,10 +42,10 @@ Server::Server(boost::asio::io_context& io_context, const ServerSettings& server
Server::~Server() = default;
void Server::onNewSession(const std::shared_ptr<StreamSession>& session)
void Server::onNewSession(std::shared_ptr<StreamSession> session)
{
LOG(DEBUG, LOG_TAG) << "onNewSession\n";
streamServer_->addSession(session);
streamServer_->addSession(std::move(session));
}
@ -140,8 +140,10 @@ void Server::onDisconnect(StreamSession* streamSession)
}
void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const
void Server::processRequest(const jsonrpcpp::request_ptr request, const OnResponse& on_response) const
{
jsonrpcpp::entity_ptr response;
jsonrpcpp::notification_ptr notification;
try
{
// LOG(INFO, LOG_TAG) << "Server::processRequest method: " << request->method << ", " << "id: " << request->id() << "\n";
@ -172,8 +174,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
std::lock_guard<std::recursive_mutex> lock(clientMutex_);
clientInfo->config.volume.fromJson(request->params().get("volume"));
result["volume"] = clientInfo->config.volume.toJson();
notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged",
jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson())));
notification = std::make_shared<jsonrpcpp::Notification>(
"Client.OnVolumeChanged", jsonrpcpp::Parameter("id", clientInfo->id, "volume", clientInfo->config.volume.toJson()));
}
else if (request->method() == "Client.SetLatency")
{
@ -189,8 +191,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
latency = settings_.stream.bufferMs;
clientInfo->config.latency = latency; //, -10000, settings_.stream.bufferMs);
result["latency"] = clientInfo->config.latency;
notification.reset(
new jsonrpcpp::Notification("Client.OnLatencyChanged", jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency)));
notification = std::make_shared<jsonrpcpp::Notification>("Client.OnLatencyChanged",
jsonrpcpp::Parameter("id", clientInfo->id, "latency", clientInfo->config.latency));
}
else if (request->method() == "Client.SetName")
{
@ -201,8 +203,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
// clang-format on
clientInfo->config.name = request->params().get<std::string>("name");
result["name"] = clientInfo->config.name;
notification.reset(
new jsonrpcpp::Notification("Client.OnNameChanged", jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name)));
notification = std::make_shared<jsonrpcpp::Notification>("Client.OnNameChanged",
jsonrpcpp::Parameter("id", clientInfo->id, "name", clientInfo->config.name));
}
else
throw jsonrpcpp::MethodNotFoundException(request->id());
@ -247,7 +249,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
// clang-format on
group->name = request->params().get<std::string>("name");
result["name"] = group->name;
notification.reset(new jsonrpcpp::Notification("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name)));
notification = std::make_shared<jsonrpcpp::Notification>("Group.OnNameChanged", jsonrpcpp::Parameter("id", group->id, "name", group->name));
}
else if (request->method() == "Group.SetMute")
{
@ -276,7 +278,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
}
result["mute"] = group->muted;
notification.reset(new jsonrpcpp::Notification("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted)));
notification = std::make_shared<jsonrpcpp::Notification>("Group.OnMute", jsonrpcpp::Parameter("id", group->id, "mute", group->muted));
}
else if (request->method() == "Group.SetStream")
{
@ -306,7 +308,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
// Notify others
result["stream_id"] = group->streamId;
notification.reset(new jsonrpcpp::Notification("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId)));
notification =
std::make_shared<jsonrpcpp::Notification>("Group.OnStreamChanged", jsonrpcpp::Parameter("id", group->id, "stream_id", group->streamId));
}
else if (request->method() == "Group.SetClients")
{
@ -366,7 +369,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
result["server"] = server;
// Notify others: since at least two groups are affected, send a complete server update
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)));
notification = std::make_shared<jsonrpcpp::Notification>("Server.OnUpdate", jsonrpcpp::Parameter("server", server));
}
else
throw jsonrpcpp::MethodNotFoundException(request->id());
@ -409,7 +412,7 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
result["server"] = server;
/// Notify others
notification.reset(new jsonrpcpp::Notification("Server.OnUpdate", jsonrpcpp::Parameter("server", server)));
notification = std::make_shared<jsonrpcpp::Notification>("Server.OnUpdate", jsonrpcpp::Parameter("server", server));
}
else
throw jsonrpcpp::MethodNotFoundException(request->id());
@ -457,13 +460,14 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
if (stream == nullptr)
throw jsonrpcpp::InternalErrorException("Stream not found", request->id());
stream->control(*request, [](const jsonrpcpp::Response& response) {
LOG(INFO, LOG_TAG) << "Received response for Stream.Control, id: " << response.id() << ", result: " << response.result()
<< ", error: " << response.error().code() << "\n";
stream->control(*request, [request, on_response](const jsonrpcpp::Response& ctrl_response) {
LOG(INFO, LOG_TAG) << "Received response for Stream.Control, id: " << ctrl_response.id() << ", result: " << ctrl_response.result()
<< ", error: " << ctrl_response.error().code() << "\n";
auto response = make_shared<jsonrpcpp::Response>(request->id(), ctrl_response.result());
on_response(response, nullptr);
});
// Setup response
result["id"] = streamId;
return;
}
else if (request->method().find("Stream.SetProperty") == 0)
{
@ -529,22 +533,23 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::ent
else
throw jsonrpcpp::MethodNotFoundException(request->id());
response.reset(new jsonrpcpp::Response(*request, result));
response = std::make_shared<jsonrpcpp::Response>(*request, result);
}
catch (const jsonrpcpp::RequestException& e)
{
LOG(ERROR, LOG_TAG) << "Server::onMessageReceived JsonRequestException: " << e.to_json().dump() << ", message: " << request->to_json().dump() << "\n";
response.reset(new jsonrpcpp::RequestException(e));
response = std::make_shared<jsonrpcpp::RequestException>(e);
}
catch (const exception& e)
{
LOG(ERROR, LOG_TAG) << "Server::onMessageReceived exception: " << e.what() << ", message: " << request->to_json().dump() << "\n";
response.reset(new jsonrpcpp::InternalErrorException(e.what(), request->id()));
response = std::make_shared<jsonrpcpp::InternalErrorException>(e.what(), request->id());
}
on_response(std::move(response), std::move(notification));
}
std::string Server::onMessageReceived(ControlSession* controlSession, const std::string& message)
void Server::onMessageReceived(std::shared_ptr<ControlSession> controlSession, const std::string& message, const ResponseHander& response_handler)
{
// LOG(DEBUG, LOG_TAG) << "onMessageReceived: " << message << "\n";
jsonrpcpp::entity_ptr entity(nullptr);
@ -552,15 +557,15 @@ std::string Server::onMessageReceived(ControlSession* controlSession, const std:
{
entity = jsonrpcpp::Parser::do_parse(message);
if (!entity)
return "";
return response_handler("");
}
catch (const jsonrpcpp::ParseErrorException& e)
{
return e.to_json().dump();
return response_handler(e.to_json().dump());
}
catch (const std::exception& e)
{
return jsonrpcpp::ParseErrorException(e.what()).to_json().dump();
return response_handler(jsonrpcpp::ParseErrorException(e.what()).to_json().dump());
}
jsonrpcpp::entity_ptr response(nullptr);
@ -568,23 +573,26 @@ std::string Server::onMessageReceived(ControlSession* controlSession, const std:
if (entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
processRequest(request, response, notification);
processRequest(request, [this, controlSession, response_handler](jsonrpcpp::entity_ptr response, jsonrpcpp::notification_ptr notification) {
saveConfig();
////cout << "Request: " << request->to_json().dump() << "\n";
if (notification)
{
////cout << "Notification: " << notification->to_json().dump() << "\n";
controlServer_->send(notification->to_json().dump(), controlSession);
controlServer_->send(notification->to_json().dump(), controlSession.get());
}
if (response)
{
////cout << "Response: " << response->to_json().dump() << "\n";
return response->to_json().dump();
return response_handler(response->to_json().dump());
}
return "";
return response_handler("");
});
}
else if (entity->is_batch())
{
/// Attention: this will only work as long as the response handler in processRequest is called synchronously.
/// This is true for volume changes, which is the only batch request, but not for Control commands!
jsonrpcpp::batch_ptr batch = dynamic_pointer_cast<jsonrpcpp::Batch>(entity);
////cout << "Batch: " << batch->to_json().dump() << "\n";
jsonrpcpp::Batch responseBatch;
@ -594,21 +602,23 @@ std::string Server::onMessageReceived(ControlSession* controlSession, const std:
if (batch_entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(batch_entity);
processRequest(request, response, notification);
processRequest(request, [this, controlSession, response_handler, &responseBatch, &notificationBatch](jsonrpcpp::entity_ptr response,
jsonrpcpp::notification_ptr notification) {
if (response != nullptr)
responseBatch.add_ptr(response);
if (notification != nullptr)
notificationBatch.add_ptr(notification);
});
}
}
saveConfig();
if (!notificationBatch.entities.empty())
controlServer_->send(notificationBatch.to_json().dump(), controlSession);
controlServer_->send(notificationBatch.to_json().dump(), controlSession.get());
if (!responseBatch.entities.empty())
return responseBatch.to_json().dump();
return "";
return response_handler(responseBatch.to_json().dump());
return response_handler("");
}
return "";
return response_handler("");
}
@ -624,7 +634,8 @@ void Server::onMessageReceived(StreamSession* streamSession, const msg::BaseMess
timeMsg->deserialize(baseMessage, buffer);
timeMsg->refersTo = timeMsg->id;
timeMsg->latency = timeMsg->received - timeMsg->sent;
// LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n";
// LOG(INFO, LOG_TAG) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo <<
// "\n";
streamSession->send(timeMsg);
// refresh streamSession state

View file

@ -55,6 +55,9 @@ using session_ptr = std::shared_ptr<StreamSession>;
class Server : public StreamMessageReceiver, public ControlMessageReceiver, public PcmListener
{
public:
// TODO: revise handler names
using OnResponse = std::function<void(jsonrpcpp::entity_ptr response, jsonrpcpp::notification_ptr notification)>;
Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings);
virtual ~Server();
@ -67,12 +70,12 @@ private:
void onDisconnect(StreamSession* streamSession) override;
/// Implementation of ControllMessageReceiver
std::string onMessageReceived(ControlSession* controlSession, const std::string& message) override;
void onNewSession(const std::shared_ptr<ControlSession>& session) override
void onMessageReceived(std::shared_ptr<ControlSession> controlSession, const std::string& message, const ResponseHander& response_handler) override;
void onNewSession(std::shared_ptr<ControlSession> session) override
{
std::ignore = session;
};
void onNewSession(const std::shared_ptr<StreamSession>& session) override;
void onNewSession(std::shared_ptr<StreamSession> session) override;
/// Implementation of PcmListener
void onMetaChanged(const PcmStream* pcmStream) override;
@ -83,7 +86,7 @@ private:
void onResync(const PcmStream* pcmStream, double ms) override;
private:
void processRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const;
void processRequest(const jsonrpcpp::request_ptr request, const OnResponse& on_response) const;
/// Save the server state deferred to prevent blocking and lower disk io
/// @param deferred the delay after the last call to saveConfig
void saveConfig(const std::chrono::milliseconds& deferred = std::chrono::seconds(2));

View file

@ -54,14 +54,14 @@ void StreamServer::cleanup()
}
void StreamServer::addSession(const std::shared_ptr<StreamSession>& session)
void StreamServer::addSession(std::shared_ptr<StreamSession> session)
{
session->setMessageReceiver(this);
session->setBufferMs(settings_.stream.bufferMs);
session->start();
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
sessions_.emplace_back(session);
sessions_.emplace_back(std::move(session));
cleanup();
}

View file

@ -63,7 +63,7 @@ public:
/// Send a message to all connceted clients
// void send(const msg::BaseMessage* message);
void addSession(const std::shared_ptr<StreamSession>& session);
void addSession(std::shared_ptr<StreamSession> session);
// void onMetaChanged(const PcmStream* pcmStream, std::shared_ptr<msg::StreamTags> meta);
void onChunkEncoded(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double duration);