Remove PosixStream layer

This commit is contained in:
badaix 2024-03-13 20:27:13 +01:00
parent 86cd4b2b63
commit 275a53a845
21 changed files with 201 additions and 162 deletions

View file

@ -71,7 +71,7 @@ log filter <tag>:<level>[,<tag>:<level>]* with tag = * or <log tag> and level =
\fI/etc/default/snapclient\fR \fI/etc/default/snapclient\fR
the daemon default configuration file the daemon default configuration file
.SH "COPYRIGHT" .SH "COPYRIGHT"
Copyright (C) 2014-2022 Johannes Pohl (snapcast@badaix.de). Copyright (C) 2014-2024 Johannes Pohl (snapcast@badaix.de).
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>. License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>.
This is free software: you are free to change and redistribute it. This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. There is NO WARRANTY, to the extent permitted by law.

View file

@ -35,7 +35,7 @@ Supported parameters for all source types:
- `codec`: Override the global codec - `codec`: Override the global codec
- `sampleformat`: Override the global sample format - `sampleformat`: Override the global sample format
- `chunk_ms`: Override the global `chunk_ms` - `chunk_ms`: Override the global `chunk_ms`
- `dryout_ms`: Supported by non-blocking sourced: when no new data is read from the source, send silence to the clients - `dryout_ms`: Supported by blocking sources: when no new data is read from the source, send silence to the clients
- `controlscript`: Script to control the stream source and read and provide meta data, see [stream_plugin.md](json_rpc_api/stream_plugin.md) - `controlscript`: Script to control the stream source and read and provide meta data, see [stream_plugin.md](json_rpc_api/stream_plugin.md)
- `controlscriptparams`: Control script command line arguments, must be url-encoded (use `%20` instead of a space " "), e.g. `--mopidy-host=192.168.42.23%20--debug` - `controlscriptparams`: Control script command line arguments, must be url-encoded (use `%20` instead of a space " "), e.g. `--mopidy-host=192.168.42.23%20--debug`

View file

