abortable sleep

This commit is contained in:
badaix 2016-11-06 18:51:29 +01:00
parent 7353383313
commit d41d314e27
5 changed files with 35 additions and 21 deletions

View file

@ -91,7 +91,8 @@ void FileStream::worker()
if (nextTick >= currentTick) if (nextTick >= currentTick)
{ {
// logO << "sleep: " << nextTick - currentTick << "\n"; // logO << "sleep: " << nextTick - currentTick << "\n";
chronos::sleep(nextTick - currentTick); if (!sleep(nextTick - currentTick))
break;
} }
else else
{ {

View file

@ -87,20 +87,29 @@ void PcmStream::start()
{ {
logD << "PcmStream start: " << sampleFormat_.getFormat() << "\n"; logD << "PcmStream start: " << sampleFormat_.getFormat() << "\n";
encoder_->init(this, sampleFormat_); encoder_->init(this, sampleFormat_);
active_ = true; active_ = true;
readerThread_ = thread(&PcmStream::worker, this); thread_ = thread(&PcmStream::worker, this);
} }
void PcmStream::stop() void PcmStream::stop()
{ {
if (active_) if (!active_ && !thread_.joinable())
{ return;
active_ = false; active_ = false;
if (readerThread_.joinable()) cv_.notify_one();
readerThread_.join(); if (thread_.joinable())
thread_.join();
} }
bool PcmStream::sleep(int32_t ms)
{
if (ms < 0)
return true;
std::unique_lock<std::mutex> lck(mtx_);
return (cv_.wait_for(lck, std::chrono::milliseconds(ms)) == std::cv_status::timeout);
} }

View file

@ -22,6 +22,8 @@
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <string> #include <string>
#include <mutex>
#include <condition_variable>
#include <map> #include <map>
#include "streamUri.h" #include "streamUri.h"
#include "encoder/encoder.h" #include "encoder/encoder.h"
@ -83,12 +85,16 @@ public:
protected: protected:
std::condition_variable cv_;
std::mutex mtx_;
std::thread thread_;
std::atomic<bool> active_;
virtual void worker() = 0; virtual void worker() = 0;
virtual bool sleep(int32_t ms);
void setState(const ReaderState& newState); void setState(const ReaderState& newState);
timeval tvEncodedChunk_; timeval tvEncodedChunk_;
std::atomic<bool> active_;
std::thread readerThread_;
PcmListener* pcmListener_; PcmListener* pcmListener_;
StreamUri uri_; StreamUri uri_;
SampleFormat sampleFormat_; SampleFormat sampleFormat_;

View file

@ -175,7 +175,8 @@ void ProcessStream::worker()
if (count < 0) if (count < 0)
{ {
setState(kIdle); setState(kIdle);
chronos::sleep(100); if (!sleep(100))
break;
} }
else if (count == 0) else if (count == 0)
throw SnapException("end of file"); throw SnapException("end of file");
@ -195,7 +196,8 @@ void ProcessStream::worker()
if (nextTick >= currentTick) if (nextTick >= currentTick)
{ {
setState(kPlaying); setState(kPlaying);
chronos::sleep(nextTick - currentTick); if (!sleep(nextTick - currentTick))
break;
} }
else else
{ {
@ -210,12 +212,8 @@ void ProcessStream::worker()
{ {
logE << "(ProcessStream) Exception: " << e.what() << std::endl; logE << "(ProcessStream) Exception: " << e.what() << std::endl;
process_->kill(); process_->kill();
int sleepMs = 30000; if (!sleep(30000))
while (active_ && (sleepMs > 0)) break;
{
chronos::sleep(100);
sleepMs -= 100;
}
} }
} }
} }

View file

@ -59,7 +59,7 @@ void Watchdog::stop()
void Watchdog::trigger() void Watchdog::trigger()
{ {
std::unique_lock<std::mutex> lck(mtx_); // std::unique_lock<std::mutex> lck(mtx_);
cv_.notify_one(); cv_.notify_one();
} }