diff --git a/client/stream.cpp b/client/stream.cpp index 746c117c..cd55ac74 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -89,6 +89,7 @@ void Stream::setBufferLen(size_t bufferLenMs) void Stream::clearChunks() { + std::lock_guard lock(mutex_); while (chunks_.size() > 0) chunks_.pop(); resetBuffers(); @@ -102,11 +103,24 @@ void Stream::addChunk(unique_ptr chunk) if (age > 5s + bufferMs_) return; - // LOG(TRACE, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, age: " << age.count() << " ms, Chunks: " << chunks_.size() << "\n"; - auto resampled = resampler_->resample(std::move(chunk)); if (resampled) - chunks_.push(move(resampled)); + { + std::lock_guard lock(mutex_); + recent_ = resampled; + chunks_.push(resampled); + + std::shared_ptr front_; + while (chunks_.front_copy(front_)) + { + auto age = std::chrono::duration_cast(TimeProvider::serverNow() - front_->start()); + if ((age > 5s + bufferMs_) && chunks_.try_pop(front_)) + LOG(TRACE, LOG_TAG) << "Oldest chunk too old: " << age.count() << " ms, removing. Chunks in queue left: " << chunks_.size() << "\n"; + else + break; + } + } + // LOG(TRACE, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, age: " << age.count() << " ms, Chunks: " << chunks_.size() << "\n"; } @@ -229,6 +243,7 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT return false; } + std::lock_guard lock(mutex_); time_t now = time(nullptr); if (!chunk_ && !chunks_.try_pop(chunk_)) { diff --git a/client/stream.hpp b/client/stream.hpp index 1d018a0b..4025e4e9 100644 --- a/client/stream.hpp +++ b/client/stream.hpp @@ -110,6 +110,8 @@ private: int frame_delta_; // int64_t next_us_; + mutable std::mutex mutex_; + bool hard_sync_; }; diff --git a/common/queue.h b/common/queue.h index 593e6537..fcb50488 100644 --- a/common/queue.h +++ b/common/queue.h @@ -103,8 +103,16 @@ public: std::lock_guard mlock(mutex_); if (queue_.empty()) return false; - T t = queue_.back(); - copy = t; + copy = queue_.back(); + return true; + } + + bool front_copy(T& copy) + { + std::lock_guard mlock(mutex_); + if (queue_.empty()) + return false; + copy = queue_.front(); return true; }