diff --git a/server/stream_server.cpp b/server/stream_server.cpp index 55e75fdd..5fe04516 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -30,7 +30,8 @@ using namespace streamreader; using json = nlohmann::json; -StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings) : io_context_(io_context), settings_(serverSettings) +StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings) + : io_context_(io_context), config_timer_(io_context), settings_(serverSettings) { } @@ -169,7 +170,7 @@ void StreamServer::onDisconnect(StreamSession* streamSession) clientInfo->connected = false; chronos::systemtimeofday(&clientInfo->lastSeen); - Config::instance().save(); + saveConfig(); if (controlServer_ != nullptr) { // Check if there is no session of this client is left @@ -572,7 +573,7 @@ std::string StreamServer::onMessageReceived(ControlSession* controlSession, cons { jsonrpcpp::request_ptr request = dynamic_pointer_cast(entity); ProcessRequest(request, response, notification); - Config::instance().save(); + saveConfig(); ////cout << "Request: " << request->to_json().dump() << "\n"; if (notification) { @@ -604,7 +605,7 @@ std::string StreamServer::onMessageReceived(ControlSession* controlSession, cons notificationBatch.add_ptr(notification); } } - Config::instance().save(); + saveConfig(); if (!notificationBatch.entities.empty()) controlServer_->send(notificationBatch.to_json().dump(), controlSession); if (!responseBatch.entities.empty()) @@ -690,7 +691,7 @@ void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::Ba } LOG(DEBUG) << "Group: " << group->id << ", stream: " << group->streamId << "\n"; - Config::instance().save(); + saveConfig(); streamSession->sendAsync(stream->getMeta()); streamSession->setPcmStream(stream); @@ -721,6 +722,19 @@ void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::Ba } +void StreamServer::saveConfig() +{ + config_timer_.cancel(); + config_timer_.expires_after(2s); + config_timer_.async_wait([this](const boost::system::error_code& ec) { + if (!ec) + { + LOG(DEBUG) << "Saving config\n"; + Config::instance().save(); + } + }); +} + session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const { diff --git a/server/stream_server.hpp b/server/stream_server.hpp index 73319c4c..a8178215 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -83,12 +83,15 @@ private: session_ptr getStreamSession(StreamSession* session) const; void ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcpp::entity_ptr& response, jsonrpcpp::notification_ptr& notification) const; void cleanup(); + /// save the server state deferred after 2s without a change to prevent blocking and too much disk io + void saveConfig(); mutable std::recursive_mutex sessionsMutex_; mutable std::recursive_mutex clientMutex_; std::vector> sessions_; boost::asio::io_context& io_context_; std::vector acceptor_; + boost::asio::steady_timer config_timer_; ServerSettings settings_; Queue> messages_;