diff --git a/client/stream.cpp b/client/stream.cpp index 71ea8032..51ff5b8f 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -31,7 +31,8 @@ static constexpr auto kCorrectionBegin = 100us; Stream::Stream(const SampleFormat& in_format, const SampleFormat& out_format) - : in_format_(in_format), sleep_(0), median_(0), shortMedian_(0), lastUpdate_(0), playedFrames_(0), bufferMs_(cs::msec(500)), soxr_(nullptr), frame_delta_(0) + : in_format_(in_format), median_(0), shortMedian_(0), lastUpdate_(0), playedFrames_(0), bufferMs_(cs::msec(500)), soxr_(nullptr), frame_delta_(0), + hard_sync_(true) { buffer_.setSize(500); shortBuffer_.setSize(100); @@ -86,10 +87,14 @@ Stream::~Stream() void Stream::setRealSampleRate(double sampleRate) { if (sampleRate == format_.rate) + { correctAfterXFrames_ = 0; + } else + { correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.)); - // LOG(DEBUG, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n"; + // LOG(DEBUG, LOG_TAG) << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n"; + } } @@ -210,44 +215,6 @@ cs::time_point_clk Stream::getSilentPlayerChunk(void* outputBuffer, unsigned lon } -/* -time_point_ms Stream::seekTo(const time_point_ms& to) -{ - if (!chunk) - chunk_ = chunks_.pop(); -// time_point_ms tp = chunk_->timePoint(); - while (to > chunk_->timePoint())// + std::chrono::milliseconds((long int)chunk_->getDuration()))// - { - chunk_ = chunks_.pop(); - LOG(DEBUG, LOG_TAG) << "\nto: " << Chunk::getAge(to) << "\t chunk: " << Chunk::getAge(chunk_->timePoint()) << "\n"; - LOG(DEBUG, LOG_TAG) << "diff: " << std::chrono::duration_cast((to - chunk_->timePoint())).count() << "\n"; - } - chunk_->seek(std::chrono::duration_cast(to - chunk_->timePoint()).count() * format_.msRate()); - return chunk_->timePoint(); -} -*/ - -/* -time_point_clk Stream::seek(long ms) -{ - if (!chunk) - chunk_ = chunks_.pop(); - - if (ms <= 0) - return chunk_->start(); - -// time_point_ms tp = chunk_->timePoint(); - while (ms > chunk_->duration().count()) - { - chunk_ = chunks_.pop(); - ms -= min(ms, (long)chunk_->durationLeft().count()); - } - chunk_->seek(ms * format_.msRate()); - return chunk_->start(); -} -*/ - - cs::time_point_clk Stream::getNextPlayerChunk(void* outputBuffer, const cs::usec& timeout, unsigned long frames) { if (!chunk_ && !chunks_.try_pop(chunk_, timeout)) @@ -380,164 +347,124 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT { LOG(INFO, LOG_TAG) << "outputBufferDacTime > bufferMs: " << cs::duration(outputBufferDacTime) << " > " << cs::duration(bufferMs_) << "\n"; - sleep_ = cs::usec(0); return false; } if (!chunk_ && !chunks_.try_pop(chunk_, outputBufferDacTime)) { - // LOG(INFO, LOG_TAG) << "no chunks available\n"; - sleep_ = cs::usec(0); + LOG(INFO, LOG_TAG) << "no chunks available\n"; return false; } - playedFrames_ += frames; + cs::nsec chunk_duration = cs::nsec(static_cast(frames / format_.nsRate())); /// we have a chunk /// age = chunk age (server now - rec time: some positive value) - buffer (e.g. 1000ms) + time to DAC /// age = 0 => play now - /// age < 0 => play in -age - /// age > 0 => too old - cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; - // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << "\n"; - if ((sleep_.count() == 0) && (cs::abs(age) > cs::msec(200))) - { - LOG(INFO, LOG_TAG) << "age > 200: " << cs::duration(age) << "\n"; - sleep_ = age; - } + /// age < 0 => play in -age => wait for a while, play silence in the meantime + /// age > 0 => too old => throw them away try { - - // LOG(DEBUG, LOG_TAG) << "frames: " << frames << "\tms: " << frames*2 / PLAYER_CHUNK_MS_SIZE << "\t" << PLAYER_CHUNK_SIZE << "\n"; - cs::nsec bufferDuration = cs::nsec(static_cast(frames / format_.nsRate())); - // LOG(DEBUG, LOG_TAG) << "buffer duration: " << bufferDuration.count() << "\n"; - - cs::usec correction = cs::usec(0); - if (sleep_.count() != 0) + if (hard_sync_) { - resetBuffers(); - if (sleep_ < -bufferDuration / 2) + cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; + // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast(chunk_duration).count() + // << "\n"; + if (age < -chunk_duration) { - LOG(INFO, LOG_TAG) << "sleep < -bufferDuration/2: " << cs::duration(sleep_) << " < " << -cs::duration(bufferDuration) / 2 - << ", "; - // We're early: not enough chunks_. play silence. Reference chunk_ is the oldest (front) one - sleep_ = - chrono::duration_cast(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, frames) - bufferMs_ + outputBufferDacTime); - LOG(INFO, LOG_TAG) << "sleep: " << cs::duration(sleep_) << "\n"; - if (sleep_ < -bufferDuration / 2) - return true; - } - else if (sleep_ > bufferDuration / 2) - { - LOG(INFO, LOG_TAG) << "sleep > bufferDuration/2: " << cs::duration(sleep_) << " > " << cs::duration(bufferDuration) / 2 - << "\n"; - // We're late: discard oldest chunks - while (sleep_ > chunk_->duration()) - { - LOG(INFO, LOG_TAG) << "sleep > chunkDuration: " << cs::duration(sleep_) << " > " << chunk_->duration().count() - << ", chunks: " << chunks_.size() << ", out: " << cs::duration(outputBufferDacTime) - << ", needed: " << cs::duration(bufferDuration) << "\n"; - sleep_ = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start() - bufferMs_ + outputBufferDacTime); - if (!chunks_.try_pop(chunk_, outputBufferDacTime)) - { - LOG(INFO, LOG_TAG) << "no chunks available\n"; - chunk_ = nullptr; - sleep_ = cs::usec(0); - return false; - } - } - } - - // out of sync, can be corrected by playing faster/slower - if (sleep_ < -cs::usec(100)) - { - sleep_ += cs::usec(100); - correction = -cs::usec(100); - } - else if (sleep_ > cs::usec(100)) - { - sleep_ -= cs::usec(100); - correction = cs::usec(100); + // the oldest chunk (top of the stream) is too young for the buffer + // e.g. age = -100ms (=> should be played in 100ms) + // but the requested chunk duration is 50ms, so there is not data in this iteration available + getSilentPlayerChunk(outputBuffer, frames); + return true; } else { - LOG(INFO, LOG_TAG) << "Sleep " << cs::duration(sleep_) << "\n"; - correction = sleep_; - sleep_ = cs::usec(0); + if (age.count() > 0) + { + // age > 0: the top of the stream is too old. We must fast foward. + while (chunks_.try_pop(chunk_, 0ms)) + { + age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; + LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 + << ", chunk_duration: " << -std::chrono::duration_cast(chunk_duration).count() + << ", duration: " << chunk_->duration().count() << "\n"; + if (age <= 0ms) + break; + } + } + + if (age.count() < 0) + { + // the oldest chunk (top of the stream) can be played in this iteration + // e.g. age = -20ms (=> should be played in 20ms) + // and the current chunk duration is 50ms, so we need to play 20ms silence (as we don't have data) + // and can play 30ms of the stream + size_t silent_frames = static_cast(-chunk_->format.nsRate() * std::chrono::duration_cast(age).count()); + LOG(DEBUG, LOG_TAG) << "Silent frames: " << silent_frames << ", frames: " << frames + << ", age: " << std::chrono::duration_cast(age).count() / 1000. << "\n"; + getSilentPlayerChunk(outputBuffer, silent_frames); + getNextPlayerChunk((char*)outputBuffer + (chunk_->format.frameSize * silent_frames), 0ms, frames - silent_frames); + + hard_sync_ = false; + resetBuffers(); + return true; + } + return false; } } - // framesCorrection = number of frames to be read more or less to get in-sync - long framesCorrection = correction.count() * format_.usRate(); - // sample rate correction - if ((correctAfterXFrames_ != 0) && (playedFrames_ >= (unsigned long)abs(correctAfterXFrames_))) + // framesCorrection = number of frames to be read more or less to get in-sync + long framesCorrection = 0; + if (correctAfterXFrames_ != 0) { - framesCorrection += (correctAfterXFrames_ > 0) ? 1 : -1; - playedFrames_ = 0; //-= abs(correctAfterXFrames_); + playedFrames_ += frames; + if (playedFrames_ >= (unsigned long)abs(correctAfterXFrames_)) + { + framesCorrection = static_cast(playedFrames_) / correctAfterXFrames_; + playedFrames_ %= abs(correctAfterXFrames_); + } } - age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, outputBufferDacTime, frames, framesCorrection) - - bufferMs_ + outputBufferDacTime); + cs::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, 0ms, frames, framesCorrection) - + bufferMs_ + outputBufferDacTime); setRealSampleRate(format_.rate); - if (sleep_.count() == 0) + // check if we need a hard sync + if (buffer_.full() && (cs::usec(abs(median_)) > cs::msec(2))) { - if (buffer_.full()) - { - if (cs::usec(abs(median_)) > cs::msec(2)) - { - LOG(INFO, LOG_TAG) << "pBuffer->full() && (abs(median_) > 2): " << median_ << "\n"; - sleep_ = cs::usec(median_); - } - // else if (cs::usec(median_) > cs::usec(300)) - // { - // setRealSampleRate(format_.rate - format_.rate / 1000); - // } - // else if (cs::usec(median_) < -cs::usec(300)) - // { - // setRealSampleRate(format_.rate + format_.rate / 1000); - // } - } - else if (shortBuffer_.full()) - { - if (cs::usec(abs(shortMedian_)) > cs::msec(5)) - { - LOG(INFO, LOG_TAG) << "pShortBuffer->full() && (abs(shortMedian_) > 5): " << shortMedian_ << "\n"; - sleep_ = cs::usec(shortMedian_); - } - // else - // { - // setRealSampleRate(format_.rate + -shortMedian_ / 100); - // } - } - else if (miniBuffer_.full() && (cs::usec(abs(miniBuffer_.median())) > cs::msec(50))) - { - LOG(INFO, LOG_TAG) << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer_.median() << "\n"; - sleep_ = cs::usec(static_cast(miniBuffer_.mean())); - } + LOG(INFO, LOG_TAG) << "pBuffer->full() && (abs(median_) > 2): " << median_ << "\n"; + hard_sync_ = true; } - - if (sleep_.count() != 0) + else if (shortBuffer_.full() && (cs::usec(abs(shortMedian_)) > cs::msec(5))) { - static int lastAge(0); - int msAge = cs::duration(age); - if (lastAge != msAge) - { - lastAge = msAge; - LOG(INFO, LOG_TAG) << "Sleep " << cs::duration(sleep_) << ", age: " << msAge - << ", bufferDuration: " << cs::duration(bufferDuration) << "\n"; - } + LOG(INFO, LOG_TAG) << "pShortBuffer->full() && (abs(shortMedian_) > 5): " << shortMedian_ << "\n"; + hard_sync_ = true; + } + else if (miniBuffer_.full() && (cs::usec(abs(miniBuffer_.median())) > cs::msec(50))) + { + LOG(INFO, LOG_TAG) << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer_.median() << "\n"; + hard_sync_ = true; + } + else if (cs::abs(age) > 500ms) + { + LOG(INFO, LOG_TAG) << "abs(age > 500): " << cs::abs(age).count() << "\n"; + hard_sync_ = true; } else if (shortBuffer_.full()) { + // No hard sync needed + // Check if we need a samplerate correction (change playback speed (soft sync)) auto miniMedian = miniBuffer_.median(); if ((cs::usec(shortMedian_) > kCorrectionBegin) && (cs::usec(miniMedian) > cs::usec(50)) && (cs::usec(age) > cs::usec(50))) { double rate = (shortMedian_ / 100) * 0.00005; rate = 1.0 - std::min(rate, 0.0005); // LOG(INFO, LOG_TAG) << "Rate: " << rate << "\n"; + // we are late (age > 0), this means we are not playing fast enough + // => the real sample rate seems to be lower, we have to drop some frames setRealSampleRate(format_.rate * rate); // 0.9999); } else if ((cs::usec(shortMedian_) < -kCorrectionBegin) && (cs::usec(miniMedian) < -cs::usec(50)) && (cs::usec(age) < -cs::usec(50))) @@ -545,6 +472,8 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT double rate = (-shortMedian_ / 100) * 0.00005; rate = 1.0 + std::min(rate, 0.0005); // LOG(INFO, LOG_TAG) << "Rate: " << rate << "\n"; + // we are early (age > 0), this means we are playing too fast + // => the real sample rate seems to be higher, we have to insert some frames setRealSampleRate(format_.rate * rate); // 1.0001); } } @@ -560,16 +489,13 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT shortMedian_ = shortBuffer_.median(); LOG(INFO, LOG_TAG) << "Chunk: " << age.count() / 100 << "\t" << miniBuffer_.median() / 100 << "\t" << shortMedian_ / 100 << "\t" << median_ / 100 << "\t" << buffer_.size() << "\t" << cs::duration(outputBufferDacTime) << "\t" << frame_delta_ << "\n"; - // LOG(INFO, LOG_TAG) << "Chunk: " << age.count()/1000 << "\t" << miniBuffer_.median()/1000 << "\t" << shortMedian_/1000 << "\t" << median_/1000 << - // "\t" << - // buffer_.size() << "\t" << cs::duration(outputBufferDacTime) << "\n"; frame_delta_ = 0; } return (abs(cs::duration(age)) < 500); } catch (int e) { - sleep_ = cs::usec(0); + LOG(INFO) << "Exception\n"; return false; } } diff --git a/client/stream.hpp b/client/stream.hpp index 440e15e1..4ec22032 100644 --- a/client/stream.hpp +++ b/client/stream.hpp @@ -62,8 +62,7 @@ private: chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames); chronos::time_point_clk getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long frames, long framesCorrection); chronos::time_point_clk getSilentPlayerChunk(void* outputBuffer, unsigned long frames); - chronos::time_point_clk seek(long ms); - // time_point_ms seekTo(const time_point_ms& to); + void updateBuffers(int age); void resetBuffers(); void setRealSampleRate(double sampleRate); @@ -71,10 +70,7 @@ private: SampleFormat format_; SampleFormat in_format_; - chronos::usec sleep_; - Queue> chunks_; - // DoubleBuffer cardBuffer; DoubleBuffer miniBuffer_; DoubleBuffer buffer_; DoubleBuffer shortBuffer_; @@ -91,6 +87,8 @@ private: std::vector resample_buffer_; int frame_delta_; // int64_t next_us_; + + bool hard_sync_; };