Make metadata thread safe, poll properties

This commit is contained in:
badaix 2021-06-23 20:48:25 +02:00
parent 1f51befbad
commit 0853c7c701
10 changed files with 124 additions and 100 deletions

View file

@ -49,13 +49,12 @@ void Server::onNewSession(std::shared_ptr<StreamSession> session)
}
void Server::onMetadataChanged(const PcmStream* pcmStream)
void Server::onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata)
{
// clang-format off
// Notification: {"jsonrpc":"2.0","method":"Stream.OnMetadata","params":{"id":"stream 1", "metadata": {"album": "some album", "artist": "some artist", "track": "some track"...}}}
// clang-format on
const auto metadata = pcmStream->getMetadata();
LOG(DEBUG, LOG_TAG) << "Metadata changed, stream: " << pcmStream->getName() << ", meta: " << metadata.toJson().dump(3) << "\n";
// streamServer_->onMetadataChanged(pcmStream, meta);
@ -67,13 +66,13 @@ void Server::onMetadataChanged(const PcmStream* pcmStream)
}
void Server::onPropertiesChanged(const PcmStream* pcmStream)
void Server::onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties)
{
const auto props = pcmStream->getProperties();
LOG(DEBUG, LOG_TAG) << "Properties changed, stream: " << pcmStream->getName() << ", properties: " << props.toJson().dump(3) << "\n";
LOG(DEBUG, LOG_TAG) << "Properties changed, stream: " << pcmStream->getName() << ", properties: " << properties.toJson().dump(3) << "\n";
// Send propeties to all connected control clients
json notification = jsonrpcpp::Notification("Stream.OnProperties", jsonrpcpp::Parameter("id", pcmStream->getId(), "properties", props.toJson())).to_json();
json notification =
jsonrpcpp::Notification("Stream.OnProperties", jsonrpcpp::Parameter("id", pcmStream->getId(), "properties", properties.toJson())).to_json();
controlServer_->send(notification.dump(), nullptr);
}

View file

@ -78,8 +78,8 @@ private:
void onNewSession(std::shared_ptr<StreamSession> session) override;
/// Implementation of PcmListener
void onMetadataChanged(const PcmStream* pcmStream) override;
void onPropertiesChanged(const PcmStream* pcmStream) override;
void onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) override;
void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override;
void onStateChanged(const PcmStream* pcmStream, ReaderState state) override;
void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) override;
void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;

View file

@ -170,10 +170,10 @@ void LibrespotStream::onStderrMsg(const std::string& line)
meta.title = string(m[1]);
meta.duration = cpt::stod(m[2]) / 1000.;
setMetadata(meta);
Properties props;
// props.can_seek = true;
// props.can_control = true;
setProperties(props);
Properties properties;
// properties.can_seek = true;
// properties.can_control = true;
setProperties(properties);
}
}

View file

@ -84,8 +84,9 @@ void MetaStream::stop()
}
void MetaStream::onMetadataChanged(const PcmStream* pcmStream)
void MetaStream::onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata)
{
std::ignore = metadata;
LOG(DEBUG, LOG_TAG) << "onMetadataChanged: " << pcmStream->getName() << "\n";
std::lock_guard<std::mutex> lock(mutex_);
if (pcmStream != active_stream_.get())
@ -93,8 +94,9 @@ void MetaStream::onMetadataChanged(const PcmStream* pcmStream)
}
void MetaStream::onPropertiesChanged(const PcmStream* pcmStream)
void MetaStream::onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties)
{
std::ignore = properties;
LOG(DEBUG, LOG_TAG) << "onPropertiesChanged: " << pcmStream->getName() << "\n";
}

View file

@ -46,8 +46,8 @@ public:
protected:
/// Implementation of PcmListener
void onMetadataChanged(const PcmStream* pcmStream) override;
void onPropertiesChanged(const PcmStream* pcmStream) override;
void onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) override;
void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) override;
void onStateChanged(const PcmStream* pcmStream, ReaderState state) override;
void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) override;
void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) override;

View file