@ -21,7 +21,6 @@ set(SERVER_SOURCES
streamreader/pcm_stream.cpp streamreader/pcm_stream.cpp
streamreader/tcp_stream.cpp streamreader/tcp_stream.cpp
streamreader/pipe_stream.cpp streamreader/pipe_stream.cpp
streamreader/posix_stream.cpp
streamreader/file_stream.cpp streamreader/file_stream.cpp
streamreader/airplay_stream.cpp streamreader/airplay_stream.cpp
streamreader/librespot_stream.cpp streamreader/librespot_stream.cpp

View file

@ -118,7 +118,7 @@ doc_root = /usr/share/snapserver/snapweb
# parameters have the form "key=value", they are concatenated with an "&" character # parameters have the form "key=value", they are concatenated with an "&" character
# parameter "name" is mandatory for all sources, while codec, sampleformat and chunk_ms are optional # parameter "name" is mandatory for all sources, while codec, sampleformat and chunk_ms are optional
# and will override the default codec, sampleformat or chunk_ms settings # and will override the default codec, sampleformat or chunk_ms settings
# Non blocking sources support the dryout_ms parameter: when no new data is read from the source, send silence to the clients # Blocking sources support the dryout_ms parameter: when no new data is read from the source, send silence to the clients
# Available types are: # Available types are:
# pipe: pipe:///<path/to/pipe>?name=<name>[&mode=create][&dryout_ms=2000], mode can be "create" or "read" # pipe: pipe:///<path/to/pipe>?name=<name>[&mode=create][&dryout_ms=2000], mode can be "create" or "read"
# librespot: librespot:///<path/to/librespot>?name=<name>[&dryout_ms=2000][&username=<my username>&password=<my password>][&devicename=Snapcast][&bitrate=320][&wd_timeout=7800][&volume=100][&onevent=""][&nomalize=false][&autoplay=false][&params=<generic librepsot process arguments>] # librespot: librespot:///<path/to/librespot>?name=<name>[&dryout_ms=2000][&username=<my username>&password=<my password>][&devicename=Snapcast][&bitrate=320][&wd_timeout=7800][&volume=100][&onevent=""][&nomalize=false][&autoplay=false][&params=<generic librepsot process arguments>]

View file

@ -41,7 +41,7 @@ the snapserver configuration file
\fI~/.config/snapcast/server.json\fR or (if $HOME is not set) \fI/var/lib/snapcast/server.json\fR \fI~/.config/snapcast/server.json\fR or (if $HOME is not set) \fI/var/lib/snapcast/server.json\fR
persistent server data file persistent server data file
.SH "COPYRIGHT" .SH "COPYRIGHT"
Copyright (C) 2014-2022 Johannes Pohl (snapcast@badaix.de). Copyright (C) 2014-2024 Johannes Pohl (snapcast@badaix.de).
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>. License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>.
This is free software: you are free to change and redistribute it. This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. There is NO WARRANTY, to the extent permitted by law.

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -209,16 +209,16 @@ void AirplayStream::setParamsAndPipePathFromPort()
} }
void AirplayStream::do_connect() void AirplayStream::connect()
{ {
ProcessStream::do_connect(); ProcessStream::connect();
pipeReadLine(); pipeReadLine();
} }
void AirplayStream::do_disconnect() void AirplayStream::disconnect()
{ {
ProcessStream::do_disconnect(); ProcessStream::disconnect();
// Shairpot-sync created but does not remove the pipe // Shairpot-sync created but does not remove the pipe
if (utils::file::exists(pipePath_) && (remove(pipePath_.c_str()) != 0)) if (utils::file::exists(pipePath_) && (remove(pipePath_.c_str()) != 0))
LOG(INFO, LOG_TAG) << "Failed to remove metadata pipe \"" << pipePath_ << "\": " << errno << "\n"; LOG(INFO, LOG_TAG) << "Failed to remove metadata pipe \"" << pipePath_ << "\": " << errno << "\n";

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -84,8 +84,8 @@ protected:
void setParamsAndPipePathFromPort(); void setParamsAndPipePathFromPort();
void do_connect() override; void connect() override;
void do_disconnect() override; void disconnect() override;
void onStderrMsg(const std::string& line) override; void onStderrMsg(const std::string& line) override;
void initExeAndPath(const std::string& filename) override; void initExeAndPath(const std::string& filename) override;

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -76,18 +76,6 @@ AlsaStream::AlsaStream(PcmStream::Listener* pcmListener, boost::asio::io_context
device_ = uri_.getQuery("device", "hw:0"); device_ = uri_.getQuery("device", "hw:0");
send_silence_ = (uri_.getQuery("send_silence", "false") == "true"); send_silence_ = (uri_.getQuery("send_silence", "false") == "true");
idle_threshold_ = std::chrono::milliseconds(std::max(cpt::stoi(uri_.getQuery("idle_threshold", "100")), 10)); idle_threshold_ = std::chrono::milliseconds(std::max(cpt::stoi(uri_.getQuery("idle_threshold", "100")), 10));
double silence_threshold_percent = 0.;
try
{
silence_threshold_percent = cpt::stod(uri_.getQuery("silence_threshold_percent", "0"));
}
catch (...)
{
}
int32_t max_amplitude = std::pow(2, sampleFormat_.bits() - 1) - 1;
silence_threshold_ = max_amplitude * (silence_threshold_percent / 100.);
LOG(DEBUG, LOG_TAG) << "Device: " << device_ << ", silence threshold percent: " << silence_threshold_percent
<< ", silence threshold amplitude: " << silence_threshold_ << "\n";
} }
@ -99,10 +87,6 @@ void AlsaStream::start()
// max_idle_bytes_ = sampleFormat_.rate() * sampleFormat_.frameSize() * dryout_ms_ / 1000; // max_idle_bytes_ = sampleFormat_.rate() * sampleFormat_.frameSize() * dryout_ms_ / 1000;
initAlsa(); initAlsa();
chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
silent_chunk_ = std::vector<char>(chunk_->payloadSize, 0);
LOG(DEBUG, LOG_TAG) << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize
<< "\n";
first_ = true; first_ = true;
tvEncodedChunk_ = std::chrono::steady_clock::now(); tvEncodedChunk_ = std::chrono::steady_clock::now();
PcmStream::start(); PcmStream::start();
@ -205,41 +189,6 @@ void AlsaStream::uninitAlsa()
} }
bool AlsaStream::isSilent(const msg::PcmChunk& chunk) const
{
if (silence_threshold_ == 0)
return (std::memcmp(chunk.payload, silent_chunk_.data(), silent_chunk_.size()) == 0);
if (sampleFormat_.sampleSize() == 1)
{
auto payload = chunk.getPayload<int8_t>();
for (size_t n = 0; n < payload.second; ++n)
{
if (abs(payload.first[n]) > silence_threshold_)
return false;
}
}
else if (sampleFormat_.sampleSize() == 2)
{
auto payload = chunk.getPayload<int16_t>();
for (size_t n = 0; n < payload.second; ++n)
{
if (abs(payload.first[n]) > silence_threshold_)
return false;
}
}
else if (sampleFormat_.sampleSize() == 4)
{
auto payload = chunk.getPayload<int32_t>();
for (size_t n = 0; n < payload.second; ++n)
{
if (abs(payload.first[n]) > silence_threshold_)
return false;
}
}
return true;
}
void AlsaStream::do_read() void AlsaStream::do_read()
{ {
try try

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2022 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -52,16 +52,12 @@ protected:
void initAlsa(); void initAlsa();
void uninitAlsa(); void uninitAlsa();
/// check if the chunk's volume is below the silence threshold
bool isSilent(const msg::PcmChunk& chunk) const;
snd_pcm_t* handle_; snd_pcm_t* handle_;
std::unique_ptr<msg::PcmChunk> chunk_;
bool first_; bool first_;
std::chrono::time_point<std::chrono::steady_clock> nextTick_; std::chrono::time_point<std::chrono::steady_clock> nextTick_;
boost::asio::steady_timer read_timer_; boost::asio::steady_timer read_timer_;
std::string device_; std::string device_;
std::vector<char> silent_chunk_;
std::chrono::microseconds silence_; std::chrono::microseconds silence_;
std::string lastException_; std::string lastException_;
@ -69,7 +65,6 @@ protected:
bool send_silence_; bool send_silence_;
/// silence duration before switching the stream to idle /// silence duration before switching the stream to idle
std::chrono::milliseconds idle_threshold_; std::chrono::milliseconds idle_threshold_;
int32_t silence_threshold_ = 0;
}; };
} // namespace streamreader } // namespace streamreader

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2022 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -36,6 +36,8 @@
namespace streamreader namespace streamreader
{ {
using namespace std::chrono_literals;
template <typename ReadStream> template <typename ReadStream>
class AsioStream : public PcmStream class AsioStream : public PcmStream
{ {
@ -46,20 +48,21 @@ public:
void start() override; void start() override;
void stop() override; void stop() override;
virtual void connect(); protected:
virtual void connect() = 0;
virtual void disconnect(); virtual void disconnect();
protected:
virtual void do_connect() = 0;
virtual void do_disconnect() = 0;
virtual void on_connect(); virtual void on_connect();
virtual void do_read(); virtual void do_read();
void check_state(); /// Start a timer that will change the stream state to idle after \p duration
void check_state(const std::chrono::steady_clock::duration& duration);
/// Use Timer \p timer to call \p handler after \p duration
template <typename Timer, typename Rep, typename Period> template <typename Timer, typename Rep, typename Period>
void wait(Timer& timer, const std::chrono::duration<Rep, Period>& duration, std::function<void()> handler); void wait(Timer& timer, const std::chrono::duration<Rep, Period>& duration, std::function<void()> handler);
std::unique_ptr<msg::PcmChunk> chunk_; /// Cache last exception to avoid repeated error logging
std::string lastException_;
timeval tv_chunk_; timeval tv_chunk_;
bool first_; bool first_;
std::chrono::time_point<std::chrono::steady_clock> nextTick_; std::chrono::time_point<std::chrono::steady_clock> nextTick_;
@ -67,7 +70,11 @@ protected:
boost::asio::steady_timer read_timer_; boost::asio::steady_timer read_timer_;
boost::asio::steady_timer state_timer_; boost::asio::steady_timer state_timer_;
std::unique_ptr<ReadStream> stream_; std::unique_ptr<ReadStream> stream_;
std::atomic<std::uint64_t> bytes_read_;
/// duration of the current silence period
std::chrono::microseconds silence_{0ms};
/// silence duration before switching the stream to idle
std::chrono::milliseconds idle_threshold_;
}; };
@ -95,11 +102,11 @@ template <typename ReadStream>
AsioStream<ReadStream>::AsioStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) AsioStream<ReadStream>::AsioStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PcmStream(pcmListener, ioc, server_settings, uri), read_timer_(strand_), state_timer_(strand_) : PcmStream(pcmListener, ioc, server_settings, uri), read_timer_(strand_), state_timer_(strand_)
{ {
chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
LOG(DEBUG, "AsioStream") << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize LOG(DEBUG, "AsioStream") << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize
<< "\n"; << "\n";
bytes_read_ = 0; idle_threshold_ = std::chrono::milliseconds(std::max(cpt::stoi(uri_.getQuery("idle_threshold", "100")), 10));
buffer_ms_ = 50; buffer_ms_ = 50;
try try
@ -113,18 +120,19 @@ AsioStream<ReadStream>::AsioStream(PcmStream::Listener* pcmListener, boost::asio
template <typename ReadStream> template <typename ReadStream>
void AsioStream<ReadStream>::check_state() void AsioStream<ReadStream>::check_state(const std::chrono::steady_clock::duration& duration)
{ {
uint64_t last_read = bytes_read_; state_timer_.expires_after(duration);
wait(state_timer_, std::chrono::milliseconds(500 + chunk_ms_), state_timer_.async_wait(
[this, last_read] [this, duration](const boost::system::error_code& ec)
{ {
LOG(TRACE, "AsioStream") << "check state last: " << last_read << ", read: " << bytes_read_ << "\n"; if (!ec)
if (bytes_read_ != last_read) {
setState(ReaderState::kPlaying);
else LOG(INFO, "AsioStream") << "No data since " << std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()
<< " ms, switchung to idle\n";
setState(ReaderState::kIdle); setState(ReaderState::kIdle);
check_state(); }
}); });
} }
@ -133,32 +141,25 @@ template <typename ReadStream>
void AsioStream<ReadStream>::start() void AsioStream<ReadStream>::start()
{ {
PcmStream::start(); PcmStream::start();
check_state();
connect(); connect();
} }
template <typename ReadStream>
void AsioStream<ReadStream>::connect()
{
do_connect();
}
template <typename ReadStream>
void AsioStream<ReadStream>::disconnect()
{
do_disconnect();
}
template <typename ReadStream> template <typename ReadStream>
void AsioStream<ReadStream>::stop() void AsioStream<ReadStream>::stop()
{ {
active_ = false;
read_timer_.cancel(); read_timer_.cancel();
state_timer_.cancel();
disconnect(); disconnect();
PcmStream::stop();
}
template <typename ReadStream>
void AsioStream<ReadStream>::disconnect()
{
if (stream_ && stream_->is_open())
stream_->close();
setState(ReaderState::kIdle);
} }
@ -174,35 +175,60 @@ void AsioStream<ReadStream>::on_connect()
template <typename ReadStream> template <typename ReadStream>
void AsioStream<ReadStream>::do_read() void AsioStream<ReadStream>::do_read()
{ {
// LOG(DEBUG, "AsioStream") << "do_read\n"; // Reset the silence timer
check_state(idle_threshold_ + std::chrono::milliseconds(chunk_ms_));
boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize),
[this](boost::system::error_code ec, std::size_t length) mutable [this](boost::system::error_code ec, std::size_t length) mutable
{ {
state_timer_.cancel();
if (ec) if (ec)
{ {
LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n"; if (lastException_ != ec.message())
connect(); {
LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << ", ec: " << ec << "\n";
lastException_ = ec.message();
}
disconnect();
wait(read_timer_, 100ms, [this] { connect(); });
return; return;
} }
bytes_read_ += length; lastException_.clear();
if (isSilent(*chunk_))
{
silence_ += chunk_->duration<std::chrono::microseconds>();
if (silence_ >= idle_threshold_)
{
setState(ReaderState::kIdle);
// Avoid overflow
silence_ = idle_threshold_;
}
}
else
{
silence_ = 0ms;
setState(ReaderState::kPlaying);
}
// LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n"; // LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n";
// First read after connect. Set the initial read timestamp // First read after connect. Set the initial read timestamp
// the timestamp will be incremented after encoding, // the timestamp will be incremented after encoding,
// since we do not know how much the encoder actually encoded // since we do not know how much the encoder actually encoded
if (!first_) // if (!first_)
{ // {
auto now = std::chrono::steady_clock::now(); // auto now = std::chrono::steady_clock::now();
auto stream2systime_diff = now - tvEncodedChunk_; // auto stream2systime_diff = now - tvEncodedChunk_;
if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_)) // if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_))
{ // {
LOG(WARNING, "AsioStream") << "Stream and system time out of sync: " // LOG(WARNING, "AsioStream") << "Stream and system time out of sync: "
<< std::chrono::duration_cast<std::chrono::microseconds>(stream2systime_diff).count() / 1000. // << std::chrono::duration_cast<std::chrono::microseconds>(stream2systime_diff).count() / 1000.
<< " ms, resetting stream time.\n"; // << " ms, resetting stream time.\n";
first_ = true; // first_ = true;
} // }
} // }
if (first_) if (first_)
{ {
first_ = false; first_ = false;
@ -236,7 +262,6 @@ void AsioStream<ReadStream>::do_read()
else else
{ {
resync(std::chrono::duration_cast<std::chrono::nanoseconds>(currentTick - nextTick_)); resync(std::chrono::duration_cast<std::chrono::nanoseconds>(currentTick - nextTick_));
nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_);
first_ = true; first_ = true;
do_read(); do_read();
} }

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2023 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -40,7 +40,7 @@ static constexpr auto LOG_TAG = "FileStream";
FileStream::FileStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) FileStream::FileStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PosixStream(pcmListener, ioc, server_settings, uri) : AsioStream<stream_descriptor>(pcmListener, ioc, server_settings, uri)
{ {
struct stat buffer; struct stat buffer;
if (stat(uri_.path.c_str(), &buffer) != 0) if (stat(uri_.path.c_str(), &buffer) != 0)
@ -60,7 +60,7 @@ FileStream::FileStream(PcmStream::Listener* pcmListener, boost::asio::io_context
} }
void FileStream::do_connect() void FileStream::connect()
{ {
LOG(DEBUG, LOG_TAG) << "connect\n"; LOG(DEBUG, LOG_TAG) << "connect\n";
int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK);

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -20,25 +20,27 @@
#define FILE_STREAM_HPP #define FILE_STREAM_HPP
// local headers // local headers
#include "posix_stream.hpp" #include "asio_stream.hpp"
namespace streamreader namespace streamreader
{ {
using boost::asio::posix::stream_descriptor;
/// Reads and decodes PCM data from a file /// Reads and decodes PCM data from a file
/** /**
* Reads PCM from a file and passes the data to an encoder. * Reads PCM from a file and passes the data to an encoder.
* Implements EncoderListener to get the encoded data. * Implements EncoderListener to get the encoded data.
* Data is passed to the PcmStream::Listener * Data is passed to the PcmStream::Listener
*/ */
class FileStream : public PosixStream class FileStream : public AsioStream<stream_descriptor>
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
FileStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri); FileStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void do_connect() override; void connect() override;
}; };
} // namespace streamreader } // namespace streamreader

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2023 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -21,7 +21,7 @@
// local headers // local headers
#include "common/resampler.hpp" #include "common/resampler.hpp"
#include "posix_stream.hpp" #include "pcm_stream.hpp"
// standard headers // standard headers
#include <memory> #include <memory>
@ -29,6 +29,8 @@
namespace streamreader namespace streamreader
{ {
// Mixing digital audio:
// https://www.vttoth.com/CMS/technical-notes/?view=article&id=68
/// Reads and decodes PCM data /// Reads and decodes PCM data
/** /**

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2022 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -60,6 +60,10 @@ PcmStream::PcmStream(PcmStream::Listener* pcmListener, boost::asio::io_context&
if (uri_.query.find(kUriSampleFormat) == uri_.query.end()) if (uri_.query.find(kUriSampleFormat) == uri_.query.end())
throw SnapException("Stream URI must have a sampleformat"); throw SnapException("Stream URI must have a sampleformat");
sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]); sampleFormat_ = SampleFormat(uri_.query[kUriSampleFormat]);
chunk_ = std::make_unique<msg::PcmChunk>(sampleFormat_, chunk_ms_);
silent_chunk_ = std::vector<char>(chunk_->payloadSize, 0);
LOG(DEBUG, LOG_TAG) << "Chunk duration: " << chunk_->durationMs() << " ms, frames: " << chunk_->getFrameCount() << ", size: " << chunk_->payloadSize
<< "\n";
LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n"; LOG(INFO, LOG_TAG) << "PcmStream: " << name_ << ", sampleFormat: " << sampleFormat_.toString() << "\n";
if (uri_.query.find(kControlScript) != uri_.query.end()) if (uri_.query.find(kControlScript) != uri_.query.end())
@ -72,6 +76,18 @@ PcmStream::PcmStream(PcmStream::Listener* pcmListener, boost::asio::io_context&
if (uri_.query.find(kUriChunkMs) != uri_.query.end()) if (uri_.query.find(kUriChunkMs) != uri_.query.end())
chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]); chunk_ms_ = cpt::stoul(uri_.query[kUriChunkMs]);
double silence_threshold_percent = 0.;
try
{
silence_threshold_percent = cpt::stod(uri_.getQuery("silence_threshold_percent", "0"));
}
catch (...)
{
}
int32_t max_amplitude = std::pow(2, sampleFormat_.bits() - 1) - 1;
silence_threshold_ = max_amplitude * (silence_threshold_percent / 100.);
LOG(DEBUG, LOG_TAG) << "Silence threshold percent: " << silence_threshold_percent << ", silence threshold amplitude: " << silence_threshold_ << "\n";
} }
@ -216,7 +232,6 @@ void PcmStream::start()
<< "\n"; << "\n";
encoder_->init([this](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) { chunkEncoded(encoder, chunk, duration); }, encoder_->init([this](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) { chunkEncoded(encoder, chunk, duration); },
sampleFormat_); sampleFormat_);
active_ = true;
if (stream_ctrl_) if (stream_ctrl_)
{ {
@ -224,12 +239,51 @@ void PcmStream::start()
getId(), server_settings_, [this](const jsonrpcpp::Notification& notification) { onControlNotification(notification); }, getId(), server_settings_, [this](const jsonrpcpp::Notification& notification) { onControlNotification(notification); },
[this](const jsonrpcpp::Request& request) { onControlRequest(request); }, [this](std::string message) { onControlLog(std::move(message)); }); [this](const jsonrpcpp::Request& request) { onControlRequest(request); }, [this](std::string message) { onControlLog(std::move(message)); });
} }
active_ = true;
} }
void PcmStream::stop() void PcmStream::stop()
{ {
active_ = false; active_ = false;
setState(ReaderState::kIdle);
}
bool PcmStream::isSilent(const msg::PcmChunk& chunk) const
{
if (silence_threshold_ == 0)
return (std::memcmp(chunk.payload, silent_chunk_.data(), silent_chunk_.size()) == 0);
if (sampleFormat_.sampleSize() == 1)
{
auto payload = chunk.getPayload<int8_t>();
for (size_t n = 0; n < payload.second; ++n)
{
if (abs(payload.first[n]) > silence_threshold_)
return false;
}
}
else if (sampleFormat_.sampleSize() == 2)
{
auto payload = chunk.getPayload<int16_t>();
for (size_t n = 0; n < payload.second; ++n)
{
if (abs(payload.first[n]) > silence_threshold_)
return false;
}
}
else if (sampleFormat_.sampleSize() == 4)
{
auto payload = chunk.getPayload<int32_t>();
for (size_t n = 0; n < payload.second; ++n)
{
if (abs(payload.first[n]) > silence_threshold_)
return false;
}
}
return true;
} }

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2023 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -162,6 +162,9 @@ public:
protected: protected:
std::atomic<bool> active_; std::atomic<bool> active_;
/// check if the volume of the \p chunk is below the silence threshold
bool isSilent(const msg::PcmChunk& chunk) const;
void setState(ReaderState newState); void setState(ReaderState newState);
void chunkRead(const msg::PcmChunk& chunk); void chunkRead(const msg::PcmChunk& chunk);
void resync(const std::chrono::nanoseconds& duration); void resync(const std::chrono::nanoseconds& duration);
@ -196,6 +199,12 @@ protected:
std::atomic<int> req_id_; std::atomic<int> req_id_;
boost::asio::steady_timer property_timer_; boost::asio::steady_timer property_timer_;
mutable std::recursive_mutex mutex_; mutable std::recursive_mutex mutex_;
/// If a chunk's max amplitude is below the threshold, it is considered silent
int32_t silence_threshold_ = 0;
/// Current chunk
std::unique_ptr<msg::PcmChunk> chunk_;
/// Silent chunk (all 0), for fast silence detection (memcmp)
std::vector<char> silent_chunk_;
}; };

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -38,7 +38,7 @@ static constexpr auto LOG_TAG = "PipeStream";
PipeStream::PipeStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) PipeStream::PipeStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PosixStream(pcmListener, ioc, server_settings, uri) : AsioStream<stream_descriptor>(pcmListener, ioc, server_settings, uri)
{ {
umask(0); umask(0);
string mode = uri_.getQuery("mode", "create"); string mode = uri_.getQuery("mode", "create");
@ -55,7 +55,7 @@ PipeStream::PipeStream(PcmStream::Listener* pcmListener, boost::asio::io_context
} }
void PipeStream::do_connect() void PipeStream::connect()
{ {
int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); int fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK);
if (fd < 0) if (fd < 0)

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -20,7 +20,7 @@
#define PIPE_STREAM_HPP #define PIPE_STREAM_HPP
// local headers // local headers
#include "posix_stream.hpp" #include "asio_stream.hpp"
namespace streamreader namespace streamreader
{ {
@ -34,14 +34,14 @@ using boost::asio::posix::stream_descriptor;
* Implements EncoderListener to get the encoded data. * Implements EncoderListener to get the encoded data.
* Data is passed to the PcmStream::Listener * Data is passed to the PcmStream::Listener
*/ */
class PipeStream : public PosixStream class PipeStream : public AsioStream<stream_descriptor>
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
PipeStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri); PipeStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void do_connect() override; void connect() override;
}; };
} // namespace streamreader } // namespace streamreader

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -41,7 +41,7 @@ static constexpr auto LOG_TAG = "ProcessStream";
ProcessStream::ProcessStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) ProcessStream::ProcessStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: PosixStream(pcmListener, ioc, server_settings, uri) : AsioStream<stream_descriptor>(pcmListener, ioc, server_settings, uri)
{ {
params_ = uri_.getQuery("params"); params_ = uri_.getQuery("params");
wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "0")); wd_timeout_sec_ = cpt::stoul(uri_.getQuery("wd_timeout", "0"));
@ -95,7 +95,7 @@ void ProcessStream::initExeAndPath(const std::string& filename)
} }
void ProcessStream::do_connect() void ProcessStream::connect()
{ {
if (!active_) if (!active_)
return; return;
@ -127,10 +127,11 @@ void ProcessStream::do_connect()
} }
void ProcessStream::do_disconnect() void ProcessStream::disconnect()
{ {
if (process_.running()) if (process_.running())
::kill(-process_.native_handle(), SIGINT); ::kill(-process_.native_handle(), SIGINT);
AsioStream<stream_descriptor>::disconnect();
} }

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -20,7 +20,7 @@
#define PROCESS_STREAM_HPP #define PROCESS_STREAM_HPP
// local headers // local headers
#include "posix_stream.hpp" #include "asio_stream.hpp"
#include "watchdog.hpp" #include "watchdog.hpp"
// standard headers // standard headers
@ -35,13 +35,15 @@ namespace bp = boost::process;
namespace streamreader namespace streamreader
{ {
using boost::asio::posix::stream_descriptor;
/// Starts an external process and reads and PCM data from stdout /// Starts an external process and reads and PCM data from stdout
/** /**
* Starts an external process, reads PCM data from stdout, and passes the data to an encoder. * Starts an external process, reads PCM data from stdout, and passes the data to an encoder.
* Implements EncoderListener to get the encoded data. * Implements EncoderListener to get the encoded data.
* Data is passed to the PcmStream::Listener * Data is passed to the PcmStream::Listener
*/ */
class ProcessStream : public PosixStream, public WatchdogListener class ProcessStream : public AsioStream<stream_descriptor>, public WatchdogListener
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
@ -49,8 +51,8 @@ public:
~ProcessStream() override = default; ~ProcessStream() override = default;
protected: protected:
void do_connect() override; void connect() override;
void do_disconnect() override; void disconnect() override;
std::string exe_; std::string exe_;
std::string path_; std::string path_;

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2021 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -68,7 +68,7 @@ TcpStream::TcpStream(PcmStream::Listener* pcmListener, boost::asio::io_context&
} }
void TcpStream::do_connect() void TcpStream::connect()
{ {
if (!active_) if (!active_)
return; return;
@ -112,12 +112,13 @@ void TcpStream::do_connect()
} }
void TcpStream::do_disconnect() void TcpStream::disconnect()
{ {
if (stream_) reconnect_timer_.cancel();
stream_->close();
if (acceptor_) if (acceptor_)
acceptor_->cancel(); acceptor_->cancel();
reconnect_timer_.cancel(); AsioStream<tcp::socket>::disconnect();
} }
} // namespace streamreader } // namespace streamreader

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2022 Johannes Pohl Copyright (C) 2014-2024 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -43,8 +43,8 @@ public:
TcpStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri); TcpStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri);
protected: protected:
void do_connect() override; void connect() override;
void do_disconnect() override; void disconnect() override;
std::unique_ptr<tcp::acceptor> acceptor_; std::unique_ptr<tcp::acceptor> acceptor_;
std::string host_; std::string host_;
size_t port_; size_t port_;