Discard old chunks if not consumed (fixes #708)

This commit is contained in:
badaix 2020-10-11 11:11:49 +02:00
parent 8e9eb5c870
commit 136766412c
3 changed files with 30 additions and 5 deletions

View file

@ -89,6 +89,7 @@ void Stream::setBufferLen(size_t bufferLenMs)
void Stream::clearChunks() void Stream::clearChunks()
{ {
std::lock_guard<std::mutex> lock(mutex_);
while (chunks_.size() > 0) while (chunks_.size() > 0)
chunks_.pop(); chunks_.pop();
resetBuffers(); resetBuffers();
@ -102,11 +103,24 @@ void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
if (age > 5s + bufferMs_) if (age > 5s + bufferMs_)
return; return;
// LOG(TRACE, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, age: " << age.count() << " ms, Chunks: " << chunks_.size() << "\n";
auto resampled = resampler_->resample(std::move(chunk)); auto resampled = resampler_->resample(std::move(chunk));
if (resampled) if (resampled)
chunks_.push(move(resampled)); {
std::lock_guard<std::mutex> lock(mutex_);
recent_ = resampled;
chunks_.push(resampled);
std::shared_ptr<msg::PcmChunk> front_;
while (chunks_.front_copy(front_))
{
auto age = std::chrono::duration_cast<cs::msec>(TimeProvider::serverNow() - front_->start());
if ((age > 5s + bufferMs_) && chunks_.try_pop(front_))
LOG(TRACE, LOG_TAG) << "Oldest chunk too old: " << age.count() << " ms, removing. Chunks in queue left: " << chunks_.size() << "\n";
else
break;
}
}
// LOG(TRACE, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, age: " << age.count() << " ms, Chunks: " << chunks_.size() << "\n";
} }
@ -229,6 +243,7 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
return false; return false;
} }
std::lock_guard<std::mutex> lock(mutex_);
time_t now = time(nullptr); time_t now = time(nullptr);
if (!chunk_ && !chunks_.try_pop(chunk_)) if (!chunk_ && !chunks_.try_pop(chunk_))
{ {

View file

@ -110,6 +110,8 @@ private:
int frame_delta_; int frame_delta_;
// int64_t next_us_; // int64_t next_us_;
mutable std::mutex mutex_;
bool hard_sync_; bool hard_sync_;
}; };

View file

@ -103,8 +103,16 @@ public:
std::lock_guard<std::mutex> mlock(mutex_); std::lock_guard<std::mutex> mlock(mutex_);
if (queue_.empty()) if (queue_.empty())
return false; return false;
T t = queue_.back(); copy = queue_.back();
copy = t; return true;
}
bool front_copy(T& copy)
{
std::lock_guard<std::mutex> mlock(mutex_);
if (queue_.empty())
return false;
copy = queue_.front();
return true; return true;
} }