Switch stream readers to use asio event loop

This commit is contained in:
badaix 2020-01-03 22:40:34 +01:00
parent 3eab397543
commit 6d7e25e9af
30 changed files with 630 additions and 583 deletions

View file

@ -159,7 +159,7 @@ Snapserver supports [librespot](https://github.com/librespot-org/librespot) with
### Process
Snapserver can start any process and read PCM data from the stdout of the process:
Configure snapserver with `stream = process:///path/to/process?name=Process[&params=<--my list --of params>][&logStderr=false]`
Configure snapserver with `stream = process:///path/to/process?name=Process[&params=<--my list --of params>][&log_stderr=false]`
### Line-in

View file

@ -14,6 +14,7 @@ set(SERVER_SOURCES
streamreader/pcm_stream.cpp
streamreader/tcp_stream.cpp
streamreader/pipe_stream.cpp
streamreader/posix_stream.cpp
streamreader/file_stream.cpp
streamreader/airplay_stream.cpp
streamreader/librespot_stream.cpp

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -25,6 +25,7 @@
#include <iostream>
using namespace std;
using namespace streamreader;
using json = nlohmann::json;

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef STREAM_SERVER_H
#define STREAM_SERVER_H
#ifndef STREAM_SERVER_HPP
#define STREAM_SERVER_HPP
#include <boost/asio.hpp>
#include <memory>
@ -38,6 +38,7 @@
#include "stream_session.hpp"
#include "streamreader/stream_manager.hpp"
using namespace streamreader;
using boost::asio::ip::tcp;
using acceptor_ptr = std::unique_ptr<tcp::acceptor>;

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -23,6 +23,10 @@
#include <iostream>
using namespace std;
using namespace streamreader;
static constexpr auto LOG_TAG = "StreamSession";
StreamSession::StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket)
@ -48,7 +52,7 @@ void StreamSession::read_next()
}
catch (const std::bad_weak_ptr& e)
{
LOG(ERROR) << "read_next: Error getting shared from this\n";
LOG(ERROR, LOG_TAG) << "read_next: Error getting shared from this\n";
return;
}
@ -56,23 +60,23 @@ void StreamSession::read_next()
boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
messageReceiver_->onDisconnect(this);
return;
}
baseMessage_.deserialize(buffer_.data());
LOG(DEBUG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id
<< ", refers: " << baseMessage_.refersTo << "\n";
LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id
<< ", refers: " << baseMessage_.refersTo << "\n";
if (baseMessage_.type > message_type::kLast)
{
LOG(ERROR) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n";
LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n";
messageReceiver_->onDisconnect(this);
return;
}
else if (baseMessage_.size > msg::max_size)
{
LOG(ERROR) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n";
LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n";
messageReceiver_->onDisconnect(this);
return;
}
@ -85,7 +89,7 @@ void StreamSession::read_next()
boost::asio::bind_executor(strand_, [this, self](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
messageReceiver_->onDisconnect(this);
return;
}
@ -121,15 +125,15 @@ void StreamSession::start()
void StreamSession::stop()
{
LOG(DEBUG) << "StreamSession::stop\n";
LOG(DEBUG, LOG_TAG) << "StreamSession::stop\n";
boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec)
LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "Error in socket shutdown: " << ec.message() << "\n";
socket_.close(ec);
if (ec)
LOG(ERROR) << "Error in socket close: " << ec.message() << "\n";
LOG(DEBUG) << "StreamSession stopped\n";
LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n";
LOG(DEBUG, LOG_TAG) << "StreamSession stopped\n";
}
@ -142,7 +146,7 @@ void StreamSession::send_next()
}
catch (const std::bad_weak_ptr& e)
{
LOG(ERROR) << "send_next: Error getting shared from this\n";
LOG(ERROR, LOG_TAG) << "send_next: Error getting shared from this\n";
return;
}
@ -152,7 +156,7 @@ void StreamSession::send_next()
messages_.pop_front();
if (ec)
{
LOG(ERROR) << "StreamSession write error (msg lenght: " << length << "): " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "StreamSession write error (msg lenght: " << length << "): " << ec.message() << "\n";
messageReceiver_->onDisconnect(this);
return;
}
@ -172,7 +176,7 @@ void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now)
messages_.push_back(const_buf);
if (messages_.size() > 1)
{
LOG(DEBUG) << "outstanding async_write\n";
LOG(DEBUG, LOG_TAG) << "outstanding async_write\n";
return;
}
send_next();

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef STREAM_SESSION_H
#define STREAM_SESSION_H
#ifndef STREAM_SESSION_HPP
#define STREAM_SESSION_HPP
#include "common/queue.h"
#include "message/message.hpp"
@ -121,8 +121,8 @@ public:
return socket_.remote_endpoint().address().to_string();
}
void setPcmStream(PcmStreamPtr pcmStream);
const PcmStreamPtr pcmStream() const;
void setPcmStream(streamreader::PcmStreamPtr pcmStream);
const streamreader::PcmStreamPtr pcmStream() const;
protected:
void read_next();
@ -134,7 +134,7 @@ protected:
tcp::socket socket_;
MessageReceiver* messageReceiver_;
size_t bufferMs_;
PcmStreamPtr pcmStream_;
streamreader::PcmStreamPtr pcmStream_;
boost::asio::io_context::strand strand_;
std::deque<shared_const_buffer> messages_;
};

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -25,13 +25,21 @@
using namespace std;
static string hex2str(string input)
namespace streamreader
{
static constexpr auto LOG_TAG = "AirplayStream";
namespace
{
string hex2str(string input)
{
typedef unsigned char byte;
unsigned long x = strtoul(input.c_str(), nullptr, 16);
byte a[] = {byte(x >> 24), byte(x >> 16), byte(x >> 8), byte(x), 0};
return string((char*)a);
}
} // namespace
/*
* Expat is used in metadata parsing from Shairport-sync.
@ -115,7 +123,7 @@ void AirplayStream::push()
if (entry_->type == "ssnc" && entry_->code == "mden")
{
// LOG(INFO) << "metadata=" << jtag_.dump(4) << "\n";
// LOG(INFO, LOG_TAG) << "metadata=" << jtag_.dump(4) << "\n";
setMeta(jtag_);
}
}
@ -167,22 +175,21 @@ void AirplayStream::initExeAndPath(const string& filename)
}
void AirplayStream::onStderrMsg(const char* buffer, size_t n)
void AirplayStream::onStderrMsg(const std::string& line)
{
string logmsg = utils::string::trim_copy(string(buffer, n));
if (logmsg.empty())
if (line.empty())
return;
LOG(INFO) << "(" << getName() << ") " << logmsg << "\n";
if (logmsg.find("Is another Shairport Sync running on this device") != string::npos)
LOG(INFO, LOG_TAG) << "(" << getName() << ") " << line << "\n";
if (line.find("Is another Shairport Sync running on this device") != string::npos)
{
LOG(ERROR) << "Seem there is another Shairport Sync runnig on port " << port_ << ", switching to port " << port_ + 1 << "\n";
LOG(ERROR, LOG_TAG) << "Seem there is another Shairport Sync runnig on port " << port_ << ", switching to port " << port_ + 1 << "\n";
++port_;
params_ = params_wo_port_ + " --port=" + cpt::to_string(port_);
}
else if (logmsg.find("Invalid audio output specified") != string::npos)
else if (line.find("Invalid audio output specified") != string::npos)
{
LOG(ERROR) << "shairport sync compiled without stdout audio backend\n";
LOG(ERROR) << "build with: \"./configure --with-stdout --with-avahi --with-ssl=openssl --with-metadata\"\n";
LOG(ERROR, LOG_TAG) << "shairport sync compiled without stdout audio backend\n";
LOG(ERROR, LOG_TAG) << "build with: \"./configure --with-stdout --with-avahi --with-ssl=openssl --with-metadata\"\n";
}
}
@ -237,4 +244,7 @@ void XMLCALL AirplayStream::data(void* userdata, const char* content, int length
string value(content, (size_t)length);
self->buf_.append(value);
}
#endif
} // namespace streamreader

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef AIRPLAY_STREAM_H
#define AIRPLAY_STREAM_H
#ifndef AIRPLAY_STREAM_HPP
#define AIRPLAY_STREAM_HPP
#include "process_stream.hpp"
@ -29,6 +29,9 @@
#include <expat.h>
#endif
namespace streamreader
{
class TageEntry
{
public:
@ -74,7 +77,7 @@ protected:
void push();
#endif
void onStderrMsg(const char* buffer, size_t n) override;
void onStderrMsg(const std::string& line) override;
void initExeAndPath(const std::string& filename) override;
size_t port_;
std::string pipePath_;
@ -88,5 +91,6 @@ protected:
#endif
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -24,7 +24,8 @@
#include <atomic>
#include <boost/asio.hpp>
namespace streamreader
{
template <typename ReadStream>
class AsioStream : public PcmStream, public std::enable_shared_from_this<AsioStream<ReadStream>>
@ -36,9 +37,12 @@ public:
void start() override;
void stop() override;
virtual void connect();
virtual void disconnect();
protected:
virtual void connect() = 0;
virtual void disconnect() = 0;
virtual void do_connect() = 0;
virtual void do_disconnect() = 0;
virtual void on_connect();
virtual void do_read();
void check_state();
@ -83,7 +87,7 @@ void AsioStream<ReadStream>::check_state()
state_timer_.async_wait([self, this, last_read](const boost::system::error_code& ec) {
if (!ec)
{
LOG(DEBUG) << "check state last: " << last_read << ", read: " << bytes_read_ << "\n";
LOG(DEBUG, "AsioStream") << "check state last: " << last_read << ", read: " << bytes_read_ << "\n";
if (bytes_read_ != last_read)
setState(ReaderState::kPlaying);
else
@ -104,6 +108,20 @@ void AsioStream<ReadStream>::start()
}
template <typename ReadStream>
void AsioStream<ReadStream>::connect()
{
do_connect();
}
template <typename ReadStream>
void AsioStream<ReadStream>::disconnect()
{
do_disconnect();
}
template <typename ReadStream>
void AsioStream<ReadStream>::stop()
{
@ -126,72 +144,72 @@ void AsioStream<ReadStream>::on_connect()
template <typename ReadStream>
void AsioStream<ReadStream>::do_read()
{
// LOG(DEBUG) << "do_read\n";
// LOG(DEBUG, "AsioStream") << "do_read\n";
auto self = this->shared_from_this();
boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize),
[this, self](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR) << "Error reading message: " << ec.message() << ", length: " << length << "\n";
connect();
return;
}
boost::asio::async_read(
*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), [this, self](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n";
connect();
return;
}
bytes_read_ += length;
// LOG(DEBUG) << "Read: " << length << " bytes\n";
// First read after connect. Set the initial read timestamp
// the timestamp will be incremented after encoding,
// since we do not know how much the encoder actually encoded
bytes_read_ += length;
// LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n";
// First read after connect. Set the initial read timestamp
// the timestamp will be incremented after encoding,
// since we do not know how much the encoder actually encoded
if (!first_)
{
timeval now;
chronos::systemtimeofday(&now);
auto stream2systime_diff = chronos::diff<std::chrono::milliseconds>(now, tvEncodedChunk_);
if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_))
{
LOG(WARNING) << "Stream and system time out of sync: " << stream2systime_diff.count() << "ms, resetting stream time.\n";
first_ = true;
}
}
if (first_)
{
first_ = false;
chronos::systemtimeofday(&tvEncodedChunk_);
nextTick_ = chronos::getTickCount() + buffer_ms_;
}
if (!first_)
{
timeval now;
chronos::systemtimeofday(&now);
auto stream2systime_diff = chronos::diff<std::chrono::milliseconds>(now, tvEncodedChunk_);
if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_))
{
LOG(WARNING, "AsioStream") << "Stream and system time out of sync: " << stream2systime_diff.count() << "ms, resetting stream time.\n";
first_ = true;
}
}
if (first_)
{
first_ = false;
chronos::systemtimeofday(&tvEncodedChunk_);
nextTick_ = chronos::getTickCount() + buffer_ms_;
}
encoder_->encode(chunk_.get());
nextTick_ += chunk_ms_;
long currentTick = chronos::getTickCount();
encoder_->encode(chunk_.get());
nextTick_ += chunk_ms_;
long currentTick = chronos::getTickCount();
// Synchronize read to chunk_ms_
if (nextTick_ >= currentTick)
{
read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR) << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
return;
}
// Read took longer, wait for the buffer to fill up
else
{
pcmListener_->onResync(this, currentTick - nextTick_);
nextTick_ = currentTick + buffer_ms_;
first_ = true;
do_read();
}
});
// Synchronize read to chunk_ms_
if (nextTick_ >= currentTick)
{
read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
return;
}
// Read took longer, wait for the buffer to fill up
else
{
pcmListener_->onResync(this, currentTick - nextTick_);
nextTick_ = currentTick + buffer_ms_;
first_ = true;
do_read();
}
});
}
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -28,85 +28,32 @@
using namespace std;
FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri)
namespace streamreader
{
ifs.open(uri_.path.c_str(), std::ifstream::in | std::ifstream::binary);
if (!ifs.good())
static constexpr auto LOG_TAG = "FileStream";
FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri)
{
struct stat buffer;
if (stat(uri_.path.c_str(), &buffer) != 0)
{
LOG(ERROR) << "failed to open PCM file: \"" + uri_.path + "\"\n";
throw SnapException("failed to open PCM file: \"" + uri_.path + "\"");
throw SnapException("Failed to open PCM file: \"" + uri_.path + "\"");
}
else if ((buffer.st_mode & S_IFMT) != S_IFREG)
{
throw SnapException("Not a regular file: \"" + uri_.path + "\"");
}
}
FileStream::~FileStream()
void FileStream::do_connect()
{
ifs.close();
LOG(DEBUG, LOG_TAG) << "connect\n";
int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK);
stream_ = std::make_unique<boost::asio::posix::stream_descriptor>(ioc_, fd);
on_connect();
}
void FileStream::worker()
{
timeval tvChunk;
std::unique_ptr<msg::PcmChunk> chunk(new msg::PcmChunk(sampleFormat_, chunk_ms_));
ifs.seekg(0, ifs.end);
size_t length = ifs.tellg();
ifs.seekg(0, ifs.beg);
setState(ReaderState::kPlaying);
while (active_)
{
chronos::systemtimeofday(&tvChunk);
tvEncodedChunk_ = tvChunk;
long nextTick = chronos::getTickCount();
try
{
while (active_)
{
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
size_t toRead = chunk->payloadSize;
size_t count = 0;
size_t pos = ifs.tellg();
size_t left = length - pos;
if (left < toRead)
{
ifs.read(chunk->payload, left);
ifs.seekg(0, ifs.beg);
count = left;
}
ifs.read(chunk->payload + count, toRead - count);
encoder_->encode(chunk.get());
if (!active_)
break;
nextTick += chunk_ms_;
chronos::addUs(tvChunk, chunk_ms_ * 1000);
long currentTick = chronos::getTickCount();
if (nextTick >= currentTick)
{
// LOG(INFO) << "sleep: " << nextTick - currentTick << "\n";
if (!sleep(nextTick - currentTick))
break;
}
else
{
chronos::systemtimeofday(&tvChunk);
tvEncodedChunk_ = tvChunk;
pcmListener_->onResync(this, currentTick - nextTick);
nextTick = currentTick;
}
}
}
catch (const std::exception& e)
{
LOG(ERROR) << "(FileStream) Exception: " << e.what() << std::endl;
}
}
}
} // namespace streamreader

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,12 +16,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef FILE_STREAM_H
#define FILE_STREAM_H
#ifndef FILE_STREAM_HPP
#define FILE_STREAM_HPP
#include "pcm_stream.hpp"
#include <fstream>
#include "posix_stream.hpp"
namespace streamreader
{
/// Reads and decodes PCM data from a file
/**
@ -29,17 +30,16 @@
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class FileStream : public PcmStream
class FileStream : public PosixStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~FileStream() override;
protected:
void worker() override;
std::ifstream ifs;
void do_connect() override;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -26,20 +26,26 @@
using namespace std;
namespace streamreader
{
static constexpr auto LOG_TAG = "LibrespotStream";
LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : ProcessStream(pcmListener, ioc, uri)
{
sampleFormat_ = SampleFormat("44100:16:2");
uri_.query["sampleformat"] = sampleFormat_.getFormat();
wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "7800")); ///< 130min
string username = uri_.getQuery("username", "");
string password = uri_.getQuery("password", "");
string cache = uri_.getQuery("cache", "");
string volume = uri_.getQuery("volume", "");
string volume = uri_.getQuery("volume", "100");
string bitrate = uri_.getQuery("bitrate", "320");
string devicename = uri_.getQuery("devicename", "Snapcast");
string onevent = uri_.getQuery("onevent", "");
bool normalize = (uri_.getQuery("normalize", "false") == "true");
if (username.empty() != password.empty())
throw SnapException("missing parameter \"username\" or \"password\" (must provide both, or neither)");
@ -54,18 +60,18 @@ LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_conte
params_ += " --initial-volume \"" + volume + "\"";
if (!onevent.empty())
params_ += " --onevent \"" + onevent + "\"";
if (normalize)
params_ += " --enable-volume-normalisation";
params_ += " --verbose";
if (uri_.query.find("username") != uri_.query.end())
uri_.query["username"] = "xxx";
if (uri_.query.find("password") != uri_.query.end())
uri_.query["password"] = "xxx";
// LOG(INFO) << "params: " << params << "\n";
// LOG(INFO, LOG_TAG) << "params: " << params << "\n";
}
LibrespotStream::~LibrespotStream() = default;
void LibrespotStream::initExeAndPath(const std::string& filename)
{
path_ = "";
@ -88,7 +94,7 @@ void LibrespotStream::initExeAndPath(const std::string& filename)
}
void LibrespotStream::onStderrMsg(const char* buffer, size_t n)
void LibrespotStream::onStderrMsg(const std::string& line)
{
static bool libreelec_patched = false;
smatch m;
@ -113,13 +119,10 @@ void LibrespotStream::onStderrMsg(const char* buffer, size_t n)
// 2016-11-03 09-00-18 [out] INFO:librespot::main_helper: librespot 6fa4e4d (2016-09-21). Built on 2016-10-27.
// 2016-11-03 09-00-18 [out] INFO:librespot::session: Connecting to AP lon3-accesspoint-a34.ap.spotify.com:443
// 2016-11-03 09-00-18 [out] INFO:librespot::session: Authenticated !
watchdog_->trigger();
string logmsg = utils::string::trim_copy(string(buffer, n));
if ((logmsg.find("allocated stream") == string::npos) && (logmsg.find("Got channel") == string::npos) && (logmsg.find('\0') == string::npos) &&
(logmsg.size() > 4))
if ((line.find("allocated stream") == string::npos) && (line.find("Got channel") == string::npos) && (line.find('\0') == string::npos) && (line.size() > 4))
{
LOG(INFO) << "(" << getName() << ") " << logmsg << "\n";
LOG(INFO, LOG_TAG) << "(" << getName() << ") " << line << "\n";
}
// Librespot patch:
@ -132,9 +135,9 @@ void LibrespotStream::onStderrMsg(const char* buffer, size_t n)
if (!libreelec_patched)
{
static regex re_nonpatched("Track \"(.*)\" loaded");
if (regex_search(logmsg, m, re_nonpatched))
if (regex_search(line, m, re_nonpatched))
{
LOG(INFO) << "metadata: <" << m[1] << ">\n";
LOG(INFO, LOG_TAG) << "metadata: <" << m[1] << ">\n";
json jtag = {{"TITLE", (string)m[1]}};
setMeta(jtag);
@ -143,28 +146,13 @@ void LibrespotStream::onStderrMsg(const char* buffer, size_t n)
// Parse the patched version
static regex re_patched("metadata:(.*)");
if (regex_search(logmsg, m, re_patched))
if (regex_search(line, m, re_patched))
{
LOG(INFO) << "metadata: <" << m[1] << ">\n";
LOG(INFO, LOG_TAG) << "metadata: <" << m[1] << ">\n";
setMeta(json::parse(m[1].str()));
libreelec_patched = true;
}
}
void LibrespotStream::stderrReader()
{
watchdog_.reset(new Watchdog(this));
/// 130min
watchdog_->start(130 * 60 * 1000);
ProcessStream::stderrReader();
}
void LibrespotStream::onTimeout(const Watchdog* /*watchdog*/, size_t ms)
{
LOG(ERROR) << "Spotify timeout: " << ms / 1000 << "\n";
if (process_)
process_->kill();
}
} // namespace streamreader

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,11 +16,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef SPOTIFY_STREAM_H
#define SPOTIFY_STREAM_H
#ifndef SPOTIFY_STREAM_HPP
#define SPOTIFY_STREAM_HPP
#include "process_stream.hpp"
#include "watchdog.h"
namespace streamreader
{
/// Starts librespot and reads PCM data from stdout
/**
@ -31,22 +33,17 @@
* snapserver -s "spotify:///librespot?name=Spotify&username=<my username>&password=<my password>[&devicename=Snapcast][&bitrate=320][&volume=<volume in
* percent>][&cache=<cache dir>]"
*/
class LibrespotStream : public ProcessStream, WatchdogListener
class LibrespotStream : public ProcessStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~LibrespotStream() override;
protected:
std::unique_ptr<Watchdog> watchdog_;
void stderrReader() override;
void onStderrMsg(const char* buffer, size_t n) override;
void onStderrMsg(const std::string& line) override;
void initExeAndPath(const std::string& filename) override;
void onTimeout(const Watchdog* watchdog, size_t ms) override;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -29,6 +29,8 @@
using namespace std;
namespace streamreader
{
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
@ -96,28 +98,12 @@ void PcmStream::start()
LOG(DEBUG) << "PcmStream start: " << sampleFormat_.getFormat() << "\n";
encoder_->init(this, sampleFormat_);
active_ = true;
thread_ = thread(&PcmStream::worker, this);
}
void PcmStream::stop()
{
if (!active_ && !thread_.joinable())
return;
active_ = false;
cv_.notify_one();
if (thread_.joinable())
thread_.join();
}
bool PcmStream::sleep(int32_t ms)
{
if (ms < 0)
return true;
std::unique_lock<std::mutex> lck(mtx_);
return (!cv_.wait_for(lck, std::chrono::milliseconds(ms), [this] { return !active_; }));
}
@ -188,3 +174,5 @@ void PcmStream::setMeta(const json& jtag)
if (pcmListener_)
pcmListener_->onMetaChanged(this);
}
} // namespace streamreader

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PCM_STREAM_H
#define PCM_STREAM_H
#ifndef PCM_STREAM_HPP
#define PCM_STREAM_HPP
#include "common/json.hpp"
#include "common/sample_format.hpp"
@ -34,6 +34,9 @@
#include <thread>
namespace streamreader
{
class PcmStream;
enum class ReaderState
@ -98,13 +101,8 @@ public:
protected:
std::condition_variable cv_;
std::mutex mtx_;
std::thread thread_;
std::atomic<bool> active_;
virtual void worker(){};
virtual bool sleep(int32_t ms);
void setState(const ReaderState& newState);
timeval tvEncodedChunk_;
@ -119,5 +117,6 @@ protected:
boost::asio::io_context& ioc_;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -30,14 +30,18 @@
using namespace std;
namespace streamreader
{
static constexpr auto LOG_TAG = "PipeStream";
PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream<stream_descriptor>(pcmListener, ioc, uri)
PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri)
{
umask(0);
string mode = uri_.getQuery("mode", "create");
LOG(INFO) << "PipeStream mode: " << mode << "\n";
LOG(INFO, LOG_TAG) << "PipeStream mode: " << mode << "\n";
if ((mode != "read") && (mode != "create"))
throw SnapException("create mode for fifo must be \"read\" or \"create\"");
@ -46,101 +50,14 @@ PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, c
if ((mkfifo(uri_.path.c_str(), 0666) != 0) && (errno != EEXIST))
throw SnapException("failed to make fifo \"" + uri_.path + "\": " + cpt::to_string(errno));
}
chunk_ = make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
}
void PipeStream::connect()
void PipeStream::do_connect()
{
LOG(DEBUG) << "connect\n";
auto self = shared_from_this();
fd_ = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK);
stream_ = std::make_unique<boost::asio::posix::stream_descriptor>(ioc_, fd_);
LOG(DEBUG, LOG_TAG) << "connect\n";
int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK);
stream_ = std::make_unique<boost::asio::posix::stream_descriptor>(ioc_, fd);
on_connect();
}
void PipeStream::disconnect()
{
stream_->close();
}
void PipeStream::do_read()
{
auto self = this->shared_from_this();
try
{
if (fd_ == -1)
throw SnapException("failed to open fifo: \"" + uri_.path + "\"");
int toRead = chunk_->payloadSize;
int len = 0;
do
{
int count = read(fd_, chunk_->payload + len, toRead - len);
if (count < 0)
{
LOG(DEBUG) << "count < 0: " << errno
<< " && idleBytes < maxIdleBytes, ms: " << 1000 * chunk_->payloadSize / (sampleFormat_.rate * sampleFormat_.frameSize) << "\n";
memset(chunk_->payload + len, 0, toRead - len);
len += toRead - len;
break;
}
else if (count == 0)
{
throw SnapException("end of file");
}
else
{
// LOG(DEBUG) << "count: " << count << "\n";
len += count;
bytes_read_ += len;
}
} while (len < toRead);
if (first_)
{
first_ = false;
chronos::systemtimeofday(&tvEncodedChunk_);
nextTick_ = chronos::getTickCount() + buffer_ms_;
}
encoder_->encode(chunk_.get());
nextTick_ += chunk_ms_;
long currentTick = chronos::getTickCount();
if (nextTick_ >= currentTick)
{
read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR) << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
return;
}
else
{
pcmListener_->onResync(this, currentTick - nextTick_);
nextTick_ = currentTick + buffer_ms_;
first_ = true;
do_read();
}
lastException_ = "";
}
catch (const std::exception& e)
{
if (lastException_ != e.what())
{
LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl;
lastException_ = e.what();
}
connect();
}
}

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,10 +16,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PIPE_STREAM_H
#define PIPE_STREAM_H
#ifndef PIPE_STREAM_HPP
#define PIPE_STREAM_HPP
#include "asio_stream.hpp"
#include "posix_stream.hpp"
namespace streamreader
{
using boost::asio::posix::stream_descriptor;
@ -30,19 +33,16 @@ using boost::asio::posix::stream_descriptor;
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class PipeStream : public AsioStream<stream_descriptor>
class PipeStream : public PosixStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
protected:
void connect() override;
void disconnect() override;
void do_read() override;
std::string lastException_;
int fd_;
void do_connect() override;
};
} // namespace streamreader
#endif

