diff --git a/server/stream_server.cpp b/server/stream_server.cpp index f2939f01..dd1fb716 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -80,7 +80,7 @@ void StreamServer::onStateChanged(const PcmStream* pcmStream, const ReaderState& // clang-format off // Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"buffer_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}} // clang-format on - LOG(INFO) << "onStateChanged (" << pcmStream->getName() << "): " << state << "\n"; + LOG(INFO) << "onStateChanged (" << pcmStream->getName() << "): " << static_cast(state) << "\n"; // LOG(INFO) << pcmStream->toJson().dump(4); json notification = jsonrpcpp::Notification("Stream.OnUpdate", jsonrpcpp::Parameter("id", pcmStream->getId(), "stream", pcmStream->toJson())).to_json(); controlServer_->send(notification.dump(), nullptr); diff --git a/server/streamreader/asio_stream.hpp b/server/streamreader/asio_stream.hpp index 349250b5..b4b9f240 100644 --- a/server/streamreader/asio_stream.hpp +++ b/server/streamreader/asio_stream.hpp @@ -46,8 +46,8 @@ protected: bool first_; long nextTick_; uint32_t buffer_ms_; - boost::asio::deadline_timer timer_; - boost::asio::deadline_timer idle_timer_; + boost::asio::deadline_timer read_timer_; + boost::asio::deadline_timer state_timer_; std::unique_ptr stream_; std::atomic bytes_read_; }; @@ -56,7 +56,7 @@ protected: template AsioStream::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) - : PcmStream(pcmListener, ioc, uri), timer_(ioc), idle_timer_(ioc) + : PcmStream(pcmListener, ioc, uri), read_timer_(ioc), state_timer_(ioc) { chunk_ = std::make_unique(sampleFormat_, pcmReadMs_); bytes_read_ = 0; @@ -77,15 +77,15 @@ void AsioStream::check_state() { uint64_t last_read = bytes_read_; auto self = this->shared_from_this(); - idle_timer_.expires_from_now(boost::posix_time::milliseconds(500 + pcmReadMs_)); - idle_timer_.async_wait([self, this, last_read](const boost::system::error_code& ec) { + state_timer_.expires_from_now(boost::posix_time::milliseconds(500 + pcmReadMs_)); + state_timer_.async_wait([self, this, last_read](const boost::system::error_code& ec) { if (!ec) { LOG(DEBUG) << "check state last: " << last_read << ", read: " << bytes_read_ << "\n"; if (bytes_read_ != last_read) - setState(kPlaying); + setState(ReaderState::kPlaying); else - setState(kIdle); + setState(ReaderState::kIdle); check_state(); } }); @@ -106,8 +106,8 @@ template void AsioStream::stop() { active_ = false; - timer_.cancel(); - idle_timer_.cancel(); + read_timer_.cancel(); + state_timer_.cancel(); disconnect(); } @@ -115,9 +115,6 @@ void AsioStream::stop() template void AsioStream::on_connect() { - chronos::systemtimeofday(&tv_chunk_); - tvEncodedChunk_ = tv_chunk_; - nextTick_ = chronos::getTickCount(); first_ = true; do_read(); } @@ -128,8 +125,6 @@ void AsioStream::do_read() { // LOG(DEBUG) << "do_read\n"; auto self = this->shared_from_this(); - chunk_->timestamp.sec = tv_chunk_.tv_sec; - chunk_->timestamp.usec = tv_chunk_.tv_usec; boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), [this, self](boost::system::error_code ec, std::size_t length) mutable { if (ec) @@ -144,21 +139,17 @@ void AsioStream::do_read() if (first_) { first_ = false; - chronos::systemtimeofday(&tv_chunk_); - chunk_->timestamp.sec = tv_chunk_.tv_sec; - chunk_->timestamp.usec = tv_chunk_.tv_usec; - tvEncodedChunk_ = tv_chunk_; + chronos::systemtimeofday(&tvEncodedChunk_); nextTick_ = chronos::getTickCount() + buffer_ms_; } encoder_->encode(chunk_.get()); nextTick_ += pcmReadMs_; - chronos::addUs(tv_chunk_, pcmReadMs_ * 1000); long currentTick = chronos::getTickCount(); if (nextTick_ >= currentTick) { - timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); - timer_.async_wait([self, this](const boost::system::error_code& ec) { + read_timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); + read_timer_.async_wait([self, this](const boost::system::error_code& ec) { if (ec) { LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; @@ -172,8 +163,6 @@ void AsioStream::do_read() } else { - chronos::systemtimeofday(&tv_chunk_); - tvEncodedChunk_ = tv_chunk_; pcmListener_->onResync(this, currentTick - nextTick_); nextTick_ = currentTick + buffer_ms_; do_read(); diff --git a/server/streamreader/file_stream.cpp b/server/streamreader/file_stream.cpp index 6ef7f02d..c8c193ef 100644 --- a/server/streamreader/file_stream.cpp +++ b/server/streamreader/file_stream.cpp @@ -56,7 +56,7 @@ void FileStream::worker() size_t length = ifs.tellg(); ifs.seekg(0, ifs.beg); - setState(kPlaying); + setState(ReaderState::kPlaying); while (active_) { diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 74d09154..7d6c59ad 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -32,7 +32,7 @@ using namespace std; PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) - : active_(false), pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20), state_(kIdle), ioc_(ioc) + : active_(false), pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20), state_(ReaderState::kIdle), ioc_(ioc) { encoder::EncoderFactory encoderFactory; if (uri_.query.find("codec") == uri_.query.end()) @@ -51,11 +51,6 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con if (uri_.query.find("buffer_ms") != uri_.query.end()) pcmReadMs_ = cpt::stoul(uri_.query["buffer_ms"]); - if (uri_.query.find("dryout_ms") != uri_.query.end()) - dryoutMs_ = cpt::stoul(uri_.query["dryout_ms"]); - else - dryoutMs_ = 2000; - // meta_.reset(new msg::StreamTags()); // meta_->msg["stream"] = name_; setMeta(json()); @@ -138,7 +133,7 @@ void PcmStream::setState(const ReaderState& newState) { if (newState != state_) { - LOG(DEBUG) << "State changed: " << state_ << " => " << newState << "\n"; + LOG(DEBUG) << "State changed: " << static_cast(state_) << " => " << static_cast(newState) << "\n"; state_ = newState; if (pcmListener_) pcmListener_->onStateChanged(this, newState); @@ -163,15 +158,17 @@ void PcmStream::onChunkEncoded(const encoder::Encoder* /*encoder*/, msg::PcmChun json PcmStream::toJson() const { string state("unknown"); - if (state_ == kIdle) + if (state_ == ReaderState::kIdle) state = "idle"; - else if (state_ == kPlaying) + else if (state_ == ReaderState::kPlaying) state = "playing"; - else if (state_ == kDisabled) + else if (state_ == ReaderState::kDisabled) state = "disabled"; json j = { - {"uri", uri_.toJson()}, {"id", getId()}, {"status", state}, + {"uri", uri_.toJson()}, + {"id", getId()}, + {"status", state}, }; if (meta_) @@ -185,7 +182,7 @@ std::shared_ptr PcmStream::getMeta() const return meta_; } -void PcmStream::setMeta(json jtag) +void PcmStream::setMeta(const json& jtag) { meta_.reset(new msg::StreamTags(jtag)); meta_->msg["STREAM"] = name_; diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index 3bc019a6..0063f218 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -36,7 +36,7 @@ class PcmStream; -enum ReaderState +enum class ReaderState { kUnknown = 0, kIdle = 1, @@ -85,7 +85,7 @@ public: virtual const SampleFormat& getSampleFormat() const; std::shared_ptr getMeta() const; - void setMeta(json j); + void setMeta(const json& j); virtual ReaderState getState() const; virtual json toJson() const; @@ -106,7 +106,6 @@ protected: StreamUri uri_; SampleFormat sampleFormat_; size_t pcmReadMs_; - size_t dryoutMs_; std::unique_ptr encoder_; std::string name_; ReaderState state_; diff --git a/server/streamreader/pipe_stream.cpp b/server/streamreader/pipe_stream.cpp index 3cd9830e..cbdabede 100644 --- a/server/streamreader/pipe_stream.cpp +++ b/server/streamreader/pipe_stream.cpp @@ -65,97 +65,3 @@ void PipeStream::disconnect() stream_->close(); } -// void PipeStream::worker() -// { -// timeval tvChunk; -// std::unique_ptr chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_)); -// string lastException = ""; - -// while (active_) -// { -// if (fd_ != -1) -// close(fd_); -// fd_ = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); -// chronos::systemtimeofday(&tvChunk); -// tvEncodedChunk_ = tvChunk; -// long nextTick = chronos::getTickCount(); -// int idleBytes = 0; -// int maxIdleBytes = sampleFormat_.rate * sampleFormat_.frameSize * dryoutMs_ / 1000; -// try -// { -// if (fd_ == -1) -// throw SnapException("failed to open fifo: \"" + uri_.path + "\""); - -// while (active_) -// { -// chunk->timestamp.sec = tvChunk.tv_sec; -// chunk->timestamp.usec = tvChunk.tv_usec; -// int toRead = chunk->payloadSize; -// int len = 0; -// do -// { -// int count = read(fd_, chunk->payload + len, toRead - len); -// if (count < 0 && idleBytes < maxIdleBytes) -// { -// memset(chunk->payload + len, 0, toRead - len); -// idleBytes += toRead - len; -// len += toRead - len; -// continue; -// } -// if (count < 0) -// { -// setState(kIdle); -// if (!sleep(100)) -// break; -// } -// else if (count == 0) -// throw SnapException("end of file"); -// else -// { -// len += count; -// idleBytes = 0; -// } -// } while ((len < toRead) && active_); - -// if (!active_) -// break; - -// /// TODO: use less raw pointers, make this encoding more transparent -// encoder_->encode(chunk.get()); - -// if (!active_) -// break; - -// nextTick += pcmReadMs_; -// chronos::addUs(tvChunk, pcmReadMs_ * 1000); -// long currentTick = chronos::getTickCount(); - -// if (nextTick >= currentTick) -// { -// setState(kPlaying); -// if (!sleep(nextTick - currentTick)) -// break; -// } -// else -// { -// chronos::systemtimeofday(&tvChunk); -// tvEncodedChunk_ = tvChunk; -// pcmListener_->onResync(this, currentTick - nextTick); -// nextTick = currentTick; -// } - -// lastException = ""; -// } -// } -// catch (const std::exception& e) -// { -// if (lastException != e.what()) -// { -// LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl; -// lastException = e.what(); -// } -// if (!sleep(100)) -// break; -// } -// } -// } diff --git a/server/streamreader/process_stream.cpp b/server/streamreader/process_stream.cpp index 6e38f369..317064f5 100644 --- a/server/streamreader/process_stream.cpp +++ b/server/streamreader/process_stream.cpp @@ -36,6 +36,11 @@ ProcessStream::ProcessStream(PcmListener* pcmListener, boost::asio::io_context& { params_ = uri_.getQuery("params"); logStderr_ = (uri_.getQuery("logStderr", "false") == "true"); + + if (uri_.query.find("dryout_ms") != uri_.query.end()) + dryoutMs_ = cpt::stoul(uri_.query["dryout_ms"]); + else + dryoutMs_ = 2000; } @@ -143,7 +148,7 @@ void ProcessStream::worker() { timeval tvChunk; std::unique_ptr chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_)); - setState(kPlaying); + setState(ReaderState::kPlaying); string lastException = ""; while (active_) @@ -180,7 +185,7 @@ void ProcessStream::worker() } if (count < 0) { - setState(kIdle); + setState(ReaderState::kIdle); if (!sleep(100)) break; } @@ -207,7 +212,7 @@ void ProcessStream::worker() if (nextTick >= currentTick) { - setState(kPlaying); + setState(ReaderState::kPlaying); if (!sleep(nextTick - currentTick)) break; } diff --git a/server/streamreader/process_stream.hpp b/server/streamreader/process_stream.hpp index badcacbe..53ec18e3 100644 --- a/server/streamreader/process_stream.hpp +++ b/server/streamreader/process_stream.hpp @@ -25,6 +25,7 @@ #include "pcm_stream.hpp" #include "process.hpp" +// TODO: switch to AsioStream, maybe use boost::process library /// Starts an external process and reads and PCM data from stdout /** @@ -49,6 +50,7 @@ protected: std::unique_ptr process_; std::thread stderrReaderThread_; bool logStderr_; + size_t dryoutMs_; void worker() override; virtual void stderrReader();