@ -37,7 +37,8 @@ static constexpr auto LOG_TAG = "PcmStream";
PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings), req_id_(0)
: active_(false), pcmListeners_{pcmListener}, uri_(uri), chunk_ms_(20), state_(ReaderState::kIdle), ioc_(ioc), server_settings_(server_settings),
req_id_(0), property_timer_(ioc)
{
encoder::EncoderFactory encoderFactory;
if (uri_.query.find(kUriCodec) == uri_.query.end())
@ -66,6 +67,7 @@ PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, con
PcmStream::~PcmStream()
{
stop();
property_timer_.cancel();
}
@ -111,6 +113,23 @@ void PcmStream::onControlRequest(const jsonrpcpp::Request& request)
}
void PcmStream::pollProperties()
{
property_timer_.expires_after(10s);
property_timer_.async_wait([this](const boost::system::error_code& ec) {
if (!ec)
{
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) {
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0)
setProperties(response.result());
});
pollProperties();
}
});
}
void PcmStream::onControlNotification(const jsonrpcpp::Notification& notification)
{
try
@ -139,6 +158,7 @@ void PcmStream::onControlNotification(const jsonrpcpp::Notification& notificatio
if (response.error().code() == 0)
setMetadata(response.result());
});
pollProperties();
}
else if (notification.method() == "Plugin.Stream.Log")
{
@ -291,12 +311,14 @@ void PcmStream::addListener(PcmListener* pcmListener)
const Metatags& PcmStream::getMetadata() const
{
std::lock_guard<std::mutex> lock(mutex_);
return metadata_;
}
const Properties& PcmStream::getProperties() const
{
std::lock_guard<std::mutex> lock(mutex_);
return properties_;
}
@ -426,6 +448,7 @@ void PcmStream::control(const jsonrpcpp::Request& request, const StreamControl::
void PcmStream::setMetadata(const Metatags& metadata)
{
std::lock_guard<std::mutex> lock(mutex_);
if (metadata == metadata_)
{
LOG(DEBUG, LOG_TAG) << "setMetadata: Metadata did not change\n";
@ -439,27 +462,28 @@ void PcmStream::setMetadata(const Metatags& metadata)
for (auto* listener : pcmListeners_)
{
if (listener != nullptr)
listener->onMetadataChanged(this);
listener->onMetadataChanged(this, metadata_);
}
}
void PcmStream::setProperties(const Properties& props)
void PcmStream::setProperties(const Properties& properties)
{
if (props == properties_)
std::lock_guard<std::mutex> lock(mutex_);
if (properties == properties_)
{
LOG(DEBUG, LOG_TAG) << "setProperties: Properties did not change\n";
return;
}
properties_ = props;
properties_ = properties;
LOG(INFO, LOG_TAG) << "setProperties, stream: " << getId() << ", properties: " << properties_.toJson() << "\n";
// Trigger a stream update
for (auto* listener : pcmListeners_)
{
if (listener != nullptr)
listener->onPropertiesChanged(this);
listener->onPropertiesChanged(this, properties);
}
}

View file

@ -20,11 +20,13 @@
#define PCM_STREAM_HPP
#include <atomic>
#include <mutex>
#include <string>
#include <vector>
#include <boost/asio/io_context.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/optional.hpp>
#include "common/json.hpp"
@ -96,8 +98,8 @@ static constexpr auto kControlScript = "controlscript";
class PcmListener
{
public:
virtual void onMetadataChanged(const PcmStream* pcmStream) = 0;
virtual void onPropertiesChanged(const PcmStream* pcmStream) = 0;
virtual void onMetadataChanged(const PcmStream* pcmStream, const Metatags& metadata) = 0;
virtual void onPropertiesChanged(const PcmStream* pcmStream, const Properties& properties) = 0;
virtual void onStateChanged(const PcmStream* pcmStream, ReaderState state) = 0;
virtual void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk& chunk) = 0;
virtual void onChunkEncoded(const PcmStream* pcmStream, std::shared_ptr<msg::PcmChunk> chunk, double duration) = 0;
@ -154,7 +156,9 @@ protected:
void chunkEncoded(const encoder::Encoder& encoder, std::shared_ptr<msg::PcmChunk> chunk, double duration);
void setMetadata(const Metatags& metadata);
void setProperties(const Properties& props);
void setProperties(const Properties& properties);
void pollProperties();
std::chrono::time_point<std::chrono::steady_clock> tvEncodedChunk_;
std::vector<PcmListener*> pcmListeners_;
@ -169,7 +173,9 @@ protected:
boost::asio::io_context& ioc_;
ServerSettings server_settings_;
std::unique_ptr<StreamControl> stream_ctrl_;
int req_id_;
std::atomic<int> req_id_;
boost::asio::steady_timer property_timer_;
mutable std::mutex mutex_;
};
} // namespace streamreader

View file

@ -36,7 +36,7 @@ namespace streamreader
static constexpr auto LOG_TAG = "Script";
StreamControl::StreamControl(boost::asio::io_context& ioc) : ioc_(ioc)
StreamControl::StreamControl(boost::asio::io_context& ioc) : ioc_(ioc), strand_(ioc)
{
}
@ -60,10 +60,13 @@ void StreamControl::start(const std::string& stream_id, const ServerSettings& se
void StreamControl::command(const jsonrpcpp::Request& request, const OnResponse& response_handler)
{
// use strand to serialize commands sent from different threads
boost::asio::post(strand_, [this, request, response_handler]() {
if (response_handler)
request_callbacks_[request.id()] = response_handler;
doCommand(request);
});
}
@ -72,31 +75,54 @@ void StreamControl::stop()
}
void StreamControl::onNotification(const jsonrpcpp::Notification& notification)
void StreamControl::onReceive(const std::string& json)
{
notification_handler_(notification);
jsonrpcpp::entity_ptr entity(nullptr);
try
{
entity = jsonrpcpp::Parser::do_parse(json);
if (!entity)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message\n";
}
void StreamControl::onRequest(const jsonrpcpp::Request& request)
else if (entity->is_notification())
{
request_handler_(request);
jsonrpcpp::notification_ptr notification = dynamic_pointer_cast<jsonrpcpp::Notification>(entity);
notification_handler_(*notification);
}
void StreamControl::onResponse(const jsonrpcpp::Response& response)
else if (entity->is_request())
{
LOG(INFO, LOG_TAG) << "Response: " << response.to_json() << ", id: " << response.id() << "\n";
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
request_handler_(*request);
}
else if (entity->is_response())
{
jsonrpcpp::response_ptr response = dynamic_pointer_cast<jsonrpcpp::Response>(entity);
LOG(INFO, LOG_TAG) << "Response: " << response->to_json() << ", id: " << response->id() << "\n";
// TODO: call request_callbacks_ on timeout with error
auto iter = request_callbacks_.find(response.id());
auto iter = request_callbacks_.find(response->id());
if (iter != request_callbacks_.end())
{
iter->second(response);
iter->second(*response);
request_callbacks_.erase(iter);
}
else
{
LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response.id() << "\n";
LOG(WARNING, LOG_TAG) << "No request found for response with id: " << response->id() << "\n";
}
}
else
{
LOG(WARNING, LOG_TAG) << "Not handling message: " << json << "\n";
}
}
catch (const jsonrpcpp::ParseErrorException& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
}
}
@ -160,6 +186,7 @@ void ScriptStreamControl::stderrReadLine()
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stderr_.data()), buffers_begin(streambuf_stderr_.data()) + bytes_transferred - delimiter.length()};
onLog(std::move(line));
streambuf_stderr_.consume(bytes_transferred);
stderrReadLine();
});
@ -178,43 +205,7 @@ void ScriptStreamControl::stdoutReadLine()
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_stdout_.data()), buffers_begin(streambuf_stdout_.data()) + bytes_transferred - delimiter.length()};
jsonrpcpp::entity_ptr entity(nullptr);
try
{
entity = jsonrpcpp::Parser::do_parse(line);
if (!entity)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message\n";
}
if (entity->is_notification())
{
jsonrpcpp::notification_ptr notification = dynamic_pointer_cast<jsonrpcpp::Notification>(entity);
onNotification(*notification);
}
else if (entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
onRequest(*request);
}
else if (entity->is_response())
{
jsonrpcpp::response_ptr response = dynamic_pointer_cast<jsonrpcpp::Response>(entity);
onResponse(*response);
}
else
{
LOG(WARNING, LOG_TAG) << "Not handling message: " << line << "\n";
}
}
catch (const jsonrpcpp::ParseErrorException& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Failed to parse message: " << e.what() << "\n";
}
onReceive(line);
streambuf_stdout_.consume(bytes_transferred);
stdoutReadLine();