View file

@ -0,0 +1,164 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <cerrno>
#include <fcntl.h>
#include <memory>
#include <sys/stat.h>
#include <unistd.h>
#include "common/aixlog.hpp"
#include "common/snap_exception.hpp"
#include "common/str_compat.hpp"
#include "posix_stream.hpp"
using namespace std;
namespace streamreader
{
static constexpr auto LOG_TAG = "PosixStream";
PosixStream::PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream<stream_descriptor>(pcmListener, ioc, uri)
{
if (uri_.query.find("dryout_ms") != uri_.query.end())
dryout_ms_ = cpt::stoul(uri_.query["dryout_ms"]);
else
dryout_ms_ = 2000;
}
void PosixStream::connect()
{
if (!active_)
return;
try
{
do_connect();
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Connect exception: " << e.what() << "\n";
auto self = this->shared_from_this();
read_timer_.expires_after(std::chrono::milliseconds(100));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
connect();
}
});
}
}
void PosixStream::do_disconnect()
{
if (stream_->is_open())
stream_->close();
}
void PosixStream::do_read()
{
try
{
if (!stream_->is_open())
throw SnapException("failed to open stream: \"" + uri_.path + "\"");
int toRead = chunk_->payloadSize;
int len = 0;
do
{
int count = read(stream_->native_handle(), chunk_->payload + len, toRead - len);
if (count < 0)
{
LOG(DEBUG, LOG_TAG) << "count < 0: " << errno
<< " && idleBytes < maxIdleBytes, ms: " << 1000 * chunk_->payloadSize / (sampleFormat_.rate * sampleFormat_.frameSize)
<< "\n";
memset(chunk_->payload + len, 0, toRead - len);
len += toRead - len;
break;
}
else if (count == 0)
{
throw SnapException("end of file");
}
else
{
// LOG(DEBUG) << "count: " << count << "\n";
len += count;
bytes_read_ += len;
}
} while (len < toRead);
if (first_)
{
first_ = false;
chronos::systemtimeofday(&tvEncodedChunk_);
nextTick_ = chronos::getTickCount() + buffer_ms_;
}
encoder_->encode(chunk_.get());
nextTick_ += chunk_ms_;
long currentTick = chronos::getTickCount();
if (nextTick_ >= currentTick)
{
auto self = this->shared_from_this();
read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
return;
}
else
{
pcmListener_->onResync(this, currentTick - nextTick_);
nextTick_ = currentTick + buffer_ms_;
first_ = true;
do_read();
}
lastException_ = "";
}
catch (const std::exception& e)
{
if (lastException_ != e.what())
{
LOG(ERROR, LOG_TAG) << "Exception: " << e.what() << std::endl;
lastException_ = e.what();
}
disconnect();
connect();
}
}
} // namespace streamreader

