Remove timeout in getNextPlayerChunk

This commit is contained in:
badaix 2020-02-16 21:23:26 +01:00
parent 3e91984d3e
commit 99e147c5aa
5 changed files with 63 additions and 89 deletions

View file

@ -253,7 +253,7 @@ void AlsaPlayer::worker()
else else
{ {
LOG(INFO) << "Failed to get chunk\n"; LOG(INFO) << "Failed to get chunk\n";
while (active_ && !stream_->waitForChunk(100)) while (active_ && !stream_->waitForChunk(100ms))
{ {
LOG(DEBUG) << "Waiting for chunk\n"; LOG(DEBUG) << "Waiting for chunk\n";
if ((handle_ != nullptr) && (chronos::getTickCount() - lastChunkTick > 5000)) if ((handle_ != nullptr) && (chronos::getTickCount() - lastChunkTick > 5000))

View file

@ -134,7 +134,7 @@ void CoreAudioPlayer::worker()
{ {
while (active_) while (active_)
{ {
if (pubStream_->waitForChunk(100)) if (pubStream_->waitForChunk(100ms))
{ {
try try
{ {

View file

@ -1,20 +1,20 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or the Free Software Foundation, either version 3 of the License, or
(at your option) any later version. (at your option) any later version.
This program is distributed in the hope that it will be useful, This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details. GNU General Public License for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#include "stream.hpp" #include "stream.hpp"
#include "common/aixlog.hpp" #include "common/aixlog.hpp"
@ -70,6 +70,7 @@ Stream::Stream(const SampleFormat& in_format, const SampleFormat& out_format)
if (error) if (error)
{ {
LOG(ERROR, LOG_TAG) << "Error soxr_create: " << error << "\n"; LOG(ERROR, LOG_TAG) << "Error soxr_create: " << error << "\n";
soxr_ = nullptr;
} }
// initialize the buffer with 20ms (~latency of the reampler) // initialize the buffer with 20ms (~latency of the reampler)
resample_buffer_.resize(format_.frameSize * ceil(format_.msRate()) * 20); resample_buffer_.resize(format_.frameSize * ceil(format_.msRate()) * 20);
@ -93,7 +94,7 @@ void Stream::setRealSampleRate(double sampleRate)
else else
{ {
correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.)); correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.));
// LOG(DEBUG, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n"; // LOG(TRACE, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n";
} }
} }
@ -114,8 +115,16 @@ void Stream::clearChunks()
void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk) void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
{ {
while (chunks_.size() * chunk->duration<cs::msec>().count() > 10000) // drop chunks that should have been played 10s before. Just in case, this shouldn't happen.
chunks_.pop(); while (!chunks_.empty())
{
auto first_chunk = chunks_.front();
cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - first_chunk->start());
if (age > 10s + bufferMs_)
chunks_.pop();
else
break;
}
// chunks_.push(move(chunk)); // chunks_.push(move(chunk));
// LOG(DEBUG, LOG_TAG) << "new chunk: " << chunk->duration<cs::msec>().count() << ", Chunks: " << chunks_.size() << "\n"; // LOG(DEBUG, LOG_TAG) << "new chunk: " << chunk->duration<cs::msec>().count() << ", Chunks: " << chunks_.size() << "\n";
@ -141,7 +150,6 @@ void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
if (error) if (error)
{ {
LOG(ERROR, LOG_TAG) << "Error soxr_process: " << error << "\n"; LOG(ERROR, LOG_TAG) << "Error soxr_process: " << error << "\n";
// delete out;
} }
else else
{ {
@ -185,7 +193,8 @@ void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
{ {
// buffer for resampled data too small, add space for 5ms // buffer for resampled data too small, add space for 5ms
resample_buffer_.resize(resample_buffer_.size() + format_.frameSize * ceil(format_.msRate()) * 5); resample_buffer_.resize(resample_buffer_.size() + format_.frameSize * ceil(format_.msRate()) * 5);
LOG(INFO, LOG_TAG) << "Resample buffer completely filled, adding space for 5ms; new buffer size: " << resample_buffer_.size() << " bytes\n"; LOG(DEBUG, LOG_TAG) << "Resample buffer completely filled, adding space for 5ms; new buffer size: " << resample_buffer_.size()
<< " bytes\n";
} }
// //LOG(TRACE, LOG_TAG) << "ts: " << out->timestamp.sec << "s, " << out->timestamp.usec/1000.f << " ms, duration: " << odone / format_.msRate() // //LOG(TRACE, LOG_TAG) << "ts: " << out->timestamp.sec << "s, " << out->timestamp.usec/1000.f << " ms, duration: " << odone / format_.msRate()
@ -199,25 +208,21 @@ void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
} }
bool Stream::waitForChunk(size_t ms) const bool Stream::waitForChunk(const std::chrono::milliseconds& timeout) const
{ {
return chunks_.wait_for(std::chrono::milliseconds(ms)); return chunks_.wait_for(timeout);
} }
cs::time_point_clk Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long frames) void Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long frames) const
{ {
if (!chunk_)
chunk_ = chunks_.pop();
cs::time_point_clk tp = chunk_->start();
memset(outputBuffer, 0, frames * format_.frameSize); memset(outputBuffer, 0, frames * format_.frameSize);
return tp;
} }
cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames) cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frames)
{ {
if (!chunk_ && !chunks_.try_pop(chunk_, timeout)) if (!chunk_ && !chunks_.try_pop(chunk_))
throw 0; throw 0;
cs::time_point_clk tp = chunk_->start(); cs::time_point_clk tp = chunk_->start();
@ -225,14 +230,14 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec
while (read < frames) while (read < frames)
{ {
read += chunk_->readFrames(static_cast<char*>(outputBuffer) + read * format_.frameSize, frames - read); read += chunk_->readFrames(static_cast<char*>(outputBuffer) + read * format_.frameSize, frames - read);
if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_, timeout)) if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_))
throw 0; throw 0;
} }
return tp; return tp;
} }
cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames, long framesCorrection) cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frames, long framesCorrection)
{ {
if (framesCorrection < 0 && frames + framesCorrection <= 0) if (framesCorrection < 0 && frames + framesCorrection <= 0)
{ {
@ -240,14 +245,14 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec
framesCorrection = -frames + 1; framesCorrection = -frames + 1;
} }
frame_delta_ -= framesCorrection;
if (framesCorrection == 0) if (framesCorrection == 0)
return getNextPlayerChunk(outputBuffer, timeout, frames); return getNextPlayerChunk(outputBuffer, frames);
frame_delta_ -= framesCorrection;
long toRead = frames + framesCorrection; long toRead = frames + framesCorrection;
char* buffer = new char[toRead * format_.frameSize]; char* buffer = new char[toRead * format_.frameSize];
cs::time_point_clk tp = getNextPlayerChunk(buffer, timeout, toRead); cs::time_point_clk tp = getNextPlayerChunk(buffer, toRead);
const auto max = framesCorrection < 0 ? frames : toRead; const auto max = framesCorrection < 0 ? frames : toRead;
// Divide the buffer into one more slice than frames that need to be dropped. // Divide the buffer into one more slice than frames that need to be dropped.
@ -296,34 +301,6 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec
return tp; return tp;
} }
/*
2020-01-12 20-25-26 [Info] Chunk: 7 7 11 15 179 120
2020-01-12 20-25-27 [Info] Chunk: 6 6 8 15 212 122
2020-01-12 20-25-28 [Info] Chunk: 6 6 7 12 245 123
2020-01-12 20-25-29 [Info] Chunk: 5 6 6 9 279 117
2020-01-12 20-25-30 [Info] Chunk: 4 5 6 8 312 117
2020-01-12 20-25-30 [Error] Controller::onException: read_some: End of file
2020-01-12 20-25-30 [Error] Exception in Controller::worker(): read_some: End of file
2020-01-12 20-25-31 [Error] Exception in Controller::worker(): connect: Connection refused
2020-01-12 20-25-31 [Error] Error in socket shutdown: Transport endpoint is not connected
2020-01-12 20-25-32 [Error] Exception in Controller::worker(): connect: Connection refused
2020-01-12 20-25-32 [Error] Error in socket shutdown: Transport endpoint is not connected
^C2020-01-12 20-25-32 [Info] Received signal 2: Interrupt
2020-01-12 20-25-32 [Info] Stopping controller
2020-01-12 20-25-32 [Error] Error in socket shutdown: Bad file descriptor
2020-01-12 20-25-32 [Error] Exception: Invalid argument
2020-01-12 20-25-32 [Notice] daemon terminated.
=================================================================
==22383==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 5756 byte(s) in 1 object(s) allocated from:
#0 0x7f3d60635602 in malloc (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x98602)
#1 0x448fc2 in Stream::getNextPlayerChunk(void*, std::chrono::duration<long, std::ratio<1l, 1000000l> > const&, unsigned long, long)
/home/johannes/Develop/snapcast/client/stream.cpp:163
SUMMARY: AddressSanitizer: 5756 byte(s) leaked in 1 allocation(s).
*/
void Stream::updateBuffers(int age) void Stream::updateBuffers(int age)
{ {
@ -350,14 +327,12 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
return false; return false;
} }
if (!chunk_ && !chunks_.try_pop(chunk_, outputBufferDacTime)) if (!chunk_ && !chunks_.try_pop(chunk_))
{ {
LOG(INFO, LOG_TAG) << "no chunks available\n"; LOG(INFO, LOG_TAG) << "no chunks available\n";
return false; return false;
} }
cs::nsec chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
/// we have a chunk /// we have a chunk
/// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC /// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC
/// age = 0 => play now /// age = 0 => play now
@ -368,6 +343,7 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
{ {
if (hard_sync_) if (hard_sync_)
{ {
cs::nsec chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
// LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast<chrono::milliseconds>(chunk_duration).count() // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast<chrono::milliseconds>(chunk_duration).count()
// << "\n"; // << "\n";
@ -384,18 +360,18 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
if (age.count() > 0) if (age.count() > 0)
{ {
// age > 0: the top of the stream is too old. We must fast foward. // age > 0: the top of the stream is too old. We must fast foward.
while (chunks_.try_pop(chunk_, 0ms)) while (chunks_.try_pop(chunk_))
{ {
age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000
<< ", chunk_duration: " << -std::chrono::duration_cast<std::chrono::milliseconds>(chunk_duration).count() << ", chunk_duration: " << -std::chrono::duration_cast<std::chrono::milliseconds>(chunk_duration).count()
<< ", duration: " << chunk_->duration<std::chrono::milliseconds>().count() << "\n"; << ", duration: " << chunk_->duration<std::chrono::milliseconds>().count() << "\n";
if (age <= 0ms) if (age.count() <= 0)
break; break;
} }
} }
if (age.count() < 0) if (age.count() <= 0)
{ {
// the oldest chunk (top of the stream) can be played in this iteration // the oldest chunk (top of the stream) can be played in this iteration
// e.g. age = -20ms (=> should be played in 20ms) // e.g. age = -20ms (=> should be played in 20ms)
@ -403,9 +379,9 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
// and can play 30ms of the stream // and can play 30ms of the stream
size_t silent_frames = static_cast<size_t>(-chunk_->format.nsRate() * std::chrono::duration_cast<cs::nsec>(age).count()); size_t silent_frames = static_cast<size_t>(-chunk_->format.nsRate() * std::chrono::duration_cast<cs::nsec>(age).count());
LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames
<< ", age: " << std::chrono::duration_cast<cs::usec>(age).count() / 1000. << "\n"; << ", age: " << std::chrono::duration_cast<cs::usec>(age).count() / 1000. << "\n";
getSilentPlayerChunk(outputBuffer, silent_frames); getSilentPlayerChunk(outputBuffer, silent_frames);
getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), 0ms, frames - silent_frames); getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), frames - silent_frames);
hard_sync_ = false; hard_sync_ = false;
resetBuffers(); resetBuffers();
@ -428,8 +404,8 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
} }
} }
cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, 0ms, frames, framesCorrection) - cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, frames, framesCorrection) - bufferMs_ +
bufferMs_ + outputBufferDacTime); outputBufferDacTime);
setRealSampleRate(format_.rate); setRealSampleRate(format_.rate);
// check if we need a hard sync // check if we need a hard sync

