fixed deadlock in server shutdown

This commit is contained in:
badaix 2016-11-09 11:45:35 +01:00
parent 341466ceb5
commit 935899b0a2
5 changed files with 35 additions and 23 deletions

View file

@ -404,6 +404,13 @@ void StreamServer::start()
void StreamServer::stop() void StreamServer::stop()
{ {
if (streamManager_)
{
streamManager_->stop();
streamManager_ = nullptr;
}
{
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it) for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it)
{ {
@ -414,6 +421,7 @@ void StreamServer::stop()
} }
} }
sessions_.clear(); sessions_.clear();
}
if (controlServer_) if (controlServer_)
{ {
@ -426,11 +434,5 @@ void StreamServer::stop()
acceptor_->cancel(); acceptor_->cancel();
acceptor_ = nullptr; acceptor_ = nullptr;
} }
if (streamManager_)
{
streamManager_->stop();
streamManager_ = nullptr;
}
} }

View file

@ -84,6 +84,7 @@ void FileStream::worker()
ifs.read(chunk->payload + count, toRead - count); ifs.read(chunk->payload + count, toRead - count);
encoder_->encode(chunk.get()); encoder_->encode(chunk.get());
if (!active_) break;
nextTick += pcmReadMs_; nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000); chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount(); long currentTick = chronos::getTickCount();

View file

@ -130,6 +130,7 @@ void PcmStream::setState(const ReaderState& newState)
if (newState != state_) if (newState != state_)
{ {
state_ = newState; state_ = newState;
if (pcmListener_)
pcmListener_->onStateChanged(this, newState); pcmListener_->onStateChanged(this, newState);
} }
} }
@ -144,6 +145,7 @@ void PcmStream::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, dou
chunk->timestamp.sec = tvEncodedChunk_.tv_sec; chunk->timestamp.sec = tvEncodedChunk_.tv_sec;
chunk->timestamp.usec = tvEncodedChunk_.tv_usec; chunk->timestamp.usec = tvEncodedChunk_.tv_usec;
chronos::addUs(tvEncodedChunk_, duration * 1000); chronos::addUs(tvEncodedChunk_, duration * 1000);
if (pcmListener_)
pcmListener_->onChunkRead(this, chunk, duration); pcmListener_->onChunkRead(this, chunk, duration);
} }

View file

@ -88,7 +88,8 @@ void PipeStream::worker()
if (count < 0) if (count < 0)
{ {
setState(kIdle); setState(kIdle);
chronos::sleep(100); if (!sleep(100))
break;
} }
else if (count == 0) else if (count == 0)
throw SnapException("end of file"); throw SnapException("end of file");
@ -97,10 +98,12 @@ void PipeStream::worker()
} }
while ((len < toRead) && active_); while ((len < toRead) && active_);
if (!active_) if (!active_) break;
break;
encoder_->encode(chunk.get()); encoder_->encode(chunk.get());
if (!active_) break;
nextTick += pcmReadMs_; nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000); chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount(); long currentTick = chronos::getTickCount();
@ -108,7 +111,8 @@ void PipeStream::worker()
if (nextTick >= currentTick) if (nextTick >= currentTick)
{ {
setState(kPlaying); setState(kPlaying);
chronos::sleep(nextTick - currentTick); if (!sleep(nextTick - currentTick))
break;
} }
else else
{ {
@ -122,7 +126,8 @@ void PipeStream::worker()
catch(const std::exception& e) catch(const std::exception& e)
{ {
logE << "(PipeStream) Exception: " << e.what() << std::endl; logE << "(PipeStream) Exception: " << e.what() << std::endl;
chronos::sleep(100); if (!sleep(100))
break;
} }
} }
} }

View file

@ -185,10 +185,12 @@ void ProcessStream::worker()
} }
while ((len < toRead) && active_); while ((len < toRead) && active_);
if (!active_) if (!active_) break;
break;
encoder_->encode(chunk.get()); encoder_->encode(chunk.get());
if (!active_) break;
nextTick += pcmReadMs_; nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000); chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount(); long currentTick = chronos::getTickCount();