diff --git a/server/streamreader/fileStream.cpp b/server/streamreader/fileStream.cpp index b084cd6b..734b2b41 100644 --- a/server/streamreader/fileStream.cpp +++ b/server/streamreader/fileStream.cpp @@ -91,7 +91,8 @@ void FileStream::worker() if (nextTick >= currentTick) { // logO << "sleep: " << nextTick - currentTick << "\n"; - chronos::sleep(nextTick - currentTick); + if (!sleep(nextTick - currentTick)) + break; } else { diff --git a/server/streamreader/pcmStream.cpp b/server/streamreader/pcmStream.cpp index cccf1297..1e7a0873 100644 --- a/server/streamreader/pcmStream.cpp +++ b/server/streamreader/pcmStream.cpp @@ -87,20 +87,29 @@ void PcmStream::start() { logD << "PcmStream start: " << sampleFormat_.getFormat() << "\n"; encoder_->init(this, sampleFormat_); - - active_ = true; - readerThread_ = thread(&PcmStream::worker, this); + active_ = true; + thread_ = thread(&PcmStream::worker, this); } void PcmStream::stop() { - if (active_) - { - active_ = false; - if (readerThread_.joinable()) - readerThread_.join(); - } + if (!active_ && !thread_.joinable()) + return; + + active_ = false; + cv_.notify_one(); + if (thread_.joinable()) + thread_.join(); +} + + +bool PcmStream::sleep(int32_t ms) +{ + if (ms < 0) + return true; + std::unique_lock lck(mtx_); + return (cv_.wait_for(lck, std::chrono::milliseconds(ms)) == std::cv_status::timeout); } diff --git a/server/streamreader/pcmStream.h b/server/streamreader/pcmStream.h index 13ba9915..d76d3f00 100644 --- a/server/streamreader/pcmStream.h +++ b/server/streamreader/pcmStream.h @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include "streamUri.h" #include "encoder/encoder.h" @@ -83,12 +85,16 @@ public: protected: + std::condition_variable cv_; + std::mutex mtx_; + std::thread thread_; + std::atomic active_; + virtual void worker() = 0; + virtual bool sleep(int32_t ms); void setState(const ReaderState& newState); timeval tvEncodedChunk_; - std::atomic active_; - std::thread readerThread_; PcmListener* pcmListener_; StreamUri uri_; SampleFormat sampleFormat_; diff --git a/server/streamreader/processStream.cpp b/server/streamreader/processStream.cpp index b4a5856b..3a38c9c8 100644 --- a/server/streamreader/processStream.cpp +++ b/server/streamreader/processStream.cpp @@ -175,7 +175,8 @@ void ProcessStream::worker() if (count < 0) { setState(kIdle); - chronos::sleep(100); + if (!sleep(100)) + break; } else if (count == 0) throw SnapException("end of file"); @@ -195,7 +196,8 @@ void ProcessStream::worker() if (nextTick >= currentTick) { setState(kPlaying); - chronos::sleep(nextTick - currentTick); + if (!sleep(nextTick - currentTick)) + break; } else { @@ -210,12 +212,8 @@ void ProcessStream::worker() { logE << "(ProcessStream) Exception: " << e.what() << std::endl; process_->kill(); - int sleepMs = 30000; - while (active_ && (sleepMs > 0)) - { - chronos::sleep(100); - sleepMs -= 100; - } + if (!sleep(30000)) + break; } } } diff --git a/server/streamreader/watchdog.cpp b/server/streamreader/watchdog.cpp index d0dd8173..4fc9e683 100644 --- a/server/streamreader/watchdog.cpp +++ b/server/streamreader/watchdog.cpp @@ -59,7 +59,7 @@ void Watchdog::stop() void Watchdog::trigger() { - std::unique_lock lck(mtx_); +// std::unique_lock lck(mtx_); cv_.notify_one(); }