cleaned up locking

This commit is contained in:
badaix 2016-11-11 12:24:55 +01:00
parent 3819fb2355
commit bc1d96ef89
6 changed files with 44 additions and 37 deletions

View file

@ -145,13 +145,10 @@ shared_ptr<msg::SerializedMessage> ClientConnection::sendRequest(const msg::Base
// logO << "Req: " << message->id << "\n"; // logO << "Req: " << message->id << "\n";
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId_)); shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId_));
{
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests_.insert(pendingRequest); pendingRequests_.insert(pendingRequest);
}
std::unique_lock<std::mutex> lck(requestMutex_);
send(message); send(message);
if (pendingRequest->cv.wait_for(lck, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) if (pendingRequest->cv.wait_for(mlock, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{ {
response = pendingRequest->response; response = pendingRequest->response;
sumTimeout_ = chronos::msec(0); sumTimeout_ = chronos::msec(0);
@ -164,10 +161,7 @@ shared_ptr<msg::SerializedMessage> ClientConnection::sendRequest(const msg::Base
if (sumTimeout_ > chronos::sec(10)) if (sumTimeout_ > chronos::sec(10))
throw SnapException("sum timeout exceeded 10s"); throw SnapException("sum timeout exceeded 10s");
} }
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests_.erase(pendingRequest); pendingRequests_.erase(pendingRequest);
}
return response; return response;
} }
@ -198,7 +192,7 @@ void ClientConnection::getNextMessage()
req->response->message = baseMessage; req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size); req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size); memcpy(req->response->buffer, &buffer[0], baseMessage.size);
std::unique_lock<std::mutex> lck(requestMutex_); mlock.unlock();
req->cv.notify_one(); req->cv.notify_one();
return; return;
} }

View file

@ -113,7 +113,6 @@ protected:
std::atomic<bool> connected_; std::atomic<bool> connected_;
MessageReceiver* messageReceiver_; MessageReceiver* messageReceiver_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
mutable std::mutex requestMutex_;
std::set<std::shared_ptr<PendingRequest>> pendingRequests_; std::set<std::shared_ptr<PendingRequest>> pendingRequests_;
uint16_t reqId_; uint16_t reqId_;
std::string host_; std::string host_;

View file

@ -196,6 +196,15 @@ void error_callback(const FLAC__StreamDecoder *decoder, FLAC__StreamDecoderError
{ {
(void)decoder, (void)client_data; (void)decoder, (void)client_data;
logS(kLogErr) << "Got error callback: " << FLAC__StreamDecoderErrorStatusString[status] << "\n"; logS(kLogErr) << "Got error callback: " << FLAC__StreamDecoderErrorStatusString[status] << "\n";
/// TODO, see issue #120:
// Thu Nov 10 07:26:44 2016 daemon.warn dnsmasq-dhcp[1194]: no address range available for DHCP request via wlan0
// Thu Nov 10 07:54:39 2016 daemon.err snapclient[1158]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC
// Thu Nov 10 07:54:39 2016 daemon.err snapclient[1158]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC
//
// and:
// Oct 27 17:37:38 kitchen snapclient[869]: Connected to 192.168.222.10
// Oct 27 17:47:13 kitchen snapclient[869]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_UNPARSEABLE_STREAM
// Oct 27 17:47:13 kitchen snapclient[869]: Got error callback: FLAC__STREAM_DECODER_ERROR_STATUS_LOST_SYNC
} }

View file

@ -76,19 +76,13 @@ void Stream::addChunk(msg::PcmChunk* chunk)
while (chunks_.size() * chunk->duration<cs::msec>().count() > 10000) while (chunks_.size() * chunk->duration<cs::msec>().count() > 10000)
chunks_.pop(); chunks_.pop();
chunks_.push(shared_ptr<msg::PcmChunk>(chunk)); chunks_.push(shared_ptr<msg::PcmChunk>(chunk));
std::unique_lock<std::mutex> lck(cvMutex_);
cv_.notify_one();
// logD << "new chunk: " << chunk->duration<cs::msec>().count() << ", Chunks: " << chunks_.size() << "\n"; // logD << "new chunk: " << chunk->duration<cs::msec>().count() << ", Chunks: " << chunks_.size() << "\n";
} }
bool Stream::waitForChunk(size_t ms) const bool Stream::waitForChunk(size_t ms) const
{ {
if (!chunks_.empty()) return chunks_.wait_for(std::chrono::milliseconds(ms));
return true;
std::unique_lock<std::mutex> lck(cvMutex_);
return (cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !chunks_.empty(); }));
} }

View file

@ -89,9 +89,6 @@ private:
unsigned long playedFrames_; unsigned long playedFrames_;
long correctAfterXFrames_; long correctAfterXFrames_;
chronos::msec bufferMs_; chronos::msec bufferMs_;
mutable std::condition_variable cv_;
mutable std::mutex cvMutex_;
}; };

View file

@ -34,9 +34,9 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty()) while (queue_.empty())
{
cond_.wait(mlock); cond_.wait(mlock);
}
// std::lock_guard<std::mutex> lock(mutex_);
auto val = queue_.front(); auto val = queue_.front();
queue_.pop(); queue_.pop();
return val; return val;
@ -48,20 +48,33 @@ public:
while (queue_.empty()) while (queue_.empty())
cond_.wait(mlock); cond_.wait(mlock);
// std::lock_guard<std::mutex> lock(mutex_);
return queue_.front(); return queue_.front();
} }
void abort_wait() void abort_wait()
{ {
{
std::lock_guard<std::mutex> mlock(mutex_);
abort_ = true; abort_ = true;
}
cond_.notify_one(); cond_.notify_one();
} }
bool wait_for(std::chrono::milliseconds timeout) const
{
std::unique_lock<std::mutex> mlock(mutex_);
abort_ = false;
if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
return false;
return !queue_.empty() && !abort_;
}
bool try_pop(T& item, std::chrono::microseconds timeout) bool try_pop(T& item, std::chrono::microseconds timeout)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
abort_ = false; abort_ = false;
if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
return false; return false;
@ -83,32 +96,33 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty()) while (queue_.empty())
{
cond_.wait(mlock); cond_.wait(mlock);
}
item = queue_.front(); item = queue_.front();
queue_.pop(); queue_.pop();
} }
void push(const T& item) void push(const T& item)
{ {
std::unique_lock<std::mutex> mlock(mutex_); {
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push(item); queue_.push(item);
mlock.unlock(); }
cond_.notify_one(); cond_.notify_one();
} }
void push(T&& item) void push(T&& item)
{ {
std::unique_lock<std::mutex> mlock(mutex_); {
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push(std::move(item)); queue_.push(std::move(item));
mlock.unlock(); }
cond_.notify_one(); cond_.notify_one();
} }
size_t size() const size_t size() const
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::lock_guard<std::mutex> mlock(mutex_);
return queue_.size(); return queue_.size();
} }
@ -123,9 +137,9 @@ public:
private: private:
std::queue<T> queue_; std::queue<T> queue_;
std::atomic<bool> abort_; mutable std::atomic<bool> abort_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::condition_variable cond_; mutable std::condition_variable cond_;
}; };