From 06ce37be1cc4ff6c471ffea63bc0a3ab9dba5785 Mon Sep 17 00:00:00 2001 From: badaix Date: Wed, 9 Nov 2016 21:57:24 +0100 Subject: [PATCH] fixed queue::abort --- client/stream.cpp | 3 +-- common/queue.h | 37 ++++++++++++++++++------------- server/streamreader/pcmStream.cpp | 2 +- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/client/stream.cpp b/client/stream.cpp index bd24b6c4..06c87a11 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -88,8 +88,7 @@ bool Stream::waitForChunk(size_t ms) const return true; std::unique_lock lck(cvMutex_); - cv_.wait_for(lck, std::chrono::milliseconds(ms)); - return !chunks_.empty(); + return (cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !chunks_.empty(); })); } diff --git a/common/queue.h b/common/queue.h index 00da2390..7a36d06f 100644 --- a/common/queue.h +++ b/common/queue.h @@ -20,6 +20,7 @@ #define QUEUE_H #include +#include #include #include #include @@ -33,8 +34,9 @@ public: { std::unique_lock mlock(mutex_); while (queue_.empty()) + { cond_.wait(mlock); - + } auto val = queue_.front(); queue_.pop(); return val; @@ -49,12 +51,21 @@ public: return queue_.front(); } + void abort_wait() + { + abort_ = true; + cond_.notify_one(); + } + bool try_pop(T& item, std::chrono::microseconds timeout) { std::unique_lock mlock(mutex_); - cond_.wait_for(mlock, timeout); + abort_ = false; - if (queue_.empty()) + if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) + return false; + + if (queue_.empty() || abort_) return false; item = std::move(queue_.front()); @@ -68,22 +79,15 @@ public: return try_pop(item, std::chrono::duration_cast(timeout)); } - void abort_wait() - { - cond_.notify_one(); - } - - bool pop(T& item) + void pop(T& item) { std::unique_lock mlock(mutex_); - cond_.wait(mlock); - - if (queue_.empty()) - return false; - - item = std::move(queue_.front()); + while (queue_.empty()) + { + cond_.wait(mlock); + } + item = queue_.front(); queue_.pop(); - return true; } void push(const T& item) @@ -119,6 +123,7 @@ public: private: std::queue queue_; + std::atomic abort_; mutable std::mutex mutex_; std::condition_variable cond_; }; diff --git a/server/streamreader/pcmStream.cpp b/server/streamreader/pcmStream.cpp index 3d265dbb..a2d2d49a 100644 --- a/server/streamreader/pcmStream.cpp +++ b/server/streamreader/pcmStream.cpp @@ -115,7 +115,7 @@ bool PcmStream::sleep(int32_t ms) if (ms < 0) return true; std::unique_lock lck(mtx_); - return (cv_.wait_for(lck, std::chrono::milliseconds(ms)) == std::cv_status::timeout); + return (!cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !active_; })); }