fixed disconnect deadlock

This commit is contained in:
badaix 2015-10-25 13:09:12 +01:00
parent a46b7c6106
commit 6d1cc2d081
2 changed files with 26 additions and 7 deletions

View file

@ -45,12 +45,31 @@ StreamServer::~StreamServer()
void StreamServer::send(const msg::BaseMessage* message)
{
std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); )
{
if (!(*it)->active())
{
logS(kLogErr) << "Session inactive. Removing\n";
// don't block: remove ServerSession in a thread
onDisconnect(it->get());
auto func = [](shared_ptr<StreamSession> s)->void{s->stop();};
std::thread t(func, *it);
t.detach();
sessions_.erase(it++);
}
else
++it;
}
/* for (auto it = sessions_.begin(); it != sessions_.end(); )
{
if (!(*it)->active())
onDisconnect(it->get());
}
*/
std::shared_ptr<const msg::BaseMessage> shared_message(message);
for (auto s : sessions_)
s->add(shared_message);
@ -73,12 +92,12 @@ void StreamServer::onResync(const PipeReader* pipeReader, double ms)
void StreamServer::onDisconnect(StreamSession* streamSession)
{
logO << "onDisconnect: " << streamSession->macAddress << "\n";
auto func = [](StreamSession* s)->void{s->stop();};
/* auto func = [](StreamSession* s)->void{s->stop();};
std::thread t(func, streamSession);
t.detach();
*/
ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress);
// don't block: remove StreamSession in a thread
/* // don't block: remove StreamSession in a thread
for (auto it = sessions_.begin(); it != sessions_.end(); )
{
if (it->get() == streamSession)
@ -88,7 +107,7 @@ void StreamServer::onDisconnect(StreamSession* streamSession)
break;
}
}
*/
// notify controllers if not yet done
if (!clientInfo->connected)
return;

View file

@ -28,7 +28,7 @@ using namespace std;
StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver)
StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : active_(true), messageReceiver_(receiver)
{
socket_ = socket;
}
@ -42,7 +42,7 @@ StreamSession::~StreamSession()
void StreamSession::start()
{
active_ = true;
setActive(true);
streamActive_ = false;
readerThread_ = new thread(&StreamSession::reader, this);
writerThread_ = new thread(&StreamSession::writer, this);