diff --git a/client/player/alsa_player.cpp b/client/player/alsa_player.cpp
index 92946198..be9bcf82 100644
--- a/client/player/alsa_player.cpp
+++ b/client/player/alsa_player.cpp
@@ -253,7 +253,7 @@ void AlsaPlayer::worker()
else
{
LOG(INFO) << "Failed to get chunk\n";
- while (active_ && !stream_->waitForChunk(100))
+ while (active_ && !stream_->waitForChunk(100ms))
{
LOG(DEBUG) << "Waiting for chunk\n";
if ((handle_ != nullptr) && (chronos::getTickCount() - lastChunkTick > 5000))
diff --git a/client/player/coreaudio_player.cpp b/client/player/coreaudio_player.cpp
index 9479a565..7a24be9f 100644
--- a/client/player/coreaudio_player.cpp
+++ b/client/player/coreaudio_player.cpp
@@ -134,7 +134,7 @@ void CoreAudioPlayer::worker()
{
while (active_)
{
- if (pubStream_->waitForChunk(100))
+ if (pubStream_->waitForChunk(100ms))
{
try
{
diff --git a/client/stream.cpp b/client/stream.cpp
index 51ff5b8f..bc52befd 100644
--- a/client/stream.cpp
+++ b/client/stream.cpp
@@ -1,20 +1,20 @@
/***
- This file is part of snapcast
- Copyright (C) 2014-2020 Johannes Pohl
+ This file is part of snapcast
+ Copyright (C) 2014-2020 Johannes Pohl
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program. If not, see .
-***/
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see .
+ ***/
#include "stream.hpp"
#include "common/aixlog.hpp"
@@ -70,6 +70,7 @@ Stream::Stream(const SampleFormat& in_format, const SampleFormat& out_format)
if (error)
{
LOG(ERROR, LOG_TAG) << "Error soxr_create: " << error << "\n";
+ soxr_ = nullptr;
}
// initialize the buffer with 20ms (~latency of the reampler)
resample_buffer_.resize(format_.frameSize * ceil(format_.msRate()) * 20);
@@ -93,7 +94,7 @@ void Stream::setRealSampleRate(double sampleRate)
else
{
correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.));
- // LOG(DEBUG, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n";
+ // LOG(TRACE, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n";
}
}
@@ -114,8 +115,16 @@ void Stream::clearChunks()
void Stream::addChunk(unique_ptr chunk)
{
- while (chunks_.size() * chunk->duration().count() > 10000)
- chunks_.pop();
+ // drop chunks that should have been played 10s before. Just in case, this shouldn't happen.
+ while (!chunks_.empty())
+ {
+ auto first_chunk = chunks_.front();
+ cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - first_chunk->start());
+ if (age > 10s + bufferMs_)
+ chunks_.pop();
+ else
+ break;
+ }
// chunks_.push(move(chunk));
// LOG(DEBUG, LOG_TAG) << "new chunk: " << chunk->duration().count() << ", Chunks: " << chunks_.size() << "\n";
@@ -141,7 +150,6 @@ void Stream::addChunk(unique_ptr chunk)
if (error)
{
LOG(ERROR, LOG_TAG) << "Error soxr_process: " << error << "\n";
- // delete out;
}
else
{
@@ -185,7 +193,8 @@ void Stream::addChunk(unique_ptr chunk)
{
// buffer for resampled data too small, add space for 5ms
resample_buffer_.resize(resample_buffer_.size() + format_.frameSize * ceil(format_.msRate()) * 5);
- LOG(INFO, LOG_TAG) << "Resample buffer completely filled, adding space for 5ms; new buffer size: " << resample_buffer_.size() << " bytes\n";
+ LOG(DEBUG, LOG_TAG) << "Resample buffer completely filled, adding space for 5ms; new buffer size: " << resample_buffer_.size()
+ << " bytes\n";
}
// //LOG(TRACE, LOG_TAG) << "ts: " << out->timestamp.sec << "s, " << out->timestamp.usec/1000.f << " ms, duration: " << odone / format_.msRate()
@@ -199,25 +208,21 @@ void Stream::addChunk(unique_ptr chunk)
}
-bool Stream::waitForChunk(size_t ms) const
+bool Stream::waitForChunk(const std::chrono::milliseconds& timeout) const
{
- return chunks_.wait_for(std::chrono::milliseconds(ms));
+ return chunks_.wait_for(timeout);
}
-cs::time_point_clk Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long frames)
+void Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long frames) const
{
- if (!chunk_)
- chunk_ = chunks_.pop();
- cs::time_point_clk tp = chunk_->start();
memset(outputBuffer, 0, frames * format_.frameSize);
- return tp;
}
-cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames)
+cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frames)
{
- if (!chunk_ && !chunks_.try_pop(chunk_, timeout))
+ if (!chunk_ && !chunks_.try_pop(chunk_))
throw 0;
cs::time_point_clk tp = chunk_->start();
@@ -225,14 +230,14 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec
while (read < frames)
{
read += chunk_->readFrames(static_cast(outputBuffer) + read * format_.frameSize, frames - read);
- if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_, timeout))
+ if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_))
throw 0;
}
return tp;
}
-cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames, long framesCorrection)
+cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frames, long framesCorrection)
{
if (framesCorrection < 0 && frames + framesCorrection <= 0)
{
@@ -240,14 +245,14 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec
framesCorrection = -frames + 1;
}
- frame_delta_ -= framesCorrection;
-
if (framesCorrection == 0)
- return getNextPlayerChunk(outputBuffer, timeout, frames);
+ return getNextPlayerChunk(outputBuffer, frames);
+
+ frame_delta_ -= framesCorrection;
long toRead = frames + framesCorrection;
char* buffer = new char[toRead * format_.frameSize];
- cs::time_point_clk tp = getNextPlayerChunk(buffer, timeout, toRead);
+ cs::time_point_clk tp = getNextPlayerChunk(buffer, toRead);
const auto max = framesCorrection < 0 ? frames : toRead;
// Divide the buffer into one more slice than frames that need to be dropped.
@@ -296,34 +301,6 @@ cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec
return tp;
}
-/*
-2020-01-12 20-25-26 [Info] Chunk: 7 7 11 15 179 120
-2020-01-12 20-25-27 [Info] Chunk: 6 6 8 15 212 122
-2020-01-12 20-25-28 [Info] Chunk: 6 6 7 12 245 123
-2020-01-12 20-25-29 [Info] Chunk: 5 6 6 9 279 117
-2020-01-12 20-25-30 [Info] Chunk: 4 5 6 8 312 117
-2020-01-12 20-25-30 [Error] Controller::onException: read_some: End of file
-2020-01-12 20-25-30 [Error] Exception in Controller::worker(): read_some: End of file
-2020-01-12 20-25-31 [Error] Exception in Controller::worker(): connect: Connection refused
-2020-01-12 20-25-31 [Error] Error in socket shutdown: Transport endpoint is not connected
-2020-01-12 20-25-32 [Error] Exception in Controller::worker(): connect: Connection refused
-2020-01-12 20-25-32 [Error] Error in socket shutdown: Transport endpoint is not connected
-^C2020-01-12 20-25-32 [Info] Received signal 2: Interrupt
-2020-01-12 20-25-32 [Info] Stopping controller
-2020-01-12 20-25-32 [Error] Error in socket shutdown: Bad file descriptor
-2020-01-12 20-25-32 [Error] Exception: Invalid argument
-2020-01-12 20-25-32 [Notice] daemon terminated.
-
-=================================================================
-==22383==ERROR: LeakSanitizer: detected memory leaks
-
-Direct leak of 5756 byte(s) in 1 object(s) allocated from:
- #0 0x7f3d60635602 in malloc (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x98602)
- #1 0x448fc2 in Stream::getNextPlayerChunk(void*, std::chrono::duration > const&, unsigned long, long)
-/home/johannes/Develop/snapcast/client/stream.cpp:163
-
-SUMMARY: AddressSanitizer: 5756 byte(s) leaked in 1 allocation(s).
-*/
void Stream::updateBuffers(int age)
{
@@ -350,14 +327,12 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
return false;
}
- if (!chunk_ && !chunks_.try_pop(chunk_, outputBufferDacTime))
+ if (!chunk_ && !chunks_.try_pop(chunk_))
{
LOG(INFO, LOG_TAG) << "no chunks available\n";
return false;
}
- cs::nsec chunk_duration = cs::nsec(static_cast(frames / format_.nsRate()));
-
/// we have a chunk
/// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC
/// age = 0 => play now
@@ -368,6 +343,7 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
{
if (hard_sync_)
{
+ cs::nsec chunk_duration = cs::nsec(static_cast(frames / format_.nsRate()));
cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
// LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast(chunk_duration).count()
// << "\n";
@@ -384,18 +360,18 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
if (age.count() > 0)
{
// age > 0: the top of the stream is too old. We must fast foward.
- while (chunks_.try_pop(chunk_, 0ms))
+ while (chunks_.try_pop(chunk_))
{
age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000
- << ", chunk_duration: " << -std::chrono::duration_cast(chunk_duration).count()
- << ", duration: " << chunk_->duration().count() << "\n";
- if (age <= 0ms)
+ << ", chunk_duration: " << -std::chrono::duration_cast(chunk_duration).count()
+ << ", duration: " << chunk_->duration().count() << "\n";
+ if (age.count() <= 0)
break;
}
}
- if (age.count() < 0)
+ if (age.count() <= 0)
{
// the oldest chunk (top of the stream) can be played in this iteration
// e.g. age = -20ms (=> should be played in 20ms)
@@ -403,9 +379,9 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
// and can play 30ms of the stream
size_t silent_frames = static_cast(-chunk_->format.nsRate() * std::chrono::duration_cast(age).count());
LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames
- << ", age: " << std::chrono::duration_cast(age).count() / 1000. << "\n";
+ << ", age: " << std::chrono::duration_cast(age).count() / 1000. << "\n";
getSilentPlayerChunk(outputBuffer, silent_frames);
- getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), 0ms, frames - silent_frames);
+ getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), frames - silent_frames);
hard_sync_ = false;
resetBuffers();
@@ -428,8 +404,8 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
}
}
- cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, 0ms, frames, framesCorrection) -
- bufferMs_ + outputBufferDacTime);
+ cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, frames, framesCorrection) - bufferMs_ +
+ outputBufferDacTime);
setRealSampleRate(format_.rate);
// check if we need a hard sync
diff --git a/client/stream.hpp b/client/stream.hpp
index 4ec22032..48429082 100644
--- a/client/stream.hpp
+++ b/client/stream.hpp
@@ -56,12 +56,12 @@ public:
return format_;
}
- bool waitForChunk(size_t ms) const;
+ bool waitForChunk(const std::chrono::milliseconds& timeout) const;
private:
- chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames);
- chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames, long framesCorrection);
- chronos::time_point_clk getSilentPlayerChunk(void* outputBuffer, unsigned long frames);
+ chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, unsigned long frames);
+ chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, unsigned long frames, long framesCorrection);
+ void getSilentPlayerChunk(void* outputBuffer, unsigned long frames) const;
void updateBuffers(int age);
void resetBuffers();
@@ -72,8 +72,8 @@ private:
Queue> chunks_;
DoubleBuffer miniBuffer_;
- DoubleBuffer buffer_;
DoubleBuffer shortBuffer_;
+ DoubleBuffer buffer_;
std::shared_ptr chunk_;
int median_;
diff --git a/common/queue.h b/common/queue.h
index d3be5482..154b1557 100644
--- a/common/queue.h
+++ b/common/queue.h
@@ -59,7 +59,7 @@ public:
cond_.notify_one();
}
- bool wait_for(std::chrono::milliseconds timeout) const
+ bool wait_for(const std::chrono::microseconds& timeout) const
{
std::unique_lock mlock(mutex_);
abort_ = false;
@@ -69,12 +69,15 @@ public:
return !queue_.empty() && !abort_;
}
- bool try_pop(T& item, std::chrono::microseconds timeout)
+ bool try_pop(T& item, const std::chrono::microseconds& timeout = std::chrono::microseconds(0))
{
std::unique_lock mlock(mutex_);
abort_ = false;
- if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
- return false;
+ if (timeout.count() > 0)
+ {
+ if (!cond_.wait_for(mlock, timeout, [this] { return (!queue_.empty() || abort_); }))
+ return false;
+ }
if (queue_.empty() || abort_)
return false;
@@ -85,11 +88,6 @@ public:
return true;
}
- bool try_pop(T& item, std::chrono::milliseconds timeout)
- {
- return try_pop(item, std::chrono::duration_cast(timeout));
- }
-
void pop(T& item)
{
std::unique_lock mlock(mutex_);