Add meta stream source

This commit is contained in:
badaix 2020-09-26 12:44:33 +02:00
parent f1e672d375
commit 27a9e710a2
20 changed files with 367 additions and 64 deletions

View file

@ -22,6 +22,7 @@ set(SERVER_SOURCES
streamreader/file_stream.cpp
streamreader/airplay_stream.cpp
streamreader/librespot_stream.cpp
streamreader/meta_stream.cpp
streamreader/watchdog.cpp
streamreader/process_stream.cpp)

View file

@ -44,7 +44,7 @@ endif
CXXFLAGS += $(ADD_CFLAGS) -std=c++14 -Wall -Wextra -Wpedantic -Wno-unused-function -DBOOST_ERROR_CODE_HEADER_ONLY -DHAS_FLAC -DHAS_OGG -DHAS_VORBIS -DHAS_VORBIS_ENC -DHAS_OPUS -DVERSION=\"$(VERSION)\" -I. -I.. -I../common
LDFLAGS += $(ADD_LDFLAGS) -lvorbis -lvorbisenc -logg -lFLAC -lopus -lsoxr
OBJ = snapserver.o server.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o ../common/resampler.o
OBJ = snapserver.o server.o config.o control_server.o control_session_tcp.o control_session_http.o control_session_ws.o stream_server.o stream_session.o stream_session_tcp.o stream_session_ws.o streamreader/stream_uri.o streamreader/base64.o streamreader/stream_manager.o streamreader/pcm_stream.o streamreader/posix_stream.o streamreader/pipe_stream.o streamreader/file_stream.o streamreader/tcp_stream.o streamreader/process_stream.o streamreader/airplay_stream.o streamreader/meta_stream.o streamreader/librespot_stream.o streamreader/watchdog.o encoder/encoder_factory.o encoder/flac_encoder.o encoder/opus_encoder.o encoder/pcm_encoder.o encoder/ogg_encoder.o ../common/sample_format.o ../common/resampler.o
ifneq (,$(TARGET))
CXXFLAGS += -D$(TARGET)

View file

