From b638d81b664610546fe0dacc41f0cf4efd68233d Mon Sep 17 00:00:00 2001 From: Jerome <7315817+jdecourval@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:38:52 -0400 Subject: [PATCH 1/2] Lazily load streams in meta streams --- server/streamreader/meta_stream.cpp | 137 +++++++++++++++++++------ server/streamreader/meta_stream.hpp | 6 ++ server/streamreader/stream_manager.cpp | 9 ++ 3 files changed, 118 insertions(+), 34 deletions(-) diff --git a/server/streamreader/meta_stream.cpp b/server/streamreader/meta_stream.cpp index e36beffd..f4ad894d 100644 --- a/server/streamreader/meta_stream.cpp +++ b/server/streamreader/meta_stream.cpp @@ -39,33 +39,86 @@ MetaStream::MetaStream(PcmStream::Listener* pcmListener, const std::vectorgetName() == component) - { - streams_.push_back(stream); - stream->addListener(this); - found = true; - break; - } + return true; } - if (!found) - throw SnapException("Unknown stream: \"" + component + "\""); } - if (streams_.empty()) - throw SnapException("Meta stream '" + getName() + "' must contain at least one stream"); - - active_stream_ = streams_.front(); - resampler_ = make_unique(active_stream_->getSampleFormat(), sampleFormat_); + return false; } +void MetaStream::addStream(std::shared_ptr stream) +{ + if (isAllowed(*stream)) + { + stream->addListener(this); + streams_.push_back(std::move(stream)); + updateActiveStream(); + } +} + +void MetaStream::removeStream(const PcmStream& stream) +{ + auto iter = std::find_if(streams_.begin(), streams_.end(), [id = stream.getId()](const auto& s) { return s->getId() == id; }); + if (iter != streams_.end()) + { + streams_.erase(iter); + updateActiveStream(); + } +} + +void MetaStream::updateActiveStream() +{ + auto compareStreamOrder = [this](const std::shared_ptr& first, const std::shared_ptr& second) + { + if (first->getName() == second->getName()) + return false; + + auto path_components = utils::string::split(uri_.path, '/'); + for (const auto& component : path_components) + { + if (component == first->getName()) + return true; + if (component == second->getName()) + return false; + } + return false; + }; + + std::lock_guard lock(active_mutex_); + if (!streams_.empty()) + { + auto new_active = std::min_element(streams_.begin(), streams_.end(), compareStreamOrder); + if (!active_stream_ || active_stream_->getId() != ((*new_active)->getId())) + { + active_stream_ = *streams_.begin(); + resampler_ = make_unique(active_stream_->getSampleFormat(), sampleFormat_); + } + } + else + { + active_stream_ = nullptr; + resampler_ = nullptr; + } +} MetaStream::~MetaStream() { @@ -133,7 +186,8 @@ void MetaStream::onStateChanged(const PcmStream* pcmStream, ReaderState state) } } - switch_stream(streams_.front()); + if (!streams_.empty()) + switch_stream(*streams_.begin()); setState(ReaderState::kIdle); } @@ -212,31 +266,36 @@ void MetaStream::onResync(const PcmStream* pcmStream, double ms) void MetaStream::setShuffle(bool shuffle, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->setShuffle(shuffle, std::move(handler)); + if (active_stream_) + active_stream_->setShuffle(shuffle, std::move(handler)); } void MetaStream::setLoopStatus(LoopStatus status, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->setLoopStatus(status, std::move(handler)); + if (active_stream_) + active_stream_->setLoopStatus(status, std::move(handler)); } void MetaStream::setVolume(uint16_t volume, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->setVolume(volume, std::move(handler)); + if (active_stream_) + active_stream_->setVolume(volume, std::move(handler)); } void MetaStream::setMute(bool mute, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->setMute(mute, std::move(handler)); + if (active_stream_) + active_stream_->setMute(mute, std::move(handler)); } void MetaStream::setRate(float rate, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->setRate(rate, std::move(handler)); + if (active_stream_) + active_stream_->setRate(rate, std::move(handler)); } @@ -244,54 +303,63 @@ void MetaStream::setRate(float rate, ResultHandler handler) void MetaStream::setPosition(std::chrono::milliseconds position, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->setPosition(position, std::move(handler)); + if (active_stream_) + active_stream_->setPosition(position, std::move(handler)); } void MetaStream::seek(std::chrono::milliseconds offset, ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->seek(offset, std::move(handler)); + if (active_stream_) + active_stream_->seek(offset, std::move(handler)); } void MetaStream::next(ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->next(std::move(handler)); + if (active_stream_) + active_stream_->next(std::move(handler)); } void MetaStream::previous(ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->previous(std::move(handler)); + if (active_stream_) + active_stream_->previous(std::move(handler)); } void MetaStream::pause(ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->pause(std::move(handler)); + if (active_stream_) + active_stream_->pause(std::move(handler)); } void MetaStream::playPause(ResultHandler handler) { LOG(DEBUG, LOG_TAG) << "PlayPause\n"; std::lock_guard lock(mutex_); - if (active_stream_->getState() == ReaderState::kIdle) - play(handler); - else - active_stream_->playPause(std::move(handler)); + if (active_stream_) + { + if (active_stream_->getState() == ReaderState::kIdle) + play(handler); + else + active_stream_->playPause(std::move(handler)); + } } void MetaStream::stop(ResultHandler handler) { std::lock_guard lock(mutex_); - active_stream_->stop(std::move(handler)); + if (active_stream_) + active_stream_->stop(std::move(handler)); } void MetaStream::play(ResultHandler handler) { LOG(DEBUG, LOG_TAG) << "Play\n"; std::lock_guard lock(mutex_); - if ((active_stream_->getProperties().can_play) && (active_stream_->getProperties().playback_status != PlaybackStatus::kPlaying)) + if ((active_stream_) && (active_stream_->getProperties().can_play) && (active_stream_->getProperties().playback_status != PlaybackStatus::kPlaying)) return active_stream_->play(std::move(handler)); for (const auto& stream : streams_) @@ -303,7 +371,8 @@ void MetaStream::play(ResultHandler handler) } // call play on the active stream to get the handler called - active_stream_->play(std::move(handler)); + if (active_stream_) + active_stream_->play(std::move(handler)); } diff --git a/server/streamreader/meta_stream.hpp b/server/streamreader/meta_stream.hpp index c87ed140..afbf587d 100644 --- a/server/streamreader/meta_stream.hpp +++ b/server/streamreader/meta_stream.hpp @@ -49,6 +49,9 @@ public: void start() override; void stop() override; + void addStream(std::shared_ptr stream); + void removeStream(const PcmStream& stream); + // Setter for properties void setShuffle(bool shuffle, ResultHandler handler) override; void setLoopStatus(LoopStatus status, ResultHandler handler) override; @@ -67,6 +70,9 @@ public: void play(ResultHandler handler) override; protected: + bool isAllowed(const PcmStream& stream) const; + void updateActiveStream(); + /// Implementation of PcmStream::Listener void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override; void onStateChanged(const PcmStream* pcmStream, ReaderState state) override; diff --git a/server/streamreader/stream_manager.cpp b/server/streamreader/stream_manager.cpp index cb4faa21..c7261ff3 100644 --- a/server/streamreader/stream_manager.cpp +++ b/server/streamreader/stream_manager.cpp @@ -146,6 +146,9 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri) { if (s->getName() == stream->getName()) throw SnapException("Stream with name \"" + stream->getName() + "\" already exists"); + + if (auto meta = dynamic_cast(s.get())) + meta->addStream(stream); } streams_.push_back(stream); } @@ -161,6 +164,12 @@ void StreamManager::removeStream(const std::string& name) { (*iter)->stop(); streams_.erase(iter); + + for (const auto& s : streams_) + { + if (auto meta = dynamic_cast(s.get())) + meta->removeStream(**iter); + } } } From cbdaeb3ea9991472f6d18dbb76453bb8d9a8d884 Mon Sep 17 00:00:00 2001 From: Jerome <7315817+jdecourval@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:39:33 -0400 Subject: [PATCH 2/2] Add wildcard (*) support to metastreams --- server/streamreader/meta_stream.cpp | 4 +++- server/streamreader/meta_stream.hpp | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/server/streamreader/meta_stream.cpp b/server/streamreader/meta_stream.cpp index f4ad894d..3cbe31fa 100644 --- a/server/streamreader/meta_stream.cpp +++ b/server/streamreader/meta_stream.cpp @@ -56,7 +56,7 @@ bool MetaStream::isAllowed(const PcmStream& stream) const if (component.empty()) continue; - if (stream.getName() == component) + if (component == WILDCARD || stream.getName() == component) { return true; } @@ -99,6 +99,8 @@ void MetaStream::updateActiveStream() return true; if (component == second->getName()) return false; + if (component == WILDCARD) + return false; } return false; }; diff --git a/server/streamreader/meta_stream.hpp b/server/streamreader/meta_stream.hpp index afbf587d..9feb60f3 100644 --- a/server/streamreader/meta_stream.hpp +++ b/server/streamreader/meta_stream.hpp @@ -40,6 +40,9 @@ namespace streamreader */ class MetaStream : public PcmStream, public PcmStream::Listener { +public: + static inline const std::string WILDCARD = "*"; + public: /// ctor. Encoded PCM data is passed to the PcmStream::Listener MetaStream(PcmStream::Listener* pcmListener, const std::vector>& streams, boost::asio::io_context& ioc,