Add control_script to stream sources

New parameter "control_script", starting a script on stream start.
Parameters "--snapcast-port" and "--stream" are passed to the script
This commit is contained in:
badaix 2021-05-23 12:39:47 +02:00
parent ec73bfb6d6
commit 2e64d81f79
25 changed files with 265 additions and 95 deletions

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#ifndef STRING_UTILS_H #ifndef STRING_UTILS_HPP
#define STRING_UTILS_H #define STRING_UTILS_HPP
#include <algorithm> #include <algorithm>
#include <map> #include <map>
@ -172,6 +172,20 @@ static std::map<std::string, std::string> split_pairs(const std::string& s, char
} }
static inline std::string& tolower(std::string& s)
{
std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) { return static_cast<char>(std::tolower(c)); });
return s;
}
static inline std::string tolower_copy(const std::string& s)
{
std::string str(s);
return tolower(str);
}
} // namespace string } // namespace string
} // namespace utils } // namespace utils

View file

@ -693,9 +693,8 @@ void Server::start()
{ {
controlServer_ = std::make_unique<ControlServer>(io_context_, settings_.tcp, settings_.http, this); controlServer_ = std::make_unique<ControlServer>(io_context_, settings_.tcp, settings_.http, this);
streamServer_ = std::make_unique<StreamServer>(io_context_, settings_, this); streamServer_ = std::make_unique<StreamServer>(io_context_, settings_, this);
streamManager_ = streamManager_ = std::make_unique<StreamManager>(this, io_context_, settings_);
std::make_unique<StreamManager>(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamChunkMs);
// throw SnapException("xxx");
// Add normal sources first // Add normal sources first
for (const auto& sourceUri : settings_.stream.sources) for (const auto& sourceUri : settings_.stream.sources)
{ {

View file

@ -47,8 +47,8 @@ string hex2str(const string& input)
* Without HAS_EXPAT defined no parsing will occur. * Without HAS_EXPAT defined no parsing will occur.
*/ */
AirplayStream::AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) AirplayStream::AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: ProcessStream(pcmListener, ioc, uri), port_(5000), pipe_open_timer_(ioc) : ProcessStream(pcmListener, ioc, server_settings, uri), port_(5000), pipe_open_timer_(ioc)
{ {
logStderr_ = true; logStderr_ = true;

View file

@ -59,7 +59,7 @@ class AirplayStream : public ProcessStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
~AirplayStream() override; ~AirplayStream() override;
protected: protected:

View file

@ -65,8 +65,8 @@ void wait(boost::asio::steady_timer& timer, const std::chrono::duration<Rep, Per
} // namespace } // namespace
AlsaStream::AlsaStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) AlsaStream::AlsaStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PcmStream(pcmListener, ioc, uri), handle_(nullptr), read_timer_(ioc), silence_(0ms) : PcmStream(pcmListener, ioc, server_settings, uri), handle_(nullptr), read_timer_(ioc), silence_(0ms)
{ {
device_ = uri_.getQuery("device", "hw:0"); device_ = uri_.getQuery("device", "hw:0");
send_silence_ = (uri_.getQuery("send_silence", "false") == "true"); send_silence_ = (uri_.getQuery("send_silence", "false") == "true");

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -37,7 +37,7 @@ class AlsaStream : public PcmStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
AlsaStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); AlsaStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
void start() override; void start() override;
void stop() override; void stop() override;

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -33,7 +33,7 @@ class AsioStream : public PcmStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
void start() override; void start() override;
void stop() override; void stop() override;
@ -82,8 +82,8 @@ void AsioStream<ReadStream>::wait(Timer& timer, const std::chrono::duration<Rep,
template <typename ReadStream> template <typename ReadStream>
AsioStream<ReadStream>::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) AsioStream<ReadStream>::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PcmStream(pcmListener, ioc, uri), read_timer_(ioc), state_timer_(ioc) : PcmStream(pcmListener, ioc, server_settings, uri), read_timer_(ioc), state_timer_(ioc)
{ {
chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_); chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
LOG(DEBUG, "AsioStream") << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize LOG(DEBUG, "AsioStream") << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -34,7 +34,8 @@ namespace streamreader
static constexpr auto LOG_TAG = "FileStream"; static constexpr auto LOG_TAG = "FileStream";
FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri) FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PosixStream(pcmListener, ioc, server_settings, uri)
{ {
struct stat buffer; struct stat buffer;
if (stat(uri_.path.c_str(), &buffer) != 0) if (stat(uri_.path.c_str(), &buffer) != 0)

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -34,7 +34,7 @@ class FileStream : public PosixStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void do_connect() override; void do_connect() override;

View file

@ -32,7 +32,8 @@ namespace streamreader
static constexpr auto LOG_TAG = "LibrespotStream"; static constexpr auto LOG_TAG = "LibrespotStream";
LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : ProcessStream(pcmListener, ioc, uri) LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: ProcessStream(pcmListener, ioc, server_settings, uri)
{ {
wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "7800")); ///< 130min wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "7800")); ///< 130min

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -37,7 +37,7 @@ class LibrespotStream : public ProcessStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
bool killall_; bool killall_;

View file

@ -32,8 +32,9 @@ static constexpr auto LOG_TAG = "MetaStream";
// static constexpr auto kResyncTolerance = 50ms; // static constexpr auto kResyncTolerance = 50ms;
MetaStream::MetaStream(PcmListener* pcmListener, const std::vector<std::shared_ptr<PcmStream>>& streams, boost::asio::io_context& ioc, const StreamUri& uri) MetaStream::MetaStream(PcmListener* pcmListener, const std::vector<std::shared_ptr<PcmStream>>& streams, boost::asio::io_context& ioc,
: PcmStream(pcmListener, ioc, uri), first_read_(true) const ServerSettings& server_settings, const StreamUri& uri)
: PcmStream(pcmListener, ioc, server_settings, uri), first_read_(true)
{ {
auto path_components = utils::string::split(uri.path, '/'); auto path_components = utils::string::split(uri.path, '/');
for (const auto& component : path_components) for (const auto& component : path_components)

View file

@ -37,7 +37,8 @@ class MetaStream : public PcmStream, public PcmListener
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PcmListener /// ctor. Encoded PCM data is passed to the PcmListener
MetaStream(PcmListener* pcmListener, const std::vector<std::shared_ptr<PcmStream>>& streams, boost::asio::io_context& ioc, const StreamUri& uri); MetaStream(PcmListener* pcmListener, const std::vector<std::shared_ptr<PcmStream>>& streams, boost::asio::io_context& ioc,
const ServerSettings& server_settings, const StreamUri& uri);
virtual ~MetaStream(); virtual ~MetaStream();
void start() override; void start() override;

View file

@ -23,6 +23,7 @@
#include "common/aixlog.hpp" #include "common/aixlog.hpp"
#include "common/snap_exception.hpp" #include "common/snap_exception.hpp"
#include "common/str_compat.hpp" #include "common/str_compat.hpp"
#include "common/utils/string_utils.hpp"
#include "encoder/encoder_factory.hpp" #include "encoder/encoder_factory.hpp"
#include "pcm_stream.hpp" #include "pcm_stream.hpp"
@ -33,10 +34,118 @@ namespace streamreader
{ {
static constexpr auto LOG_TAG = "PcmStream"; static constexpr auto LOG_TAG = "PcmStream";
static constexpr auto SCRIPT_LOG_TAG = "Script";
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) CtrlScript::CtrlScript(boost::asio::io_context& ioc, const std::string& script) : ioc_(ioc), script_(script)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc) {
}
CtrlScript::~CtrlScript()
{
stop();
}
void CtrlScript::start(const std::string& stream_id, const ServerSettings& server_setttings)
{
pipe_stderr_ = bp::pipe();
pipe_stdout_ = bp::pipe();
stringstream params;
params << " \"--stream=" + stream_id + "\"";
if (server_setttings.http.enabled)
params << " --snapcast-port=" << server_setttings.http.port;
process_ = bp::child(script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_);
stream_stdout_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stdout_.native_source());
stream_stderr_ = make_unique<boost::asio::posix::stream_descriptor>(ioc_, pipe_stderr_.native_source());
stderrReadLine();
stdoutReadLine();
}
void CtrlScript::logScript(const std::string& source, std::string line)
{
if (line.empty())
return;
std::ignore = source;
if (line.back() == '\r')
line.resize(line.size() - 1);
auto tmp = utils::string::tolower_copy(line);
AixLog::Severity severity = AixLog::Severity::info;
if (tmp.find(" trace") != string::npos)
severity = AixLog::Severity::trace;
else if (tmp.find(" debug") != string::npos)
severity = AixLog::Severity::debug;
else if (tmp.find(" info") != string::npos)
severity = AixLog::Severity::info;
else if (tmp.find(" warning") != string::npos)
severity = AixLog::Severity::warning;
else if (tmp.find(" error") != string::npos)
severity = AixLog::Severity::error;
else if ((tmp.find(" fatal") != string::npos) || (tmp.find(" critical") != string::npos))
severity = AixLog::Severity::fatal;
LOG(severity, SCRIPT_LOG_TAG) << line << "\n";
}
void CtrlScript::stderrReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(
*stream_stderr_, streambuf_stderr_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
logScript("stderr", std::move(line));
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
}
void CtrlScript::stdoutReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(
*stream_stdout_, streambuf_stdout_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
logScript("stdout", std::move(line));
streambuf_stdout_.consume(bytes_transferred);
stdoutReadLine();
});
}
void CtrlScript::stop()
{
if (process_.running())
{
::kill(-process_.native_handle(), SIGINT);
}
}
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings)
{ {
encoder::EncoderFactory encoderFactory; encoder::EncoderFactory encoderFactory;
if (uri_.query.find(kUriCodec) == uri_.query.end()) if (uri_.query.find(kUriCodec) == uri_.query.end())
@ -52,6 +161,9 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]); sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]);
LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n"; LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n";
if (uri_.query.find(kControlScript) != uri_.query.end())
ctrl_script_ = std::make_unique<CtrlScript>(ioc, uri_.query[kControlScript]);
if (uri_.query.find(kUriChunkMs) != uri_.query.end()) if (uri_.query.find(kUriChunkMs) != uri_.query.end())
chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]); chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]);
@ -108,6 +220,9 @@ void PcmStream::start()
encoder_->init([this](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) { chunkEncoded(encoder, chunk, duration); }, encoder_->init([this](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) { chunkEncoded(encoder, chunk, duration); },
sampleFormat_); sampleFormat_);
active_ = true; active_ = true;
if (ctrl_script_)
ctrl_script_->start(getId(), server_settings_);
} }
@ -184,18 +299,10 @@ void PcmStream::resync(const std::chrono::nanoseconds& duration)
json PcmStream::toJson() const json PcmStream::toJson() const
{ {
string state("unknown");
if (state_ == ReaderState::kIdle)
state = "idle";
else if (state_ == ReaderState::kPlaying)
state = "playing";
else if (state_ == ReaderState::kDisabled)
state = "disabled";
json j = { json j = {
{"uri", uri_.toJson()}, {"uri", uri_.toJson()},
{"id", getId()}, {"id", getId()},
{"status", state}, {"status", to_string(state_)},
}; };
if (meta_) if (meta_)

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -19,18 +19,32 @@
#ifndef PCM_STREAM_HPP #ifndef PCM_STREAM_HPP
#define PCM_STREAM_HPP #define PCM_STREAM_HPP
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wmissing-braces"
#include <boost/process.hpp>
#pragma GCC diagnostic pop
#include <atomic>
#include <condition_variable>
#include <map>
#include <string>
#include <vector>
#include <boost/asio/io_context.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/optional.hpp>
#include "common/json.hpp" #include "common/json.hpp"
#include "common/sample_format.hpp" #include "common/sample_format.hpp"
#include "encoder/encoder.hpp" #include "encoder/encoder.hpp"
#include "message/codec_header.hpp" #include "message/codec_header.hpp"
#include "message/stream_tags.hpp" #include "message/stream_tags.hpp"
#include "server_settings.hpp"
#include "stream_uri.hpp" #include "stream_uri.hpp"
#include <atomic>
#include <boost/asio/io_context.hpp>
#include <condition_variable> namespace bp = boost::process;
#include <map>
#include <string>
#include <vector>
namespace streamreader namespace streamreader
@ -46,30 +60,37 @@ enum class ReaderState
kDisabled = 3 kDisabled = 3
}; };
static std::ostream& operator<<(std::ostream& os, const ReaderState& reader_state)
static std::string to_string(const ReaderState& reader_state)
{ {
switch (reader_state) switch (reader_state)
{ {
case ReaderState::kIdle: case ReaderState::kIdle:
os << "idle"; return "idle";
break;
case ReaderState::kPlaying: case ReaderState::kPlaying:
os << "playing"; return "playing";
break;
case ReaderState::kDisabled: case ReaderState::kDisabled:
os << "disabled"; return "disabled";
break;
case ReaderState::kUnknown: case ReaderState::kUnknown:
default: default:
os << "unknown"; return "unknown";
} }
}
static std::ostream& operator<<(std::ostream& os, const ReaderState& reader_state)
{
os << to_string(reader_state);
return os; return os;
} }
static constexpr auto kUriCodec = "codec"; static constexpr auto kUriCodec = "codec";
static constexpr auto kUriName = "name"; static constexpr auto kUriName = "name";
static constexpr auto kUriSampleFormat = "sampleformat"; static constexpr auto kUriSampleFormat = "sampleformat";
static constexpr auto kUriChunkMs = "chunk_ms"; static constexpr auto kUriChunkMs = "chunk_ms";
static constexpr auto kControlScript = "controlscript";
/// Callback interface for users of PcmStream /// Callback interface for users of PcmStream
@ -87,6 +108,33 @@ public:
}; };
class CtrlScript
{
public:
CtrlScript(boost::asio::io_context& ioc, const std::string& script);
virtual ~CtrlScript();
void start(const std::string& stream_id, const ServerSettings& server_setttings);
void stop();
private:
void stderrReadLine();
void stdoutReadLine();
void logScript(const std::string& source, std::string line);
bp::child process_;
bp::pipe pipe_stdout_;
bp::pipe pipe_stderr_;
std::unique_ptr<boost::asio::posix::stream_descriptor> stream_stdout_;
std::unique_ptr<boost::asio::posix::stream_descriptor> stream_stderr_;
boost::asio::streambuf streambuf_stdout_;
boost::asio::streambuf streambuf_stderr_;
boost::asio::io_context& ioc_;
std::string script_;
};
/// Reads and decodes PCM data /// Reads and decodes PCM data
/** /**
* Reads PCM and passes the data to an encoder. * Reads PCM and passes the data to an encoder.
@ -97,7 +145,7 @@ class PcmStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PcmListener /// ctor. Encoded PCM data is passed to the PcmListener
PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
virtual ~PcmStream(); virtual ~PcmStream();
virtual void start(); virtual void start();
@ -137,6 +185,8 @@ protected:
ReaderState state_; ReaderState state_;
std::shared_ptr<msg::StreamTags> meta_; std::shared_ptr<msg::StreamTags> meta_;
boost::asio::io_context& ioc_; boost::asio::io_context& ioc_;
ServerSettings server_settings_;
std::unique_ptr<CtrlScript> ctrl_script_;
}; };
} // namespace streamreader } // namespace streamreader

View file

@ -36,7 +36,8 @@ namespace streamreader
static constexpr auto LOG_TAG = "PipeStream"; static constexpr auto LOG_TAG = "PipeStream";
PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri) PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PosixStream(pcmListener, ioc, server_settings, uri)
{ {
umask(0); umask(0);
string mode = uri_.getQuery("mode", "create"); string mode = uri_.getQuery("mode", "create");

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -37,7 +37,7 @@ class PipeStream : public PosixStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void do_connect() override; void do_connect() override;

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -37,7 +37,8 @@ namespace streamreader
static constexpr auto LOG_TAG = "PosixStream"; static constexpr auto LOG_TAG = "PosixStream";
static constexpr auto kResyncTolerance = 50ms; static constexpr auto kResyncTolerance = 50ms;
PosixStream::PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream<stream_descriptor>(pcmListener, ioc, uri) PosixStream::PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: AsioStream<stream_descriptor>(pcmListener, ioc, server_settings, uri)
{ {
if (uri_.query.find("dryout_ms") != uri_.query.end()) if (uri_.query.find("dryout_ms") != uri_.query.end())
dryout_ms_ = cpt::stoul(uri_.query["dryout_ms"]); dryout_ms_ = cpt::stoul(uri_.query["dryout_ms"]);

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -37,7 +37,7 @@ class PosixStream : public AsioStream<stream_descriptor>
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); PosixStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void connect() override; void connect() override;

View file

@ -35,7 +35,8 @@ namespace streamreader
static constexpr auto LOG_TAG = "ProcessStream"; static constexpr auto LOG_TAG = "ProcessStream";
ProcessStream::ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PosixStream(pcmListener, ioc, uri) ProcessStream::ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PosixStream(pcmListener, ioc, server_settings, uri)
{ {
params_ = uri_.getQuery("params"); params_ = uri_.getQuery("params");
wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "0")); wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "0"));

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -19,12 +19,6 @@
#ifndef PROCESS_STREAM_HPP #ifndef PROCESS_STREAM_HPP
#define PROCESS_STREAM_HPP #define PROCESS_STREAM_HPP
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wmissing-braces"
#include <boost/process.hpp>
#pragma GCC diagnostic pop
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@ -49,7 +43,7 @@ class ProcessStream : public PosixStream, public WatchdogListener
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
~ProcessStream() override = default; ~ProcessStream() override = default;
protected: protected:

View file

@ -38,9 +38,9 @@ using namespace std;
namespace streamreader namespace streamreader
{ {
StreamManager::StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const std::string& defaultSampleFormat, const std::string& defaultCodec, StreamManager::StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& settings)
size_t defaultChunkBufferMs) // const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultChunkBufferMs)
: pcmListener_(pcmListener), sampleFormat_(defaultSampleFormat), codec_(defaultCodec), chunkBufferMs_(defaultChunkBufferMs), ioc_(ioc) : pcmListener_(pcmListener), settings_(settings), ioc_(ioc)
{ {
} }
@ -55,13 +55,13 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri)
PcmStreamPtr StreamManager::addStream(StreamUri& streamUri) PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
{ {
if (streamUri.query.find(kUriSampleFormat) == streamUri.query.end()) if (streamUri.query.find(kUriSampleFormat) == streamUri.query.end())
streamUri.query[kUriSampleFormat] = sampleFormat_; streamUri.query[kUriSampleFormat] = settings_.stream.sampleFormat;
if (streamUri.query.find(kUriCodec) == streamUri.query.end()) if (streamUri.query.find(kUriCodec) == streamUri.query.end())
streamUri.query[kUriCodec] = codec_; streamUri.query[kUriCodec] = settings_.stream.codec;
if (streamUri.query.find(kUriChunkMs) == streamUri.query.end()) if (streamUri.query.find(kUriChunkMs) == streamUri.query.end())
streamUri.query[kUriChunkMs] = cpt::to_string(chunkBufferMs_); streamUri.query[kUriChunkMs] = cpt::to_string(settings_.stream.streamChunkMs);
// LOG(DEBUG) << "\nURI: " << streamUri.uri << "\nscheme: " << streamUri.scheme << "\nhost: " // LOG(DEBUG) << "\nURI: " << streamUri.uri << "\nscheme: " << streamUri.scheme << "\nhost: "
// << streamUri.host << "\npath: " << streamUri.path << "\nfragment: " << streamUri.fragment << "\n"; // << streamUri.host << "\npath: " << streamUri.path << "\nfragment: " << streamUri.fragment << "\n";
@ -72,20 +72,20 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
if (streamUri.scheme == "pipe") if (streamUri.scheme == "pipe")
{ {
stream = make_shared<PipeStream>(pcmListener_, ioc_, streamUri); stream = make_shared<PipeStream>(pcmListener_, ioc_, settings_, streamUri);
} }
else if (streamUri.scheme == "file") else if (streamUri.scheme == "file")
{ {
stream = make_shared<FileStream>(pcmListener_, ioc_, streamUri); stream = make_shared<FileStream>(pcmListener_, ioc_, settings_, streamUri);
} }
else if (streamUri.scheme == "process") else if (streamUri.scheme == "process")
{ {
stream = make_shared<ProcessStream>(pcmListener_, ioc_, streamUri); stream = make_shared<ProcessStream>(pcmListener_, ioc_, settings_, streamUri);
} }
#ifdef HAS_ALSA #ifdef HAS_ALSA
else if (streamUri.scheme == "alsa") else if (streamUri.scheme == "alsa")
{ {
stream = make_shared<AlsaStream>(pcmListener_, ioc_, streamUri); stream = make_shared<AlsaStream>(pcmListener_, ioc_, settings_, streamUri);
} }
#endif #endif
else if ((streamUri.scheme == "spotify") || (streamUri.scheme == "librespot")) else if ((streamUri.scheme == "spotify") || (streamUri.scheme == "librespot"))
@ -94,7 +94,7 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
// that all constructors of all parent classes also use the overwritten sample // that all constructors of all parent classes also use the overwritten sample
// format. // format.
streamUri.query[kUriSampleFormat] = "44100:16:2"; streamUri.query[kUriSampleFormat] = "44100:16:2";
stream = make_shared<LibrespotStream>(pcmListener_, ioc_, streamUri); stream = make_shared<LibrespotStream>(pcmListener_, ioc_, settings_, streamUri);
} }
else if (streamUri.scheme == "airplay") else if (streamUri.scheme == "airplay")
{ {
@ -102,15 +102,15 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
// that all constructors of all parent classes also use the overwritten sample // that all constructors of all parent classes also use the overwritten sample
// format. // format.
streamUri.query[kUriSampleFormat] = "44100:16:2"; streamUri.query[kUriSampleFormat] = "44100:16:2";
stream = make_shared<AirplayStream>(pcmListener_, ioc_, streamUri); stream = make_shared<AirplayStream>(pcmListener_, ioc_, settings_, streamUri);
} }
else if (streamUri.scheme == "tcp") else if (streamUri.scheme == "tcp")
{ {
stream = make_shared<TcpStream>(pcmListener_, ioc_, streamUri); stream = make_shared<TcpStream>(pcmListener_, ioc_, settings_, streamUri);
} }
else if (streamUri.scheme == "meta") else if (streamUri.scheme == "meta")
{ {
stream = make_shared<MetaStream>(pcmListener_, streams_, ioc_, streamUri); stream = make_shared<MetaStream>(pcmListener_, streams_, ioc_, settings_, streamUri);
} }
else else
{ {

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -20,6 +20,8 @@
#define STREAM_MANAGER_HPP #define STREAM_MANAGER_HPP
#include "pcm_stream.hpp" #include "pcm_stream.hpp"
#include "server_settings.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <memory> #include <memory>
#include <string> #include <string>
@ -33,8 +35,7 @@ using PcmStreamPtr = std::shared_ptr<PcmStream>;
class StreamManager class StreamManager
{ {
public: public:
StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const std::string& defaultSampleFormat, const std::string& defaultCodec, StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& settings);
size_t defaultChunkBufferMs = 20);
PcmStreamPtr addStream(const std::string& uri); PcmStreamPtr addStream(const std::string& uri);
PcmStreamPtr addStream(StreamUri& streamUri); PcmStreamPtr addStream(StreamUri& streamUri);
@ -49,9 +50,7 @@ public:
private: private:
std::vector<PcmStreamPtr> streams_; std::vector<PcmStreamPtr> streams_;
PcmListener* pcmListener_; PcmListener* pcmListener_;
std::string sampleFormat_; ServerSettings settings_;
std::string codec_;
size_t chunkBufferMs_;
boost::asio::io_context& ioc_; boost::asio::io_context& ioc_;
}; };

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -37,8 +37,8 @@ namespace streamreader
static constexpr auto LOG_TAG = "TcpStream"; static constexpr auto LOG_TAG = "TcpStream";
TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: AsioStream<tcp::socket>(pcmListener, ioc, uri), reconnect_timer_(ioc) : AsioStream<tcp::socket>(pcmListener, ioc, server_settings, uri), reconnect_timer_(ioc)
{ {
host_ = uri_.host; host_ = uri_.host;
auto host_port = utils::string::split(host_, ':'); auto host_port = utils::string::split(host_, ':');

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl Copyright (C) 2014-2021 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -36,7 +36,7 @@ class TcpStream : public AsioStream<tcp::socket>
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void do_connect() override; void do_connect() override;