@ -29,20 +29,6 @@
namespace encoder
{
class Encoder;
/// Callback interface for users of Encoder
/**
* Users of Encoder should implement this to get the encoded PCM data
*/
class EncoderListener
{
public:
virtual void onChunkEncoded(const Encoder* encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) = 0;
};
/// Abstract Encoder class
/**
* Stream encoder. PCM chunks are fed into the encoder.
@ -51,6 +37,8 @@ public:
class Encoder
{
public:
using OnEncodedCallback = std::function<void(const Encoder&, std::shared_ptr<msg::PcmChunk>, double)>;
/// ctor. Codec options (E.g. compression level) are passed as string and are codec dependend
Encoder(const std::string& codecOptions = "") : headerChunk_(nullptr), codecOptions_(codecOptions)
{
@ -59,11 +47,11 @@ public:
virtual ~Encoder() = default;
/// The listener will receive the encoded stream
virtual void init(EncoderListener* listener, const SampleFormat& format)
virtual void init(OnEncodedCallback callback, const SampleFormat& format)
{
if (codecOptions_ == "")
codecOptions_ = getDefaultOptions();
listener_ = listener;
encoded_callback_ = callback;
sampleFormat_ = format;
initEncoder();
}
@ -94,8 +82,8 @@ protected:
SampleFormat sampleFormat_;
std::shared_ptr<msg::CodecHeader> headerChunk_;
EncoderListener* listener_;
std::string codecOptions_;
OnEncodedCallback encoded_callback_;
};
} // namespace encoder

View file

@ -112,7 +112,7 @@ void FlacEncoder::encode(const msg::PcmChunk& chunk)
double resMs = static_cast<double>(encodedSamples_) / sampleFormat_.msRate();
// LOG(INFO, LOG_TAG) << "encoded: " << chunk->payloadSize << "\tframes: " << encodedSamples_ << "\tres: " << resMs << "\n";
encodedSamples_ = 0;
listener_->onChunkEncoded(this, flacChunk_, resMs);
encoded_callback_(*this, flacChunk_, resMs);
flacChunk_ = make_shared<msg::PcmChunk>(chunk.format, 0);
}
}

View file

@ -150,7 +150,7 @@ void OggEncoder::encode(const msg::PcmChunk& chunk)
// make oggChunk smaller
oggChunk->payload = (char*)realloc(oggChunk->payload, pos);
oggChunk->payloadSize = pos;
listener_->onChunkEncoded(this, oggChunk, res);
encoded_callback_(*this, oggChunk, res);
}
}

View file

@ -251,7 +251,7 @@ void OpusEncoder::encode(const SampleFormat& format, const char* data, size_t si
opusChunk->payloadSize = len;
opusChunk->payload = (char*)realloc(opusChunk->payload, opusChunk->payloadSize);
memcpy(opusChunk->payload, encoded_.data(), len);
listener_->onChunkEncoded(this, opusChunk, (double)samples_per_channel / sampleFormat_.msRate());
encoded_callback_(*this, opusChunk, (double)samples_per_channel / sampleFormat_.msRate());
}
else
{

View file

@ -51,7 +51,7 @@ void PcmEncoder::encode(const msg::PcmChunk& chunk)
{
// copy the chunk into a shared_ptr
auto pcmChunk = std::make_shared<msg::PcmChunk>(chunk);
listener_->onChunkEncoded(this, pcmChunk, pcmChunk->durationMs());
encoded_callback_(*this, pcmChunk, pcmChunk->durationMs());
}

View file

@ -68,7 +68,7 @@ void Server::onMetaChanged(const PcmStream* pcmStream)
}
void Server::onStateChanged(const PcmStream* pcmStream, const ReaderState& state)
void Server::onStateChanged(const PcmStream* pcmStream, ReaderState state)
{
// clang-format off
// Notification: {"jsonrpc":"2.0","method":"Stream.OnUpdate","params":{"id":"stream 1","stream":{"id":"stream 1","status":"idle","uri":{"fragment":"","host":"","path":"/tmp/snapfifo","query":{"chunk_ms":"20","codec":"flac","name":"stream 1","sampleformat":"48000:16:2"},"raw":"pipe:///tmp/snapfifo?name=stream 1","scheme":"pipe"}}}}
@ -81,9 +81,16 @@ void Server::onStateChanged(const PcmStream* pcmStream, const ReaderState& state
}
void Server::onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration)
void Server::onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk)
{
streamServer_->onNewChunk(pcmStream, pcmStream == streamManager_->getDefaultStream().get(), chunk, duration);
std::ignore = pcmStream;
std::ignore = chunk;
}
void Server::onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration)
{
streamServer_->onChunkEncoded(pcmStream, pcmStream == streamManager_->getDefaultStream().get(), chunk, duration);
}
@ -692,9 +699,23 @@ void Server::start()
streamManager_ =
std::make_unique<StreamManager>(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamChunkMs);
// throw SnapException("xxx");
// Add normal sources first
for (const auto& sourceUri : settings_.stream.sources)
{
PcmStreamPtr stream = streamManager_->addStream(sourceUri);
StreamUri streamUri(sourceUri);
if (streamUri.scheme == "meta")
continue;
PcmStreamPtr stream = streamManager_->addStream(streamUri);
if (stream)
LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n";
}
// Add meta sources second
for (const auto& sourceUri : settings_.stream.sources)
{
StreamUri streamUri(sourceUri);
if (streamUri.scheme != "meta")
continue;
PcmStreamPtr stream = streamManager_->addStream(streamUri);
if (stream)
LOG(INFO, LOG_TAG) << "Stream: " << stream->getUri().toJson() << "\n";
}

View file

@ -76,8 +76,9 @@ private:
/// Implementation of PcmListener
void onMetaChanged(const PcmStream* pcmStream) override;
void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) override;
void onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;
void onStateChanged(const PcmStream* pcmStream, ReaderState state) override;
void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) override;
void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;
void onResync(const PcmStream* pcmStream, double ms) override;
private:

View file

@ -86,9 +86,9 @@ void StreamServer::onMetaChanged(const PcmStream* pcmStream, std::shared_ptr<msg
}
void StreamServer::onNewChunk(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double /*duration*/)
void StreamServer::onChunkEncoded(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double /*duration*/)
{
// LOG(INFO, LOG_TAG) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n";
// LOG(TRACE, LOG_TAG) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n";
shared_const_buffer buffer(*chunk);
// make a copy of the sessions to avoid that a session get's deleted

View file

@ -65,7 +65,7 @@ public:
void addSession(const std::shared_ptr<StreamSession>& session);
void onMetaChanged(const PcmStream* pcmStream, std::shared_ptr<msg::StreamTags> meta);
void onNewChunk(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double duration);
void onChunkEncoded(const PcmStream* pcmStream, bool isDefaultStream, std::shared_ptr<msg::PcmChunk> chunk, double duration);
session_ptr getStreamSession(const std::string& mac) const;
session_ptr getStreamSession(StreamSession* session) const;

View file

@ -76,7 +76,6 @@ AlsaStream::AlsaStream(PcmListener* pcmListener, boost::asio::io_context& ioc, c
void AlsaStream::start()
{
LOG(DEBUG, LOG_TAG) << "Start, sampleformat: " << sampleFormat_.toString() << "\n";
encoder_->init(this, sampleFormat_);
// idle_bytes_ = 0;
// max_idle_bytes_ = sampleFormat_.rate() * sampleFormat_.frameSize() * dryout_ms_ / 1000;
@ -88,7 +87,7 @@ void AlsaStream::start()
first_ = true;
tvEncodedChunk_ = std::chrono::steady_clock::now();
initAlsa();
active_ = true;
PcmStream::start();
// wait(read_timer_, std::chrono::milliseconds(chunk_ms_), [this] { do_read(); });
do_read();
}
@ -225,7 +224,7 @@ void AlsaStream::do_read()
tvEncodedChunk_ = std::chrono::steady_clock::now() - duration;
}
onChunkRead(*chunk_);
chunkRead(*chunk_);
nextTick_ += duration;
auto currentTick = std::chrono::steady_clock::now();
@ -246,7 +245,7 @@ void AlsaStream::do_read()
else
{
// reading chunk_ms_ took longer than chunk_ms_
pcmListener_->onResync(this, std::chrono::duration_cast<std::chrono::milliseconds>(-next_read).count());
resync(-next_read);
first_ = true;
wait(read_timer_, nextTick_ - currentTick, [this] { do_read(); });
}

View file

@ -120,8 +120,7 @@ void AsioStream<ReadStream>::check_state()
template <typename ReadStream>
void AsioStream<ReadStream>::start()
{
encoder_->init(this, sampleFormat_);
active_ = true;
PcmStream::start();
check_state();
connect();
}
@ -198,7 +197,7 @@ void AsioStream<ReadStream>::do_read()
nextTick_ = std::chrono::steady_clock::now();
}
onChunkRead(*chunk_);
chunkRead(*chunk_);
nextTick_ += chunk_->duration<std::chrono::nanoseconds>();
auto currentTick = std::chrono::steady_clock::now();
@ -221,7 +220,7 @@ void AsioStream<ReadStream>::do_read()
// Read took longer, wait for the buffer to fill up
else
{
pcmListener_->onResync(this, std::chrono::duration_cast<std::chrono::milliseconds>(currentTick - nextTick_).count());
resync(std::chrono::duration_cast<std::chrono::nanoseconds>(currentTick - nextTick_));
nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_);
first_ = true;
do_read();

