fix data race and speedup batch commands

This commit is contained in:
badaix 2019-11-01 15:23:38 +01:00
parent 698c94bc2d
commit b5d8cefbcc

View file

@ -92,8 +92,6 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, msg::PcmChunk* chunk,
{ {
// LOG(INFO) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n"; // LOG(INFO) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n";
bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get()); bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get());
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
unique_ptr<msg::PcmChunk> chunk_ptr(chunk); unique_ptr<msg::PcmChunk> chunk_ptr(chunk);
std::ostringstream oss; std::ostringstream oss;
@ -102,7 +100,13 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, msg::PcmChunk* chunk,
chunk_ptr->serialize(oss); chunk_ptr->serialize(oss);
shared_const_buffer buffer(oss.str()); shared_const_buffer buffer(oss.str());
for (auto session : sessions_) std::vector<std::weak_ptr<StreamSession>> sessions;
{
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
sessions = sessions_;
}
for (auto session : sessions)
{ {
if (auto s = session.lock()) if (auto s = session.lock())
{ {
@ -112,13 +116,18 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, msg::PcmChunk* chunk,
if (group) if (group)
{ {
if (group->muted) if (group->muted)
{
continue; continue;
}
else
{
std::lock_guard<std::recursive_mutex> lock(clientMutex_);
ClientInfoPtr client = group->getClient(s->clientId); ClientInfoPtr client = group->getClient(s->clientId);
if (client && client->config.volume.muted) if (client && client->config.volume.muted)
continue; continue;
} }
} }
}
if (!s->pcmStream() && isDefaultStream) //->getName() == "default") if (!s->pcmStream() && isDefaultStream) //->getName() == "default")
s->sendAsync(buffer); s->sendAsync(buffer);
@ -210,6 +219,8 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp
// Response: {"id":8,"jsonrpc":"2.0","result":{"volume":{"muted":false,"percent":74}}} // Response: {"id":8,"jsonrpc":"2.0","result":{"volume":{"muted":false,"percent":74}}}
// Notification: {"jsonrpc":"2.0","method":"Client.OnVolumeChanged","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}} // Notification: {"jsonrpc":"2.0","method":"Client.OnVolumeChanged","params":{"id":"00:21:6a:7d:74:fc","volume":{"muted":false,"percent":74}}}
// clang-format on // clang-format on
std::lock_guard<std::recursive_mutex> lock(clientMutex_);
clientInfo->config.volume.fromJson(request->params().get("volume")); clientInfo->config.volume.fromJson(request->params().get("volume"));
result["volume"] = clientInfo->config.volume.toJson(); result["volume"] = clientInfo->config.volume.toJson();
notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged", notification.reset(new jsonrpcpp::Notification("Client.OnVolumeChanged",
@ -521,7 +532,6 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp
else else
throw jsonrpcpp::MethodNotFoundException(request->id()); throw jsonrpcpp::MethodNotFoundException(request->id());
Config::instance().save();
response.reset(new jsonrpcpp::Response(*request, result)); response.reset(new jsonrpcpp::Response(*request, result));
} }
catch (const jsonrpcpp::RequestException& e) catch (const jsonrpcpp::RequestException& e)
@ -539,7 +549,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp
std::string StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message) std::string StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message)
{ {
LOG(DEBUG) << "onMessageReceived: " << message << "\n"; // LOG(DEBUG) << "onMessageReceived: " << message << "\n";
jsonrpcpp::entity_ptr entity(nullptr); jsonrpcpp::entity_ptr entity(nullptr);
try try
{ {
@ -562,6 +572,7 @@ std::string StreamServer::onMessageReceived(ControlSession* controlSession, cons
{ {
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity); jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
ProcessRequest(request, response, notification); ProcessRequest(request, response, notification);
Config::instance().save();
////cout << "Request: " << request->to_json().dump() << "\n"; ////cout << "Request: " << request->to_json().dump() << "\n";
if (notification) if (notification)
{ {
@ -593,6 +604,7 @@ std::string StreamServer::onMessageReceived(ControlSession* controlSession, cons
notificationBatch.add_ptr(notification); notificationBatch.add_ptr(notification);
} }
} }
Config::instance().save();
if (!notificationBatch.entities.empty()) if (!notificationBatch.entities.empty())
controlServer_->send(notificationBatch.to_json().dump(), controlSession); controlServer_->send(notificationBatch.to_json().dump(), controlSession);
if (!responseBatch.entities.empty()) if (!responseBatch.entities.empty())