diff --git a/doc/player_setup.md b/doc/player_setup.md index 327b4a3a..3ce71f76 100644 --- a/doc/player_setup.md +++ b/doc/player_setup.md @@ -159,7 +159,7 @@ Snapserver supports [librespot](https://github.com/librespot-org/librespot) with ### Process Snapserver can start any process and read PCM data from the stdout of the process: -Configure snapserver with `stream = process:///path/to/process?name=Process[¶ms=<--my list --of params>][&logStderr=false]` +Configure snapserver with `stream = process:///path/to/process?name=Process[¶ms=<--my list --of params>][&log_stderr=false]` ### Line-in diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 35ee0526..40bf39ec 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -14,6 +14,7 @@ set(SERVER_SOURCES streamreader/pcm_stream.cpp streamreader/tcp_stream.cpp streamreader/pipe_stream.cpp + streamreader/posix_stream.cpp streamreader/file_stream.cpp streamreader/airplay_stream.cpp streamreader/librespot_stream.cpp diff --git a/server/stream_server.cpp b/server/stream_server.cpp index f0bb0527..489c9d5c 100644 --- a/server/stream_server.cpp +++ b/server/stream_server.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ #include using namespace std; +using namespace streamreader; using json = nlohmann::json; diff --git a/server/stream_server.hpp b/server/stream_server.hpp index 338221b3..baf403e5 100644 --- a/server/stream_server.hpp +++ b/server/stream_server.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,8 +16,8 @@ along with this program. If not, see . ***/ -#ifndef STREAM_SERVER_H -#define STREAM_SERVER_H +#ifndef STREAM_SERVER_HPP +#define STREAM_SERVER_HPP #include #include @@ -38,6 +38,7 @@ #include "stream_session.hpp" #include "streamreader/stream_manager.hpp" +using namespace streamreader; using boost::asio::ip::tcp; using acceptor_ptr = std::unique_ptr; diff --git a/server/stream_session.cpp b/server/stream_session.cpp index d69a21af..3029dc7b 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -23,6 +23,10 @@ #include using namespace std; +using namespace streamreader; + + +static constexpr auto LOG_TAG = "StreamSession"; StreamSession::StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket) @@ -48,7 +52,7 @@ void StreamSession::read_next() } catch (const std::bad_weak_ptr& e) { - LOG(ERROR) << "read_next: Error getting shared from this\n"; + LOG(ERROR, LOG_TAG) << "read_next: Error getting shared from this\n"; return; } @@ -56,23 +60,23 @@ void StreamSession::read_next() boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable { if (ec) { - LOG(ERROR) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n"; messageReceiver_->onDisconnect(this); return; } baseMessage_.deserialize(buffer_.data()); - LOG(DEBUG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id - << ", refers: " << baseMessage_.refersTo << "\n"; + LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id + << ", refers: " << baseMessage_.refersTo << "\n"; if (baseMessage_.type > message_type::kLast) { - LOG(ERROR) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; + LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; messageReceiver_->onDisconnect(this); return; } else if (baseMessage_.size > msg::max_size) { - LOG(ERROR) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; + LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; messageReceiver_->onDisconnect(this); return; } @@ -85,7 +89,7 @@ void StreamSession::read_next() boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable { if (ec) { - LOG(ERROR) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n"; messageReceiver_->onDisconnect(this); return; } @@ -121,15 +125,15 @@ void StreamSession::start() void StreamSession::stop() { - LOG(DEBUG) << "StreamSession::stop\n"; + LOG(DEBUG, LOG_TAG) << "StreamSession::stop\n"; boost::system::error_code ec; socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); if (ec) - LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "Error in socket shutdown: " << ec.message() << "\n"; socket_.close(ec); if (ec) - LOG(ERROR) << "Error in socket close: " << ec.message() << "\n"; - LOG(DEBUG) << "StreamSession stopped\n"; + LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n"; + LOG(DEBUG, LOG_TAG) << "StreamSession stopped\n"; } @@ -142,7 +146,7 @@ void StreamSession::send_next() } catch (const std::bad_weak_ptr& e) { - LOG(ERROR) << "send_next: Error getting shared from this\n"; + LOG(ERROR, LOG_TAG) << "send_next: Error getting shared from this\n"; return; } @@ -152,7 +156,7 @@ void StreamSession::send_next() messages_.pop_front(); if (ec) { - LOG(ERROR) << "StreamSession write error (msg lenght: " << length << "): " << ec.message() << "\n"; + LOG(ERROR, LOG_TAG) << "StreamSession write error (msg lenght: " << length << "): " << ec.message() << "\n"; messageReceiver_->onDisconnect(this); return; } @@ -172,7 +176,7 @@ void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) messages_.push_back(const_buf); if (messages_.size() > 1) { - LOG(DEBUG) << "outstanding async_write\n"; + LOG(DEBUG, LOG_TAG) << "outstanding async_write\n"; return; } send_next(); diff --git a/server/stream_session.hpp b/server/stream_session.hpp index bcf74c9b..826e6023 100644 --- a/server/stream_session.hpp +++ b/server/stream_session.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,8 +16,8 @@ along with this program. If not, see . ***/ -#ifndef STREAM_SESSION_H -#define STREAM_SESSION_H +#ifndef STREAM_SESSION_HPP +#define STREAM_SESSION_HPP #include "common/queue.h" #include "message/message.hpp" @@ -121,8 +121,8 @@ public: return socket_.remote_endpoint().address().to_string(); } - void setPcmStream(PcmStreamPtr pcmStream); - const PcmStreamPtr pcmStream() const; + void setPcmStream(streamreader::PcmStreamPtr pcmStream); + const streamreader::PcmStreamPtr pcmStream() const; protected: void read_next(); @@ -134,7 +134,7 @@ protected: tcp::socket socket_; MessageReceiver* messageReceiver_; size_t bufferMs_; - PcmStreamPtr pcmStream_; + streamreader::PcmStreamPtr pcmStream_; boost::asio::io_context::strand strand_; std::deque messages_; }; diff --git a/server/streamreader/airplay_stream.cpp b/server/streamreader/airplay_stream.cpp index bb87a9e5..a652c347 100644 --- a/server/streamreader/airplay_stream.cpp +++ b/server/streamreader/airplay_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,13 +25,21 @@ using namespace std; -static string hex2str(string input) +namespace streamreader +{ + +static constexpr auto LOG_TAG = "AirplayStream"; + +namespace +{ +string hex2str(string input) { typedef unsigned char byte; unsigned long x = strtoul(input.c_str(), nullptr, 16); byte a[] = {byte(x >> 24), byte(x >> 16), byte(x >> 8), byte(x), 0}; return string((char*)a); } +} // namespace /* * Expat is used in metadata parsing from Shairport-sync. @@ -115,7 +123,7 @@ void AirplayStream::push() if (entry_->type == "ssnc" && entry_->code == "mden") { - // LOG(INFO) << "metadata=" << jtag_.dump(4) << "\n"; + // LOG(INFO, LOG_TAG) << "metadata=" << jtag_.dump(4) << "\n"; setMeta(jtag_); } } @@ -167,22 +175,21 @@ void AirplayStream::initExeAndPath(const string& filename) } -void AirplayStream::onStderrMsg(const char* buffer, size_t n) +void AirplayStream::onStderrMsg(const std::string& line) { - string logmsg = utils::string::trim_copy(string(buffer, n)); - if (logmsg.empty()) + if (line.empty()) return; - LOG(INFO) << "(" << getName() << ") " << logmsg << "\n"; - if (logmsg.find("Is another Shairport Sync running on this device") != string::npos) + LOG(INFO, LOG_TAG) << "(" << getName() << ") " << line << "\n"; + if (line.find("Is another Shairport Sync running on this device") != string::npos) { - LOG(ERROR) << "Seem there is another Shairport Sync runnig on port " << port_ << ", switching to port " << port_ + 1 << "\n"; + LOG(ERROR, LOG_TAG) << "Seem there is another Shairport Sync runnig on port " << port_ << ", switching to port " << port_ + 1 << "\n"; ++port_; params_ = params_wo_port_ + " --port=" + cpt::to_string(port_); } - else if (logmsg.find("Invalid audio output specified") != string::npos) + else if (line.find("Invalid audio output specified") != string::npos) { - LOG(ERROR) << "shairport sync compiled without stdout audio backend\n"; - LOG(ERROR) << "build with: \"./configure --with-stdout --with-avahi --with-ssl=openssl --with-metadata\"\n"; + LOG(ERROR, LOG_TAG) << "shairport sync compiled without stdout audio backend\n"; + LOG(ERROR, LOG_TAG) << "build with: \"./configure --with-stdout --with-avahi --with-ssl=openssl --with-metadata\"\n"; } } @@ -237,4 +244,7 @@ void XMLCALL AirplayStream::data(void* userdata, const char* content, int length string value(content, (size_t)length); self->buf_.append(value); } + #endif + +} // namespace streamreader diff --git a/server/streamreader/airplay_stream.hpp b/server/streamreader/airplay_stream.hpp index 8b6b6f3f..67e5a70a 100644 --- a/server/streamreader/airplay_stream.hpp +++ b/server/streamreader/airplay_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,8 +16,8 @@ along with this program. If not, see . ***/ -#ifndef AIRPLAY_STREAM_H -#define AIRPLAY_STREAM_H +#ifndef AIRPLAY_STREAM_HPP +#define AIRPLAY_STREAM_HPP #include "process_stream.hpp" @@ -29,6 +29,9 @@ #include #endif +namespace streamreader +{ + class TageEntry { public: @@ -74,7 +77,7 @@ protected: void push(); #endif - void onStderrMsg(const char* buffer, size_t n) override; + void onStderrMsg(const std::string& line) override; void initExeAndPath(const std::string& filename) override; size_t port_; std::string pipePath_; @@ -88,5 +91,6 @@ protected: #endif }; +} // namespace streamreader #endif diff --git a/server/streamreader/asio_stream.hpp b/server/streamreader/asio_stream.hpp index 7286f166..d625a29d 100644 --- a/server/streamreader/asio_stream.hpp +++ b/server/streamreader/asio_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,8 @@ #include #include - +namespace streamreader +{ template class AsioStream : public PcmStream, public std::enable_shared_from_this> @@ -36,9 +37,12 @@ public: void start() override; void stop() override; + virtual void connect(); + virtual void disconnect(); + protected: - virtual void connect() = 0; - virtual void disconnect() = 0; + virtual void do_connect() = 0; + virtual void do_disconnect() = 0; virtual void on_connect(); virtual void do_read(); void check_state(); @@ -83,7 +87,7 @@ void AsioStream::check_state() state_timer_.async_wait([self, this, last_read](const boost::system::error_code& ec) { if (!ec) { - LOG(DEBUG) << "check state last: " << last_read << ", read: " << bytes_read_ << "\n"; + LOG(DEBUG, "AsioStream") << "check state last: " << last_read << ", read: " << bytes_read_ << "\n"; if (bytes_read_ != last_read) setState(ReaderState::kPlaying); else @@ -104,6 +108,20 @@ void AsioStream::start() } +template +void AsioStream::connect() +{ + do_connect(); +} + + +template +void AsioStream::disconnect() +{ + do_disconnect(); +} + + template void AsioStream::stop() { @@ -126,72 +144,72 @@ void AsioStream::on_connect() template void AsioStream::do_read() { - // LOG(DEBUG) << "do_read\n"; + // LOG(DEBUG, "AsioStream") << "do_read\n"; auto self = this->shared_from_this(); - boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), - [this, self](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR) << "Error reading message: " << ec.message() << ", length: " << length << "\n"; - connect(); - return; - } + boost::asio::async_read( + *stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), [this, self](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n"; + connect(); + return; + } - bytes_read_ += length; - // LOG(DEBUG) << "Read: " << length << " bytes\n"; - // First read after connect. Set the initial read timestamp - // the timestamp will be incremented after encoding, - // since we do not know how much the encoder actually encoded + bytes_read_ += length; + // LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n"; + // First read after connect. Set the initial read timestamp + // the timestamp will be incremented after encoding, + // since we do not know how much the encoder actually encoded - if (!first_) - { - timeval now; - chronos::systemtimeofday(&now); - auto stream2systime_diff = chronos::diff(now, tvEncodedChunk_); - if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_)) - { - LOG(WARNING) << "Stream and system time out of sync: " << stream2systime_diff.count() << "ms, resetting stream time.\n"; - first_ = true; - } - } - if (first_) - { - first_ = false; - chronos::systemtimeofday(&tvEncodedChunk_); - nextTick_ = chronos::getTickCount() + buffer_ms_; - } + if (!first_) + { + timeval now; + chronos::systemtimeofday(&now); + auto stream2systime_diff = chronos::diff(now, tvEncodedChunk_); + if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_)) + { + LOG(WARNING, "AsioStream") << "Stream and system time out of sync: " << stream2systime_diff.count() << "ms, resetting stream time.\n"; + first_ = true; + } + } + if (first_) + { + first_ = false; + chronos::systemtimeofday(&tvEncodedChunk_); + nextTick_ = chronos::getTickCount() + buffer_ms_; + } - encoder_->encode(chunk_.get()); - nextTick_ += chunk_ms_; - long currentTick = chronos::getTickCount(); + encoder_->encode(chunk_.get()); + nextTick_ += chunk_ms_; + long currentTick = chronos::getTickCount(); - // Synchronize read to chunk_ms_ - if (nextTick_ >= currentTick) - { - read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick)); - read_timer_.async_wait([self, this](const boost::system::error_code& ec) { - if (ec) - { - LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; - } - else - { - do_read(); - } - }); - return; - } - // Read took longer, wait for the buffer to fill up - else - { - pcmListener_->onResync(this, currentTick - nextTick_); - nextTick_ = currentTick + buffer_ms_; - first_ = true; - do_read(); - } - }); + // Synchronize read to chunk_ms_ + if (nextTick_ >= currentTick) + { + read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick)); + read_timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (ec) + { + LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n"; + } + else + { + do_read(); + } + }); + return; + } + // Read took longer, wait for the buffer to fill up + else + { + pcmListener_->onResync(this, currentTick - nextTick_); + nextTick_ = currentTick + buffer_ms_; + first_ = true; + do_read(); + } + }); } - +} // namespace streamreader #endif diff --git a/server/streamreader/file_stream.cpp b/server/streamreader/file_stream.cpp index 252a9f3f..c82cf5c1 100644 --- a/server/streamreader/file_stream.cpp +++ b/server/streamreader/file_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -28,85 +28,32 @@ using namespace std; - - -FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri) +namespace streamreader { - ifs.open(uri_.path.c_str(), std::ifstream::in | std::ifstream::binary); - if (!ifs.good()) + +static constexpr auto LOG_TAG = "FileStream"; + + +FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri) +{ + struct stat buffer; + if (stat(uri_.path.c_str(), &buffer) != 0) { - LOG(ERROR) << "failed to open PCM file: \"" + uri_.path + "\"\n"; - throw SnapException("failed to open PCM file: \"" + uri_.path + "\""); + throw SnapException("Failed to open PCM file: \"" + uri_.path + "\""); + } + else if ((buffer.st_mode & S_IFMT) != S_IFREG) + { + throw SnapException("Not a regular file: \"" + uri_.path + "\""); } } -FileStream::~FileStream() +void FileStream::do_connect() { - ifs.close(); + LOG(DEBUG, LOG_TAG) << "connect\n"; + int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); + stream_ = std::make_unique(ioc_, fd); + on_connect(); } - -void FileStream::worker() -{ - timeval tvChunk; - std::unique_ptr chunk(new msg::PcmChunk(sampleFormat_, chunk_ms_)); - - ifs.seekg(0, ifs.end); - size_t length = ifs.tellg(); - ifs.seekg(0, ifs.beg); - - setState(ReaderState::kPlaying); - - while (active_) - { - chronos::systemtimeofday(&tvChunk); - tvEncodedChunk_ = tvChunk; - long nextTick = chronos::getTickCount(); - try - { - while (active_) - { - chunk->timestamp.sec = tvChunk.tv_sec; - chunk->timestamp.usec = tvChunk.tv_usec; - size_t toRead = chunk->payloadSize; - size_t count = 0; - - size_t pos = ifs.tellg(); - size_t left = length - pos; - if (left < toRead) - { - ifs.read(chunk->payload, left); - ifs.seekg(0, ifs.beg); - count = left; - } - ifs.read(chunk->payload + count, toRead - count); - - encoder_->encode(chunk.get()); - if (!active_) - break; - nextTick += chunk_ms_; - chronos::addUs(tvChunk, chunk_ms_ * 1000); - long currentTick = chronos::getTickCount(); - - if (nextTick >= currentTick) - { - // LOG(INFO) << "sleep: " << nextTick - currentTick << "\n"; - if (!sleep(nextTick - currentTick)) - break; - } - else - { - chronos::systemtimeofday(&tvChunk); - tvEncodedChunk_ = tvChunk; - pcmListener_->onResync(this, currentTick - nextTick); - nextTick = currentTick; - } - } - } - catch (const std::exception& e) - { - LOG(ERROR) << "(FileStream) Exception: " << e.what() << std::endl; - } - } -} +} // namespace streamreader diff --git a/server/streamreader/file_stream.hpp b/server/streamreader/file_stream.hpp index 7ddc5f59..1a7ffbe2 100644 --- a/server/streamreader/file_stream.hpp +++ b/server/streamreader/file_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,12 +16,13 @@ along with this program. If not, see . ***/ -#ifndef FILE_STREAM_H -#define FILE_STREAM_H +#ifndef FILE_STREAM_HPP +#define FILE_STREAM_HPP -#include "pcm_stream.hpp" -#include +#include "posix_stream.hpp" +namespace streamreader +{ /// Reads and decodes PCM data from a file /** @@ -29,17 +30,16 @@ * Implements EncoderListener to get the encoded data. * Data is passed to the PcmListener */ -class FileStream : public PcmStream +class FileStream : public PosixStream { public: /// ctor. Encoded PCM data is passed to the PipeListener FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); - ~FileStream() override; protected: - void worker() override; - std::ifstream ifs; + void do_connect() override; }; +} // namespace streamreader #endif diff --git a/server/streamreader/librespot_stream.cpp b/server/streamreader/librespot_stream.cpp index 97993474..2f9f61a7 100644 --- a/server/streamreader/librespot_stream.cpp +++ b/server/streamreader/librespot_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -26,20 +26,26 @@ using namespace std; +namespace streamreader +{ + +static constexpr auto LOG_TAG = "LibrespotStream"; LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : ProcessStream(pcmListener, ioc, uri) { sampleFormat_ = SampleFormat("44100:16:2"); uri_.query["sampleformat"] = sampleFormat_.getFormat(); + wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "7800")); ///< 130min string username = uri_.getQuery("username", ""); string password = uri_.getQuery("password", ""); string cache = uri_.getQuery("cache", ""); - string volume = uri_.getQuery("volume", ""); + string volume = uri_.getQuery("volume", "100"); string bitrate = uri_.getQuery("bitrate", "320"); string devicename = uri_.getQuery("devicename", "Snapcast"); string onevent = uri_.getQuery("onevent", ""); + bool normalize = (uri_.getQuery("normalize", "false") == "true"); if (username.empty() != password.empty()) throw SnapException("missing parameter \"username\" or \"password\" (must provide both, or neither)"); @@ -54,18 +60,18 @@ LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_conte params_ += " --initial-volume \"" + volume + "\""; if (!onevent.empty()) params_ += " --onevent \"" + onevent + "\""; + if (normalize) + params_ += " --enable-volume-normalisation"; + params_ += " --verbose"; if (uri_.query.find("username") != uri_.query.end()) uri_.query["username"] = "xxx"; if (uri_.query.find("password") != uri_.query.end()) uri_.query["password"] = "xxx"; - // LOG(INFO) << "params: " << params << "\n"; + // LOG(INFO, LOG_TAG) << "params: " << params << "\n"; } -LibrespotStream::~LibrespotStream() = default; - - void LibrespotStream::initExeAndPath(const std::string& filename) { path_ = ""; @@ -88,7 +94,7 @@ void LibrespotStream::initExeAndPath(const std::string& filename) } -void LibrespotStream::onStderrMsg(const char* buffer, size_t n) +void LibrespotStream::onStderrMsg(const std::string& line) { static bool libreelec_patched = false; smatch m; @@ -113,13 +119,10 @@ void LibrespotStream::onStderrMsg(const char* buffer, size_t n) // 2016-11-03 09-00-18 [out] INFO:librespot::main_helper: librespot 6fa4e4d (2016-09-21). Built on 2016-10-27. // 2016-11-03 09-00-18 [out] INFO:librespot::session: Connecting to AP lon3-accesspoint-a34.ap.spotify.com:443 // 2016-11-03 09-00-18 [out] INFO:librespot::session: Authenticated ! - watchdog_->trigger(); - string logmsg = utils::string::trim_copy(string(buffer, n)); - if ((logmsg.find("allocated stream") == string::npos) && (logmsg.find("Got channel") == string::npos) && (logmsg.find('\0') == string::npos) && - (logmsg.size() > 4)) + if ((line.find("allocated stream") == string::npos) && (line.find("Got channel") == string::npos) && (line.find('\0') == string::npos) && (line.size() > 4)) { - LOG(INFO) << "(" << getName() << ") " << logmsg << "\n"; + LOG(INFO, LOG_TAG) << "(" << getName() << ") " << line << "\n"; } // Librespot patch: @@ -132,9 +135,9 @@ void LibrespotStream::onStderrMsg(const char* buffer, size_t n) if (!libreelec_patched) { static regex re_nonpatched("Track \"(.*)\" loaded"); - if (regex_search(logmsg, m, re_nonpatched)) + if (regex_search(line, m, re_nonpatched)) { - LOG(INFO) << "metadata: <" << m[1] << ">\n"; + LOG(INFO, LOG_TAG) << "metadata: <" << m[1] << ">\n"; json jtag = {{"TITLE", (string)m[1]}}; setMeta(jtag); @@ -143,28 +146,13 @@ void LibrespotStream::onStderrMsg(const char* buffer, size_t n) // Parse the patched version static regex re_patched("metadata:(.*)"); - if (regex_search(logmsg, m, re_patched)) + if (regex_search(line, m, re_patched)) { - LOG(INFO) << "metadata: <" << m[1] << ">\n"; + LOG(INFO, LOG_TAG) << "metadata: <" << m[1] << ">\n"; setMeta(json::parse(m[1].str())); libreelec_patched = true; } } - -void LibrespotStream::stderrReader() -{ - watchdog_.reset(new Watchdog(this)); - /// 130min - watchdog_->start(130 * 60 * 1000); - ProcessStream::stderrReader(); -} - - -void LibrespotStream::onTimeout(const Watchdog* /*watchdog*/, size_t ms) -{ - LOG(ERROR) << "Spotify timeout: " << ms / 1000 << "\n"; - if (process_) - process_->kill(); -} +} // namespace streamreader diff --git a/server/streamreader/librespot_stream.hpp b/server/streamreader/librespot_stream.hpp index b7e6a11b..3f05ecc0 100644 --- a/server/streamreader/librespot_stream.hpp +++ b/server/streamreader/librespot_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,11 +16,13 @@ along with this program. If not, see . ***/ -#ifndef SPOTIFY_STREAM_H -#define SPOTIFY_STREAM_H +#ifndef SPOTIFY_STREAM_HPP +#define SPOTIFY_STREAM_HPP #include "process_stream.hpp" -#include "watchdog.h" + +namespace streamreader +{ /// Starts librespot and reads PCM data from stdout /** @@ -31,22 +33,17 @@ * snapserver -s "spotify:///librespot?name=Spotify&username=&password=[&devicename=Snapcast][&bitrate=320][&volume=][&cache=]" */ -class LibrespotStream : public ProcessStream, WatchdogListener +class LibrespotStream : public ProcessStream { public: /// ctor. Encoded PCM data is passed to the PipeListener LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); - ~LibrespotStream() override; protected: - std::unique_ptr watchdog_; - - void stderrReader() override; - void onStderrMsg(const char* buffer, size_t n) override; + void onStderrMsg(const std::string& line) override; void initExeAndPath(const std::string& filename) override; - - void onTimeout(const Watchdog* watchdog, size_t ms) override; }; +} // namespace streamreader #endif diff --git a/server/streamreader/pcm_stream.cpp b/server/streamreader/pcm_stream.cpp index 588ac9b3..6f796c98 100644 --- a/server/streamreader/pcm_stream.cpp +++ b/server/streamreader/pcm_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -29,6 +29,8 @@ using namespace std; +namespace streamreader +{ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) @@ -96,28 +98,12 @@ void PcmStream::start() LOG(DEBUG) << "PcmStream start: " << sampleFormat_.getFormat() << "\n"; encoder_->init(this, sampleFormat_); active_ = true; - thread_ = thread(&PcmStream::worker, this); } void PcmStream::stop() { - if (!active_ && !thread_.joinable()) - return; - active_ = false; - cv_.notify_one(); - if (thread_.joinable()) - thread_.join(); -} - - -bool PcmStream::sleep(int32_t ms) -{ - if (ms < 0) - return true; - std::unique_lock lck(mtx_); - return (!cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !active_; })); } @@ -188,3 +174,5 @@ void PcmStream::setMeta(const json& jtag) if (pcmListener_) pcmListener_->onMetaChanged(this); } + +} // namespace streamreader diff --git a/server/streamreader/pcm_stream.hpp b/server/streamreader/pcm_stream.hpp index 10a611ab..73f9963c 100644 --- a/server/streamreader/pcm_stream.hpp +++ b/server/streamreader/pcm_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,8 +16,8 @@ along with this program. If not, see . ***/ -#ifndef PCM_STREAM_H -#define PCM_STREAM_H +#ifndef PCM_STREAM_HPP +#define PCM_STREAM_HPP #include "common/json.hpp" #include "common/sample_format.hpp" @@ -34,6 +34,9 @@ #include +namespace streamreader +{ + class PcmStream; enum class ReaderState @@ -98,13 +101,8 @@ public: protected: - std::condition_variable cv_; - std::mutex mtx_; - std::thread thread_; std::atomic active_; - virtual void worker(){}; - virtual bool sleep(int32_t ms); void setState(const ReaderState& newState); timeval tvEncodedChunk_; @@ -119,5 +117,6 @@ protected: boost::asio::io_context& ioc_; }; +} // namespace streamreader #endif diff --git a/server/streamreader/pipe_stream.cpp b/server/streamreader/pipe_stream.cpp index bb7040ed..be433b5f 100644 --- a/server/streamreader/pipe_stream.cpp +++ b/server/streamreader/pipe_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -30,14 +30,18 @@ using namespace std; +namespace streamreader +{ + +static constexpr auto LOG_TAG = "PipeStream"; -PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri) +PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri) { umask(0); string mode = uri_.getQuery("mode", "create"); - LOG(INFO) << "PipeStream mode: " << mode << "\n"; + LOG(INFO, LOG_TAG) << "PipeStream mode: " << mode << "\n"; if ((mode != "read") && (mode != "create")) throw SnapException("create mode for fifo must be \"read\" or \"create\""); @@ -46,101 +50,14 @@ PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, c if ((mkfifo(uri_.path.c_str(), 0666) != 0) && (errno != EEXIST)) throw SnapException("failed to make fifo \"" + uri_.path + "\": " + cpt::to_string(errno)); } - chunk_ = make_unique(sampleFormat_, chunk_ms_); } -void PipeStream::connect() +void PipeStream::do_connect() { - LOG(DEBUG) << "connect\n"; - auto self = shared_from_this(); - fd_ = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); - stream_ = std::make_unique(ioc_, fd_); + LOG(DEBUG, LOG_TAG) << "connect\n"; + int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); + stream_ = std::make_unique(ioc_, fd); on_connect(); } - - -void PipeStream::disconnect() -{ - stream_->close(); -} - - -void PipeStream::do_read() -{ - auto self = this->shared_from_this(); - try - { - if (fd_ == -1) - throw SnapException("failed to open fifo: \"" + uri_.path + "\""); - - int toRead = chunk_->payloadSize; - int len = 0; - do - { - int count = read(fd_, chunk_->payload + len, toRead - len); - if (count < 0) - { - LOG(DEBUG) << "count < 0: " << errno - << " && idleBytes < maxIdleBytes, ms: " << 1000 * chunk_->payloadSize / (sampleFormat_.rate * sampleFormat_.frameSize) << "\n"; - memset(chunk_->payload + len, 0, toRead - len); - len += toRead - len; - break; - } - else if (count == 0) - { - throw SnapException("end of file"); - } - else - { - // LOG(DEBUG) << "count: " << count << "\n"; - len += count; - bytes_read_ += len; - } - } while (len < toRead); - - if (first_) - { - first_ = false; - chronos::systemtimeofday(&tvEncodedChunk_); - nextTick_ = chronos::getTickCount() + buffer_ms_; - } - encoder_->encode(chunk_.get()); - nextTick_ += chunk_ms_; - long currentTick = chronos::getTickCount(); - - if (nextTick_ >= currentTick) - { - read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick)); - read_timer_.async_wait([self, this](const boost::system::error_code& ec) { - if (ec) - { - LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; - } - else - { - do_read(); - } - }); - return; - } - else - { - pcmListener_->onResync(this, currentTick - nextTick_); - nextTick_ = currentTick + buffer_ms_; - first_ = true; - do_read(); - } - - lastException_ = ""; - } - catch (const std::exception& e) - { - if (lastException_ != e.what()) - { - LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl; - lastException_ = e.what(); - } - connect(); - } } diff --git a/server/streamreader/pipe_stream.hpp b/server/streamreader/pipe_stream.hpp index 2ba70fff..4c4b590b 100644 --- a/server/streamreader/pipe_stream.hpp +++ b/server/streamreader/pipe_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,10 +16,13 @@ along with this program. If not, see . ***/ -#ifndef PIPE_STREAM_H -#define PIPE_STREAM_H +#ifndef PIPE_STREAM_HPP +#define PIPE_STREAM_HPP -#include "asio_stream.hpp" +#include "posix_stream.hpp" + +namespace streamreader +{ using boost::asio::posix::stream_descriptor; @@ -30,19 +33,16 @@ using boost::asio::posix::stream_descriptor; * Implements EncoderListener to get the encoded data. * Data is passed to the PcmListener */ -class PipeStream : public AsioStream +class PipeStream : public PosixStream { public: /// ctor. Encoded PCM data is passed to the PipeListener PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); protected: - void connect() override; - void disconnect() override; - void do_read() override; - std::string lastException_; - int fd_; + void do_connect() override; }; +} // namespace streamreader #endif diff --git a/server/streamreader/posix_stream.cpp b/server/streamreader/posix_stream.cpp new file mode 100644 index 00000000..9bfe7da5 --- /dev/null +++ b/server/streamreader/posix_stream.cpp @@ -0,0 +1,164 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2019 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#include +#include +#include +#include +#include + +#include "common/aixlog.hpp" +#include "common/snap_exception.hpp" +#include "common/str_compat.hpp" +#include "posix_stream.hpp" + + +using namespace std; + +namespace streamreader +{ + +static constexpr auto LOG_TAG = "PosixStream"; + + +PosixStream::PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri) +{ + if (uri_.query.find("dryout_ms") != uri_.query.end()) + dryout_ms_ = cpt::stoul(uri_.query["dryout_ms"]); + else + dryout_ms_ = 2000; +} + + +void PosixStream::connect() +{ + if (!active_) + return; + + try + { + do_connect(); + } + catch (const std::exception& e) + { + LOG(ERROR, LOG_TAG) << "Connect exception: " << e.what() << "\n"; + auto self = this->shared_from_this(); + read_timer_.expires_after(std::chrono::milliseconds(100)); + read_timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n"; + } + else + { + connect(); + } + }); + } +} + + +void PosixStream::do_disconnect() +{ + if (stream_->is_open()) + stream_->close(); +} + + +void PosixStream::do_read() +{ + try + { + if (!stream_->is_open()) + throw SnapException("failed to open stream: \"" + uri_.path + "\""); + + int toRead = chunk_->payloadSize; + int len = 0; + do + { + int count = read(stream_->native_handle(), chunk_->payload + len, toRead - len); + if (count < 0) + { + LOG(DEBUG, LOG_TAG) << "count < 0: " << errno + << " && idleBytes < maxIdleBytes, ms: " << 1000 * chunk_->payloadSize / (sampleFormat_.rate * sampleFormat_.frameSize) + << "\n"; + memset(chunk_->payload + len, 0, toRead - len); + len += toRead - len; + break; + } + else if (count == 0) + { + throw SnapException("end of file"); + } + else + { + // LOG(DEBUG) << "count: " << count << "\n"; + len += count; + bytes_read_ += len; + } + } while (len < toRead); + + if (first_) + { + first_ = false; + chronos::systemtimeofday(&tvEncodedChunk_); + nextTick_ = chronos::getTickCount() + buffer_ms_; + } + encoder_->encode(chunk_.get()); + nextTick_ += chunk_ms_; + long currentTick = chronos::getTickCount(); + + if (nextTick_ >= currentTick) + { + auto self = this->shared_from_this(); + read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick)); + read_timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n"; + } + else + { + do_read(); + } + }); + return; + } + else + { + pcmListener_->onResync(this, currentTick - nextTick_); + nextTick_ = currentTick + buffer_ms_; + first_ = true; + do_read(); + } + + lastException_ = ""; + } + catch (const std::exception& e) + { + if (lastException_ != e.what()) + { + LOG(ERROR, LOG_TAG) << "Exception: " << e.what() << std::endl; + lastException_ = e.what(); + } + disconnect(); + connect(); + } +} + +} // namespace streamreader diff --git a/server/streamreader/posix_stream.hpp b/server/streamreader/posix_stream.hpp new file mode 100644 index 00000000..4e6fc0f1 --- /dev/null +++ b/server/streamreader/posix_stream.hpp @@ -0,0 +1,52 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef POSIX_STREAM_HPP +#define POSIX_STREAM_HPP + +#include "asio_stream.hpp" + +namespace streamreader +{ + +using boost::asio::posix::stream_descriptor; + + +/// Reads and decodes PCM data from a file descriptor +/** + * Reads PCM from a file descriptor and passes the data to an encoder. + * Implements EncoderListener to get the encoded data. + * Data is passed to the PcmListener + */ +class PosixStream : public AsioStream +{ +public: + /// ctor. Encoded PCM data is passed to the PipeListener + PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); + +protected: + void connect() override; + void do_disconnect() override; + void do_read() override; + std::string lastException_; + size_t dryout_ms_; +}; + +} // namespace streamreader + +#endif diff --git a/server/streamreader/process.hpp b/server/streamreader/process.hpp index 46d2a825..4f6ac622 100644 --- a/server/streamreader/process.hpp +++ b/server/streamreader/process.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or @@ -27,6 +27,9 @@ // Copyright (c) 2015-2016 Ole Christian Eidheim // Thanks, Christian :-) +namespace streamreader +{ + /// Create a new process given command and run path. /// Thus, at the moment, if read_stdout==nullptr, read_stderr==nullptr and open_stdin==false, /// the stdout, stderr and stdin are sent to the parent process instead. @@ -239,4 +242,6 @@ private: } }; +} // namespace streamreader + #endif // TINY_PROCESS_LIBRARY_HPP_ diff --git a/server/streamreader/process_stream.cpp b/server/streamreader/process_stream.cpp index 93f7e1f5..a291cdb5 100644 --- a/server/streamreader/process_stream.cpp +++ b/server/streamreader/process_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -29,25 +29,19 @@ using namespace std; +namespace streamreader +{ + +static constexpr auto LOG_TAG = "ProcessStream"; ProcessStream::ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) - : PcmStream(pcmListener, ioc, uri), path_(""), process_(nullptr) + : PosixStream(pcmListener, ioc, uri), path_(""), process_(nullptr) { params_ = uri_.getQuery("params"); - logStderr_ = (uri_.getQuery("logStderr", "false") == "true"); - - if (uri_.query.find("dryout_ms") != uri_.query.end()) - dryoutMs_ = cpt::stoul(uri_.query["dryout_ms"]); - else - dryoutMs_ = 2000; -} - - -ProcessStream::~ProcessStream() -{ - if (process_) - process_->kill(); + wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "0")); + LOG(DEBUG, LOG_TAG) << "Watchdog timeout: " << wd_timeout_sec_ << "\n"; + logStderr_ = (uri_.getQuery("log_stderr", "false") == "true"); } @@ -103,140 +97,81 @@ void ProcessStream::initExeAndPath(const std::string& filename) } -void ProcessStream::start() +void ProcessStream::do_connect() { + if (!active_) + return; initExeAndPath(uri_.path); - PcmStream::start(); + LOG(DEBUG, LOG_TAG) << "Launching: '" << path_ + exe_ << "', with params: '" << params_ << "', in path: '" << path_ << "'\n"; + process_.reset(new Process(path_ + exe_ + " " + params_, path_)); + int flags = fcntl(process_->getStdout(), F_GETFL, 0); + fcntl(process_->getStdout(), F_SETFL, flags | O_NONBLOCK); + stream_ = make_unique(ioc_, process_->getStdout()); + stream_stderr_ = make_unique(ioc_, process_->getStderr()); + on_connect(); + if (wd_timeout_sec_ > 0) + { + watchdog_ = make_unique(ioc_, this); + watchdog_->start(std::chrono::seconds(wd_timeout_sec_)); + } + else + { + watchdog_ = nullptr; + } + stderrReadLine(); } -void ProcessStream::stop() +void ProcessStream::do_disconnect() { if (process_) process_->kill(); - PcmStream::stop(); - - /// thread is detached, so it is not joinable - if (stderrReaderThread_.joinable()) - stderrReaderThread_.join(); } -void ProcessStream::onStderrMsg(const char* buffer, size_t n) +void ProcessStream::onStderrMsg(const std::string& line) { if (logStderr_) { - string line = utils::string::trim_copy(string(buffer, n)); - if ((line.find('\0') == string::npos) && !line.empty()) - LOG(INFO) << "(" << getName() << ") " << line << "\n"; + LOG(INFO, LOG_TAG) << "(" << getName() << ") " << line << "\n"; } } -void ProcessStream::stderrReader() +void ProcessStream::stderrReadLine() { - size_t buffer_size = 8192; - auto buffer = std::unique_ptr(new char[buffer_size]); - ssize_t n; - stringstream message; - while (active_ && (n = read(process_->getStderr(), buffer.get(), buffer_size)) > 0) - onStderrMsg(buffer.get(), n); + const std::string delimiter = "\n"; + auto self(shared_from_this()); + boost::asio::async_read_until( + *stream_stderr_, streambuf_stderr_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n"; + return; + } + + if (watchdog_) + watchdog_->trigger(); + + // Extract up to the first delimiter. + std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()}; + if (!line.empty()) + { + if (line.back() == '\r') + line.resize(line.size() - 1); + onStderrMsg(line); + } + streambuf_stderr_.consume(bytes_transferred); + stderrReadLine(); + }); } -void ProcessStream::worker() +void ProcessStream::onTimeout(const Watchdog& /*watchdog*/, std::chrono::milliseconds ms) { - timeval tvChunk; - std::unique_ptr chunk(new msg::PcmChunk(sampleFormat_, chunk_ms_)); - setState(ReaderState::kPlaying); - string lastException = ""; - - while (active_) - { - process_.reset(new Process(path_ + exe_ + " " + params_, path_)); - int flags = fcntl(process_->getStdout(), F_GETFL, 0); - fcntl(process_->getStdout(), F_SETFL, flags | O_NONBLOCK); - - stderrReaderThread_ = thread(&ProcessStream::stderrReader, this); - stderrReaderThread_.detach(); - - chronos::systemtimeofday(&tvChunk); - tvEncodedChunk_ = tvChunk; - long nextTick = chronos::getTickCount(); - int idleBytes = 0; - int maxIdleBytes = sampleFormat_.rate * sampleFormat_.frameSize * dryoutMs_ / 1000; - try - { - while (active_) - { - chunk->timestamp.sec = tvChunk.tv_sec; - chunk->timestamp.usec = tvChunk.tv_usec; - int toRead = chunk->payloadSize; - int len = 0; - do - { - int count = read(process_->getStdout(), chunk->payload + len, toRead - len); - if (count < 0 && idleBytes < maxIdleBytes) - { - memset(chunk->payload + len, 0, toRead - len); - idleBytes += toRead - len; - len += toRead - len; - continue; - } - if (count < 0) - { - setState(ReaderState::kIdle); - if (!sleep(100)) - break; - } - else if (count == 0) - throw SnapException("end of file"); - else - { - len += count; - idleBytes = 0; - } - } while ((len < toRead) && active_); - - if (!active_) - break; - - encoder_->encode(chunk.get()); - - if (!active_) - break; - - nextTick += chunk_ms_; - chronos::addUs(tvChunk, chunk_ms_ * 1000); - long currentTick = chronos::getTickCount(); - - if (nextTick >= currentTick) - { - setState(ReaderState::kPlaying); - if (!sleep(nextTick - currentTick)) - break; - } - else - { - chronos::systemtimeofday(&tvChunk); - tvEncodedChunk_ = tvChunk; - pcmListener_->onResync(this, currentTick - nextTick); - nextTick = currentTick; - } - - lastException = ""; - } - } - catch (const std::exception& e) - { - if (lastException != e.what()) - { - LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl; - lastException = e.what(); - } - process_->kill(); - if (!sleep(30000)) - break; - } - } + LOG(ERROR, LOG_TAG) << "Watchdog timeout: " << ms.count() / 1000 << "s\n"; + if (process_) + process_->kill(); } + +} // namespace streamreader diff --git a/server/streamreader/process_stream.hpp b/server/streamreader/process_stream.hpp index 53ec18e3..5cdea921 100644 --- a/server/streamreader/process_stream.hpp +++ b/server/streamreader/process_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,16 +16,18 @@ along with this program. If not, see . ***/ -#ifndef PROCESS_STREAM_H -#define PROCESS_STREAM_H +#ifndef PROCESS_STREAM_HPP +#define PROCESS_STREAM_HPP #include #include -#include "pcm_stream.hpp" +#include "posix_stream.hpp" #include "process.hpp" +#include "watchdog.hpp" -// TODO: switch to AsioStream, maybe use boost::process library +namespace streamreader +{ /// Starts an external process and reads and PCM data from stdout /** @@ -33,33 +35,39 @@ * Implements EncoderListener to get the encoded data. * Data is passed to the PcmListener */ -class ProcessStream : public PcmStream +class ProcessStream : public PosixStream, public WatchdogListener { public: /// ctor. Encoded PCM data is passed to the PipeListener ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); - ~ProcessStream() override; - - void start() override; - void stop() override; + ~ProcessStream() override = default; protected: + void do_connect() override; + void do_disconnect() override; + std::string exe_; std::string path_; std::string params_; std::unique_ptr process_; - std::thread stderrReaderThread_; - bool logStderr_; - size_t dryoutMs_; - void worker() override; - virtual void stderrReader(); - virtual void onStderrMsg(const char* buffer, size_t n); + bool logStderr_; + boost::asio::streambuf streambuf_stderr_; + std::unique_ptr stream_stderr_; + + // void worker() override; + virtual void stderrReadLine(); + virtual void onStderrMsg(const std::string& line); virtual void initExeAndPath(const std::string& filename); bool fileExists(const std::string& filename); std::string findExe(const std::string& filename); + + size_t wd_timeout_sec_; + std::unique_ptr watchdog_; + void onTimeout(const Watchdog& watchdog, std::chrono::milliseconds ms) override; }; +} // namespace streamreader #endif diff --git a/server/streamreader/stream_manager.cpp b/server/streamreader/stream_manager.cpp index f0516596..d1491890 100644 --- a/server/streamreader/stream_manager.cpp +++ b/server/streamreader/stream_manager.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,13 +25,14 @@ #include "file_stream.hpp" #include "librespot_stream.hpp" #include "pipe_stream.hpp" -// #include "pipe_stream_old.hpp" #include "process_stream.hpp" #include "tcp_stream.hpp" using namespace std; +namespace streamreader +{ StreamManager::StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultChunkBufferMs) @@ -64,10 +65,6 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri) { stream = make_shared(pcmListener_, ioc_, streamUri); } - // else if (streamUri.scheme == "pipe.old") - // { - // stream = make_shared(pcmListener_, ioc_, streamUri); - // } else if (streamUri.scheme == "file") { stream = make_shared(pcmListener_, ioc_, streamUri); @@ -95,7 +92,7 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri) if (stream) { - for (auto s : streams_) + for (const auto& s : streams_) { if (s->getName() == stream->getName()) throw SnapException("Stream with name \"" + stream->getName() + "\" already exists"); @@ -165,3 +162,5 @@ json StreamManager::toJson() const result.push_back(stream->toJson()); return result; } + +} // namespace streamreader diff --git a/server/streamreader/stream_manager.hpp b/server/streamreader/stream_manager.hpp index 0c13fc00..292b7850 100644 --- a/server/streamreader/stream_manager.hpp +++ b/server/streamreader/stream_manager.hpp @@ -1,5 +1,23 @@ -#ifndef PCM_READER_FACTORY_H -#define PCM_READER_FACTORY_H +/*** + This file is part of snapcast + Copyright (C) 2014-2020 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef STREAM_MANAGER_HPP +#define STREAM_MANAGER_HPP #include "pcm_stream.hpp" #include @@ -7,6 +25,9 @@ #include #include +namespace streamreader +{ + typedef std::shared_ptr PcmStreamPtr; class StreamManager @@ -33,5 +54,6 @@ private: boost::asio::io_context& ioc_; }; +} // namespace streamreader #endif diff --git a/server/streamreader/stream_uri.cpp b/server/streamreader/stream_uri.cpp index 0f611ff7..da2280e4 100644 --- a/server/streamreader/stream_uri.cpp +++ b/server/streamreader/stream_uri.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,6 +25,8 @@ using namespace std; namespace strutils = utils::string; +namespace streamreader +{ StreamUri::StreamUri(const std::string& uri) { @@ -32,7 +34,6 @@ StreamUri::StreamUri(const std::string& uri) } - void StreamUri::parse(const std::string& streamUri) { // https://en.wikipedia.org/wiki/Uniform_Resource_Identifier @@ -146,3 +147,4 @@ std::string StreamUri::getQuery(const std::string& key, const std::string& def) return iter->second; return def; } +} diff --git a/server/streamreader/stream_uri.hpp b/server/streamreader/stream_uri.hpp index b8875549..bc4fd97a 100644 --- a/server/streamreader/stream_uri.hpp +++ b/server/streamreader/stream_uri.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,8 +16,8 @@ along with this program. If not, see . ***/ -#ifndef READER_URI_H -#define READER_URI_H +#ifndef STREAM_URI_HPP +#define STREAM_URI_HPP #include #include @@ -27,6 +27,8 @@ using json = nlohmann::json; +namespace streamreader +{ // scheme:[//[user:password@]host[:port]][/]path[?query][#fragment] struct StreamUri @@ -56,5 +58,6 @@ struct StreamUri std::string toString() const; }; +} // namespace streamreader #endif diff --git a/server/streamreader/tcp_stream.cpp b/server/streamreader/tcp_stream.cpp index 6409db23..c30b7ecc 100644 --- a/server/streamreader/tcp_stream.cpp +++ b/server/streamreader/tcp_stream.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -32,7 +32,8 @@ using namespace std; - +namespace streamreader +{ TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri), reconnect_timer_(ioc) @@ -62,7 +63,7 @@ TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con } -void TcpStream::connect() +void TcpStream::do_connect() { if (!active_) return; @@ -108,7 +109,7 @@ void TcpStream::connect() } -void TcpStream::disconnect() +void TcpStream::do_disconnect() { if (stream_) stream_->close(); @@ -116,3 +117,4 @@ void TcpStream::disconnect() acceptor_->cancel(); reconnect_timer_.cancel(); } +} diff --git a/server/streamreader/tcp_stream.hpp b/server/streamreader/tcp_stream.hpp index 01cc6602..b5a599ea 100644 --- a/server/streamreader/tcp_stream.hpp +++ b/server/streamreader/tcp_stream.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,13 +16,15 @@ along with this program. If not, see . ***/ -#ifndef TCP_STREAM_H -#define TCP_STREAM_H +#ifndef TCP_STREAM_HPP +#define TCP_STREAM_HPP #include "asio_stream.hpp" using boost::asio::ip::tcp; +namespace streamreader +{ /// Reads and decodes PCM data from a named pipe /** @@ -37,8 +39,8 @@ public: TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); protected: - void connect() override; - void disconnect() override; + void do_connect() override; + void do_disconnect() override; std::unique_ptr acceptor_; std::string host_; size_t port_; @@ -46,5 +48,6 @@ protected: boost::asio::steady_timer reconnect_timer_; }; +} // namespace streamreader #endif diff --git a/server/streamreader/watchdog.cpp b/server/streamreader/watchdog.cpp index 44ed5dd4..a194fdc7 100644 --- a/server/streamreader/watchdog.cpp +++ b/server/streamreader/watchdog.cpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,14 +16,16 @@ along with this program. If not, see . ***/ -#include "watchdog.h" +#include "watchdog.hpp" #include using namespace std; +namespace streamreader +{ -Watchdog::Watchdog(WatchdogListener* listener) : listener_(listener), thread_(nullptr), active_(false) +Watchdog::Watchdog(boost::asio::io_context& ioc, WatchdogListener* listener) : timer_(ioc), listener_(listener) { } @@ -34,49 +36,29 @@ Watchdog::~Watchdog() } -void Watchdog::start(size_t timeoutMs) +void Watchdog::start(const std::chrono::milliseconds& timeout) { - timeoutMs_ = timeoutMs; - if (!thread_ || !active_) - { - active_ = true; - thread_.reset(new thread(&Watchdog::worker, this)); - } - else - trigger(); + timeout_ms_ = timeout; + trigger(); } void Watchdog::stop() { - active_ = false; - trigger(); - if (thread_ && thread_->joinable()) - thread_->join(); - thread_ = nullptr; + timer_.cancel(); } void Watchdog::trigger() { - // std::unique_lock lck(mtx_); - cv_.notify_one(); -} - - -void Watchdog::worker() -{ - while (active_) - { - std::unique_lock lck(mtx_); - if (cv_.wait_for(lck, std::chrono::milliseconds(timeoutMs_)) == std::cv_status::timeout) + timer_.cancel(); + timer_.expires_after(timeout_ms_); + timer_.async_wait([this](const boost::system::error_code& ec) { + if (!ec) { - if (listener_) - { - listener_->onTimeout(this, timeoutMs_); - break; - } + listener_->onTimeout(*this, timeout_ms_); } - } - active_ = false; + }); } + +} // namespace streamreader diff --git a/server/streamreader/watchdog.h b/server/streamreader/watchdog.hpp similarity index 65% rename from server/streamreader/watchdog.h rename to server/streamreader/watchdog.hpp index 528cced9..09b15aa5 100644 --- a/server/streamreader/watchdog.h +++ b/server/streamreader/watchdog.hpp @@ -1,6 +1,6 @@ /*** This file is part of snapcast - Copyright (C) 2014-2019 Johannes Pohl + Copyright (C) 2014-2020 Johannes Pohl This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,15 +16,14 @@ along with this program. If not, see . ***/ -#ifndef WATCH_DOG_H -#define WATCH_DOG_H +#ifndef WATCH_DOG_HPP +#define WATCH_DOG_HPP -#include -#include +#include #include -#include -#include +namespace streamreader +{ class Watchdog; @@ -32,7 +31,7 @@ class Watchdog; class WatchdogListener { public: - virtual void onTimeout(const Watchdog* watchdog, size_t ms) = 0; + virtual void onTimeout(const Watchdog& watchdog, std::chrono::milliseconds ms) = 0; }; @@ -40,23 +39,19 @@ public: class Watchdog { public: - Watchdog(WatchdogListener* listener = nullptr); + Watchdog(boost::asio::io_context& ioc, WatchdogListener* listener = nullptr); virtual ~Watchdog(); - void start(size_t timeoutMs); + void start(const std::chrono::milliseconds& timeout); void stop(); void trigger(); private: + boost::asio::steady_timer timer_; WatchdogListener* listener_; - std::condition_variable cv_; - std::mutex mtx_; - std::unique_ptr thread_; - size_t timeoutMs_; - std::atomic active_; - - void worker(); + std::chrono::milliseconds timeout_ms_; }; +} // namespace streamreader #endif