View file

@ -0,0 +1,170 @@
/***
This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <fcntl.h>
#include <memory>
#include <sys/stat.h>
#include "common/aixlog.hpp"
#include "common/snap_exception.hpp"
#include "common/utils/string_utils.hpp"
#include "encoder/encoder_factory.hpp"
#include "meta_stream.hpp"
using namespace std;
namespace streamreader
{
static constexpr auto LOG_TAG = "MetaStream";
MetaStream::MetaStream(PcmListener* pcmListener, std::vector<std::shared_ptr<PcmStream>> streams, boost::asio::io_context& ioc, const StreamUri& uri)
: PcmStream(pcmListener, ioc, uri), first_read_(true)
{
auto path_components = utils::string::split(uri.path, '/');
for (const auto& component : path_components)
{
if (component.empty())
continue;
LOG(INFO, LOG_TAG) << "Stream: " << component << "\n";
bool found = false;
for (const auto stream : streams)
{
if (stream->getName() == component)
{
streams_.push_back(stream);
stream->addListener(this);
found = true;
break;
}
}
if (!found)
throw SnapException("Unknown stream: \"" + component + "\"");
}
for (const auto stream : streams_)
LOG(INFO, LOG_TAG) << "Stream: " << stream->getName() << ", " << stream->getUri().toString() << "\n";
if (!streams_.empty())
{
active_stream_ = streams_.front();
resampler_ = make_unique<Resampler>(active_stream_->getSampleFormat(), sampleFormat_);
}
}
MetaStream::~MetaStream()
{
stop();
}
void MetaStream::start()
{
LOG(DEBUG, LOG_TAG) << "Start, sampleformat: " << sampleFormat_.toString() << "\n";
PcmStream::start();
}
void MetaStream::stop()
{
active_ = false;
}
void MetaStream::onMetaChanged(const PcmStream* pcmStream)
{
LOG(DEBUG, LOG_TAG) << "onMetaChanged: " << pcmStream->getName() << "\n";
std::lock_guard<std::mutex> lock(mutex_);
if (pcmStream != active_stream_.get())
return;
}
void MetaStream::onStateChanged(const PcmStream* pcmStream, ReaderState state)
{
LOG(DEBUG, LOG_TAG) << "onStateChanged: " << pcmStream->getName() << ", state: " << static_cast<int>(state) << "\n";
std::lock_guard<std::mutex> lock(mutex_);
for (const auto stream : streams_)
{
if (stream->getState() == ReaderState::kPlaying)
{
if ((state_ != ReaderState::kPlaying) || (active_stream_ != stream))
first_read_ = true;
if (active_stream_ != stream)
{
active_stream_ = stream;
resampler_ = make_unique<Resampler>(active_stream_->getSampleFormat(), sampleFormat_);
}
setState(ReaderState::kPlaying);
return;
}
}
active_stream_ = nullptr;
setState(ReaderState::kIdle);
}
void MetaStream::onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk)
{
// LOG(TRACE, LOG_TAG) << "onChunkRead: " << pcmStream->getName() << ", duration: " << chunk.durationMs() << "\n";
std::lock_guard<std::mutex> lock(mutex_);
if (pcmStream != active_stream_.get())
return;
// active_stream_->sampleFormat_
// sampleFormat_
if (first_read_)
{
first_read_ = false;
LOG(INFO, LOG_TAG) << "first read, updating timestamp\n";
tvEncodedChunk_ = std::chrono::steady_clock::now() - chunk.duration<std::chrono::microseconds>();
}
auto resampled_chunk = resampler_->resample(std::make_shared<msg::PcmChunk>(chunk));
if (resampled_chunk)
chunkRead(*resampled_chunk);
// chunkRead(chunk);
}
void MetaStream::onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration)
{
std::ignore = pcmStream;
std::ignore = chunk;
std::ignore = duration;
// LOG(TRACE, LOG_TAG) << "onChunkEncoded: " << pcmStream->getName() << ", duration: " << duration << "\n";
// chunkEncoded(*encoder_, chunk, duration);
}
void MetaStream::onResync(const PcmStream* pcmStream, double ms)
{
LOG(DEBUG, LOG_TAG) << "onResync: " << pcmStream->getName() << ", duration: " << ms << " ms\n";
std::lock_guard<std::mutex> lock(mutex_);
if (pcmStream != active_stream_.get())
return;
resync(std::chrono::nanoseconds(static_cast<int64_t>(ms * 1000000)));
}
} // namespace streamreader

View file

@ -0,0 +1,64 @@
/***
This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef META_STREAM_HPP
#define META_STREAM_HPP
#include "posix_stream.hpp"
#include "resampler.hpp"
#include <memory>
namespace streamreader
{
/// Reads and decodes PCM data
/**
* Reads PCM and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class MetaStream : public PcmStream, public PcmListener
{
public:
/// ctor. Encoded PCM data is passed to the PcmListener
MetaStream(PcmListener* pcmListener, std::vector<std::shared_ptr<PcmStream>> streams, boost::asio::io_context& ioc, const StreamUri& uri);
virtual ~MetaStream();
virtual void start();
virtual void stop();
protected:
/// Implementation of PcmListener
void onMetaChanged(const PcmStream* pcmStream) override;
void onStateChanged(const PcmStream* pcmStream, ReaderState state) override;
void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) override;
void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;
void onResync(const PcmStream* pcmStream, double ms) override;
protected:
std::vector<std::shared_ptr<PcmStream>> streams_;
std::shared_ptr<PcmStream> active_stream_;
std::mutex mutex_;
std::unique_ptr<Resampler> resampler_;
bool first_read_;
};
} // namespace streamreader
#endif

View file

@ -36,7 +36,7 @@ static constexpr auto LOG_TAG = "PcmStream";
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: active_(false), pcmListener_(pcmListener), uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc)
{
encoder::EncoderFactory encoderFactory;
if (uri_.query.find(kUriCodec) == uri_.query.end())
@ -98,7 +98,8 @@ const SampleFormat& PcmStream::getSampleFormat() const
void PcmStream::start()
{
LOG(DEBUG, LOG_TAG) << "Start, sampleformat: " << sampleFormat_.toString() << "\n";
encoder_->init(this, sampleFormat_);
encoder_->init([this](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) { chunkEncoded(encoder, chunk, duration); },
sampleFormat_);
active_ = true;
}
@ -115,21 +116,26 @@ ReaderState PcmStream::getState() const
}
void PcmStream::setState(const ReaderState& newState)
void PcmStream::setState(ReaderState newState)
{
if (newState != state_)
{
LOG(INFO, LOG_TAG) << "State changed: " << static_cast<int>(state_) << " => " << static_cast<int>(newState) << "\n";
state_ = newState;
if (pcmListener_)
pcmListener_->onStateChanged(this, newState);
for (auto* listener : pcmListeners_)
{
if (listener)
listener->onStateChanged(this, newState);
}
}
}
void PcmStream::onChunkEncoded(const encoder::Encoder* /*encoder*/, std::shared_ptr<msg::PcmChunk> chunk, double duration)
void PcmStream::chunkEncoded(const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration)
{
// LOG(TRACE, LOG_TAG) << "onChunkEncoded: " << duration << " ms, compression ratio: " << 100 - ceil(100 * (chunk->durationMs() / duration)) << "%\n";
std::ignore = encoder;
// LOG(TRACE, LOG_TAG) << "onChunkEncoded: " << getName() << ", duration: " << duration << " ms, compression ratio: " << 100 - ceil(100 *
// (chunk->durationMs() / duration)) << "%\n";
if (duration <= 0)
return;
@ -140,17 +146,35 @@ void PcmStream::onChunkEncoded(const encoder::Encoder* /*encoder*/, std::shared_
// update tvEncodedChunk_ to the next chunk start by adding the current chunk duration
tvEncodedChunk_ += std::chrono::nanoseconds(static_cast<std::chrono::nanoseconds::rep>(duration * 1000000));
if (pcmListener_)
pcmListener_->onNewChunk(this, chunk, duration);
for (auto* listener : pcmListeners_)
{
if (listener)
listener->onChunkEncoded(this, chunk, duration);
}
}
void PcmStream::onChunkRead(const msg::PcmChunk& chunk)
void PcmStream::chunkRead(const msg::PcmChunk& chunk)
{
for (auto* listener : pcmListeners_)
{
if (listener)
listener->onChunkRead(this, chunk);
}
encoder_->encode(chunk);
}
void PcmStream::resync(const std::chrono::nanoseconds& duration)
{
for (auto* listener : pcmListeners_)
{
if (listener)
listener->onResync(this, duration.count() / 1000000.);
}
}
json PcmStream::toJson() const
{
string state("unknown");
@ -171,11 +195,19 @@ json PcmStream::toJson() const
return j;
}
void PcmStream::addListener(PcmListener* pcmListener)
{
pcmListeners_.push_back(pcmListener);
}
std::shared_ptr<msg::StreamTags> PcmStream::getMeta() const
{
return meta_;
}
void PcmStream::setMeta(const json& jtag)
{
meta_.reset(new msg::StreamTags(jtag));
@ -183,8 +215,11 @@ void PcmStream::setMeta(const json& jtag)
LOG(INFO, LOG_TAG) << "metadata=" << meta_->msg.dump(4) << "\n";
// Trigger a stream update
if (pcmListener_)
pcmListener_->onMetaChanged(this);
for (auto* listener : pcmListeners_)
{
if (listener)
listener->onMetaChanged(this);
}
}
} // namespace streamreader

