Change stream reader to use actual chunk duration

e.g. a configured chunk_ms of 11 will read a 44.1kHz stream in chunks of 485 frames (=10.9977ms)
This commit is contained in:
badaix 2020-02-29 19:24:47 +01:00
parent 12f828ed43
commit a8998997e9
8 changed files with 45 additions and 33 deletions

View file

@ -128,8 +128,7 @@ void Stream::addChunk(unique_ptr<msg::PcmChunk> chunk)
break; break;
} }
// chunks_.push(move(chunk)); // LOG(DEBUG, LOG_TAG) << "new chunk: " << chunk->durationMs() << " ms, Chunks: " << chunks_.size() << "\n";
// LOG(DEBUG, LOG_TAG) << "new chunk: " << chunk->duration<cs::msec>().count() << ", Chunks: " << chunks_.size() << "\n";
if (soxr_ == nullptr) if (soxr_ == nullptr)
{ {
@ -345,11 +344,11 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
{ {
if (hard_sync_) if (hard_sync_)
{ {
cs::nsec chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate())); cs::nsec req_chunk_duration = cs::nsec(static_cast<cs::nsec::rep>(frames / format_.nsRate()));
cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; cs::usec age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
// LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast<chrono::milliseconds>(chunk_duration).count() // LOG(INFO, LOG_TAG) << "age: " << age.count() / 1000 << ", buffer: " << std::chrono::duration_cast<chrono::milliseconds>(chunk_duration).count()
// << "\n"; // << "\n";
if (age < -chunk_duration) if (age < -req_chunk_duration)
{ {
// the oldest chunk (top of the stream) is too young for the buffer // the oldest chunk (top of the stream) is too young for the buffer
// e.g. age = -100ms (=> should be played in 100ms) // e.g. age = -100ms (=> should be played in 100ms)
@ -365,8 +364,8 @@ bool Stream::getPlayerChunk(void* outputBuffer, const cs::usec& outputBufferDacT
while (chunks_.try_pop(chunk_)) while (chunks_.try_pop(chunk_))
{ {
age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime; age = std::chrono::duration_cast<cs::usec>(TimeProvider::serverNow() - chunk_->start()) - bufferMs_ + outputBufferDacTime;
LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 LOG(DEBUG, LOG_TAG) << "age: " << age.count() / 1000 << ", requested chunk_duration: "
<< ", chunk_duration: " << -std::chrono::duration_cast<std::chrono::milliseconds>(chunk_duration).count() << std::chrono::duration_cast<std::chrono::milliseconds>(req_chunk_duration).count()
<< ", duration: " << chunk_->duration<std::chrono::milliseconds>().count() << "\n"; << ", duration: " << chunk_->duration<std::chrono::milliseconds>().count() << "\n";
if (age.count() <= 0) if (age.count() <= 0)
break; break;

View file

@ -36,7 +36,8 @@ namespace msg
class PcmChunk : public WireChunk class PcmChunk : public WireChunk
{ {
public: public:
PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(sampleFormat.rate() * sampleFormat.frameSize() * ms / 1000), format(sampleFormat), idx_(0) PcmChunk(const SampleFormat& sampleFormat, size_t ms)
: WireChunk((sampleFormat.rate() * ms / 1000) * sampleFormat.frameSize()), format(sampleFormat), idx_(0)
{ {
} }
@ -104,6 +105,11 @@ public:
return std::chrono::duration_cast<T>(chronos::nsec(static_cast<chronos::nsec::rep>(1000000 * getFrameCount() / format.msRate()))); return std::chrono::duration_cast<T>(chronos::nsec(static_cast<chronos::nsec::rep>(1000000 * getFrameCount() / format.msRate())));
} }
double durationMs() const
{
return static_cast<double>(getFrameCount()) / format.msRate();
}
template <typename T> template <typename T>
inline T durationLeft() const inline T durationLeft() const
{ {

View file

@ -28,9 +28,8 @@ using namespace std;
namespace encoder namespace encoder
{ {
FlacEncoder::FlacEncoder(const std::string& codecOptions) : Encoder(codecOptions), encoder_(nullptr), pcmBufferSize_(0), encodedSamples_(0) FlacEncoder::FlacEncoder(const std::string& codecOptions) : Encoder(codecOptions), encoder_(nullptr), pcmBufferSize_(0), encodedSamples_(0), flacChunk_(nullptr)
{ {
flacChunk_ = new msg::PcmChunk();
headerChunk_.reset(new msg::CodecHeader("flac")); headerChunk_.reset(new msg::CodecHeader("flac"));
pcmBuffer_ = (FLAC__int32*)malloc(pcmBufferSize_ * sizeof(FLAC__int32)); pcmBuffer_ = (FLAC__int32*)malloc(pcmBufferSize_ * sizeof(FLAC__int32));
} }
@ -46,7 +45,8 @@ FlacEncoder::~FlacEncoder()
FLAC__stream_encoder_delete(encoder_); FLAC__stream_encoder_delete(encoder_);
} }
delete flacChunk_; if (flacChunk_ != nullptr)
delete flacChunk_;
free(pcmBuffer_); free(pcmBuffer_);
} }
@ -71,6 +71,9 @@ std::string FlacEncoder::name() const
void FlacEncoder::encode(const msg::PcmChunk* chunk) void FlacEncoder::encode(const msg::PcmChunk* chunk)
{ {
if (flacChunk_ == nullptr)
flacChunk_ = new msg::PcmChunk(chunk->format, 0);
int samples = chunk->getSampleCount(); int samples = chunk->getSampleCount();
int frames = chunk->getFrameCount(); int frames = chunk->getFrameCount();
// LOG(INFO) << "payload: " << chunk->payloadSize << "\tframes: " << frames << "\tsamples: " << samples << "\tduration: " << // LOG(INFO) << "payload: " << chunk->payloadSize << "\tframes: " << frames << "\tsamples: " << samples << "\tduration: " <<
@ -106,11 +109,11 @@ void FlacEncoder::encode(const msg::PcmChunk* chunk)
if (encodedSamples_ > 0) if (encodedSamples_ > 0)
{ {
double resMs = encodedSamples_ / sampleFormat_.msRate(); double resMs = static_cast<double>(encodedSamples_) / sampleFormat_.msRate();
// LOG(INFO) << "encoded: " << chunk->payloadSize << "\tframes: " << encodedSamples_ << "\tres: " << resMs << "\n"; // LOG(INFO) << "encoded: " << chunk->payloadSize << "\tframes: " << encodedSamples_ << "\tres: " << resMs << "\n";
encodedSamples_ = 0; encodedSamples_ = 0;
listener_->onChunkEncoded(this, flacChunk_, resMs); listener_->onChunkEncoded(this, flacChunk_, resMs);
flacChunk_ = new msg::PcmChunk(chunk->format, 0); flacChunk_ = nullptr;
} }
} }

View file

@ -67,8 +67,8 @@ std::string OggEncoder::name() const
void OggEncoder::encode(const msg::PcmChunk* chunk) void OggEncoder::encode(const msg::PcmChunk* chunk)
{ {
double res = 0; double res = 0;
LOG(DEBUG) << "payload: " << chunk->payloadSize << "\tframes: " << chunk->getFrameCount() << "\tduration: " << chunk->duration<chronos::msec>().count() // LOG(TRACE) << "payload: " << chunk->payloadSize << "\tframes: " << chunk->getFrameCount() << "\tduration: " << chunk->duration<chronos::msec>().count()
<< "\n"; // << "\n";
int frames = chunk->getFrameCount(); int frames = chunk->getFrameCount();
float** buffer = vorbis_analysis_buffer(&vd_, frames); float** buffer = vorbis_analysis_buffer(&vd_, frames);

View file

@ -50,7 +50,7 @@ PcmEncoder::PcmEncoder(const std::string& codecOptions) : Encoder(codecOptions)
void PcmEncoder::encode(const msg::PcmChunk* chunk) void PcmEncoder::encode(const msg::PcmChunk* chunk)
{ {
auto* pcmChunk = new msg::PcmChunk(*chunk); auto* pcmChunk = new msg::PcmChunk(*chunk);
listener_->onChunkEncoded(this, pcmChunk, pcmChunk->duration<chronos::msec>().count()); listener_->onChunkEncoded(this, pcmChunk, pcmChunk->durationMs());
} }

View file

@ -54,7 +54,7 @@ protected:
std::unique_ptr<msg::PcmChunk> chunk_; std::unique_ptr<msg::PcmChunk> chunk_;
timeval tv_chunk_; timeval tv_chunk_;
bool first_; bool first_;
long nextTick_; std::chrono::time_point<std::chrono::steady_clock> nextTick_;
uint32_t buffer_ms_; uint32_t buffer_ms_;
boost::asio::steady_timer read_timer_; boost::asio::steady_timer read_timer_;
boost::asio::steady_timer state_timer_; boost::asio::steady_timer state_timer_;
@ -86,6 +86,8 @@ AsioStream<ReadStream>::AsioStream(PcmListener* pcmListener, boost::asio::io_con
: PcmStream(pcmListener, ioc, uri), read_timer_(ioc), state_timer_(ioc) : PcmStream(pcmListener, ioc, uri), read_timer_(ioc), state_timer_(ioc)
{ {
chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_); chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
LOG(DEBUG) << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize << "\n";
bytes_read_ = 0; bytes_read_ = 0;
buffer_ms_ = 50; buffer_ms_ = 50;
@ -191,17 +193,17 @@ void AsioStream<ReadStream>::do_read()
{ {
first_ = false; first_ = false;
chronos::systemtimeofday(&tvEncodedChunk_); chronos::systemtimeofday(&tvEncodedChunk_);
nextTick_ = chronos::getTickCount() + buffer_ms_; nextTick_ = std::chrono::steady_clock::now() + std::chrono::milliseconds(buffer_ms_);
} }
encoder_->encode(chunk_.get()); encoder_->encode(chunk_.get());
nextTick_ += chunk_ms_; nextTick_ += chunk_->duration<std::chrono::nanoseconds>();
long currentTick = chronos::getTickCount(); auto currentTick = std::chrono::steady_clock::now();
// Synchronize read to chunk_ms_ // Synchronize read to chunk_ms_
if (nextTick_ >= currentTick) if (nextTick_ >= currentTick)
{ {
read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick)); read_timer_.expires_after(nextTick_ - currentTick);
read_timer_.async_wait([this](const boost::system::error_code& ec) { read_timer_.async_wait([this](const boost::system::error_code& ec) {
if (ec) if (ec)
{ {
@ -217,8 +219,8 @@ void AsioStream<ReadStream>::do_read()
// Read took longer, wait for the buffer to fill up // Read took longer, wait for the buffer to fill up
else else
{ {
pcmListener_->onResync(this, currentTick - nextTick_); pcmListener_->onResync(this, std::chrono::duration_cast<std::chrono::milliseconds>(currentTick - nextTick_).count());
nextTick_ = currentTick + buffer_ms_; nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_);
first_ = true; first_ = true;
do_read(); do_read();
} }

View file

@ -32,6 +32,8 @@ using namespace std;
namespace streamreader namespace streamreader
{ {
static constexpr auto LOG_TAG = "PcmStream";
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: active_(false), pcmListener_(pcmListener), uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc) : active_(false), pcmListener_(pcmListener), uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc)
@ -48,7 +50,7 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
if (uri_.query.find(kUriSampleFormat) == uri_.query.end()) if (uri_.query.find(kUriSampleFormat) == uri_.query.end())
throw SnapException("Stream URI must have a sampleformat"); throw SnapException("Stream URI must have a sampleformat");
sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]); sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]);
LOG(INFO) << "PcmStream sampleFormat: " << sampleFormat_.getFormat() << "\n"; LOG(INFO, LOG_TAG) << "PcmStream sampleFormat: " << sampleFormat_.getFormat() << "\n";
if (uri_.query.find(kUriChunkMs) != uri_.query.end()) if (uri_.query.find(kUriChunkMs) != uri_.query.end())
chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]); chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]);
@ -95,7 +97,7 @@ const SampleFormat& PcmStream::getSampleFormat() const
void PcmStream::start() void PcmStream::start()
{ {
LOG(DEBUG) << "PcmStream start: " << sampleFormat_.getFormat() << "\n"; LOG(DEBUG, LOG_TAG) << "Start, sampleformat: " << sampleFormat_.getFormat() << "\n";
encoder_->init(this, sampleFormat_); encoder_->init(this, sampleFormat_);
active_ = true; active_ = true;
} }
@ -117,7 +119,7 @@ void PcmStream::setState(const ReaderState& newState)
{ {
if (newState != state_) if (newState != state_)
{ {
LOG(DEBUG) << "State changed: " << static_cast<int>(state_) << " => " << static_cast<int>(newState) << "\n"; LOG(DEBUG, LOG_TAG) << "State changed: " << static_cast<int>(state_) << " => " << static_cast<int>(newState) << "\n";
state_ = newState; state_ = newState;
if (pcmListener_) if (pcmListener_)
pcmListener_->onStateChanged(this, newState); pcmListener_->onStateChanged(this, newState);
@ -127,7 +129,7 @@ void PcmStream::setState(const ReaderState& newState)
void PcmStream::onChunkEncoded(const encoder::Encoder* /*encoder*/, msg::PcmChunk* chunk, double duration) void PcmStream::onChunkEncoded(const encoder::Encoder* /*encoder*/, msg::PcmChunk* chunk, double duration)
{ {
// LOG(INFO) << "onChunkEncoded: " << duration << " us\n"; // LOG(TRACE, LOG_TAG) << "onChunkEncoded: " << duration << " ms, compression ratio: " << 100 - ceil(100 * (chunk->durationMs() / duration)) << "%\n";
if (duration <= 0) if (duration <= 0)
return; return;
@ -168,7 +170,7 @@ void PcmStream::setMeta(const json& jtag)
{ {
meta_.reset(new msg::StreamTags(jtag)); meta_.reset(new msg::StreamTags(jtag));
meta_->msg["STREAM"] = name_; meta_->msg["STREAM"] = name_;
LOG(INFO) << "metadata=" << meta_->msg.dump(4) << "\n"; LOG(INFO, LOG_TAG) << "metadata=" << meta_->msg.dump(4) << "\n";
// Trigger a stream update // Trigger a stream update
if (pcmListener_) if (pcmListener_)

View file

@ -119,23 +119,23 @@ void PosixStream::do_read()
{ {
first_ = false; first_ = false;
chronos::systemtimeofday(&tvEncodedChunk_); chronos::systemtimeofday(&tvEncodedChunk_);
nextTick_ = chronos::getTickCount() + buffer_ms_; nextTick_ = std::chrono::steady_clock::now() + std::chrono::milliseconds(buffer_ms_);
} }
encoder_->encode(chunk_.get()); encoder_->encode(chunk_.get());
nextTick_ += chunk_ms_; nextTick_ += chunk_->duration<std::chrono::nanoseconds>();
long currentTick = chronos::getTickCount(); auto currentTick = std::chrono::steady_clock::now();
if (nextTick_ >= currentTick) if (nextTick_ >= currentTick)
{ {
// synchronize reads to an interval of chunk_ms_ // synchronize reads to an interval of chunk_ms_
wait(read_timer_, std::chrono::milliseconds(nextTick_ - currentTick), [this] { do_read(); }); wait(read_timer_, nextTick_ - currentTick, [this] { do_read(); });
return; return;
} }
else else
{ {
// reading chunk_ms_ took longer than chunk_ms_ // reading chunk_ms_ took longer than chunk_ms_
pcmListener_->onResync(this, currentTick - nextTick_); pcmListener_->onResync(this, std::chrono::duration_cast<std::chrono::milliseconds>(currentTick - nextTick_).count());
nextTick_ = currentTick + buffer_ms_; nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_);
first_ = true; first_ = true;
do_read(); do_read();
} }