diff --git a/server/streamreader/asio_stream.hpp b/server/streamreader/asio_stream.hpp index 617fb0e1..8720df41 100644 --- a/server/streamreader/asio_stream.hpp +++ b/server/streamreader/asio_stream.hpp @@ -20,9 +20,11 @@ #define ASIO_STREAM_HPP #include "pcm_stream.hpp" +#include #include + template class AsioStream : public PcmStream, public std::enable_shared_from_this> { @@ -38,20 +40,45 @@ protected: virtual void disconnect() = 0; virtual void on_connect(); virtual void do_read(); + void check_state(); std::unique_ptr chunk_; timeval tv_chunk_; bool first_; long nextTick_; boost::asio::deadline_timer timer_; + boost::asio::deadline_timer idle_timer_; std::unique_ptr stream_; + std::atomic bytes_read_; }; template -AsioStream::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), timer_(ioc) +AsioStream::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) + : PcmStream(pcmListener, ioc, uri), timer_(ioc), idle_timer_(ioc) { chunk_ = std::make_unique(sampleFormat_, pcmReadMs_); + bytes_read_ = 0; +} + + +template +void AsioStream::check_state() +{ + uint64_t last_read = bytes_read_; + auto self = this->shared_from_this(); + idle_timer_.expires_from_now(boost::posix_time::milliseconds(500 + pcmReadMs_)); + idle_timer_.async_wait([self, this, last_read](const boost::system::error_code& ec) { + if (!ec) + { + LOG(DEBUG) << "check state last: " << last_read << ", read: " << bytes_read_ << "\n"; + if (bytes_read_ != last_read) + setState(kPlaying); + else + setState(kIdle); + check_state(); + } + }); } @@ -59,6 +86,7 @@ template void AsioStream::start() { encoder_->init(this, sampleFormat_); + check_state(); connect(); } @@ -67,6 +95,7 @@ template void AsioStream::stop() { timer_.cancel(); + idle_timer_.cancel(); disconnect(); } @@ -97,6 +126,8 @@ void AsioStream::do_read() connect(); return; } + + bytes_read_ += length; // LOG(DEBUG) << "Read: " << length << " bytes\n"; if (first_) { @@ -114,7 +145,6 @@ void AsioStream::do_read() if (nextTick_ >= currentTick) { - setState(kPlaying); timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); timer_.async_wait([self, this](const boost::system::error_code& ec) { if (ec) diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 63a36ed6..74d09154 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -138,6 +138,7 @@ void PcmStream::setState(const ReaderState& newState) { if (newState != state_) { + LOG(DEBUG) << "State changed: " << state_ << " => " << newState << "\n"; state_ = newState; if (pcmListener_) pcmListener_->onStateChanged(this, newState); diff --git a/server/streamreader/pipe_stream.cpp b/server/streamreader/pipe_stream.cpp index 17da72de..3cd9830e 100644 --- a/server/streamreader/pipe_stream.cpp +++ b/server/streamreader/pipe_stream.cpp @@ -25,7 +25,6 @@ #include "common/aixlog.hpp" #include "common/snap_exception.hpp" #include "common/str_compat.hpp" -#include "encoder/encoder_factory.hpp" #include "pipe_stream.hpp" @@ -33,8 +32,7 @@ using namespace std; -PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) - : AsioStream(pcmListener, ioc, uri) +PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri) { umask(0); string mode = uri_.getQuery("mode", "create");