mirror of
https://github.com/badaix/snapcast.git
synced 2025-06-10 06:41:42 +02:00
Protect request response with a strand
This commit is contained in:
parent
fdcc609bbb
commit
799ed8ae66
2 changed files with 23 additions and 9 deletions
|
@ -62,7 +62,7 @@ void ClientConnection::connect(const ResultHandler& handler)
|
||||||
{
|
{
|
||||||
tcp::resolver::query query(server_.host, cpt::to_string(server_.port), boost::asio::ip::resolver_query_base::numeric_service);
|
tcp::resolver::query query(server_.host, cpt::to_string(server_.port), boost::asio::ip::resolver_query_base::numeric_service);
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
LOG(INFO, LOG_TAG) << "Resolving host IP\n";
|
LOG(INFO, LOG_TAG) << "Resolving host IP for: " << server_.host << "\n";
|
||||||
auto iterator = resolver_.resolve(query, ec);
|
auto iterator = resolver_.resolve(query, ec);
|
||||||
if (ec)
|
if (ec)
|
||||||
{
|
{
|
||||||
|
@ -81,7 +81,6 @@ void ClientConnection::connect(const ResultHandler& handler)
|
||||||
}
|
}
|
||||||
LOG(NOTICE, LOG_TAG) << "Connected to " << socket_.remote_endpoint().address().to_string() << endl;
|
LOG(NOTICE, LOG_TAG) << "Connected to " << socket_.remote_endpoint().address().to_string() << endl;
|
||||||
handler(ec);
|
handler(ec);
|
||||||
// getNextMessage();
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
resolver_.async_resolve(query, host_, cpt::to_string(port_), [this, handler](const boost::system::error_code& ec, tcp::resolver::results_type results) {
|
resolver_.async_resolve(query, host_, cpt::to_string(port_), [this, handler](const boost::system::error_code& ec, tcp::resolver::results_type results) {
|
||||||
|
@ -206,8 +205,8 @@ void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& ha
|
||||||
base_message_.deserialize(buffer_.data());
|
base_message_.deserialize(buffer_.data());
|
||||||
tv t;
|
tv t;
|
||||||
base_message_.received = t;
|
base_message_.received = t;
|
||||||
// LOG(DEBUG, LOG_TAG) << "getNextMessage: " << base_message_.type << ", size: " << base_message_.size
|
LOG(TRACE, LOG_TAG) << "getNextMessage: " << base_message_.type << ", size: " << base_message_.size
|
||||||
// << ", id: " << base_message_.id << ", refers: " << base_message_.refersTo << "\n";
|
<< ", id: " << base_message_.id << ", refers: " << base_message_.refersTo << "\n";
|
||||||
if (base_message_.type > message_type::kLast)
|
if (base_message_.type > message_type::kLast)
|
||||||
{
|
{
|
||||||
LOG(ERROR, LOG_TAG) << "unknown message type received: " << base_message_.type << ", size: " << base_message_.size << "\n";
|
LOG(ERROR, LOG_TAG) << "unknown message type received: " << base_message_.type << ", size: " << base_message_.size << "\n";
|
||||||
|
@ -238,13 +237,15 @@ void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& ha
|
||||||
}
|
}
|
||||||
|
|
||||||
auto response = msg::factory::createMessage(base_message_, buffer_.data());
|
auto response = msg::factory::createMessage(base_message_, buffer_.data());
|
||||||
for (const auto& request : pendingRequests_)
|
for (auto iter = pendingRequests_.begin(); iter != pendingRequests_.end(); ++iter)
|
||||||
{
|
{
|
||||||
|
auto request = *iter;
|
||||||
if (auto req = request.lock())
|
if (auto req = request.lock())
|
||||||
{
|
{
|
||||||
if (req->id() == base_message_.refersTo)
|
if (req->id() == base_message_.refersTo)
|
||||||
{
|
{
|
||||||
req->setValue(std::move(response));
|
req->setValue(std::move(response));
|
||||||
|
pendingRequests_.erase(iter);
|
||||||
getNextMessage(handler);
|
getNextMessage(handler);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,34 +57,47 @@ public:
|
||||||
timer_.cancel();
|
timer_.cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the response for the pending request and passes it to the handler
|
||||||
|
/// @param value the response message
|
||||||
void setValue(std::unique_ptr<msg::BaseMessage> value)
|
void setValue(std::unique_ptr<msg::BaseMessage> value)
|
||||||
{
|
{
|
||||||
|
boost::asio::post(strand_, [this, self = shared_from_this(), val = std::move(value)]() mutable {
|
||||||
timer_.cancel();
|
timer_.cancel();
|
||||||
if (handler_)
|
if (handler_)
|
||||||
handler_({}, std::move(value));
|
handler_({}, std::move(val));
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// @return the id of the request
|
||||||
uint16_t id() const
|
uint16_t id() const
|
||||||
{
|
{
|
||||||
return id_;
|
return id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Start the timer for the request
|
||||||
|
/// @param timeout the timeout to wait for the reception of the response
|
||||||
void startTimer(const chronos::usec& timeout)
|
void startTimer(const chronos::usec& timeout)
|
||||||
{
|
{
|
||||||
timer_.expires_after(timeout);
|
timer_.expires_after(timeout);
|
||||||
timer_.async_wait(boost::asio::bind_executor(strand_, [ this, self = shared_from_this() ](boost::system::error_code ec) {
|
timer_.async_wait(boost::asio::bind_executor(strand_, [this, self = shared_from_this()](boost::system::error_code ec) {
|
||||||
if (!handler_)
|
if (!handler_)
|
||||||
return;
|
return;
|
||||||
if (!ec)
|
if (!ec)
|
||||||
{
|
{
|
||||||
|
// !ec => expired => timeout
|
||||||
handler_(boost::asio::error::timed_out, nullptr);
|
handler_(boost::asio::error::timed_out, nullptr);
|
||||||
handler_ = nullptr;
|
handler_ = nullptr;
|
||||||
}
|
}
|
||||||
else if (ec != boost::asio::error::operation_aborted)
|
else if (ec != boost::asio::error::operation_aborted)
|
||||||
|
{
|
||||||
|
// ec != aborted => not cancelled (in setValue)
|
||||||
|
// => should not happen, but who knows => pass the error to the handler
|
||||||
handler_(ec, nullptr);
|
handler_(ec, nullptr);
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Needed to put the requests in a container
|
||||||
bool operator<(const PendingRequest& other) const
|
bool operator<(const PendingRequest& other) const
|
||||||
{
|
{
|
||||||
return (id_ < other.id());
|
return (id_ < other.id());
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue