Make latency configurable

This commit is contained in:
badaix 2020-11-21 19:29:40 +01:00
parent 577e524f88
commit 9ea1f9cc86
3 changed files with 125 additions and 101 deletions

View file

@ -28,12 +28,22 @@
using namespace std; using namespace std;
static constexpr auto LOG_TAG = "PulsePlayer"; static constexpr auto LOG_TAG = "PulsePlayer";
static constexpr auto kDefaultBuffer = 50ms;
// Example code:
// https://code.qt.io/cgit/qt/qtmultimedia.git/tree/src/plugins/pulseaudio/qaudioinput_pulse.cpp?h=dev
// http://www.videolan.org/developers/vlc/modules/audio_output/pulse.c
// https://www.freedesktop.org/wiki/Software/PulseAudio/Documentation/Developer/Clients/Samples/AsyncPlayback/
PulsePlayer::PulsePlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream) PulsePlayer::PulsePlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: Player(io_context, settings, stream), latency_(100000), pa_ml_(nullptr), pa_ctx_(nullptr), playstream_(nullptr) : Player(io_context, settings, stream), latency_(100ms), pa_ml_(nullptr), pa_ctx_(nullptr), playstream_(nullptr)
{ {
auto params = utils::string::split_pairs(settings.parameter, ',', '=');
if (params.find("latency") != params.end())
{
latency_ = std::chrono::milliseconds(std::max(cpt::stoi(params["latency"]), 10));
}
LOG(INFO, LOG_TAG) << "Using latency: " << latency_.count() / 1000 << " ms\n";
} }
@ -52,8 +62,6 @@ bool PulsePlayer::needsThread() const
void PulsePlayer::worker() void PulsePlayer::worker()
{ {
// Run the mainloop until pa_mainloop_quit() is called
// (this example never calls it, so the mainloop runs forever).
pa_mainloop_run(pa_ml_, nullptr); pa_mainloop_run(pa_ml_, nullptr);
} }
@ -83,37 +91,36 @@ bool PulsePlayer::getHardwareVolume(double& volume, bool& muted)
void PulsePlayer::triggerVolumeUpdate() void PulsePlayer::triggerVolumeUpdate()
{ {
pa_context_get_sink_input_info( pa_context_get_sink_input_info(pa_ctx_, pa_stream_get_index(playstream_),
pa_ctx_, pa_stream_get_index(playstream_), [](pa_context* ctx, const pa_sink_input_info* info, int eol, void* userdata) {
[](pa_context* ctx, const pa_sink_input_info* info, int eol, void* userdata) { std::ignore = ctx;
std::ignore = ctx; LOG(DEBUG, LOG_TAG) << "pa_context_get_sink_info_by_index info: " << (info != nullptr) << ", eol: " << eol << "\n";
LOG(DEBUG, LOG_TAG) << "pa_context_get_sink_info_by_index info: " << (info != nullptr) << ", eol: " << eol << "\n"; if (info)
if (info) {
{ auto self = static_cast<PulsePlayer*>(userdata);
auto self = static_cast<PulsePlayer*>(userdata); auto volume = (double)pa_cvolume_avg(&(info->volume)) / (double)PA_VOLUME_NORM;
auto volume = (double)pa_cvolume_avg(&(info->volume)) / (double)PA_VOLUME_NORM; bool muted = (info->mute != 0);
bool muted = (info->mute != 0); LOG(DEBUG, LOG_TAG) << "volume changed: " << volume << ", muted: " << muted << "\n";
LOG(DEBUG, LOG_TAG) << "volume changed: " << volume << ", muted: " << muted << "\n";
auto now = std::chrono::steady_clock::now(); auto now = std::chrono::steady_clock::now();
if (now - self->last_change_ < 1s) if (now - self->last_change_ < 1s)
{ {
LOG(DEBUG, LOG_TAG) << "Last volume change by server: " LOG(DEBUG, LOG_TAG) << "Last volume change by server: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(now - self->last_change_).count() << std::chrono::duration_cast<std::chrono::milliseconds>(now - self->last_change_).count()
<< " ms => ignoring volume change\n"; << " ms => ignoring volume change\n";
return; return;
} }
self->notifyVolumeChange(volume, muted); self->notifyVolumeChange(volume, muted);
} }
}, },
this); this);
} }
void PulsePlayer::subscribeCallback(pa_context* ctx, pa_subscription_event_type_t event_type, uint32_t idx) void PulsePlayer::subscribeCallback(pa_context* ctx, pa_subscription_event_type_t event_type, uint32_t idx)
{ {
std::ignore = ctx; std::ignore = ctx;
LOG(DEBUG, LOG_TAG) << "subscribeCallback, event type: " << event_type << ", idx: " << idx << "\n"; LOG(TRACE, LOG_TAG) << "subscribeCallback, event type: " << event_type << ", idx: " << idx << "\n";
unsigned facility = event_type & PA_SUBSCRIPTION_EVENT_FACILITY_MASK; unsigned facility = event_type & PA_SUBSCRIPTION_EVENT_FACILITY_MASK;
event_type = static_cast<pa_subscription_event_type_t>(static_cast<int>(event_type) & PA_SUBSCRIPTION_EVENT_TYPE_MASK); event_type = static_cast<pa_subscription_event_type_t>(static_cast<int>(event_type) & PA_SUBSCRIPTION_EVENT_TYPE_MASK);
if (facility == PA_SUBSCRIPTION_EVENT_SINK_INPUT) if (facility == PA_SUBSCRIPTION_EVENT_SINK_INPUT)
@ -130,40 +137,56 @@ void PulsePlayer::underflowCallback(pa_stream* stream)
// We increase the latency by 50% if we get 6 underflows and latency is under 2s // We increase the latency by 50% if we get 6 underflows and latency is under 2s
// This is very useful for over the network playback that can't handle low latencies // This is very useful for over the network playback that can't handle low latencies
underflows_++; underflows_++;
LOG(INFO, LOG_TAG) << "undeflow #" << underflows_ << ", latency: " << latency_ / 1000 << " ms\n"; LOG(INFO, LOG_TAG) << "undeflow #" << underflows_ << ", latency: " << latency_.count() / 1000 << " ms\n";
if (underflows_ >= 6 && latency_ < 2000000) if (underflows_ >= 6 && latency_ < 500ms)
{ {
latency_ = (latency_ * 3) / 2; latency_ = (latency_ * 3) / 2;
bufattr_.maxlength = pa_usec_to_bytes(latency_, &pa_ss_); bufattr_.maxlength = pa_usec_to_bytes(latency_.count(), &pa_ss_);
bufattr_.tlength = pa_usec_to_bytes(latency_, &pa_ss_); bufattr_.tlength = pa_usec_to_bytes(latency_.count(), &pa_ss_);
pa_stream_set_buffer_attr(stream, &bufattr_, nullptr, nullptr); pa_stream_set_buffer_attr(stream, &bufattr_, nullptr, nullptr);
underflows_ = 0; underflows_ = 0;
LOG(INFO, LOG_TAG) << "latency increased to " << latency_ << " ms\n"; LOG(INFO, LOG_TAG) << "latency increased to " << latency_.count() / 1000 << " ms\n";
} }
} }
void PulsePlayer::stateCallback(pa_context* ctx) void PulsePlayer::stateCallback(pa_context* ctx)
{ {
pa_context_state_t state; pa_context_state_t state = pa_context_get_state(ctx);
state = pa_context_get_state(ctx); string str_state = "unknown";
pa_ready_ = 0;
switch (state) switch (state)
{ {
// These are just here for reference // These are just here for reference
case PA_CONTEXT_UNCONNECTED: case PA_CONTEXT_UNCONNECTED:
str_state = "unconnected";
break;
case PA_CONTEXT_CONNECTING: case PA_CONTEXT_CONNECTING:
str_state = "connecting";
break;
case PA_CONTEXT_AUTHORIZING: case PA_CONTEXT_AUTHORIZING:
str_state = "authorizing";
break;
case PA_CONTEXT_SETTING_NAME: case PA_CONTEXT_SETTING_NAME:
str_state = "setting name";
break;
default: default:
str_state = "unknown";
break; break;
case PA_CONTEXT_FAILED: case PA_CONTEXT_FAILED:
str_state = "failed";
pa_ready_ = 2;
break;
case PA_CONTEXT_TERMINATED: case PA_CONTEXT_TERMINATED:
str_state = "terminated";
pa_ready_ = 2; pa_ready_ = 2;
break; break;
case PA_CONTEXT_READY: case PA_CONTEXT_READY:
str_state = "ready";
pa_ready_ = 1; pa_ready_ = 1;
break; break;
} }
LOG(DEBUG, LOG_TAG) << "State changed " << state << ": " << str_state << "\n";
} }
@ -179,7 +202,7 @@ void PulsePlayer::writeCallback(pa_stream* stream, size_t nbytes)
// LOG(TRACE, LOG_TAG) << "writeCallback latency " << usec << " us, frames: " << numFrames << "\n"; // LOG(TRACE, LOG_TAG) << "writeCallback latency " << usec << " us, frames: " << numFrames << "\n";
if (!stream_->getPlayerChunk(buffer_.data(), std::chrono::microseconds(usec), numFrames)) if (!stream_->getPlayerChunk(buffer_.data(), std::chrono::microseconds(usec), numFrames))
{ {
LOG(INFO, LOG_TAG) << "Failed to get chunk. Playing silence.\n"; // LOG(INFO, LOG_TAG) << "Failed to get chunk. Playing silence.\n";
memset(buffer_.data(), 0, numFrames); memset(buffer_.data(), 0, numFrames);
} }
else else
@ -193,33 +216,6 @@ void PulsePlayer::writeCallback(pa_stream* stream, size_t nbytes)
void PulsePlayer::start() void PulsePlayer::start()
{ {
// Create a mainloop API and connection to the default server
pa_ml_ = pa_mainloop_new();
pa_mainloop_api* pa_mlapi = pa_mainloop_get_api(pa_ml_);
pa_ctx_ = pa_context_new(pa_mlapi, "Snapcast");
pa_context_connect(pa_ctx_, nullptr, PA_CONTEXT_NOFLAGS, nullptr);
// This function defines a callback so the server will tell us it's state.
// Our callback will wait for the state to be ready. The callback will
// modify the variable to 1 so we know when we have a connection and it's
// ready.
// If there's an error, the callback will set pa_ready to 2
pa_context_set_state_callback(
pa_ctx_,
[](pa_context* c, void* userdata) {
auto self = static_cast<PulsePlayer*>(userdata);
self->stateCallback(c);
},
this);
// We can't do anything until PA is ready, so just iterate the mainloop
// and continue
while (pa_ready_ == 0)
pa_mainloop_iterate(pa_ml_, 1, nullptr);
if (pa_ready_ == 2)
throw SnapException("PulseAudio is not ready");
const SampleFormat& format = stream_->getFormat(); const SampleFormat& format = stream_->getFormat();
pa_ss_.rate = format.rate(); pa_ss_.rate = format.rate();
pa_ss_.channels = format.channels(); pa_ss_.channels = format.channels();
@ -236,55 +232,78 @@ void PulsePlayer::start()
else else
throw SnapException("Unsupported sample format: " + cpt::to_string(format.bits())); throw SnapException("Unsupported sample format: " + cpt::to_string(format.bits()));
// Create a mainloop API and connection to the default server
pa_ready_ = 0;
pa_ml_ = pa_mainloop_new();
pa_mainloop_api* pa_mlapi = pa_mainloop_get_api(pa_ml_);
pa_ctx_ = pa_context_new(pa_mlapi, "Snapcast");
pa_context_connect(pa_ctx_, nullptr, PA_CONTEXT_NOFLAGS, nullptr);
// This function defines a callback so the server will tell us it's state.
// Our callback will wait for the state to be ready. The callback will
// modify the variable to 1 so we know when we have a connection and it's
// ready.
// If there's an error, the callback will set pa_ready to 2
pa_context_set_state_callback(pa_ctx_,
[](pa_context* c, void* userdata) {
auto self = static_cast<PulsePlayer*>(userdata);
self->stateCallback(c);
},
this);
// We can't do anything until PA is ready, so just iterate the mainloop
// and continue
while (pa_ready_ == 0)
pa_mainloop_iterate(pa_ml_, 1, nullptr);
if (pa_ready_ == 2)
throw SnapException("PulseAudio is not ready");
playstream_ = pa_stream_new(pa_ctx_, "Playback", &pa_ss_, nullptr); playstream_ = pa_stream_new(pa_ctx_, "Playback", &pa_ss_, nullptr);
if (!playstream_) if (!playstream_)
throw SnapException("Failed to create PulseAudio stream"); throw SnapException("Failed to create PulseAudio stream");
if (settings_.mixer.mode == ClientSettings::Mixer::Mode::hardware) if (settings_.mixer.mode == ClientSettings::Mixer::Mode::hardware)
{ {
pa_context_set_subscribe_callback( pa_context_set_subscribe_callback(pa_ctx_,
pa_ctx_, [](pa_context* ctx, pa_subscription_event_type_t event_type, uint32_t idx, void* userdata) {
[](pa_context* ctx, pa_subscription_event_type_t event_type, uint32_t idx, void* userdata) { auto self = static_cast<PulsePlayer*>(userdata);
auto self = static_cast<PulsePlayer*>(userdata); self->subscribeCallback(ctx, event_type, idx);
self->subscribeCallback(ctx, event_type, idx); },
}, this);
this);
const pa_subscription_mask_t mask = static_cast<pa_subscription_mask_t>(PA_SUBSCRIPTION_MASK_SINK_INPUT); const pa_subscription_mask_t mask = static_cast<pa_subscription_mask_t>(PA_SUBSCRIPTION_MASK_SINK_INPUT);
pa_context_subscribe( pa_context_subscribe(pa_ctx_, mask,
pa_ctx_, mask, [](pa_context* ctx, int success, void* userdata) {
[](pa_context* ctx, int success, void* userdata) { std::ignore = ctx;
std::ignore = ctx; if (success)
if (success) {
{ auto self = static_cast<PulsePlayer*>(userdata);
auto self = static_cast<PulsePlayer*>(userdata); self->triggerVolumeUpdate();
self->triggerVolumeUpdate(); }
} },
}, this);
this);
} }
pa_stream_set_write_callback( pa_stream_set_write_callback(playstream_,
playstream_, [](pa_stream* stream, size_t length, void* userdata) {
[](pa_stream* stream, size_t length, void* userdata) { auto self = static_cast<PulsePlayer*>(userdata);
auto self = static_cast<PulsePlayer*>(userdata); self->writeCallback(stream, length);
self->writeCallback(stream, length); },
}, this);
this);
pa_stream_set_underflow_callback( pa_stream_set_underflow_callback(playstream_,
playstream_, [](pa_stream* stream, void* userdata) {
[](pa_stream* stream, void* userdata) { auto self = static_cast<PulsePlayer*>(userdata);
auto self = static_cast<PulsePlayer*>(userdata); self->underflowCallback(stream);
self->underflowCallback(stream); },
}, this);
this);
bufattr_.fragsize = (uint32_t)-1; bufattr_.fragsize = (uint32_t)-1;
bufattr_.maxlength = pa_usec_to_bytes(latency_, &pa_ss_); bufattr_.maxlength = pa_usec_to_bytes(latency_.count(), &pa_ss_);
bufattr_.minreq = pa_usec_to_bytes(0, &pa_ss_); bufattr_.minreq = pa_usec_to_bytes(0, &pa_ss_);
bufattr_.prebuf = (uint32_t)-1; bufattr_.prebuf = (uint32_t)-1;
bufattr_.tlength = pa_usec_to_bytes(latency_, &pa_ss_); bufattr_.tlength = pa_usec_to_bytes(latency_.count(), &pa_ss_);
int result = pa_stream_connect_playback( int result = pa_stream_connect_playback(
playstream_, nullptr, &bufattr_, static_cast<pa_stream_flags>(PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE), playstream_, nullptr, &bufattr_, static_cast<pa_stream_flags>(PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE),

View file

@ -52,7 +52,7 @@ protected:
std::vector<char> buffer_; std::vector<char> buffer_;
int latency_; //< start latency in micro seconds std::chrono::microseconds latency_;
int underflows_ = 0; int underflows_ = 0;
int pa_ready_ = 0; int pa_ready_ = 0;

View file

@ -329,8 +329,13 @@ int main(int argc, char** argv)
if (settings.player.player_name == "file") if (settings.player.player_name == "file")
{ {
cout << "Options are a comma separated list of:\n" cout << "Options are a comma separated list of:\n"
<< " \"filename:<filename>\" - with <filename> = \"stdout\", \"stderr\" or a filename\n" << " \"filename=<filename>\" - with <filename> = \"stdout\", \"stderr\" or a filename\n"
<< " \"mode:[w|a]\" - w: write (discarding the content), a: append (keeping the content)\n"; << " \"mode=[w|a]\" - w: write (discarding the content), a: append (keeping the content)\n";
}
else if (settings.player.player_name == "pulse")
{
cout << "Options are a comma separated list of:\n"
<< " \"latency=<device latency [ms]>\" - default 100, min 10\n";
} }
else else
{ {