Reformat code

This commit is contained in:
badaix 2022-12-29 19:10:38 +01:00
parent c6518a4709
commit e1c8250876
32 changed files with 505 additions and 351 deletions

View file

@ -179,11 +179,12 @@ bool BrowseBonjour::browse(const string& serviceName, mDNSResult& result, int /*
CHECKED(DNSServiceBrowse(
service.get(), 0, 0, serviceName.c_str(), "local.",
[](DNSServiceRef /*service*/, DNSServiceFlags /*flags*/, uint32_t /*interfaceIndex*/, DNSServiceErrorType errorCode, const char* serviceName,
const char* regtype, const char* replyDomain, void* context) {
auto replyCollection = static_cast<deque<mDNSReply>*>(context);
const char* regtype, const char* replyDomain, void* context)
{
auto replyCollection = static_cast<deque<mDNSReply>*>(context);
CHECKED(errorCode);
replyCollection->push_back(mDNSReply{string(serviceName), string(regtype), string(replyDomain)});
CHECKED(errorCode);
replyCollection->push_back(mDNSReply{string(serviceName), string(regtype), string(replyDomain)});
},
&replyCollection));
@ -198,11 +199,12 @@ bool BrowseBonjour::browse(const string& serviceName, mDNSResult& result, int /*
CHECKED(DNSServiceResolve(
service.get(), 0, 0, reply.name.c_str(), reply.regtype.c_str(), reply.domain.c_str(),
[](DNSServiceRef /*service*/, DNSServiceFlags /*flags*/, uint32_t /*interfaceIndex*/, DNSServiceErrorType errorCode, const char* /*fullName*/,
const char* hosttarget, uint16_t port, uint16_t /*txtLen*/, const unsigned char* /*txtRecord*/, void* context) {
auto resultCollection = static_cast<deque<mDNSResolve>*>(context);
const char* hosttarget, uint16_t port, uint16_t /*txtLen*/, const unsigned char* /*txtRecord*/, void* context)
{
auto resultCollection = static_cast<deque<mDNSResolve>*>(context);
CHECKED(errorCode);
resultCollection->push_back(mDNSResolve{string(hosttarget), ntohs(port)});
CHECKED(errorCode);
resultCollection->push_back(mDNSResolve{string(hosttarget), ntohs(port)});
},
&resolveCollection));
@ -220,20 +222,21 @@ bool BrowseBonjour::browse(const string& serviceName, mDNSResult& result, int /*
CHECKED(DNSServiceGetAddrInfo(
service.get(), kDNSServiceFlagsLongLivedQuery, 0, kDNSServiceProtocol_IPv4, resolve.fullName.c_str(),
[](DNSServiceRef /*service*/, DNSServiceFlags /*flags*/, uint32_t interfaceIndex, DNSServiceErrorType /*errorCode*/, const char* hostname,
const sockaddr* address, uint32_t /*ttl*/, void* context) {
auto result = static_cast<mDNSResult*>(context);
const sockaddr* address, uint32_t /*ttl*/, void* context)
{
auto result = static_cast<mDNSResult*>(context);
result->host = string(hostname);
result->ip_version = (address->sa_family == AF_INET) ? (IPVersion::IPv4) : (IPVersion::IPv6);
result->iface_idx = static_cast<int>(interfaceIndex);
result->host = string(hostname);
result->ip_version = (address->sa_family == AF_INET) ? (IPVersion::IPv4) : (IPVersion::IPv6);
result->iface_idx = static_cast<int>(interfaceIndex);
char hostIP[NI_MAXHOST];
char hostService[NI_MAXSERV];
if (getnameinfo(address, sizeof(*address), hostIP, sizeof(hostIP), hostService, sizeof(hostService), NI_NUMERICHOST | NI_NUMERICSERV) == 0)
result->ip = string(hostIP);
else
return;
result->valid = true;
char hostIP[NI_MAXHOST];
char hostService[NI_MAXSERV];
if (getnameinfo(address, sizeof(*address), hostIP, sizeof(hostIP), hostService, sizeof(hostService), NI_NUMERICHOST | NI_NUMERICSERV) == 0)
result->ip = string(hostIP);
else
return;
result->valid = true;
},
&resultCollection[i++]));
}

View file