View file

@ -56,12 +56,12 @@ public:
return format_; return format_;
} }
bool waitForChunk(size_t ms) const; bool waitForChunk(const std::chrono::milliseconds& timeout) const;
private: private:
chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames); chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, unsigned long frames);
chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames, long framesCorrection); chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, unsigned long frames, long framesCorrection);
chronos::time_point_clk getSilentPlayerChunk(void* outputBuffer, unsigned long frames); void getSilentPlayerChunk(void* outputBuffer, unsigned long frames) const;
void updateBuffers(int age); void updateBuffers(int age);
void resetBuffers(); void resetBuffers();
@ -72,8 +72,8 @@ private:
Queue<std::shared_ptr<msg::PcmChunk>> chunks_; Queue<std::shared_ptr<msg::PcmChunk>> chunks_;
DoubleBuffer<chronos::usec::rep> miniBuffer_; DoubleBuffer<chronos::usec::rep> miniBuffer_;
DoubleBuffer<chronos::usec::rep> buffer_;
DoubleBuffer<chronos::usec::rep> shortBuffer_; DoubleBuffer<chronos::usec::rep> shortBuffer_;
DoubleBuffer<chronos::usec::rep> buffer_;
std::shared_ptr<msg::PcmChunk> chunk_; std::shared_ptr<msg::PcmChunk> chunk_;
int median_; int median_;

View file

@ -59,7 +59,7 @@ public:
cond_.notify_one(); cond_.notify_one();
} }
bool wait_for(std::chrono::milliseconds timeout) const bool wait_for(const std::chrono::microseconds& timeout) const
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
abort_ = false; abort_ = false;
@ -69,12 +69,15 @@ public:
return !queue_.empty() && !abort_; return !queue_.empty() && !abort_;
} }
bool try_pop(T& item, std::chrono::microseconds timeout) bool try_pop(T& item, const std::chrono::microseconds& timeout = std::chrono::microseconds(0))
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
abort_ = false; abort_ = false;
if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); })) if (timeout.count() > 0)
return false; {
if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
return false;
}
if (queue_.empty() || abort_) if (queue_.empty() || abort_)
return false; return false;
@ -85,11 +88,6 @@ public:
return true; return true;
} }
bool try_pop(T& item, std::chrono::milliseconds timeout)
{
return try_pop(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout));
}
void pop(T& item) void pop(T& item)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);