fixed queue::abort

This commit is contained in:
badaix 2016-11-09 21:57:24 +01:00
parent e875555d3c
commit 06ce37be1c
3 changed files with 23 additions and 19 deletions

View file

@ -88,8 +88,7 @@ bool Stream::waitForChunk(size_t ms) const
return true; return true;
std::unique_lock<std::mutex> lck(cvMutex_); std::unique_lock<std::mutex> lck(cvMutex_);
cv_.wait_for(lck, std::chrono::milliseconds(ms)); return (cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !chunks_.empty(); }));
return !chunks_.empty();
} }

View file

@ -20,6 +20,7 @@
#define QUEUE_H #define QUEUE_H
#include <queue> #include <queue>
#include <atomic>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
@ -33,8 +34,9 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty()) while (queue_.empty())
{
cond_.wait(mlock); cond_.wait(mlock);
}
auto val = queue_.front(); auto val = queue_.front();
queue_.pop(); queue_.pop();
return val; return val;
@ -49,12 +51,21 @@ public:
return queue_.front(); return queue_.front();
} }
void abort_wait()
{
abort_ = true;
cond_.notify_one();
}
bool try_pop(T& item, std::chrono::microseconds timeout) bool try_pop(T& item, std::chrono::microseconds timeout)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
cond_.wait_for(mlock, timeout); abort_ = false;
if (queue_.empty()) if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
return false;
if (queue_.empty() || abort_)
return false; return false;
item = std::move(queue_.front()); item = std::move(queue_.front());
@ -68,22 +79,15 @@ public:
return try_pop(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout)); return try_pop(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout));
} }
void abort_wait() void pop(T& item)
{
cond_.notify_one();
}
bool pop(T& item)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
cond_.wait(mlock); while (queue_.empty())
{
if (queue_.empty()) cond_.wait(mlock);
return false; }
item = queue_.front();
item = std::move(queue_.front());
queue_.pop(); queue_.pop();
return true;
} }
void push(const T& item) void push(const T& item)
@ -119,6 +123,7 @@ public:
private: private:
std::queue<T> queue_; std::queue<T> queue_;
std::atomic<bool> abort_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::condition_variable cond_; std::condition_variable cond_;
}; };

View file

@ -115,7 +115,7 @@ bool PcmStream::sleep(int32_t ms)
if (ms < 0) if (ms < 0)
return true; return true;
std::unique_lock<std::mutex> lck(mtx_); std::unique_lock<std::mutex> lck(mtx_);
return (cv_.wait_for(lck, std::chrono::milliseconds(ms)) == std::cv_status::timeout); return (!cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !active_; }));
} }