From 297d74aae91fd072ae925aa9ee4f881b7ffe522e Mon Sep 17 00:00:00 2001 From: badaix Date: Tue, 3 Jul 2018 22:52:39 +0200 Subject: [PATCH] fix rare deadlock in case of pending control request for ungraceful disconnected clients --- server/controlSession.cpp | 6 ++--- server/streamServer.cpp | 50 ++++++++++++++++++++++----------------- server/streamSession.cpp | 4 ++-- 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/server/controlSession.cpp b/server/controlSession.cpp index c6f026ea..7fd9d963 100644 --- a/server/controlSession.cpp +++ b/server/controlSession.cpp @@ -68,12 +68,12 @@ void ControlSession::stop() } if (readerThread_.joinable()) { - LOG(DEBUG) << "joining readerThread\n"; + LOG(DEBUG) << "ControlSession joining readerThread\n"; readerThread_.join(); } if (writerThread_.joinable()) { - LOG(DEBUG) << "joining writerThread\n"; + LOG(DEBUG) << "ControlSession joining writerThread\n"; messages_.abort_wait(); writerThread_.join(); } @@ -82,7 +82,7 @@ void ControlSession::stop() { } socket_ = NULL; - LOG(DEBUG) << "ControlSession stopped\n"; + LOG(DEBUG) << "ControlSession ControlSession stopped\n"; } diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 662aec09..212b77f9 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -130,7 +130,7 @@ void StreamServer::onDisconnect(StreamSession* streamSession) LOG(DEBUG) << "sessions: " << sessions_.size() << "\n"; // notify controllers if not yet done - ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->clientId); + ClientInfoPtr clientInfo = Config::instance().getClientInfo(session->clientId); if (!clientInfo || !clientInfo->connected) return; @@ -139,10 +139,16 @@ void StreamServer::onDisconnect(StreamSession* streamSession) Config::instance().save(); if (controlServer_ != nullptr) { - /// Notification: {"jsonrpc":"2.0","method":"Client.OnDisconnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":false,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025523,"usec":814067},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}} - json notification = jsonrpcpp::Notification("Client.OnDisconnect", jsonrpcpp::Parameter("id", clientInfo->id, "client", clientInfo->toJson())).to_json(); - controlServer_->send(notification.dump()); - ////cout << "Notification: " << notification.dump() << "\n"; + /// Check if there is no session of this client is left + /// Can happen in case of ungraceful disconnect/reconnect or + /// in case of a duplicate client id + if (getStreamSession(clientInfo->id) == nullptr) + { + /// Notification: {"jsonrpc":"2.0","method":"Client.OnDisconnect","params":{"client":{"config":{"instance":1,"latency":0,"name":"","volume":{"muted":false,"percent":81}},"connected":false,"host":{"arch":"x86_64","ip":"192.168.0.54","mac":"00:21:6a:7d:74:fc","name":"T400","os":"Linux Mint 17.3 Rosa"},"id":"00:21:6a:7d:74:fc","lastSeen":{"sec":1488025523,"usec":814067},"snapclient":{"name":"Snapclient","protocolVersion":2,"version":"0.10.0"}},"id":"00:21:6a:7d:74:fc"}} + json notification = jsonrpcpp::Notification("Client.OnDisconnect", jsonrpcpp::Parameter("id", clientInfo->id, "client", clientInfo->toJson())).to_json(); + controlServer_->send(notification.dump()); + ////cout << "Notification: " << notification.dump() << "\n"; + } } } @@ -214,7 +220,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp GroupPtr group = Config::instance().getGroupFromClient(clientInfo); serverSettings->setMuted(clientInfo->config.volume.muted || group->muted); serverSettings->setLatency(clientInfo->config.latency); - session->send(serverSettings); + session->sendAsync(serverSettings, true); } } } @@ -250,7 +256,7 @@ void StreamServer::ProcessRequest(const jsonrpcpp::request_ptr request, jsonrpcp GroupPtr group = Config::instance().getGroupFromClient(client); serverSettings->setMuted(client->config.volume.muted || group->muted); serverSettings->setLatency(client->config.latency); - session->send(serverSettings); + session->sendAsync(serverSettings, true); } } @@ -497,7 +503,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std:: -void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) +void StreamServer::onMessageReceived(StreamSession* streamSession, const msg::BaseMessage& baseMessage, char* buffer) { // LOG(DEBUG) << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; if (baseMessage.type == message_type::kTime) @@ -507,10 +513,10 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM timeMsg->refersTo = timeMsg->id; timeMsg->latency = timeMsg->received - timeMsg->sent; // LOG(INFO) << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; - connection->sendAsync(timeMsg, true); + streamSession->sendAsync(timeMsg, true); - // refresh connection state - ClientInfoPtr client = Config::instance().getClientInfo(connection->clientId); + // refresh streamSession state + ClientInfoPtr client = Config::instance().getClientInfo(streamSession->clientId); if (client != nullptr) { chronos::systemtimeofday(&client->lastSeen); @@ -521,22 +527,22 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM { msg::Hello helloMsg; helloMsg.deserialize(baseMessage, buffer); - connection->clientId = helloMsg.getUniqueId(); - LOG(INFO) << "Hello from " << connection->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() + streamSession->clientId = helloMsg.getUniqueId(); + LOG(INFO) << "Hello from " << streamSession->clientId << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() << ", ClientName: " << helloMsg.getClientName() << ", OS: " << helloMsg.getOS() << ", Arch: " << helloMsg.getArch() << ", Protocol version: " << helloMsg.getProtocolVersion() << "\n"; - LOG(DEBUG) << "request kServerSettings: " << connection->clientId << "\n"; + LOG(DEBUG) << "request kServerSettings: " << streamSession->clientId << "\n"; // std::lock_guard mlock(mutex_); bool newGroup(false); - GroupPtr group = Config::instance().getGroupFromClient(connection->clientId); + GroupPtr group = Config::instance().getGroupFromClient(streamSession->clientId); if (group == nullptr) { - group = Config::instance().addClientInfo(connection->clientId); + group = Config::instance().addClientInfo(streamSession->clientId); newGroup = true; } - ClientInfoPtr client = group->getClient(connection->clientId); + ClientInfoPtr client = group->getClient(streamSession->clientId); LOG(DEBUG) << "request kServerSettings\n"; auto serverSettings = make_shared(); @@ -545,10 +551,10 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM serverSettings->setLatency(client->config.latency); serverSettings->setBufferMs(settings_.bufferMs); serverSettings->refersTo = helloMsg.id; - connection->sendAsync(serverSettings); + streamSession->sendAsync(serverSettings); client->host.mac = helloMsg.getMacAddress(); - client->host.ip = connection->getIP(); + client->host.ip = streamSession->getIP(); client->host.name = helloMsg.getHostName(); client->host.os = helloMsg.getOS(); client->host.arch = helloMsg.getArch(); @@ -570,10 +576,10 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM Config::instance().save(); - connection->sendAsync(stream->getMeta()); - connection->setPcmStream(stream); + streamSession->sendAsync(stream->getMeta()); + streamSession->setPcmStream(stream); auto headerChunk = stream->getHeader(); - connection->sendAsync(headerChunk); + streamSession->sendAsync(headerChunk); if (newGroup) { diff --git a/server/streamSession.cpp b/server/streamSession.cpp index 620ae2ef..0b274786 100644 --- a/server/streamSession.cpp +++ b/server/streamSession.cpp @@ -86,12 +86,12 @@ void StreamSession::stop() } if (readerThread_ && readerThread_->joinable()) { - LOG(DEBUG) << "joining readerThread\n"; + LOG(DEBUG) << "StreamSession joining readerThread\n"; readerThread_->join(); } if (writerThread_ && writerThread_->joinable()) { - LOG(DEBUG) << "joining writerThread\n"; + LOG(DEBUG) << "StreamSession joining writerThread\n"; messages_.abort_wait(); writerThread_->join(); }