diff --git a/server/stream_server.cpp b/server/stream_server.cpp index 4b8b8e63..ca24dd52 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -136,7 +136,7 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, std::shared_ptrgetName() << "): " << ms << "ms\n"; + LOG(INFO) << "onResync (" << pcmStream->getName() << "): " << ms << " ms\n"; } diff --git a/server/streamreader/asio_stream.hpp b/server/streamreader/asio_stream.hpp index 88d2d147..ce40cb31 100644 --- a/server/streamreader/asio_stream.hpp +++ b/server/streamreader/asio_stream.hpp @@ -154,7 +154,7 @@ template void AsioStream::on_connect() { first_ = true; - chronos::systemtimeofday(&tvEncodedChunk_); + tvEncodedChunk_ = std::chrono::steady_clock::now(); do_read(); } @@ -163,68 +163,69 @@ template void AsioStream::do_read() { // LOG(DEBUG, "AsioStream") << "do_read\n"; - boost::asio::async_read( - *stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), [this](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n"; - connect(); - return; - } + boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), + [this](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n"; + connect(); + return; + } - bytes_read_ += length; - // LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n"; - // First read after connect. Set the initial read timestamp - // the timestamp will be incremented after encoding, - // since we do not know how much the encoder actually encoded + bytes_read_ += length; + // LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n"; + // First read after connect. Set the initial read timestamp + // the timestamp will be incremented after encoding, + // since we do not know how much the encoder actually encoded - if (!first_) - { - timeval now; - chronos::systemtimeofday(&now); - auto stream2systime_diff = chronos::diff(now, tvEncodedChunk_); - if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_)) - { - LOG(WARNING, "AsioStream") << "Stream and system time out of sync: " << stream2systime_diff.count() << "ms, resetting stream time.\n"; - first_ = true; - } - } - if (first_) - { - first_ = false; - chronos::systemtimeofday(&tvEncodedChunk_); - nextTick_ = std::chrono::steady_clock::now() + std::chrono::milliseconds(buffer_ms_); - } + if (!first_) + { + auto now = std::chrono::steady_clock::now(); + auto stream2systime_diff = now - tvEncodedChunk_; + if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_)) + { + LOG(WARNING, "AsioStream") << "Stream and system time out of sync: " + << std::chrono::duration_cast(stream2systime_diff).count() / 1000. + << " ms, resetting stream time.\n"; + first_ = true; + } + } + if (first_) + { + first_ = false; + tvEncodedChunk_ = std::chrono::steady_clock::now() - chunk_->duration(); + nextTick_ = std::chrono::steady_clock::now(); + } - encoder_->encode(chunk_.get()); - nextTick_ += chunk_->duration(); - auto currentTick = std::chrono::steady_clock::now(); + encoder_->encode(chunk_.get()); + nextTick_ += chunk_->duration(); + auto currentTick = std::chrono::steady_clock::now(); - // Synchronize read to chunk_ms_ - if (nextTick_ >= currentTick) - { - read_timer_.expires_after(nextTick_ - currentTick); - read_timer_.async_wait([this](const boost::system::error_code& ec) { - if (ec) - { - LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n"; - } - else - { - do_read(); - } - }); - return; - } - // Read took longer, wait for the buffer to fill up - else - { - pcmListener_->onResync(this, std::chrono::duration_cast(currentTick - nextTick_).count()); - nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_); - first_ = true; - do_read(); - } - }); + // Synchronize read to chunk_ms_ + if (nextTick_ >= currentTick) + { + read_timer_.expires_after(nextTick_ - currentTick); + read_timer_.async_wait([this](const boost::system::error_code& ec) { + if (ec) + { + LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n"; + } + else + { + do_read(); + } + }); + return; + } + // Read took longer, wait for the buffer to fill up + else + { + pcmListener_->onResync(this, std::chrono::duration_cast(currentTick - nextTick_).count()); + nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_); + first_ = true; + do_read(); + } + }); } } // namespace streamreader diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 5d600bb5..add4795e 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -133,9 +133,11 @@ void PcmStream::onChunkEncoded(const encoder::Encoder* /*encoder*/, std::shared_ if (duration <= 0) return; - chunk->timestamp.sec = tvEncodedChunk_.tv_sec; - chunk->timestamp.usec = tvEncodedChunk_.tv_usec; - chronos::addUs(tvEncodedChunk_, duration * 1000); + auto microsecs = std::chrono::duration_cast(tvEncodedChunk_.time_since_epoch()).count(); + chunk->timestamp.sec = microsecs / 1000000; + chunk->timestamp.usec = microsecs % 1000000; + + tvEncodedChunk_ += std::chrono::nanoseconds(static_cast(duration * 1000000)); if (pcmListener_) pcmListener_->onChunkRead(this, chunk, duration); } diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index ddfdba7f..1f57d95b 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -103,7 +103,7 @@ protected: void setState(const ReaderState& newState); - timeval tvEncodedChunk_; + std::chrono::time_point tvEncodedChunk_; PcmListener* pcmListener_; StreamUri uri_; SampleFormat sampleFormat_; diff --git a/server/streamreader/posix_stream.cpp b/server/streamreader/posix_stream.cpp index 3703dfc9..4ae93c29 100644 --- a/server/streamreader/posix_stream.cpp +++ b/server/streamreader/posix_stream.cpp @@ -35,7 +35,7 @@ namespace streamreader { static constexpr auto LOG_TAG = "PosixStream"; - +static constexpr auto kResyncTolerance = 50ms; PosixStream::PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri) { @@ -80,28 +80,25 @@ void PosixStream::do_read() if (!stream_->is_open()) throw SnapException("failed to open stream: \"" + uri_.path + "\""); + if (first_) + { + LOG(DEBUG, LOG_TAG) << "First read, initializing nextTick to now\n"; + nextTick_ = std::chrono::steady_clock::now(); + } + int toRead = chunk_->payloadSize; + auto duration = chunk_->duration(); int len = 0; do { int count = read(stream_->native_handle(), chunk_->payload + len, toRead - len); - if (count < 0 && idle_bytes_ < max_idle_bytes_) + if (count < 0) { - // nothing to read for a longer time now, set the chunk to silent - LOG(DEBUG, LOG_TAG) << "count < 0: " << errno - << " && idleBytes < maxIdleBytes, ms: " << 1000 * chunk_->payloadSize / (sampleFormat_.rate() * sampleFormat_.frameSize()) - << "\n"; + // no data available, fill with silence memset(chunk_->payload + len, 0, toRead - len); idle_bytes_ += toRead - len; - len += toRead - len; break; } - else if (count < 0) - { - // nothing to read, try again (chunk_ms_ / 2) later - wait(read_timer_, std::chrono::milliseconds(chunk_ms_ / 2), [this] { do_read(); }); - return; - } else if (count == 0) { throw SnapException("end of file"); @@ -115,29 +112,46 @@ void PosixStream::do_read() } } while (len < toRead); + // LOG(DEBUG, LOG_TAG) << "Received " << len << "/" << toRead << " bytes\n"; if (first_) { first_ = false; - chronos::systemtimeofday(&tvEncodedChunk_); - nextTick_ = std::chrono::steady_clock::now() + std::chrono::milliseconds(buffer_ms_); + // initialize the stream's base timestamp to now minus the chunk's duration + tvEncodedChunk_ = std::chrono::steady_clock::now() - duration; } - encoder_->encode(chunk_.get()); - nextTick_ += chunk_->duration(); - auto currentTick = std::chrono::steady_clock::now(); - if (nextTick_ >= currentTick) + if ((idle_bytes_ == 0) || (idle_bytes_ <= max_idle_bytes_)) + { + // the encoder will update the tvEncodedChunk when a chunk is encoded + encoder_->encode(chunk_.get()); + } + else + { + // no data available + // set first_ = true will cause the timestamps to be updated without encoding + first_ = true; + } + + nextTick_ += duration; + auto currentTick = std::chrono::steady_clock::now(); + auto next_read = nextTick_ - currentTick; + if (next_read >= 0ms) { // synchronize reads to an interval of chunk_ms_ wait(read_timer_, nextTick_ - currentTick, [this] { do_read(); }); return; } + else if (next_read >= -kResyncTolerance) + { + LOG(INFO) << "next read < 0 (" << getName() << "): " << std::chrono::duration_cast(next_read).count() / 1000. << " ms\n"; + do_read(); + } else { // reading chunk_ms_ took longer than chunk_ms_ - pcmListener_->onResync(this, std::chrono::duration_cast(currentTick - nextTick_).count()); - nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_); + pcmListener_->onResync(this, std::chrono::duration_cast(-next_read).count()); first_ = true; - do_read(); + wait(read_timer_, duration + kResyncTolerance, [this] { do_read(); }); } lastException_ = "";