View file

@ -0,0 +1,52 @@
/***
This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef POSIX_STREAM_HPP
#define POSIX_STREAM_HPP
#include "asio_stream.hpp"
namespace streamreader
{
using boost::asio::posix::stream_descriptor;
/// Reads and decodes PCM data from a file descriptor
/**
* Reads PCM from a file descriptor and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class PosixStream : public AsioStream<stream_descriptor>
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
protected:
void connect() override;
void do_disconnect() override;
void do_read() override;
std::string lastException_;
size_t dryout_ms_;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
@ -27,6 +27,9 @@
// Copyright (c) 2015-2016 Ole Christian Eidheim
// Thanks, Christian :-)
namespace streamreader
{
/// Create a new process given command and run path.
/// Thus, at the moment, if read_stdout==nullptr, read_stderr==nullptr and open_stdin==false,
/// the stdout, stderr and stdin are sent to the parent process instead.
@ -239,4 +242,6 @@ private:
}
};
} // namespace streamreader
#endif // TINY_PROCESS_LIBRARY_HPP_

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -29,25 +29,19 @@
using namespace std;
namespace streamreader
{
static constexpr auto LOG_TAG = "ProcessStream";
ProcessStream::ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: PcmStream(pcmListener, ioc, uri), path_(""), process_(nullptr)
: PosixStream(pcmListener, ioc, uri), path_(""), process_(nullptr)
{
params_ = uri_.getQuery("params");
logStderr_ = (uri_.getQuery("logStderr", "false") == "true");
if (uri_.query.find("dryout_ms") != uri_.query.end())
dryoutMs_ = cpt::stoul(uri_.query["dryout_ms"]);
else
dryoutMs_ = 2000;
}
ProcessStream::~ProcessStream()
{
if (process_)
process_->kill();
wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "0"));
LOG(DEBUG, LOG_TAG) << "Watchdog timeout: " << wd_timeout_sec_ << "\n";
logStderr_ = (uri_.getQuery("log_stderr", "false") == "true");
}
@ -103,140 +97,81 @@ void ProcessStream::initExeAndPath(const std::string& filename)
}
void ProcessStream::start()
void ProcessStream::do_connect()
{
if (!active_)
return;
initExeAndPath(uri_.path);
PcmStream::start();
LOG(DEBUG, LOG_TAG) << "Launching: '" << path_ + exe_ << "', with params: '" << params_ << "', in path: '" << path_ << "'\n";
process_.reset(new Process(path_ + exe_ + " " + params_, path_));
int flags = fcntl(process_->getStdout(), F_GETFL, 0);
fcntl(process_->getStdout(), F_SETFL, flags | O_NONBLOCK);
stream_ = make_unique<stream_descriptor>(ioc_, process_->getStdout());
stream_stderr_ = make_unique<stream_descriptor>(ioc_, process_->getStderr());
on_connect();
if (wd_timeout_sec_ > 0)
{
watchdog_ = make_unique<Watchdog>(ioc_, this);
watchdog_->start(std::chrono::seconds(wd_timeout_sec_));
}
else
{
watchdog_ = nullptr;
}
stderrReadLine();
}
void ProcessStream::stop()
void ProcessStream::do_disconnect()
{
if (process_)
process_->kill();
PcmStream::stop();
/// thread is detached, so it is not joinable
if (stderrReaderThread_.joinable())
stderrReaderThread_.join();
}
void ProcessStream::onStderrMsg(const char* buffer, size_t n)
void ProcessStream::onStderrMsg(const std::string& line)
{
if (logStderr_)
{
string line = utils::string::trim_copy(string(buffer, n));
if ((line.find('\0') == string::npos) && !line.empty())
LOG(INFO) << "(" << getName() << ") " << line << "\n";
LOG(INFO, LOG_TAG) << "(" << getName() << ") " << line << "\n";
}
}
void ProcessStream::stderrReader()
void ProcessStream::stderrReadLine()
{
size_t buffer_size = 8192;
auto buffer = std::unique_ptr<char[]>(new char[buffer_size]);
ssize_t n;
stringstream message;
while (active_ && (n = read(process_->getStderr(), buffer.get(), buffer_size)) > 0)
onStderrMsg(buffer.get(), n);
const std::string delimiter = "\n";
auto self(shared_from_this());
boost::asio::async_read_until(
*stream_stderr_, streambuf_stderr_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
if (watchdog_)
watchdog_->trigger();
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
onStderrMsg(line);
}
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
}
void ProcessStream::worker()
void ProcessStream::onTimeout(const Watchdog& /*watchdog*/, std::chrono::milliseconds ms)
{
timeval tvChunk;
std::unique_ptr<msg::PcmChunk> chunk(new msg::PcmChunk(sampleFormat_, chunk_ms_));
setState(ReaderState::kPlaying);
string lastException = "";
while (active_)
{
process_.reset(new Process(path_ + exe_ + " " + params_, path_));
int flags = fcntl(process_->getStdout(), F_GETFL, 0);
fcntl(process_->getStdout(), F_SETFL, flags | O_NONBLOCK);
stderrReaderThread_ = thread(&ProcessStream::stderrReader, this);
stderrReaderThread_.detach();
chronos::systemtimeofday(&tvChunk);
tvEncodedChunk_ = tvChunk;
long nextTick = chronos::getTickCount();
int idleBytes = 0;
int maxIdleBytes = sampleFormat_.rate * sampleFormat_.frameSize * dryoutMs_ / 1000;
try
{
while (active_)
{
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
int toRead = chunk->payloadSize;
int len = 0;
do
{
int count = read(process_->getStdout(), chunk->payload + len, toRead - len);
if (count < 0 && idleBytes < maxIdleBytes)
{
memset(chunk->payload + len, 0, toRead - len);
idleBytes += toRead - len;
len += toRead - len;
continue;
}
if (count < 0)
{
setState(ReaderState::kIdle);
if (!sleep(100))
break;
}
else if (count == 0)
throw SnapException("end of file");
else
{
len += count;
idleBytes = 0;
}
} while ((len < toRead) && active_);
if (!active_)
break;
encoder_->encode(chunk.get());
if (!active_)
break;
nextTick += chunk_ms_;
chronos::addUs(tvChunk, chunk_ms_ * 1000);
long currentTick = chronos::getTickCount();
if (nextTick >= currentTick)
{
setState(ReaderState::kPlaying);
if (!sleep(nextTick - currentTick))
break;
}
else
{
chronos::systemtimeofday(&tvChunk);
tvEncodedChunk_ = tvChunk;
pcmListener_->onResync(this, currentTick - nextTick);
nextTick = currentTick;
}
lastException = "";
}
}
catch (const std::exception& e)
{
if (lastException != e.what())
{
LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl;
lastException = e.what();
}
process_->kill();
if (!sleep(30000))
break;
}
}
LOG(ERROR, LOG_TAG) << "Watchdog timeout: " << ms.count() / 1000 << "s\n";
if (process_)
process_->kill();
}
} // namespace streamreader

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,16 +16,18 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PROCESS_STREAM_H
#define PROCESS_STREAM_H
#ifndef PROCESS_STREAM_HPP
#define PROCESS_STREAM_HPP
#include <memory>
#include <string>
#include "pcm_stream.hpp"
#include "posix_stream.hpp"
#include "process.hpp"
#include "watchdog.hpp"
// TODO: switch to AsioStream, maybe use boost::process library
namespace streamreader
{
/// Starts an external process and reads and PCM data from stdout
/**
@ -33,33 +35,39 @@
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class ProcessStream : public PcmStream
class ProcessStream : public PosixStream, public WatchdogListener
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~ProcessStream() override;
void start() override;
void stop() override;
~ProcessStream() override = default;
protected:
void do_connect() override;
void do_disconnect() override;
std::string exe_;
std::string path_;
std::string params_;
std::unique_ptr<Process> process_;
std::thread stderrReaderThread_;
bool logStderr_;
size_t dryoutMs_;
void worker() override;
virtual void stderrReader();
virtual void onStderrMsg(const char* buffer, size_t n);
bool logStderr_;
boost::asio::streambuf streambuf_stderr_;
std::unique_ptr<stream_descriptor> stream_stderr_;
// void worker() override;
virtual void stderrReadLine();
virtual void onStderrMsg(const std::string& line);
virtual void initExeAndPath(const std::string& filename);
bool fileExists(const std::string& filename);
std::string findExe(const std::string& filename);
size_t wd_timeout_sec_;
std::unique_ptr<Watchdog> watchdog_;
void onTimeout(const Watchdog& watchdog, std::chrono::milliseconds ms) override;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -25,13 +25,14 @@
#include "file_stream.hpp"
#include "librespot_stream.hpp"
#include "pipe_stream.hpp"
// #include "pipe_stream_old.hpp"
#include "process_stream.hpp"
#include "tcp_stream.hpp"
using namespace std;
namespace streamreader
{
StreamManager::StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const std::string& defaultSampleFormat, const std::string& defaultCodec,
size_t defaultChunkBufferMs)
@ -64,10 +65,6 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri)
{
stream = make_shared<PipeStream>(pcmListener_, ioc_, streamUri);
}
// else if (streamUri.scheme == "pipe.old")
// {
// stream = make_shared<PipeStreamOld>(pcmListener_, ioc_, streamUri);
// }
else if (streamUri.scheme == "file")
{
stream = make_shared<FileStream>(pcmListener_, ioc_, streamUri);
@ -95,7 +92,7 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri)
if (stream)
{
for (auto s : streams_)
for (const auto& s : streams_)
{
if (s->getName() == stream->getName())
throw SnapException("Stream with name \"" + stream->getName() + "\" already exists");
@ -165,3 +162,5 @@ json StreamManager::toJson() const
result.push_back(stream->toJson());
return result;
}
} // namespace streamreader

View file

@ -1,5 +1,23 @@
#ifndef PCM_READER_FACTORY_H
#define PCM_READER_FACTORY_H
/***
This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef STREAM_MANAGER_HPP
#define STREAM_MANAGER_HPP
#include "pcm_stream.hpp"
#include <boost/asio/io_context.hpp>
@ -7,6 +25,9 @@
#include <string>
#include <vector>
namespace streamreader
{
typedef std::shared_ptr<PcmStream> PcmStreamPtr;
class StreamManager
@ -33,5 +54,6 @@ private:
boost::asio::io_context& ioc_;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -25,6 +25,8 @@
using namespace std;
namespace strutils = utils::string;
namespace streamreader
{
StreamUri::StreamUri(const std::string& uri)
{
@ -32,7 +34,6 @@ StreamUri::StreamUri(const std::string& uri)
}
void StreamUri::parse(const std::string& streamUri)
{
// https://en.wikipedia.org/wiki/Uniform_Resource_Identifier
@ -146,3 +147,4 @@ std::string StreamUri::getQuery(const std::string& key, const std::string& def)
return iter->second;
return def;
}
}

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef READER_URI_H
#define READER_URI_H
#ifndef STREAM_URI_HPP
#define STREAM_URI_HPP
#include <map>
#include <string>
@ -27,6 +27,8 @@
using json = nlohmann::json;
namespace streamreader
{
// scheme:[//[user:password@]host[:port]][/]path[?query][#fragment]
struct StreamUri
@ -56,5 +58,6 @@ struct StreamUri
std::string toString() const;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -32,7 +32,8 @@
using namespace std;
namespace streamreader
{
TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: AsioStream<tcp::socket>(pcmListener, ioc, uri), reconnect_timer_(ioc)
@ -62,7 +63,7 @@ TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
}
void TcpStream::connect()
void TcpStream::do_connect()
{
if (!active_)
return;
@ -108,7 +109,7 @@ void TcpStream::connect()
}
void TcpStream::disconnect()
void TcpStream::do_disconnect()
{
if (stream_)
stream_->close();
@ -116,3 +117,4 @@ void TcpStream::disconnect()
acceptor_->cancel();
reconnect_timer_.cancel();
}
}

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,13 +16,15 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef TCP_STREAM_H
#define TCP_STREAM_H
#ifndef TCP_STREAM_HPP
#define TCP_STREAM_HPP
#include "asio_stream.hpp"
using boost::asio::ip::tcp;
namespace streamreader
{
/// Reads and decodes PCM data from a named pipe
/**
@ -37,8 +39,8 @@ public:
TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
protected:
void connect() override;
void disconnect() override;
void do_connect() override;
void do_disconnect() override;
std::unique_ptr<tcp::acceptor> acceptor_;
std::string host_;
size_t port_;
@ -46,5 +48,6 @@ protected:
boost::asio::steady_timer reconnect_timer_;
};
} // namespace streamreader
#endif

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,14 +16,16 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "watchdog.h"
#include "watchdog.hpp"
#include <chrono>
using namespace std;
namespace streamreader
{
Watchdog::Watchdog(WatchdogListener* listener) : listener_(listener), thread_(nullptr), active_(false)
Watchdog::Watchdog(boost::asio::io_context& ioc, WatchdogListener* listener) : timer_(ioc), listener_(listener)
{
}
@ -34,49 +36,29 @@ Watchdog::~Watchdog()
}
void Watchdog::start(size_t timeoutMs)
void Watchdog::start(const std::chrono::milliseconds& timeout)
{
timeoutMs_ = timeoutMs;
if (!thread_ || !active_)
{
active_ = true;
thread_.reset(new thread(&Watchdog::worker, this));
}
else
trigger();
timeout_ms_ = timeout;
trigger();
}
void Watchdog::stop()
{
active_ = false;
trigger();
if (thread_ && thread_->joinable())
thread_->join();
thread_ = nullptr;
timer_.cancel();
}
void Watchdog::trigger()
{
// std::unique_lock<std::mutex> lck(mtx_);
cv_.notify_one();
}
void Watchdog::worker()
{
while (active_)
{
std::unique_lock<std::mutex> lck(mtx_);
if (cv_.wait_for(lck, std::chrono::milliseconds(timeoutMs_)) == std::cv_status::timeout)
timer_.cancel();
timer_.expires_after(timeout_ms_);
timer_.async_wait([this](const boost::system::error_code& ec) {
if (!ec)
{
if (listener_)
{
listener_->onTimeout(this, timeoutMs_);
break;
}
listener_->onTimeout(*this, timeout_ms_);
}
}
active_ = false;
});
}
} // namespace streamreader

View file

@ -1,6 +1,6 @@
/***
This file is part of snapcast
Copyright (C) 2014-2019 Johannes Pohl
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -16,15 +16,14 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef WATCH_DOG_H
#define WATCH_DOG_H
#ifndef WATCH_DOG_HPP
#define WATCH_DOG_HPP
#include <atomic>
#include <condition_variable>
#include <boost/asio.hpp>
#include <memory>
#include <mutex>
#include <thread>
namespace streamreader
{
class Watchdog;
@ -32,7 +31,7 @@ class Watchdog;
class WatchdogListener
{
public:
virtual void onTimeout(const Watchdog* watchdog, size_t ms) = 0;
virtual void onTimeout(const Watchdog& watchdog, std::chrono::milliseconds ms) = 0;
};
@ -40,23 +39,19 @@ public:
class Watchdog
{
public:
Watchdog(WatchdogListener* listener = nullptr);
Watchdog(boost::asio::io_context& ioc, WatchdogListener* listener = nullptr);
virtual ~Watchdog();
void start(size_t timeoutMs);
void start(const std::chrono::milliseconds& timeout);
void stop();
void trigger();
private:
boost::asio::steady_timer timer_;
WatchdogListener* listener_;
std::condition_variable cv_;
std::mutex mtx_;
std::unique_ptr<std::thread> thread_;
size_t timeoutMs_;
std::atomic<bool> active_;
void worker();
std::chrono::milliseconds timeout_ms_;
};
} // namespace streamreader
#endif