From 99e147c5aa751a30a1401aa5e1314c48bcb3d92c Mon Sep 17 00:00:00 2001 From: badaix Date: Sun, 16 Feb 2020 21:23:26 +0100 Subject: [PATCH] Remove timeout in getNextPlayerChunk --- client/player/alsa_player.cpp | 2 +- client/player/coreaudio_player.cpp | 2 +- client/stream.cpp | 122 ++++++++++++----------------- client/stream.hpp | 10 +-- common/queue.h | 16 ++-- 5 files changed, 63 insertions(+), 89 deletions(-) diff --git a/client/player/alsa_player.cpp b/client/player/alsa_player.cpp index 92946198..be9bcf82 100644 --- a/client/player/alsa_player.cpp +++ b/client/player/alsa_player.cpp @@ -253,7 +253,7 @@ void AlsaPlayer::worker() else { LOG(INFO) << "Failed to get chunk\n"; - while (active_ && !stream_->waitForChunk(100)) + while (active_ && !stream_->waitForChunk(100ms)) { LOG(DEBUG) << "Waiting for chunk\n"; if ((handle_ != nullptr) && (chronos::getTickCount() - lastChunkTick > 5000)) diff --git a/client/player/coreaudio_player.cpp b/client/player/coreaudio_player.cpp index 9479a565..7a24be9f 100644 --- a/client/player/coreaudio_player.cpp +++ b/client/player/coreaudio_player.cpp @@ -134,7 +134,7 @@ void CoreAudioPlayer::worker() { while (active_) { - if (pubStream_->waitForChunk(100)) + if (pubStream_->waitForChunk(100ms)) { try { diff --git a/client/stream.cpp b/client/stream.cpp index 51ff5b8f..bc52befd 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -1,20 +1,20 @@ /*** - This file is part of snapcast - Copyright (C) 2014-2020 Johannes Pohl + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. - You should have received a copy of the GNU General Public License - along with this program. If not, see . -***/ + You should have received a copy of the GNU General Public License + along with this program. If not, see . + ***/ #include "stream.hpp" #include "common/aixlog.hpp" @@ -70,6 +70,7 @@ Stream::Stream(const SampleFormat& in_format, const SampleFormat& out_format) if (error) { LOG(ERROR, LOG_TAG) << "Error soxr_create: " << error << "\n"; + soxr_ = nullptr; } // initialize the buffer with 20ms (~latency of the reampler) resample_buffer_.resize(format_.frameSize * ceil(format_.msRate()) * 20); @@ -93,7 +94,7 @@ void Stream::setRealSampleRate(double sampleRate) else { correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.)); - // LOG(DEBUG, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n"; + // LOG(TRACE, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n"; } } @@ -114,8 +115,16 @@ void Stream::clearChunks() void Stream::addChunk(unique_ptr chunk) { - while (chunks_.size() * chunk->duration().count() > 10000) - chunks_.pop(); + // drop chunks that should have been played 10s before. Just in case, this shouldn't happen. + while (!chunks_.empty()) + { + auto first_chunk = chunks_.front(); + cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - first_chunk->start()); + if (age > 10s + bufferMs_) + chunks_.pop(); + else + break; + } // chunks_.push(move(chunk)); // LOG(DEBUG, LOG_TAG) << "new chunk: " << chunk->duration().count() << ", Chunks: " << chunks_.size() << "\n"; @@ -141,7 +150,6 @@ void Stream::addChunk(unique_ptr chunk) if (error) { LOG(ERROR, LOG_TAG) << "Error soxr_process: " << error << "\n"; - // delete out; } else { @@ -185,7 +193,8 @@ void Stream::addChunk(unique_ptr chunk) { // buffer for resampled data too small, add space for 5ms resample_buffer_.resize(resample_buffer_.size() + format_.frameSize * ceil(format_.msRate()) * 5); - LOG(INFO, LOG_TAG) << "Resample buffer completely filled, adding space for 5ms; new buffer size: " << resample_buffer_.size() << " bytes\n"; + LOG(DEBUG, LOG_TAG) << "Resample buffer completely filled, adding space for 5ms; new buffer size: " << resample_buffer_.size() + << " bytes\n"; } // //LOG(TRACE, LOG_TAG) << "ts: " << out->timestamp.sec << "s, " << out->timestamp.usec/1000.f << " ms, duration: " << odone / format_.msRate() @@ -199,25 +208,21 @@ void Stream::addChunk(unique_ptr chunk) } -bool Stream::waitForChunk(size_t ms) const +bool Stream::waitForChunk(const std::chrono::milliseconds& timeout) const { - return chunks_.wait_for(std::chrono::milliseconds(ms)); + return chunks_.wait_for(timeout); } -cs::time_point_clk Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long frames) +void Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long frames) const { - if (!chunk_) - chunk_ = chunks_.pop(); - cs::time_point_clk tp = chunk_->start(); memset(outputBuffer, 0, frames * format_.frameSize); - return tp; } -cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames) +cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frames) { - if (!chunk_ && !chunks_.try_pop(chunk_, timeout)) + if (!chunk_ && !chunks_.try_pop(chunk_)) throw 0; cs::time_point_clk tp = chunk_->start(); @@ -225,14 +230,14 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec while (read < frames) { read += chunk_->readFrames(static_cast(outputBuffer) + read * format_.frameSize, frames - read); - if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_, timeout)) + if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_)) throw 0; } return tp; } -cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames, long framesCorrection) +cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frames, long framesCorrection) { if (framesCorrection < 0 && frames + framesCorrection <= 0) { @@ -240,14 +245,14 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec framesCorrection = -frames + 1; } - frame_delta_ -= framesCorrection; - if (framesCorrection == 0) - return getNextPlayerChunk(outputBuffer, timeout, frames); + return getNextPlayerChunk(outputBuffer, frames); + + frame_delta_ -= framesCorrection; long toRead = frames + framesCorrection; char* buffer = new char[toRead * format_.frameSize]; - cs::time_point_clk tp = getNextPlayerChunk(buffer, timeout, toRead); + cs::time_point_clk tp = getNextPlayerChunk(buffer, toRead); const auto max = framesCorrection < 0 ? frames : toRead; // Divide the buffer into one more slice than frames that need to be dropped. @@ -296,34 +301,6 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec return tp; } -/* -2020-01-12 20-25-26 [Info] Chunk: 7 7 11 15 179 120 -2020-01-12 20-25-27 [Info] Chunk: 6 6 8 15 212 122 -2020-01-12 20-25-28 [Info] Chunk: 6 6 7 12 245 123 -2020-01-12 20-25-29 [Info] Chunk: 5 6 6 9 279 117 -2020-01-12 20-25-30 [Info] Chunk: 4 5 6 8 312 117 -2020-01-12 20-25-30 [Error] Controller::onException: read_some: End of file -2020-01-12 20-25-30 [Error] Exception in Controller::worker(): read_some: End of file -2020-01-12 20-25-31 [Error] Exception in Controller::worker(): connect: Connection refused -2020-01-12 20-25-31 [Error] Error in socket shutdown: Transport endpoint is not connected -2020-01-12 20-25-32 [Error] Exception in Controller::worker(): connect: Connection refused -2020-01-12 20-25-32 [Error] Error in socket shutdown: Transport endpoint is not connected -^C2020-01-12 20-25-32 [Info] Received signal 2: Interrupt -2020-01-12 20-25-32 [Info] Stopping controller -2020-01-12 20-25-32 [Error] Error in socket shutdown: Bad file descriptor -2020-01-12 20-25-32 [Error] Exception: Invalid argument -2020-01-12 20-25-32 [Notice] daemon terminated. - -================================================================= -==22383==ERROR: LeakSanitizer: detected memory leaks - -Direct leak of 5756 byte(s) in 1 object(s) allocated from: - #0 0x7f3d60635602 in malloc (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x98602) - #1 0x448fc2 in Stream::getNextPlayerChunk(void*, std::chrono::duration > const&, unsigned long, long) -/home/johannes/Develop/snapcast/client/stream.cpp:163 - -SUMMARY: AddressSanitizer: 5756 byte(s) leaked in 1 allocation(s). -*/ void Stream::updateBuffers(int age) { @@ -350,14 +327,12 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT return false; } - if (!chunk_ && !chunks_.try_pop(chunk_, outputBufferDacTime)) + if (!chunk_ && !chunks_.try_pop(chunk_)) { LOG(INFO, LOG_TAG) << "no chunks available\n"; return false; } - cs::nsec chunk_duration = cs::nsec(static_cast(frames / format_.nsRate())); - /// we have a chunk /// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC /// age = 0 => play now @@ -368,6 +343,7 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT { if (hard_sync_) { + cs::nsec chunk_duration = cs::nsec(static_cast(frames / format_.nsRate())); cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast(chunk_duration).count() // << "\n"; @@ -384,18 +360,18 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT if (age.count() > 0) { // age > 0: the top of the stream is too old. We must fast foward. - while (chunks_.try_pop(chunk_, 0ms)) + while (chunks_.try_pop(chunk_)) { age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 - << ", chunk_duration: " << -std::chrono::duration_cast(chunk_duration).count() - << ", duration: " << chunk_->duration().count() << "\n"; - if (age <= 0ms) + << ", chunk_duration: " << -std::chrono::duration_cast(chunk_duration).count() + << ", duration: " << chunk_->duration().count() << "\n"; + if (age.count() <= 0) break; } } - if (age.count() < 0) + if (age.count() <= 0) { // the oldest chunk (top of the stream) can be played in this iteration // e.g. age = -20ms (=> should be played in 20ms) @@ -403,9 +379,9 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT // and can play 30ms of the stream size_t silent_frames = static_cast(-chunk_->format.nsRate() * std::chrono::duration_cast(age).count()); LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames - << ", age: " << std::chrono::duration_cast(age).count() / 1000. << "\n"; + << ", age: " << std::chrono::duration_cast(age).count() / 1000. << "\n"; getSilentPlayerChunk(outputBuffer, silent_frames); - getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), 0ms, frames - silent_frames); + getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), frames - silent_frames); hard_sync_ = false; resetBuffers(); @@ -428,8 +404,8 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT } } - cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, 0ms, frames, framesCorrection) - - bufferMs_ + outputBufferDacTime); + cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, frames, framesCorrection) - bufferMs_ + + outputBufferDacTime); setRealSampleRate(format_.rate); // check if we need a hard sync diff --git a/client/stream.hpp b/client/stream.hpp index 4ec22032..48429082 100644 --- a/client/stream.hpp +++ b/client/stream.hpp @@ -56,12 +56,12 @@ public: return format_; } - bool waitForChunk(size_t ms) const; + bool waitForChunk(const std::chrono::milliseconds& timeout) const; private: - chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames); - chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames, long framesCorrection); - chronos::time_point_clk getSilentPlayerChunk(void* outputBuffer, unsigned long frames); + chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, unsigned long frames); + chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, unsigned long frames, long framesCorrection); + void getSilentPlayerChunk(void* outputBuffer, unsigned long frames) const; void updateBuffers(int age); void resetBuffers(); @@ -72,8 +72,8 @@ private: Queue> chunks_; DoubleBuffer miniBuffer_; - DoubleBuffer buffer_; DoubleBuffer shortBuffer_; + DoubleBuffer buffer_; std::shared_ptr chunk_; int median_; diff --git a/common/queue.h b/common/queue.h index d3be5482..154b1557 100644 --- a/common/queue.h +++ b/common/queue.h @@ -59,7 +59,7 @@ public: cond_.notify_one(); } - bool wait_for(std::chrono::milliseconds timeout) const + bool wait_for(const std::chrono::microseconds& timeout) const { std::unique_lock mlock(mutex_); abort_ = false; @@ -69,12 +69,15 @@ public: return !queue_.empty() && !abort_; } - bool try_pop(T& item, std::chrono::microseconds timeout) + bool try_pop(T& item, const std::chrono::microseconds& timeout = std::chrono::microseconds(0)) { std::unique_lock mlock(mutex_); abort_ = false; - if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) - return false; + if (timeout.count() > 0) + { + if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) + return false; + } if (queue_.empty() || abort_) return false; @@ -85,11 +88,6 @@ public: return true; } - bool try_pop(T& item, std::chrono::milliseconds timeout) - { - return try_pop(item, std::chrono::duration_cast(timeout)); - } - void pop(T& item) { std::unique_lock mlock(mutex_);