View file

@ -30,6 +30,7 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/strand.hpp>
#include "jsonrpcpp.hpp"
#include "server_settings.hpp"
@ -64,17 +65,18 @@ protected:
virtual void doCommand(const jsonrpcpp::Request& request) = 0;
virtual void doStart(const std::string& stream_id, const ServerSettings& server_setttings) = 0;
void onNotification(const jsonrpcpp::Notification& notification);
void onRequest(const jsonrpcpp::Request& request);
void onResponse(const jsonrpcpp::Response& response);
void onReceive(const std::string& json);
void onLog(std::string message);
boost::asio::io_context& ioc_;
private:
OnRequest request_handler_;
OnNotification notification_handler_;
OnLog log_handler_;
boost::asio::io_context& ioc_;
std::map<jsonrpcpp::Id, OnResponse> request_callbacks_;
boost::asio::io_context::strand strand_;
};

View file

@ -182,12 +182,12 @@ TEST_CASE("Properties")
)");
// std::cout << in_json.dump(4) << "\n";
Properties props(in_json);
std::cout << props.toJson().dump(4) << "\n";
Properties properties(in_json);
std::cout << properties.toJson().dump(4) << "\n";
REQUIRE(props.loop_status.has_value());
REQUIRE(properties.loop_status.has_value());
auto out_json = props.toJson();
auto out_json = properties.toJson();
// std::cout << out_json.dump(4) << "\n";
REQUIRE(in_json == out_json);
@ -198,12 +198,12 @@ TEST_CASE("Properties")
)");
// std::cout << in_json.dump(4) << "\n";
props.fromJson(in_json);
// std::cout << props.toJson().dump(4) << "\n";
properties.fromJson(in_json);
// std::cout << properties.toJson().dump(4) << "\n";
REQUIRE(!props.loop_status.has_value());
REQUIRE(!properties.loop_status.has_value());
out_json = props.toJson();
out_json = properties.toJson();
// std::cout << out_json.dump(4) << "\n";
REQUIRE(in_json == out_json);
}