View file

@ -30,6 +30,7 @@
#include <condition_variable>
#include <map>
#include <string>
#include <vector>
namespace streamreader
@ -60,8 +61,9 @@ class PcmListener
{
public:
virtual void onMetaChanged(const PcmStream* pcmStream) = 0;
virtual void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) = 0;
virtual void onNewChunk(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) = 0;
virtual void onStateChanged(const PcmStream* pcmStream, ReaderState state) = 0;
virtual void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) = 0;
virtual void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) = 0;
virtual void onResync(const PcmStream* pcmStream, double ms) = 0;
};
@ -72,7 +74,7 @@ public:
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class PcmStream : public encoder::EncoderListener
class PcmStream
{
public:
/// ctor. Encoded PCM data is passed to the PcmListener
@ -82,8 +84,6 @@ public:
virtual void start();
virtual void stop();
/// Implementation of EncoderListener::onChunkEncoded
void onChunkEncoded(const encoder::Encoder* encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;
virtual std::shared_ptr<msg::CodecHeader> getHeader();
virtual const StreamUri& getUri() const;
@ -97,15 +97,18 @@ public:
virtual ReaderState getState() const;
virtual json toJson() const;
void addListener(PcmListener* pcmListener);
protected:
std::atomic<bool> active_;
void setState(const ReaderState& newState);
virtual void onChunkRead(const msg::PcmChunk& chunk);
void setState(ReaderState newState);
void chunkRead(const msg::PcmChunk& chunk);
void resync(const std::chrono::nanoseconds& duration);
void chunkEncoded(const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration);
std::chrono::time_point<std::chrono::steady_clock> tvEncodedChunk_;
PcmListener* pcmListener_;
std::vector<PcmListener*> pcmListeners_;
StreamUri uri_;
SampleFormat sampleFormat_;
size_t chunk_ms_;

View file

@ -123,7 +123,7 @@ void PosixStream::do_read()
if ((idle_bytes_ == 0) || (idle_bytes_ <= max_idle_bytes_))
{
// the encoder will update the tvEncodedChunk when a chunk is encoded
onChunkRead(*chunk_);
chunkRead(*chunk_);
}
else
{
@ -150,7 +150,7 @@ void PosixStream::do_read()
else
{
// reading chunk_ms_ took longer than chunk_ms_
pcmListener_->onResync(this, std::chrono::duration_cast<std::chrono::milliseconds>(-next_read).count());
resync(-next_read);
first_ = true;
wait(read_timer_, duration + kResyncTolerance, [this] { do_read(); });
}

View file

@ -27,6 +27,7 @@
#include "common/utils.hpp"
#include "file_stream.hpp"
#include "librespot_stream.hpp"
#include "meta_stream.hpp"
#include "pipe_stream.hpp"
#include "process_stream.hpp"
#include "tcp_stream.hpp"
@ -47,7 +48,12 @@ StreamManager::StreamManager(PcmListener* pcmListener, boost::asio::io_context&
PcmStreamPtr StreamManager::addStream(const std::string& uri)
{
StreamUri streamUri(uri);
return addStream(uri);
}
PcmStreamPtr StreamManager::addStream(StreamUri& streamUri)
{
if (streamUri.query.find(kUriSampleFormat) == streamUri.query.end())
streamUri.query[kUriSampleFormat] = sampleFormat_;
@ -102,6 +108,10 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri)
{
stream = make_shared<TcpStream>(pcmListener_, ioc_, streamUri);
}
else if (streamUri.scheme == "meta")
{
stream = make_shared<MetaStream>(pcmListener_, streams_, ioc_, streamUri);
}
else
{
throw SnapException("Unknown stream type: " + streamUri.scheme);
@ -160,15 +170,26 @@ const PcmStreamPtr StreamManager::getStream(const std::string& id)
void StreamManager::start()
{
// Start meta streams first
for (const auto& stream : streams_)
stream->start();
if (stream->getUri().scheme == "meta")
stream->start();
// Start normal streams second
for (const auto& stream : streams_)
if (stream->getUri().scheme != "meta")
stream->start();
}
void StreamManager::stop()
{
// Stop normal streams first
for (const auto& stream : streams_)
if (stream)
if (stream && (stream->getUri().scheme != "meta"))
stream->stop();
// Stop meta streams second
for (const auto& stream : streams_)
if (stream && (stream->getUri().scheme == "meta"))
stream->stop();
}

View file

@ -28,7 +28,7 @@
namespace streamreader
{
typedef std::shared_ptr<PcmStream> PcmStreamPtr;
using PcmStreamPtr = std::shared_ptr<PcmStream>;
class StreamManager
{
@ -37,6 +37,7 @@ public:
size_t defaultChunkBufferMs = 20);
PcmStreamPtr addStream(const std::string& uri);
PcmStreamPtr addStream(StreamUri& streamUri);
void removeStream(const std::string& name);
void start();
void stop();