Fix crash while calling Stream.RemoveStream

This commit is contained in:
badaix 2025-01-19 22:46:24 +01:00
parent 61bda79158
commit 780d8f3f1a
6 changed files with 20 additions and 19 deletions

View file

@ -121,7 +121,7 @@ template <typename ReadStream>
void AsioStream<ReadStream>::check_state(const std::chrono::steady_clock::duration& duration) void AsioStream<ReadStream>::check_state(const std::chrono::steady_clock::duration& duration)
{ {
state_timer_.expires_after(duration); state_timer_.expires_after(duration);
state_timer_.async_wait([this, duration](const boost::system::error_code& ec) state_timer_.async_wait([this, self = shared_from_this(), duration](const boost::system::error_code& ec)
{ {
if (!ec) if (!ec)
{ {
@ -174,7 +174,7 @@ void AsioStream<ReadStream>::do_read()
// Reset the silence timer // Reset the silence timer
check_state(idle_threshold_ + std::chrono::milliseconds(chunk_ms_)); check_state(idle_threshold_ + std::chrono::milliseconds(chunk_ms_));
boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize),
[this](boost::system::error_code ec, std::size_t length) mutable [this, self = shared_from_this()](boost::system::error_code ec, std::size_t length) mutable
{ {
state_timer_.cancel(); state_timer_.cancel();
@ -186,7 +186,7 @@ void AsioStream<ReadStream>::do_read()
lastException_ = ec.message(); lastException_ = ec.message();
} }
disconnect(); disconnect();
wait(read_timer_, 100ms, [this] { connect(); }); wait(read_timer_, 100ms, [this, self = shared_from_this()] { connect(); });
return; return;
} }
@ -240,7 +240,7 @@ void AsioStream<ReadStream>::do_read()
if (nextTick_ >= currentTick) if (nextTick_ >= currentTick)
{ {
read_timer_.expires_after(nextTick_ - currentTick); read_timer_.expires_after(nextTick_ - currentTick);
read_timer_.async_wait([this](const boost::system::error_code& ec) read_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec)
{ {
if (ec) if (ec)
{ {

View file

@ -144,11 +144,11 @@ void PcmStream::onControlRequest(const jsonrpcpp::Request& request)
void PcmStream::pollProperties() void PcmStream::pollProperties()
{ {
property_timer_.expires_after(10s); property_timer_.expires_after(10s);
property_timer_.async_wait([this](const boost::system::error_code& ec) property_timer_.async_wait([this, self = shared_from_this()](const boost::system::error_code& ec)
{ {
if (!ec) if (!ec)
{ {
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this, self = shared_from_this()](const jsonrpcpp::Response& response)
{ {
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n"; LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0) if (response.error().code() == 0)
@ -173,7 +173,7 @@ void PcmStream::onControlNotification(const jsonrpcpp::Notification& notificatio
else if (notification.method() == "Plugin.Stream.Ready") else if (notification.method() == "Plugin.Stream.Ready")
{ {
LOG(DEBUG, LOG_TAG) << "Plugin is ready\n"; LOG(DEBUG, LOG_TAG) << "Plugin is ready\n";
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this, self = shared_from_this()](const jsonrpcpp::Response& response)
{ {
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n"; LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0) if (response.error().code() == 0)
@ -228,14 +228,14 @@ void PcmStream::start()
{ {
LOG(DEBUG, LOG_TAG) << "Start: " << name_ << ", type: " << uri_.scheme << ", sampleformat: " << sampleFormat_.toString() << ", codec: " << getCodec() LOG(DEBUG, LOG_TAG) << "Start: " << name_ << ", type: " << uri_.scheme << ", sampleformat: " << sampleFormat_.toString() << ", codec: " << getCodec()
<< "\n"; << "\n";
encoder_->init([this](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration) encoder_->init([this, self = shared_from_this()](const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration)
{ chunkEncoded(encoder, std::move(chunk), duration); }, sampleFormat_); { chunkEncoded(encoder, std::move(chunk), duration); }, sampleFormat_);
if (stream_ctrl_) if (stream_ctrl_)
{ {
stream_ctrl_->start(getId(), server_settings_, [this](const jsonrpcpp::Notification& notification) { stream_ctrl_->start(getId(), server_settings_, [this, self = shared_from_this()](const jsonrpcpp::Notification& notification)
onControlNotification(notification); { onControlNotification(notification); }, [this, self = shared_from_this()](const jsonrpcpp::Request& request) { onControlRequest(request); },
}, [this](const jsonrpcpp::Request& request) { onControlRequest(request); }, [this](std::string message) { onControlLog(std::move(message)); }); [this, self = shared_from_this()](std::string message) { onControlLog(std::move(message)); });
} }
active_ = true; active_ = true;

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2024 Johannes Pohl Copyright (C) 2014-2025 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -100,7 +100,7 @@ static constexpr auto kControlScriptParams = "controlscriptparams";
* Implements EncoderListener to get the encoded data. * Implements EncoderListener to get the encoded data.
* Data is passed to the PcmStream::Listener * Data is passed to the PcmStream::Listener
*/ */
class PcmStream class PcmStream : public std::enable_shared_from_this<PcmStream>
{ {
public: public:
/// Callback interface for users of PcmStream /// Callback interface for users of PcmStream

View file

@ -1,6 +1,6 @@
/*** /***
This file is part of snapcast This file is part of snapcast
Copyright (C) 2014-2024 Johannes Pohl Copyright (C) 2014-2025 Johannes Pohl
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU General Public License as published by
@ -65,7 +65,7 @@ void PipeStream::connect()
if (errno == static_cast<int>(std::errc::no_such_file_or_directory)) if (errno == static_cast<int>(std::errc::no_such_file_or_directory))
{ {
LOG(ERROR, LOG_TAG) << error << "\n"; LOG(ERROR, LOG_TAG) << error << "\n";
wait(read_timer_, 200ms, [this] { connect(); }); wait(read_timer_, 200ms, [this, self = shared_from_this()] { connect(); });
return; return;
} }
throw SnapException(error); throw SnapException(error);

View file

@ -145,7 +145,8 @@ void ProcessStream::onStderrMsg(const std::string& line)
void ProcessStream::stderrReadLine() void ProcessStream::stderrReadLine()
{ {
const std::string delimiter = "\n"; const std::string delimiter = "\n";
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter,
[this, self = shared_from_this(), delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{ {
if (ec) if (ec)
{ {

View file

@ -73,7 +73,7 @@ void TcpStream::connect()
if (is_server_) if (is_server_)
{ {
acceptor_->async_accept([this](boost::system::error_code ec, tcp::socket socket) acceptor_->async_accept([this, self = shared_from_this()](boost::system::error_code ec, tcp::socket socket)
{ {
if (!ec) if (!ec)
{ {
@ -91,7 +91,7 @@ void TcpStream::connect()
{ {
stream_ = make_unique<tcp::socket>(strand_); stream_ = make_unique<tcp::socket>(strand_);
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::make_address(host_), port_); boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::make_address(host_), port_);
stream_->async_connect(endpoint, [this](const boost::system::error_code& ec) stream_->async_connect(endpoint, [this, self = shared_from_this()](const boost::system::error_code& ec)
{ {
if (!ec) if (!ec)
{ {
@ -101,7 +101,7 @@ void TcpStream::connect()
else else
{ {
LOG(DEBUG, LOG_TAG) << "Connect failed: " << ec.message() << "\n"; LOG(DEBUG, LOG_TAG) << "Connect failed: " << ec.message() << "\n";
wait(reconnect_timer_, 1s, [this] { connect(); }); wait(reconnect_timer_, 1s, [this, self = shared_from_this()] { connect(); });
} }
}); });
} }