From bc1d96ef89ec24e3c4290b14d12388b05a36fd9c Mon Sep 17 00:00:00 2001 From: badaix Date: Fri, 11 Nov 2016 12:24:55 +0100 Subject: [PATCH] cleaned up locking --- client/clientConnection.cpp | 16 ++++--------- client/clientConnection.h | 1 - client/decoder/flacDecoder.cpp | 9 +++++++ client/stream.cpp | 8 +------ client/stream.h | 3 --- common/queue.h | 44 ++++++++++++++++++++++------------ 6 files changed, 44 insertions(+), 37 deletions(-) diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index 2695a4b2..c22dbf44 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -145,13 +145,10 @@ shared_ptr ClientConnection::sendRequest(const msg::Base // logO << "Req: " << message->id << "\n"; shared_ptr pendingRequest(new PendingRequest(reqId_)); - { - std::unique_lock mlock(mutex_); - pendingRequests_.insert(pendingRequest); - } - std::unique_lock lck(requestMutex_); + std::unique_lock mlock(mutex_); + pendingRequests_.insert(pendingRequest); send(message); - if (pendingRequest->cv.wait_for(lck, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) + if (pendingRequest->cv.wait_for(mlock, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) { response = pendingRequest->response; sumTimeout_ = chronos::msec(0); @@ -164,10 +161,7 @@ shared_ptr ClientConnection::sendRequest(const msg::Base if (sumTimeout_ > chronos::sec(10)) throw SnapException("sum timeout exceeded 10s"); } - { - std::unique_lock mlock(mutex_); - pendingRequests_.erase(pendingRequest); - } + pendingRequests_.erase(pendingRequest); return response; } @@ -198,7 +192,7 @@ void ClientConnection::getNextMessage() req->response->message = baseMessage; req->response->buffer = (char*)malloc(baseMessage.size); memcpy(req->response->buffer, &buffer[0], baseMessage.size); - std::unique_lock lck(requestMutex_); + mlock.unlock(); req->cv.notify_one(); return; } diff --git a/client/clientConnection.h b/client/clientConnection.h index a4c05feb..576d9469 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -113,7 +113,6 @@ protected: std::atomic connected_; MessageReceiver* messageReceiver_; mutable std::mutex mutex_; - mutable std::mutex requestMutex_; std::set> pendingRequests_; uint16_t reqId_; std::string host_; diff --git a/client/decoder/flacDecoder.cpp b/client/decoder/flacDecoder.cpp index f8e8cd61..a25cb8da 100644 --- a/client/decoder/flacDecoder.cpp +++ b/client/decoder/flacDecoder.cpp @@ -196,6 +196,15 @@ void error_callback(const FLAC__StreamDecoder *decoder, FLAC__StreamDecoderError { (void)decoder, (void)client_data; logS(kLogErr) << "Got error callback: " << FLAC__StreamDecoderErrorStatusString[status] << "\n"; + /// TODO, see issue #120: + // Thu Nov 10 07:26:44 2016 daemon.warn dnsmasq-dhcp[1194]: no address range available for DHCP request via wlan0 + // Thu Nov 10 07:54:39 2016 daemon.err snapclient[1158]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC + // Thu Nov 10 07:54:39 2016 daemon.err snapclient[1158]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC + // + // and: + // Oct 27 17:37:38 kitchen snapclient[869]: Connected to 192.168.222.10 + // Oct 27 17:47:13 kitchen snapclient[869]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_UNPARSEABLE_STREAM + // Oct 27 17:47:13 kitchen snapclient[869]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC } diff --git a/client/stream.cpp b/client/stream.cpp index 06c87a11..1000a720 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -76,19 +76,13 @@ void Stream::addChunk(msg::PcmChunk* chunk) while (chunks_.size() * chunk->duration().count() > 10000) chunks_.pop(); chunks_.push(shared_ptr(chunk)); - std::unique_lock lck(cvMutex_); - cv_.notify_one(); // logD << "new chunk: " << chunk->duration().count() << ", Chunks: " << chunks_.size() << "\n"; } bool Stream::waitForChunk(size_t ms) const { - if (!chunks_.empty()) - return true; - - std::unique_lock lck(cvMutex_); - return (cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !chunks_.empty(); })); + return chunks_.wait_for(std::chrono::milliseconds(ms)); } diff --git a/client/stream.h b/client/stream.h index 74c70b0b..a56af107 100644 --- a/client/stream.h +++ b/client/stream.h @@ -89,9 +89,6 @@ private: unsigned long playedFrames_; long correctAfterXFrames_; chronos::msec bufferMs_; - - mutable std::condition_variable cv_; - mutable std::mutex cvMutex_; }; diff --git a/common/queue.h b/common/queue.h index 7a36d06f..fbe2527f 100644 --- a/common/queue.h +++ b/common/queue.h @@ -34,9 +34,9 @@ public: { std::unique_lock mlock(mutex_); while (queue_.empty()) - { cond_.wait(mlock); - } + +// std::lock_guard lock(mutex_); auto val = queue_.front(); queue_.pop(); return val; @@ -48,20 +48,33 @@ public: while (queue_.empty()) cond_.wait(mlock); +// std::lock_guard lock(mutex_); return queue_.front(); } void abort_wait() { - abort_ = true; + { + std::lock_guard mlock(mutex_); + abort_ = true; + } cond_.notify_one(); } + bool wait_for(std::chrono::milliseconds timeout) const + { + std::unique_lock mlock(mutex_); + abort_ = false; + if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) + return false; + + return !queue_.empty() && !abort_; + } + bool try_pop(T& item, std::chrono::microseconds timeout) { std::unique_lock mlock(mutex_); abort_ = false; - if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) return false; @@ -83,32 +96,33 @@ public: { std::unique_lock mlock(mutex_); while (queue_.empty()) - { cond_.wait(mlock); - } + item = queue_.front(); queue_.pop(); } void push(const T& item) { - std::unique_lock mlock(mutex_); - queue_.push(item); - mlock.unlock(); + { + std::lock_guard mlock(mutex_); + queue_.push(item); + } cond_.notify_one(); } void push(T&& item) { - std::unique_lock mlock(mutex_); - queue_.push(std::move(item)); - mlock.unlock(); + { + std::lock_guard mlock(mutex_); + queue_.push(std::move(item)); + } cond_.notify_one(); } size_t size() const { - std::unique_lock mlock(mutex_); + std::lock_guard mlock(mutex_); return queue_.size(); } @@ -123,9 +137,9 @@ public: private: std::queue queue_; - std::atomic abort_; + mutable std::atomic abort_; mutable std::mutex mutex_; - std::condition_variable cond_; + mutable std::condition_variable cond_; };