@ -52,7 +52,9 @@ PendingRequest::~PendingRequest()
void PendingRequest::setValue(std::unique_ptr<msg::BaseMessage> value)
{
boost::asio::post(strand_, [this, self = shared_from_this(), val = std::move(value)]() mutable {
boost::asio::post(strand_,
[this, self = shared_from_this(), val = std::move(value)]() mutable
{
timer_.cancel();
if (handler_)
handler_({}, std::move(val));
@ -67,7 +69,9 @@ uint16_t PendingRequest::id() const
void PendingRequest::startTimer(const chronos::usec& timeout)
{
timer_.expires_after(timeout);
timer_.async_wait([this, self = shared_from_this()](boost::system::error_code ec) {
timer_.async_wait(
[this, self = shared_from_this()](boost::system::error_code ec)
{
if (!handler_)
return;
if (!ec)
@ -202,7 +206,9 @@ void ClientConnection::sendNext()
message.msg->serialize(stream);
auto handler = message.handler;
boost::asio::async_write(socket_, streambuf, [this, handler](boost::system::error_code ec, std::size_t length) {
boost::asio::async_write(socket_, streambuf,
[this, handler](boost::system::error_code ec, std::size_t length)
{
if (ec)
LOG(ERROR, LOG_TAG) << "Failed to send message, error: " << ec.message() << "\n";
else
@ -220,7 +226,9 @@ void ClientConnection::sendNext()
void ClientConnection::send(const msg::message_ptr& message, const ResultHandler& handler)
{
boost::asio::post(strand_, [this, message, handler]() {
boost::asio::post(strand_,
[this, message, handler]()
{
messages_.emplace_back(message, handler);
if (messages_.size() > 1)
{
@ -234,7 +242,9 @@ void ClientConnection::send(const msg::message_ptr& message, const ResultHandler
void ClientConnection::sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<msg::BaseMessage>& handler)
{
boost::asio::post(strand_, [this, message, timeout, handler]() {
boost::asio::post(strand_,
[this, message, timeout, handler]()
{
pendingRequests_.erase(
std::remove_if(pendingRequests_.begin(), pendingRequests_.end(), [](std::weak_ptr<PendingRequest> request) { return request.expired(); }),
pendingRequests_.end());
@ -245,7 +255,9 @@ void ClientConnection::sendRequest(const msg::message_ptr& message, const chrono
auto request = make_shared<PendingRequest>(strand_, reqId_, handler);
pendingRequests_.push_back(request);
request->startTimer(timeout);
send(message, [handler](const boost::system::error_code& ec) {
send(message,
[handler](const boost::system::error_code& ec)
{
if (ec)
handler(ec, nullptr);
});
@ -255,7 +267,9 @@ void ClientConnection::sendRequest(const msg::message_ptr& message, const chrono
void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& handler)
{
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, handler](boost::system::error_code ec, std::size_t length) mutable {
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_),
[this, handler](boost::system::error_code ec, std::size_t length) mutable
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
@ -288,35 +302,36 @@ void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& ha
buffer_.resize(base_message_.size);
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_message_.size),
[this, handler](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
if (handler)
handler(ec, nullptr);
return;
}
[this, handler](boost::system::error_code ec, std::size_t length) mutable
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
if (handler)
handler(ec, nullptr);
return;
}
auto response = msg::factory::createMessage(base_message_, buffer_.data());
if (!response)
LOG(WARNING, LOG_TAG) << "Failed to deserialize message of type: " << base_message_.type << "\n";
for (auto iter = pendingRequests_.begin(); iter != pendingRequests_.end(); ++iter)
{
auto request = *iter;
if (auto req = request.lock())
{
if (req->id() == base_message_.refersTo)
{
req->setValue(std::move(response));
pendingRequests_.erase(iter);
getNextMessage(handler);
return;
}
}
}
auto response = msg::factory::createMessage(base_message_, buffer_.data());
if (!response)
LOG(WARNING, LOG_TAG) << "Failed to deserialize message of type: " << base_message_.type << "\n";
for (auto iter = pendingRequests_.begin(); iter != pendingRequests_.end(); ++iter)
{
auto request = *iter;
if (auto req = request.lock())
{
if (req->id() == base_message_.refersTo)
{
req->setValue(std::move(response));
pendingRequests_.erase(iter);
getNextMessage(handler);
return;
}
}
}
if (handler)
handler(ec, std::move(response));
});
if (handler)
handler(ec, std::move(response));
});
});
}

View file

@ -117,7 +117,9 @@ public:
template <typename Message>
void sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<Message>& handler)
{
sendRequest(message, timeout, [handler](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response) {
sendRequest(message, timeout,
[handler](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response)
{
if (ec)
handler(ec, nullptr);
else if (auto casted_response = msg::message_cast<Message>(std::move(response)))

View file

@ -121,7 +121,9 @@ std::vector<std::string> Controller::getSupportedPlayerNames()
void Controller::getNextMessage()
{
clientConnection_->getNextMessage([this](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response) {
clientConnection_->getNextMessage(
[this](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response)
{
if (ec)
{
reconnect();
@ -222,7 +224,9 @@ void Controller::getNextMessage()
if (!player_)
throw SnapException("No audio player support" + (settings_.player.player_name.empty() ? "" : " for: " + settings_.player.player_name));
player_->setVolumeCallback([this](double volume, bool muted) {
player_->setVolumeCallback(
[this](double volume, bool muted)
{
static double last_volume(-1);
static bool last_muted(true);
if ((volume != last_volume) || (last_muted != muted))
@ -232,7 +236,9 @@ void Controller::getNextMessage()
auto info = std::make_shared<msg::ClientInfo>();
info->setVolume(static_cast<uint16_t>(volume * 100.));
info->setMuted(muted);
clientConnection_->send(info, [this](const boost::system::error_code& ec) {
clientConnection_->send(info,
[this](const boost::system::error_code& ec)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to send client info, error: " << ec.message() << "\n";
@ -270,35 +276,38 @@ void Controller::getNextMessage()
void Controller::sendTimeSyncMessage(int quick_syncs)
{
auto timeReq = std::make_shared<msg::Time>();
clientConnection_->sendRequest<msg::Time>(
timeReq, 2s, [this, quick_syncs](const boost::system::error_code& ec, const std::unique_ptr<msg::Time>& response) mutable {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Time sync request failed: " << ec.message() << "\n";
reconnect();
return;
}
else
{
TimeProvider::getInstance().setDiff(response->latency, response->received - response->sent);
}
clientConnection_->sendRequest<msg::Time>(timeReq, 2s,
[this, quick_syncs](const boost::system::error_code& ec, const std::unique_ptr<msg::Time>& response) mutable
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Time sync request failed: " << ec.message() << "\n";
reconnect();
return;
}
else
{
TimeProvider::getInstance().setDiff(response->latency, response->received - response->sent);
}
std::chrono::microseconds next = TIME_SYNC_INTERVAL;
if (quick_syncs > 0)
std::chrono::microseconds next = TIME_SYNC_INTERVAL;
if (quick_syncs > 0)
{
if (--quick_syncs == 0)
LOG(INFO, LOG_TAG) << "diff to server [ms]: "
<< static_cast<float>(TimeProvider::getInstance().getDiffToServer<chronos::usec>().count()) / 1000.f << "\n";
next = 100us;
}
timer_.expires_after(next);
timer_.async_wait(
[this, quick_syncs](const boost::system::error_code& ec)
{
if (--quick_syncs == 0)
LOG(INFO, LOG_TAG) << "diff to server [ms]: "
<< static_cast<float>(TimeProvider::getInstance().getDiffToServer<chronos::usec>().count()) / 1000.f << "\n";
next = 100us;
if (!ec)
{
sendTimeSyncMessage(quick_syncs);
}
timer_.expires_after(next);
timer_.async_wait([this, quick_syncs](const boost::system::error_code& ec) {
if (!ec)
{
sendTimeSyncMessage(quick_syncs);
}
});
});
});
}
void Controller::browseMdns(const MdnsHandler& handler)
@ -324,7 +333,9 @@ void Controller::browseMdns(const MdnsHandler& handler)
}
timer_.expires_after(500ms);
timer_.async_wait([this, handler](const boost::system::error_code& ec) {
timer_.async_wait(
[this, handler](const boost::system::error_code& ec)
{
if (!ec)
{
browseMdns(handler);
@ -343,7 +354,9 @@ void Controller::start()
{
if (settings_.server.host.empty())
{
browseMdns([this](const boost::system::error_code& ec, const std::string& host, uint16_t port) {
browseMdns(
[this](const boost::system::error_code& ec, const std::string& host, uint16_t port)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to browse MDNS, error: " << ec.message() << "\n";
@ -380,7 +393,9 @@ void Controller::reconnect()
stream_.reset();
decoder_.reset();
timer_.expires_after(1s);
timer_.async_wait([this](const boost::system::error_code& ec) {
timer_.async_wait(
[this](const boost::system::error_code& ec)
{
if (!ec)
{
worker();
@ -390,7 +405,9 @@ void Controller::reconnect()
void Controller::worker()
{
clientConnection_->connect([this](const boost::system::error_code& ec) {
clientConnection_->connect(
[this](const boost::system::error_code& ec)
{
if (!ec)
{
// LOG(INFO, LOG_TAG) << "Connected!\n";
@ -401,19 +418,21 @@ void Controller::worker()
// Say hello to the server
auto hello = std::make_shared<msg::Hello>(macAddress, settings_.host_id, settings_.instance);
clientConnection_->sendRequest<msg::ServerSettings>(
hello, 2s, [this](const boost::system::error_code& ec, std::unique_ptr<msg::ServerSettings> response) mutable {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to send hello request, error: " << ec.message() << "\n";
reconnect();
return;
}
else
{
serverSettings_ = std::move(response);
LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
<< ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
}
hello, 2s,
[this](const boost::system::error_code& ec, std::unique_ptr<msg::ServerSettings> response) mutable
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to send hello request, error: " << ec.message() << "\n";
reconnect();
return;
}
else
{
serverSettings_ = std::move(response);
LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
<< ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
}
});
// Do initial time sync with the server

View file

@ -188,7 +188,9 @@ bool AlsaPlayer::getHardwareVolume(double& volume, bool& muted)
void AlsaPlayer::waitForEvent()
{
sd_.async_wait(boost::asio::posix::stream_descriptor::wait_read, [this](const boost::system::error_code& ec) {
sd_.async_wait(boost::asio::posix::stream_descriptor::wait_read,
[this](const boost::system::error_code& ec)
{
if (ec)
{
// TODO: fd is "Bad" after unplugging/plugging USB DAC, i.e. after init/uninit/init cycle
@ -221,7 +223,9 @@ void AlsaPlayer::waitForEvent()
// As workaround we defer getting the volume by 20ms.
timer_.cancel();
timer_.expires_after(20ms);
timer_.async_wait([this](const boost::system::error_code& ec) {
timer_.async_wait(
[this](const boost::system::error_code& ec)
{
if (!ec)
{
if (getHardwareVolume(volume_, muted_))
@ -249,7 +253,9 @@ void AlsaPlayer::initMixer()
throw SnapException("Can't open control for " + mixer_device_ + ", error: " + snd_strerror(err));
if ((err = snd_ctl_subscribe_events(ctl_, 1)) < 0)
throw SnapException("Can't subscribe for events for " + mixer_device_ + ", error: " + snd_strerror(err));
fd_ = std::unique_ptr<pollfd, std::function<void(pollfd*)>>(new pollfd(), [](pollfd* p) {
fd_ = std::unique_ptr<pollfd, std::function<void(pollfd*)>>(new pollfd(),
[](pollfd* p)
{
close(p->fd);
delete p;
});

View file

@ -131,7 +131,9 @@ void FilePlayer::loop()
next_request_ = now + 1ms;
timer_.expires_at(next_request_);
timer_.async_wait([this](boost::system::error_code ec) {
timer_.async_wait(
[this](boost::system::error_code ec)
{
if (ec)
return;
requestAudio();

View file

@ -71,7 +71,8 @@ Player::Player(boost::asio::io_context& io_context, const ClientSettings::Player
break;
}
auto not_empty = [](const std::string& value) -> std::string {
auto not_empty = [](const std::string& value) -> std::string
{
if (!value.empty())
return value;
else

View file

@ -54,7 +54,9 @@ vector<PcmDevice> PulsePlayer::pcm_list(const std::string& parameter)
{
auto pa_ml = std::shared_ptr<pa_mainloop>(pa_mainloop_new(), [](pa_mainloop* pa_ml) { pa_mainloop_free(pa_ml); });
pa_mainloop_api* pa_mlapi = pa_mainloop_get_api(pa_ml.get());
auto pa_ctx = std::shared_ptr<pa_context>(pa_context_new(pa_mlapi, "Snapcast"), [](pa_context* pa_ctx) {
auto pa_ctx = std::shared_ptr<pa_context>(pa_context_new(pa_mlapi, "Snapcast"),
[](pa_context* pa_ctx)
{
pa_context_disconnect(pa_ctx);
pa_context_unref(pa_ctx);
});
@ -70,21 +72,22 @@ vector<PcmDevice> PulsePlayer::pcm_list(const std::string& parameter)
static int pa_ready = 0;
pa_context_set_state_callback(
pa_ctx.get(),
[](pa_context* c, void* userdata) {
std::ignore = userdata;
pa_context_state_t state = pa_context_get_state(c);
switch (state)
{
case PA_CONTEXT_FAILED:
case PA_CONTEXT_TERMINATED:
pa_ready = 2;
break;
case PA_CONTEXT_READY:
pa_ready = 1;
break;
default:
break;
}
[](pa_context* c, void* userdata)
{
std::ignore = userdata;
pa_context_state_t state = pa_context_get_state(c);
switch (state)
{
case PA_CONTEXT_FAILED:
case PA_CONTEXT_TERMINATED:
pa_ready = 2;
break;
case PA_CONTEXT_READY:
pa_ready = 1;
break;
default:
break;
}
},
nullptr);
@ -107,13 +110,14 @@ vector<PcmDevice> PulsePlayer::pcm_list(const std::string& parameter)
static std::vector<PcmDevice> devices;
auto* op = pa_context_get_sink_info_list(
pa_ctx.get(),
[](pa_context* ctx, const pa_sink_info* i, int eol, void* userdata) mutable {
std::ignore = ctx;
std::ignore = userdata;
// auto self = static_cast<PulsePlayer*>(userdata);
// If eol is set to a positive number, you're at the end of the list
if (eol <= 0)
devices.emplace_back(i->index, i->name, i->description);
[](pa_context* ctx, const pa_sink_info* i, int eol, void* userdata) mutable
{
std::ignore = ctx;
std::ignore = userdata;
// auto self = static_cast<PulsePlayer*>(userdata);
// If eol is set to a positive number, you're at the end of the list
if (eol <= 0)
devices.emplace_back(i->index, i->name, i->description);
},
nullptr);
@ -246,26 +250,27 @@ void PulsePlayer::triggerVolumeUpdate()
{
pa_context_get_sink_input_info(
pa_ctx_, pa_stream_get_index(playstream_),
[](pa_context* ctx, const pa_sink_input_info* info, int eol, void* userdata) {
std::ignore = ctx;
LOG(DEBUG, LOG_TAG) << "pa_context_get_sink_info_by_index info: " << (info != nullptr) << ", eol: " << eol << "\n";
if (info != nullptr)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->volume_ = static_cast<double>(pa_cvolume_avg(&(info->volume))) / static_cast<double>(PA_VOLUME_NORM);
self->muted_ = (info->mute != 0);
LOG(DEBUG, LOG_TAG) << "Volume changed: " << self->volume_ << ", muted: " << self->muted_ << "\n";
[](pa_context* ctx, const pa_sink_input_info* info, int eol, void* userdata)
{
std::ignore = ctx;
LOG(DEBUG, LOG_TAG) << "pa_context_get_sink_info_by_index info: " << (info != nullptr) << ", eol: " << eol << "\n";
if (info != nullptr)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->volume_ = static_cast<double>(pa_cvolume_avg(&(info->volume))) / static_cast<double>(PA_VOLUME_NORM);
self->muted_ = (info->mute != 0);
LOG(DEBUG, LOG_TAG) << "Volume changed: " << self->volume_ << ", muted: " << self->muted_ << "\n";
auto now = std::chrono::steady_clock::now();
if (now - self->last_change_ < 1s)
{
LOG(DEBUG, LOG_TAG) << "Last volume change by server: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(now - self->last_change_).count()
<< " ms => ignoring volume change\n";
return;
}
self->notifyVolumeChange(self->volume_, self->muted_);
auto now = std::chrono::steady_clock::now();
if (now - self->last_change_ < 1s)
{
LOG(DEBUG, LOG_TAG) << "Last volume change by server: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(now - self->last_change_).count()
<< " ms => ignoring volume change\n";
return;
}
self->notifyVolumeChange(self->volume_, self->muted_);
}
},
this);
}
@ -432,9 +437,10 @@ void PulsePlayer::connect()
// If there's an error, the callback will set pa_ready to 2
pa_context_set_state_callback(
pa_ctx_,
[](pa_context* c, void* userdata) {
auto* self = static_cast<PulsePlayer*>(userdata);
self->stateCallback(c);
[](pa_context* c, void* userdata)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->stateCallback(c);
},
this);
@ -462,39 +468,43 @@ void PulsePlayer::connect()
{
pa_context_set_subscribe_callback(
pa_ctx_,
[](pa_context* ctx, pa_subscription_event_type_t event_type, uint32_t idx, void* userdata) {
auto* self = static_cast<PulsePlayer*>(userdata);
self->subscribeCallback(ctx, event_type, idx);
[](pa_context* ctx, pa_subscription_event_type_t event_type, uint32_t idx, void* userdata)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->subscribeCallback(ctx, event_type, idx);
},
this);
const auto mask = static_cast<pa_subscription_mask_t>(PA_SUBSCRIPTION_MASK_SINK_INPUT);
pa_context_subscribe(
pa_ctx_, mask,
[](pa_context* ctx, int success, void* userdata) {
std::ignore = ctx;
if (success != 0)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->triggerVolumeUpdate();
}
[](pa_context* ctx, int success, void* userdata)
{
std::ignore = ctx;
if (success != 0)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->triggerVolumeUpdate();
}
},
this);
}
pa_stream_set_write_callback(
playstream_,
[](pa_stream* stream, size_t length, void* userdata) {
auto* self = static_cast<PulsePlayer*>(userdata);
self->writeCallback(stream, length);
[](pa_stream* stream, size_t length, void* userdata)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->writeCallback(stream, length);
},
this);
pa_stream_set_underflow_callback(
playstream_,
[](pa_stream* stream, void* userdata) {
auto* self = static_cast<PulsePlayer*>(userdata);
self->underflowCallback(stream);
[](pa_stream* stream, void* userdata)
{
auto* self = static_cast<PulsePlayer*>(userdata);
self->underflowCallback(stream);
},
this);

View file

@ -134,7 +134,8 @@ inline PcmDevice convertToDevice(int idx, IMMDevicePtr& device)
desc.idx = idx;
// Convert a wide Unicode string to an UTF8 string
auto utf8_encode = [](const std::wstring& wstr) {
auto utf8_encode = [](const std::wstring& wstr)
{
if (wstr.empty())
return std::string();
int size_needed = WideCharToMultiByte(CP_UTF8, 0, &wstr[0], (int)wstr.size(), NULL, 0, NULL, NULL);

View file

@ -418,7 +418,9 @@ int main(int argc, char** argv)
boost::asio::io_context io_context;
// Construct a signal set registered for process termination.
boost::asio::signal_set signals(io_context, SIGHUP, SIGINT, SIGTERM);
signals.async_wait([&](const boost::system::error_code& ec, int signal) {
signals.async_wait(
[&](const boost::system::error_code& ec, int signal)
{
if (!ec)
LOG(INFO, LOG_TAG) << "Received signal " << signal << ": " << strsignal(signal) << "\n";
else

View file

@ -945,18 +945,21 @@ inline void OptionParser::parse(const std::string& ini_filename)
std::ifstream file(ini_filename.c_str());
std::string line;
auto trim = [](std::string& s) {
auto trim = [](std::string& s)
{
s.erase(s.begin(), std::find_if(s.begin(), s.end(), [](int ch) { return !std::isspace(ch); }));
s.erase(std::find_if(s.rbegin(), s.rend(), [](int ch) { return !std::isspace(ch); }).base(), s.end());
return s;
};
auto trim_copy = [trim](const std::string& s) {
auto trim_copy = [trim](const std::string& s)
{
std::string copy(s);
return trim(copy);
};
auto split = [trim_copy](const std::string& s) -> std::pair<std::string, std::string> {
auto split = [trim_copy](const std::string& s) -> std::pair<std::string, std::string>
{
size_t pos = s.find('=');
if (pos == std::string::npos)
return {"", ""};

View file

@ -72,7 +72,9 @@ namespace strutils = utils::string;
#ifndef WINDOWS
static std::string execGetOutput(const std::string& cmd)
{
std::shared_ptr<::FILE> pipe(popen((cmd + " 2> /dev/null").c_str(), "r"), [](::FILE* stream) {
std::shared_ptr<::FILE> pipe(popen((cmd + " 2> /dev/null").c_str(), "r"),
[](::FILE* stream)
{
if (stream != nullptr)
pclose(stream);
});

View file

@ -106,14 +106,16 @@ void ControlServer::onNewSession(std::shared_ptr<StreamSession> session)
void ControlServer::startAccept()
{
auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) {
auto accept_handler_tcp = [this](error_code ec, tcp::socket socket)
{
if (!ec)
handleAccept<ControlSessionTcp>(std::move(socket));
else
LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
};
auto accept_handler_http = [this](error_code ec, tcp::socket socket) {
auto accept_handler_http = [this](error_code ec, tcp::socket socket)
{
if (!ec)
handleAccept<ControlSessionHttp>(std::move(socket), http_settings_);
else

View file

@ -71,7 +71,8 @@ namespace
boost::beast::string_view mime_type(boost::beast::string_view path)
{
using boost::beast::iequals;
auto const ext = [&path] {
auto const ext = [&path]
{
auto const pos = path.rfind(".");
if (pos == boost::beast::string_view::npos)
return boost::beast::string_view{};
@ -165,7 +166,8 @@ template <class Body, class Allocator, class Send>
void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<Allocator>>&& req, Send&& send)
{
// Returns a bad request response
auto const bad_request = [&req](boost::beast::string_view why) {
auto const bad_request = [&req](boost::beast::string_view why)
{
http::response<http::string_body> res{http::status::bad_request, req.version()};
// TODO: Server: Snapcast/VERSION
res.set(http::field::server, HTTP_SERVER_NAME);
@ -177,7 +179,8 @@ void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<A
};
// Returns a not found response
auto const not_found = [&req](boost::beast::string_view target) {
auto const not_found = [&req](boost::beast::string_view target)
{
http::response<http::string_body> res{http::status::not_found, req.version()};
res.set(http::field::server, HTTP_SERVER_NAME);
res.set(http::field::content_type, "text/html");
@ -188,7 +191,8 @@ void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<A
};
// Returns a configuration help
auto const unconfigured = [&req]() {
auto const unconfigured = [&req]()
{
http::response<http::string_body> res{http::status::ok, req.version()};
res.set(http::field::server, HTTP_SERVER_NAME);
res.set(http::field::content_type, "text/html");
@ -199,7 +203,8 @@ void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<A
};
// Returns a server error response
auto const server_error = [&req](boost::beast::string_view what) {
auto const server_error = [&req](boost::beast::string_view what)
{
http::response<http::string_body> res{http::status::internal_server_error, req.version()};
res.set(http::field::server, HTTP_SERVER_NAME);
res.set(http::field::content_type, "text/html");
@ -220,7 +225,9 @@ void ControlSessionHttp::handle_request(http::request<Body, http::basic_fields<A
return send(bad_request("Illegal request-target"));
std::string request = req.body();
return message_receiver_->onMessageReceived(shared_from_this(), request, [req = std::move(req), send = std::move(send)](const std::string& response) {
return message_receiver_->onMessageReceived(shared_from_this(), request,
[req = std::move(req), send = std::move(send)](const std::string& response)
{
http::response<http::string_body> res{http::status::ok, req.version()};
res.set(http::field::server, HTTP_SERVER_NAME);
res.set(http::field::content_type, "application/json");
@ -340,7 +347,9 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
// Create a WebSocket session by transferring the socket
// std::make_shared<websocket_session>(std::move(socket_), state_)->run(std::move(req_));
auto ws = std::make_shared<websocket::stream<beast::tcp_stream>>(std::move(socket_));
ws->async_accept(req_, [this, ws, self = shared_from_this()](beast::error_code ec) {
ws->async_accept(req_,
[this, ws, self = shared_from_this()](beast::error_code ec)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during WebSocket handshake (control): " << ec.message() << "\n";
@ -357,7 +366,9 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
// Create a WebSocket session by transferring the socket
// std::make_shared<websocket_session>(std::move(socket_), state_)->run(std::move(req_));
auto ws = std::make_shared<websocket::stream<beast::tcp_stream>>(std::move(socket_));
ws->async_accept(req_, [this, ws, self = shared_from_this()](beast::error_code ec) {
ws->async_accept(req_,
[this, ws, self = shared_from_this()](beast::error_code ec)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during WebSocket handshake (stream): " << ec.message() << "\n";
@ -373,7 +384,9 @@ void ControlSessionHttp::on_read(beast::error_code ec, std::size_t bytes_transfe
}
// Send the response
handle_request(std::move(req_), [this](auto&& response) {
handle_request(std::move(req_),
[this](auto&& response)
{
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.

View file

@ -51,32 +51,35 @@ ControlSessionTcp::~ControlSessionTcp()
void ControlSessionTcp::do_read()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(
socket_, streambuf_, delimiter, [this, self = shared_from_this(), delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
boost::asio::async_read_until(socket_, streambuf_, delimiter,
[this, self = shared_from_this(), delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
// LOG(DEBUG, LOG_TAG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
// LOG(DEBUG, LOG_TAG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
{
message_receiver_->onMessageReceived(shared_from_this(), line, [this](const std::string& response) {
if (!response.empty())
sendAsync(response);
});
}
message_receiver_->onMessageReceived(shared_from_this(), line,
[this](const std::string& response)
{
if (!response.empty())
sendAsync(response);
});
}
streambuf_.consume(bytes_transferred);
do_read();
});
}
streambuf_.consume(bytes_transferred);
do_read();
});
}
@ -102,7 +105,9 @@ void ControlSessionTcp::stop()
void ControlSessionTcp::sendAsync(const std::string& message)
{
boost::asio::post(strand_, [this, self = shared_from_this(), message]() {
boost::asio::post(strand_,
[this, self = shared_from_this(), message]()
{
messages_.emplace_back(message + "\r\n");
if (messages_.size() > 1)
{
@ -115,7 +120,9 @@ void ControlSessionTcp::sendAsync(const std::string& message)
void ControlSessionTcp::send_next()
{
boost::asio::async_write(socket_, boost::asio::buffer(messages_.front()), [this, self = shared_from_this()](std::error_code ec, std::size_t length) {
boost::asio::async_write(socket_, boost::asio::buffer(messages_.front()),
[this, self = shared_from_this()](std::error_code ec, std::size_t length)
{
messages_.pop_front();
if (ec)
{

View file

@ -68,7 +68,9 @@ void ControlSessionWebsocket::stop()
void ControlSessionWebsocket::sendAsync(const std::string& message)
{
boost::asio::post(strand_, [this, self = shared_from_this(), msg = message]() {
boost::asio::post(strand_,
[this, self = shared_from_this(), msg = message]()
{
messages_.push_back(std::move(msg));
if (messages_.size() > 1)
{
@ -83,7 +85,9 @@ void ControlSessionWebsocket::sendAsync(const std::string& message)
void ControlSessionWebsocket::send_next()
{
const std::string& message = messages_.front();
ws_.async_write(boost::asio::buffer(message), [this, self = shared_from_this()](std::error_code ec, std::size_t length) {
ws_.async_write(boost::asio::buffer(message),
[this, self = shared_from_this()](std::error_code ec, std::size_t length)
{
messages_.pop_front();
if (ec)
{
@ -126,7 +130,9 @@ void ControlSessionWebsocket::on_read_ws(beast::error_code ec, std::size_t bytes
// LOG(DEBUG, LOG_TAG) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
{
message_receiver_->onMessageReceived(shared_from_this(), line, [this](const std::string& response) {
message_receiver_->onMessageReceived(shared_from_this(), line,
[this](const std::string& response)
{
if (!response.empty())
{
sendAsync(response);

View file

@ -85,7 +85,8 @@ void FlacEncoder::encode(const msg::PcmChunk& chunk)
pcmBuffer_ = static_cast<FLAC__int32*>(realloc(pcmBuffer_, pcmBufferSize_ * sizeof(FLAC__int32)));
}
auto clip = [](int32_t min, int32_t max, int32_t value) -> int32_t {
auto clip = [](int32_t min, int32_t max, int32_t value) -> int32_t
{
if (value < min)
return min;
if (value > max)

View file

@ -71,7 +71,9 @@ void PublishAvahi::publish(const std::vector<mDNSService>& services)
void PublishAvahi::poll()
{
timer_.expires_after(std::chrono::milliseconds(50));
timer_.async_wait([this](const boost::system::error_code& ec) {
timer_.async_wait(
[this](const boost::system::error_code& ec)
{
if (!ec && (avahi_simple_poll_iterate(simple_poll, 0) == 0))
poll();
});

View file

@ -452,7 +452,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, const OnRespon
auto command = request->params().get<string>("command");
auto handle_response = [request, on_response, command](const snapcast::ErrorCode& ec) {
auto handle_response = [request, on_response, command](const snapcast::ErrorCode& ec)
{
auto log_level = AixLog::Severity::debug;
if (ec)
log_level = AixLog::Severity::error;
@ -533,7 +534,8 @@ void Server::processRequest(const jsonrpcpp::request_ptr request, const OnRespon
auto value = request->params().get("value");
LOG(INFO, LOG_TAG) << "Stream '" << streamId << "' set property: " << name << " = " << value << "\n";
auto handle_response = [request, on_response](const snapcast::ErrorCode& ec) {
auto handle_response = [request, on_response](const snapcast::ErrorCode& ec)
{
LOG(ERROR, LOG_TAG) << "SetShuffle: " << ec << ", message: " << ec.detailed_message() << ", msg: " << ec.message()
<< ", category: " << ec.category().name() << "\n";
std::shared_ptr<jsonrpcpp::Response> response;
@ -663,7 +665,9 @@ void Server::onMessageReceived(std::shared_ptr<ControlSession> controlSession, c
if (entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(entity);
processRequest(request, [this, controlSession, response_handler](jsonrpcpp::entity_ptr response, jsonrpcpp::notification_ptr notification) {
processRequest(request,
[this, controlSession, response_handler](jsonrpcpp::entity_ptr response, jsonrpcpp::notification_ptr notification)
{
saveConfig();
////cout << "Request: " << request->to_json().dump() << "\n";
if (notification)
@ -693,8 +697,10 @@ void Server::onMessageReceived(std::shared_ptr<ControlSession> controlSession, c
if (batch_entity->is_request())
{
jsonrpcpp::request_ptr request = dynamic_pointer_cast<jsonrpcpp::Request>(batch_entity);
processRequest(request, [controlSession, response_handler, &responseBatch, &notificationBatch](jsonrpcpp::entity_ptr response,
jsonrpcpp::notification_ptr notification) {
processRequest(request,
[controlSession, response_handler, &responseBatch, &notificationBatch](jsonrpcpp::entity_ptr response,
jsonrpcpp::notification_ptr notification)
{
if (response != nullptr)
responseBatch.add_ptr(response);
if (notification != nullptr)
@ -848,7 +854,9 @@ void Server::saveConfig(const std::chrono::milliseconds& deferred)
std::lock_guard<std::mutex> lock(mutex);
config_timer_.cancel();
config_timer_.expires_after(deferred);
config_timer_.async_wait([](const boost::system::error_code& ec) {
config_timer_.async_wait(
[](const boost::system::error_code& ec)
{
if (!ec)
{
LOG(DEBUG, LOG_TAG) << "Saving config\n";

View file

@ -321,7 +321,9 @@ int main(int argc, char* argv[])
// Construct a signal set registered for process termination.
boost::asio::signal_set signals(io_context, SIGHUP, SIGINT, SIGTERM);
signals.async_wait([&io_context](const boost::system::error_code& ec, int signal) {
signals.async_wait(
[&io_context](const boost::system::error_code& ec, int signal)
{
if (!ec)
LOG(INFO, LOG_TAG) << "Received signal " << signal << ": " << strsignal(signal) << "\n";
else

View file

@ -142,10 +142,11 @@ void StreamServer::onDisconnect(StreamSession* streamSession)
LOG(INFO, LOG_TAG) << "onDisconnect: " << session->clientId << "\n";
LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n";
sessions_.erase(std::remove_if(sessions_.begin(), sessions_.end(),
[streamSession](std::weak_ptr<StreamSession> session) {
auto s = session.lock();
return s.get() == streamSession;
}),
[streamSession](std::weak_ptr<StreamSession> session)
{
auto s = session.lock();
return s.get() == streamSession;
}),
sessions_.end());
LOG(DEBUG, LOG_TAG) << "sessions: " << sessions_.size() << "\n";
if (messageReceiver_ != nullptr)
@ -184,7 +185,8 @@ session_ptr StreamServer::getStreamSession(const std::string& clientId) const
void StreamServer::startAccept()
{
auto accept_handler = [this](error_code ec, tcp::socket socket) {
auto accept_handler = [this](error_code ec, tcp::socket socket)
{
if (!ec)
handleAccept(std::move(socket));
else

View file

@ -60,8 +60,12 @@ void StreamSession::send_next()
{
auto& buffer = messages_.front();
buffer.on_air = true;
boost::asio::post(strand_, [this, self = shared_from_this(), buffer]() {
sendAsync(buffer, [this](boost::system::error_code ec, std::size_t length) {
boost::asio::post(strand_,
[this, self = shared_from_this(), buffer]()
{
sendAsync(buffer,
[this](boost::system::error_code ec, std::size_t length)
{
messages_.pop_front();
if (ec)
{
@ -78,16 +82,19 @@ void StreamSession::send_next()
void StreamSession::send(shared_const_buffer const_buf)
{
boost::asio::post(strand_, [this, self = shared_from_this(), const_buf]() {
boost::asio::post(strand_,
[this, self = shared_from_this(), const_buf]()
{
// delete PCM chunks that are older than the overall buffer duration
messages_.erase(std::remove_if(messages_.begin(), messages_.end(),
[this](const shared_const_buffer& buffer) {
const auto& msg = buffer.message();
if (!msg.is_pcm_chunk || buffer.on_air)
return false;
auto age = chronos::clk::now() - msg.rec_time;
return (age > std::chrono::milliseconds(bufferMs_) + 100ms);
}),
[this](const shared_const_buffer& buffer)
{
const auto& msg = buffer.message();
if (!msg.is_pcm_chunk || buffer.on_air)
return false;
auto age = chronos::clk::now() - msg.rec_time;
return (age > std::chrono::milliseconds(bufferMs_) + 100ms);
}),
messages_.end());
messages_.push_back(const_buf);

View file

@ -86,50 +86,52 @@ std::string StreamSessionTcp::getIP()
void StreamSessionTcp::read_next()
{
boost::asio::async_read(
socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, self = shared_from_this()](boost::system::error_code ec, std::size_t length) mutable {
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_),
[this, self = shared_from_this()](boost::system::error_code ec, std::size_t length) mutable
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
messageReceiver_->onDisconnect(this);
return;
}
baseMessage_.deserialize(buffer_.data());
LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id
<< ", refers: " << baseMessage_.refersTo << "\n";
if (baseMessage_.type > message_type::kLast)
{
LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n";
messageReceiver_->onDisconnect(this);
return;
}
else if (baseMessage_.size > msg::max_size)
{
LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n";
messageReceiver_->onDisconnect(this);
return;
}
if (baseMessage_.size > buffer_.size())
buffer_.resize(baseMessage_.size);
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, baseMessage_.size),
[this, self](boost::system::error_code ec, std::size_t length) mutable
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
messageReceiver_->onDisconnect(this);
return;
}
baseMessage_.deserialize(buffer_.data());
LOG(DEBUG, LOG_TAG) << "getNextMessage: " << baseMessage_.type << ", size: " << baseMessage_.size << ", id: " << baseMessage_.id
<< ", refers: " << baseMessage_.refersTo << "\n";
if (baseMessage_.type > message_type::kLast)
{
LOG(ERROR, LOG_TAG) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n";
messageReceiver_->onDisconnect(this);
return;
}
else if (baseMessage_.size > msg::max_size)
{
LOG(ERROR, LOG_TAG) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n";
messageReceiver_->onDisconnect(this);
return;
}
if (baseMessage_.size > buffer_.size())
buffer_.resize(baseMessage_.size);
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, baseMessage_.size),
[this, self](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
messageReceiver_->onDisconnect(this);
return;
}
tv t;
baseMessage_.received = t;
if (messageReceiver_ != nullptr)
messageReceiver_->onMessageReceived(this, baseMessage_, buffer_.data());
read_next();
});
tv t;
baseMessage_.received = t;
if (messageReceiver_ != nullptr)
messageReceiver_->onMessageReceived(this, baseMessage_, buffer_.data());
read_next();
});
});
}

View file

@ -245,7 +245,9 @@ void AirplayStream::pipeReadLine()
}
const std::string delimiter = "\n";
boost::asio::async_read_until(*pipe_fd_, streambuf_pipe_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
boost::asio::async_read_until(*pipe_fd_, streambuf_pipe_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
if ((ec.value() == boost::asio::error::eof) || (ec.value() == boost::asio::error::bad_descriptor))

View file

@ -54,7 +54,9 @@ template <typename Rep, typename Period>
void wait(boost::asio::steady_timer& timer, const std::chrono::duration<Rep, Period>& duration, std::function<void()> handler)
{
timer.expires_after(duration);
timer.async_wait([handler = std::move(handler)](const boost::system::error_code& ec) {
timer.async_wait(
[handler = std::move(handler)](const boost::system::error_code& ec)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";

View file

@ -76,7 +76,9 @@ template <typename Timer, typename Rep, typename Period>
void AsioStream<ReadStream>::wait(Timer& timer, const std::chrono::duration<Rep, Period>& duration, std::function<void()> handler)
{
timer.expires_after(duration);
timer.async_wait([handler = std::move(handler)](const boost::system::error_code& ec) {
timer.async_wait(
[handler = std::move(handler)](const boost::system::error_code& ec)
{
if (ec)
{
LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n";
@ -114,7 +116,9 @@ template <typename ReadStream>
void AsioStream<ReadStream>::check_state()
{
uint64_t last_read = bytes_read_;
wait(state_timer_, std::chrono::milliseconds(500 + chunk_ms_), [this, last_read] {
wait(state_timer_, std::chrono::milliseconds(500 + chunk_ms_),
[this, last_read]
{
LOG(TRACE, "AsioStream") << "check state last: " << last_read << ", read: " << bytes_read_ << "\n";
if (bytes_read_ != last_read)
setState(ReaderState::kPlaying);
@ -172,68 +176,71 @@ void AsioStream<ReadStream>::do_read()
{
// LOG(DEBUG, "AsioStream") << "do_read\n";
boost::asio::async_read(*stream_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize),
[this](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n";
connect();
return;
}
[this](boost::system::error_code ec, std::size_t length) mutable
{
if (ec)
{
LOG(ERROR, "AsioStream") << "Error reading message: " << ec.message() << ", length: " << length << "\n";
connect();
return;
}
bytes_read_ += length;
// LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n";
// First read after connect. Set the initial read timestamp
// the timestamp will be incremented after encoding,
// since we do not know how much the encoder actually encoded
bytes_read_ += length;
// LOG(DEBUG, "AsioStream") << "Read: " << length << " bytes\n";
// First read after connect. Set the initial read timestamp
// the timestamp will be incremented after encoding,
// since we do not know how much the encoder actually encoded
if (!first_)
{
auto now = std::chrono::steady_clock::now();
auto stream2systime_diff = now - tvEncodedChunk_;
if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_))
{
LOG(WARNING, "AsioStream") << "Stream and system time out of sync: "
<< std::chrono::duration_cast<std::chrono::microseconds>(stream2systime_diff).count() / 1000.
<< " ms, resetting stream time.\n";
first_ = true;
}
}
if (first_)
{
first_ = false;
tvEncodedChunk_ = std::chrono::steady_clock::now() - chunk_->duration<std::chrono::nanoseconds>();
nextTick_ = std::chrono::steady_clock::now();
}
if (!first_)
{
auto now = std::chrono::steady_clock::now();
auto stream2systime_diff = now - tvEncodedChunk_;
if (stream2systime_diff > chronos::sec(5) + chronos::msec(chunk_ms_))
{
LOG(WARNING, "AsioStream") << "Stream and system time out of sync: "
<< std::chrono::duration_cast<std::chrono::microseconds>(stream2systime_diff).count() / 1000.
<< " ms, resetting stream time.\n";
first_ = true;
}
}
if (first_)
{
first_ = false;
tvEncodedChunk_ = std::chrono::steady_clock::now() - chunk_->duration<std::chrono::nanoseconds>();
nextTick_ = std::chrono::steady_clock::now();
}
chunkRead(*chunk_);
nextTick_ += chunk_->duration<std::chrono::nanoseconds>();
auto currentTick = std::chrono::steady_clock::now();
chunkRead(*chunk_);
nextTick_ += chunk_->duration<std::chrono::nanoseconds>();
auto currentTick = std::chrono::steady_clock::now();
// Synchronize read to chunk_ms_
if (nextTick_ >= currentTick)
{
read_timer_.expires_after(nextTick_ - currentTick);
read_timer_.async_wait([this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
return;
}
// Read took longer, wait for the buffer to fill up
else
{
resync(std::chrono::duration_cast<std::chrono::nanoseconds>(currentTick - nextTick_));
nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_);
first_ = true;
do_read();
}
});
// Synchronize read to chunk_ms_
if (nextTick_ >= currentTick)
{
read_timer_.expires_after(nextTick_ - currentTick);
read_timer_.async_wait(
[this](const boost::system::error_code& ec)
{
if (ec)
{
LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
return;
}
// Read took longer, wait for the buffer to fill up
else
{
resync(std::chrono::duration_cast<std::chrono::nanoseconds>(currentTick - nextTick_));
nextTick_ = currentTick + std::chrono::milliseconds(buffer_ms_);
first_ = true;
do_read();
}
});
}
} // namespace streamreader

View file

@ -106,7 +106,8 @@ void MetaStream::onStateChanged(const PcmStream* pcmStream, ReaderState state)
// if (active_stream_->getProperties().playback_status == PlaybackStatus::kPaused)
// return;
auto switch_stream = [this](std::shared_ptr<PcmStream> new_stream) {
auto switch_stream = [this](std::shared_ptr<PcmStream> new_stream)
{
if (new_stream == active_stream_)
return;
LOG(INFO, LOG_TAG) << "Stream: " << name_ << ", switching active stream: " << (active_stream_ ? active_stream_->getName() : "<null>") << " => "

View file

@ -127,10 +127,14 @@ void PcmStream::onControlRequest(const jsonrpcpp::Request& request)
void PcmStream::pollProperties()
{
property_timer_.expires_after(10s);
property_timer_.async_wait([this](const boost::system::error_code& ec) {
property_timer_.async_wait(
[this](const boost::system::error_code& ec)
{
if (!ec)
{
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) {
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"},
[this](const jsonrpcpp::Response& response)
{
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0)
setProperties(response.result());
@ -154,7 +158,9 @@ void PcmStream::onControlNotification(const jsonrpcpp::Notification& notificatio
else if (notification.method() == "Plugin.Stream.Ready")
{
LOG(DEBUG, LOG_TAG) << "Plugin is ready\n";
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"}, [this](const jsonrpcpp::Response& response) {
stream_ctrl_->command({++req_id_, "Plugin.Stream.Player.GetProperties"},
[this](const jsonrpcpp::Response& response)
{
LOG(INFO, LOG_TAG) << "Response for Plugin.Stream.Player.GetProperties: " << response.to_json() << "\n";
if (response.error().code() == 0)
setProperties(response.result());
@ -453,7 +459,9 @@ void PcmStream::sendRequest(const std::string& method, const jsonrpcpp::Paramete
return handler({ControlErrc::can_not_control});
jsonrpcpp::Request req(++req_id_, method, params);
stream_ctrl_->command(req, [handler](const jsonrpcpp::Response& response) {
stream_ctrl_->command(req,
[handler](const jsonrpcpp::Response& response)
{
if (response.error().code() != 0)
handler({static_cast<ControlErrc>(response.error().code()), response.error().data()});
else

View file

@ -146,7 +146,9 @@ void ProcessStream::onStderrMsg(const std::string& line)
void ProcessStream::stderrReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";

View file

@ -67,7 +67,9 @@ void StreamControl::start(const std::string& stream_id, const ServerSettings& se
void StreamControl::command(const jsonrpcpp::Request& request, const OnResponse& response_handler)
{
// use strand to serialize commands sent from different threads
boost::asio::post(executor_, [this, request, response_handler]() {
boost::asio::post(executor_,
[this, request, response_handler]()
{
if (response_handler)
request_callbacks_[request.id()] = response_handler;
@ -172,11 +174,13 @@ void ScriptStreamControl::doStart(const std::string& stream_id, const ServerSett
{
process_ = bp::child(
script_ + params.str(), bp::std_out > pipe_stdout_, bp::std_err > pipe_stderr_, bp::std_in < in_,
bp::on_exit = [](int exit, const std::error_code& ec_in) {
auto severity = AixLog::Severity::debug;
if (exit != 0)
severity = AixLog::Severity::error;
LOG(severity, LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n";
bp::on_exit =
[](int exit, const std::error_code& ec_in)
{
auto severity = AixLog::Severity::debug;
if (exit != 0)
severity = AixLog::Severity::error;
LOG(severity, LOG_TAG) << "Exit code: " << exit << ", message: " << ec_in.message() << "\n";
});
}
catch (const std::exception& e)
@ -203,7 +207,9 @@ void ScriptStreamControl::doCommand(const jsonrpcpp::Request& request)
void ScriptStreamControl::stderrReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
boost::asio::async_read_until(*stream_stderr_, streambuf_stderr_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stderr: " << ec.message() << "\n";
@ -223,7 +229,9 @@ void ScriptStreamControl::stderrReadLine()
void ScriptStreamControl::stdoutReadLine()
{
const std::string delimiter = "\n";
boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter, [this, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
boost::asio::async_read_until(*stream_stdout_, streambuf_stdout_, delimiter,
[this, delimiter](const std::error_code& ec, std::size_t bytes_transferred)
{
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from stdout: " << ec.message() << "\n";

View file

@ -75,7 +75,9 @@ void TcpStream::do_connect()
if (is_server_)
{
acceptor_->async_accept([this](boost::system::error_code ec, tcp::socket socket) {
acceptor_->async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
LOG(DEBUG, LOG_TAG) << "New client connection\n";
@ -92,7 +94,9 @@ void TcpStream::do_connect()
{
stream_ = make_unique<tcp::socket>(strand_);
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host_), port_);
stream_->async_connect(endpoint, [this](const boost::system::error_code& ec) {
stream_->async_connect(endpoint,
[this](const boost::system::error_code& ec)
{
if (!ec)
{
LOG(DEBUG, LOG_TAG) << "Connected\n";

View file

@ -65,7 +65,9 @@ void Watchdog::trigger()
{
timer_.cancel();
timer_.expires_after(timeout_ms_);
timer_.async_wait([this](const boost::system::error_code& ec) {
timer_.async_wait(
[this](const boost::system::error_code& ec)
{
if (!ec)
{
LOG(INFO, LOG_TAG) << "Timed out: " << std::chrono::duration_cast<std::chrono::seconds>(timeout_ms_).count() << "s\n";