diff --git a/server/streamreader/asio_stream.hpp b/server/streamreader/asio_stream.hpp new file mode 100644 index 00000000..617fb0e1 --- /dev/null +++ b/server/streamreader/asio_stream.hpp @@ -0,0 +1,144 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2019 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef ASIO_STREAM_HPP +#define ASIO_STREAM_HPP + +#include "pcm_stream.hpp" +#include + + +template +class AsioStream : public PcmStream, public std::enable_shared_from_this> +{ +public: + /// ctor. Encoded PCM data is passed to the PipeListener + AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); + + void start() override; + void stop() override; + +protected: + virtual void connect() = 0; + virtual void disconnect() = 0; + virtual void on_connect(); + virtual void do_read(); + std::unique_ptr chunk_; + timeval tv_chunk_; + bool first_; + long nextTick_; + boost::asio::deadline_timer timer_; + std::unique_ptr stream_; +}; + + + +template +AsioStream::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), timer_(ioc) +{ + chunk_ = std::make_unique(sampleFormat_, pcmReadMs_); +} + + +template +void AsioStream::start() +{ + encoder_->init(this, sampleFormat_); + connect(); +} + + +template +void AsioStream::stop() +{ + timer_.cancel(); + disconnect(); +} + + +template +void AsioStream::on_connect() +{ + chronos::systemtimeofday(&tv_chunk_); + tvEncodedChunk_ = tv_chunk_; + nextTick_ = chronos::getTickCount(); + first_ = true; + do_read(); +} + + +template +void AsioStream::do_read() +{ + // LOG(DEBUG) << "do_read\n"; + auto self = this->shared_from_this(); + chunk_->timestamp.sec = tv_chunk_.tv_sec; + chunk_->timestamp.usec = tv_chunk_.tv_usec; + boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), + [this, self](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR) << "Error reading message: " << ec.message() << ", length: " << length << "\n"; + connect(); + return; + } + // LOG(DEBUG) << "Read: " << length << " bytes\n"; + if (first_) + { + first_ = false; + chronos::systemtimeofday(&tv_chunk_); + chunk_->timestamp.sec = tv_chunk_.tv_sec; + chunk_->timestamp.usec = tv_chunk_.tv_usec; + tvEncodedChunk_ = tv_chunk_; + nextTick_ = chronos::getTickCount(); + } + encoder_->encode(chunk_.get()); + nextTick_ += pcmReadMs_; + chronos::addUs(tv_chunk_, pcmReadMs_ * 1000); + long currentTick = chronos::getTickCount(); + + if (nextTick_ >= currentTick) + { + setState(kPlaying); + timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); + timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (ec) + { + LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; + } + else + { + do_read(); + } + }); + return; + } + else + { + chronos::systemtimeofday(&tv_chunk_); + tvEncodedChunk_ = tv_chunk_; + pcmListener_->onResync(this, currentTick - nextTick_); + nextTick_ = currentTick; + do_read(); + } + }); +} + + + +#endif diff --git a/server/streamreader/pipe_stream.cpp b/server/streamreader/pipe_stream.cpp index d8ed163f..17da72de 100644 --- a/server/streamreader/pipe_stream.cpp +++ b/server/streamreader/pipe_stream.cpp @@ -33,7 +33,8 @@ using namespace std; -PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), timer_(ioc) +PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) + : AsioStream(pcmListener, ioc, uri) { umask(0); string mode = uri_.getQuery("mode", "create"); @@ -51,92 +52,21 @@ PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, c } -PipeStream::~PipeStream() +void PipeStream::connect() { - fifo_->close(); -} - - -void PipeStream::start() -{ - encoder_->init(this, sampleFormat_); - active_ = true; - do_accept(); -} - - -void PipeStream::do_accept() -{ - LOG(DEBUG) << "do_accept\n"; + LOG(DEBUG) << "connect\n"; auto self = shared_from_this(); auto fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); - fifo_ = std::make_unique(ioc_, fd); - chronos::systemtimeofday(&tv_chunk_); - tvEncodedChunk_ = tv_chunk_; - nextTick_ = chronos::getTickCount(); - first_ = true; - do_read(); + stream_ = std::make_unique(ioc_, fd); + on_connect(); } -void PipeStream::do_read() +void PipeStream::disconnect() { - // LOG(DEBUG) << "do_read\n"; - auto self = shared_from_this(); - chunk_->timestamp.sec = tv_chunk_.tv_sec; - chunk_->timestamp.usec = tv_chunk_.tv_usec; - boost::asio::async_read(*fifo_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), - [this, self](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR) << "Error reading message: " << ec.message() << ", length: " << length << "\n"; - do_accept(); - return; - } - // LOG(DEBUG) << "Read: " << length << " bytes\n"; - if (first_) - { - first_ = false; - chronos::systemtimeofday(&tv_chunk_); - chunk_->timestamp.sec = tv_chunk_.tv_sec; - chunk_->timestamp.usec = tv_chunk_.tv_usec; - tvEncodedChunk_ = tv_chunk_; - nextTick_ = chronos::getTickCount(); - } - encoder_->encode(chunk_.get()); - nextTick_ += pcmReadMs_; - chronos::addUs(tv_chunk_, pcmReadMs_ * 1000); - long currentTick = chronos::getTickCount(); - - if (nextTick_ >= currentTick) - { - setState(kPlaying); - timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); - timer_.async_wait([self, this](const boost::system::error_code& ec) { - if (ec) - { - LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; - } - else - { - do_read(); - } - }); - return; - } - else - { - chronos::systemtimeofday(&tv_chunk_); - tvEncodedChunk_ = tv_chunk_; - pcmListener_->onResync(this, currentTick - nextTick_); - nextTick_ = currentTick; - do_read(); - } - }); + stream_->close(); } - - // void PipeStream::worker() // { // timeval tvChunk; diff --git a/server/streamreader/pipe_stream.hpp b/server/streamreader/pipe_stream.hpp index 623a7565..b5934446 100644 --- a/server/streamreader/pipe_stream.hpp +++ b/server/streamreader/pipe_stream.hpp @@ -19,8 +19,9 @@ #ifndef PIPE_STREAM_H #define PIPE_STREAM_H -#include "pcm_stream.hpp" -#include +#include "asio_stream.hpp" + +using boost::asio::posix::stream_descriptor; /// Reads and decodes PCM data from a named pipe @@ -29,24 +30,15 @@ * Implements EncoderListener to get the encoded data. * Data is passed to the PcmListener */ -class PipeStream : public PcmStream, public std::enable_shared_from_this +class PipeStream : public AsioStream { public: /// ctor. Encoded PCM data is passed to the PipeListener PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); - ~PipeStream() override; - - void start() override; protected: - void do_accept(); - void do_read(); - std::unique_ptr chunk_; - timeval tv_chunk_; - bool first_; - long nextTick_; - boost::asio::deadline_timer timer_; - std::unique_ptr fifo_; + void connect() override; + void disconnect() override; }; diff --git a/server/streamreader/tcp_stream.cpp b/server/streamreader/tcp_stream.cpp index a84a2c8c..1a75ad9f 100644 --- a/server/streamreader/tcp_stream.cpp +++ b/server/streamreader/tcp_stream.cpp @@ -33,7 +33,7 @@ using namespace std; -TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), timer_(ioc) +TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri) { size_t port = 4953; try @@ -46,35 +46,18 @@ TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con LOG(INFO) << "TcpStream port: " << port << "\n"; acceptor_ = make_unique(ioc_, tcp::endpoint(tcp::v4(), port)); - chunk_ = make_unique(sampleFormat_, pcmReadMs_); - LOG(DEBUG) << "Chunk size: " << chunk_->payloadSize << "\n"; } -TcpStream::~TcpStream() -{ -} - - -void TcpStream::start() -{ - encoder_->init(this, sampleFormat_); - active_ = true; - do_accept(); -} - - -void TcpStream::do_accept() +void TcpStream::connect() { auto self = shared_from_this(); acceptor_->async_accept([this, self](boost::system::error_code ec, tcp::socket socket) { if (!ec) { LOG(DEBUG) << "New client connection\n"; - socket_ = make_unique(move(socket)); - tv_chunk_.tv_sec = 0; - first_ = true; - do_read(); + stream_ = make_unique(move(socket)); + on_connect(); } else { @@ -84,57 +67,7 @@ void TcpStream::do_accept() } -void TcpStream::do_read() +void TcpStream::disconnect() { - auto self = shared_from_this(); - chunk_->timestamp.sec = tv_chunk_.tv_sec; - chunk_->timestamp.usec = tv_chunk_.tv_usec; - boost::asio::async_read(*socket_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), - [this, self](boost::system::error_code ec, std::size_t length) mutable { - if (ec) - { - LOG(ERROR) << "Error reading message: " << ec.message() << "\n"; - do_accept(); - return; - } - LOG(DEBUG) << "Read: " << length << " bytes\n"; - if (first_) - { - first_ = false; - chronos::systemtimeofday(&tv_chunk_); - chunk_->timestamp.sec = tv_chunk_.tv_sec; - chunk_->timestamp.usec = tv_chunk_.tv_usec; - tvEncodedChunk_ = tv_chunk_; - nextTick_ = chronos::getTickCount(); - } - encoder_->encode(chunk_.get()); - nextTick_ += pcmReadMs_; - chronos::addUs(tv_chunk_, pcmReadMs_ * 1000); - long currentTick = chronos::getTickCount(); - - if (nextTick_ >= currentTick) - { - setState(kPlaying); - timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); - timer_.async_wait([self, this](const boost::system::error_code& ec) { - if (ec) - { - LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; - } - else - { - do_read(); - } - }); - return; - } - else - { - chronos::systemtimeofday(&tv_chunk_); - tvEncodedChunk_ = tv_chunk_; - pcmListener_->onResync(this, currentTick - nextTick_); - nextTick_ = currentTick; - do_read(); - } - }); + stream_->close(); } diff --git a/server/streamreader/tcp_stream.hpp b/server/streamreader/tcp_stream.hpp index e4c64b6c..57b72fe0 100644 --- a/server/streamreader/tcp_stream.hpp +++ b/server/streamreader/tcp_stream.hpp @@ -19,9 +19,7 @@ #ifndef TCP_STREAM_H #define TCP_STREAM_H -#include "pcm_stream.hpp" -#include -#include +#include "asio_stream.hpp" using boost::asio::ip::tcp; @@ -32,25 +30,16 @@ using boost::asio::ip::tcp; * Implements EncoderListener to get the encoded data. * Data is passed to the PcmListener */ -class TcpStream : public PcmStream, public std::enable_shared_from_this +class TcpStream : public AsioStream { public: /// ctor. Encoded PCM data is passed to the PipeListener TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); - ~TcpStream() override; - - void start() override; protected: - void do_accept(); - void do_read(); + void connect() override; + void disconnect() override; std::unique_ptr acceptor_; - std::unique_ptr socket_; - std::unique_ptr chunk_; - timeval tv_chunk_; - bool first_; - long nextTick_; - boost::asio::deadline_timer timer_; };