diff --git a/CMakeLists.txt b/CMakeLists.txt index 03848abc..edaaf064 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ option(BUILD_WITH_OPUS "Build with OPUS support" ON) option(BUILD_WITH_AVAHI "Build with AVAHI support" ON) option(BUILD_WITH_EXPAT "Build with EXPAT support" ON) option(BUILD_WITH_PULSE "Build with PulseAudio support" ON) +option(BUILD_WITH_JACK "Build with JACK support" ON) if(NOT BUILD_SHARED_LIBS AND NOT BUILD_STATIC_LIBS) message( @@ -200,6 +201,13 @@ if(NOT WIN32 AND NOT ANDROID) add_compile_definitions(HAS_ALSA) endif(ALSA_FOUND) + if(BUILD_WITH_JACK) + pkg_search_module(JACK jack) + if(JACK_FOUND) + add_compile_definitions(HAS_JACK) + endif(JACK_FOUND) + endif(BUILD_WITH_JACK) + if(BUILD_WITH_PULSE) pkg_search_module(PULSE libpulse) if(PULSE_FOUND) diff --git a/common/message/pcm_chunk.hpp b/common/message/pcm_chunk.hpp index 16ae3969..a1c6032e 100644 --- a/common/message/pcm_chunk.hpp +++ b/common/message/pcm_chunk.hpp @@ -128,6 +128,12 @@ public: // payloadSize = newSize; // } + void setFrameCount(int frameCount) { + auto newSize = format.frameSize() * frameCount; + payload = (char*)realloc(payload, newSize); + payloadSize = newSize; + } + double durationMs() const { return static_cast(getFrameCount()) / format.msRate(); diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 82f14c4d..bf02c392 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -96,6 +96,12 @@ else() list(APPEND SERVER_INCLUDE ${ALSA_INCLUDE_DIRS}) endif(ALSA_FOUND) + if(JACK_FOUND) + list(APPEND SERVER_SOURCES streamreader/jack_stream.cpp) + list(APPEND SERVER_LIBRARIES ${JACK_LIBRARIES}) + list(APPEND SERVER_INCLUDE ${JACK_INCLUDE_DIRS}) + endif(JACK_FOUND) + if(EXPAT_FOUND) list(APPEND SERVER_LIBRARIES ${EXPAT_LIBRARIES}) list(APPEND SERVER_INCLUDE ${EXPAT_INCLUDE_DIRS}) diff --git a/server/streamreader/jack_stream.cpp b/server/streamreader/jack_stream.cpp new file mode 100644 index 00000000..48fee923 --- /dev/null +++ b/server/streamreader/jack_stream.cpp @@ -0,0 +1,453 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2024 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +// prototype/interface header file +#include "jack_stream.hpp" + +#include + +// local headers +#include "common/aixlog.hpp" +#include "common/snap_exception.hpp" +#include "common/str_compat.hpp" + +// standard headers +#include +#include + +using namespace std; +using namespace std::chrono_literals; + +namespace streamreader +{ + +static constexpr auto LOG_TAG = "JackStream"; + + +void float_to_s32(char *dst, jack_default_audio_sample_t *src, unsigned long nsamples, unsigned long dst_skip) +{ + while (nsamples--) { + // float to S32 conversion + double clipped = fmin(1.0f, fmax((double)(*src), -1.0f)); + double scaled = clipped * 2147483647.0; + *(int32_t *)dst = lrint(scaled); + + dst += dst_skip; + src++; + } +} + +void float_to_s24(char *dst, jack_default_audio_sample_t *src, unsigned long nsamples, unsigned long dst_skip) +{ + int32_t tmp; + + while (nsamples--) { + + // float to S24 conversion + if (*src <= -1.0f) { + tmp = -8388607; + } else if (*src >= 1.0f) { + tmp = 8388607; + } else { + tmp = lrintf (*src * 8388607.0); + } + +#if __BYTE_ORDER == __LITTLE_ENDIAN + memcpy (dst, &tmp, 3); +#elif __BYTE_ORDER == __BIG_ENDIAN + memcpy (dst, (char *)&tmp + 1, 3); +#endif + dst += dst_skip; + src++; + } +} + +void float_to_s16(char *dst, jack_default_audio_sample_t *src, unsigned long nsamples, unsigned long dst_skip) +{ + while (nsamples--) { + + // float to S16 conversion + if (*src <= -1.0f) { + *((int16_t*) dst) = -32767; + } else if (*src >= 1.0f) { + *((int16_t*) dst) = 32767; + } else { + *((int16_t*) dst) = lrintf (*src * 32767.0); + } + + dst += dst_skip; + src++; + } +} + +namespace +{ +template +void wait(boost::asio::steady_timer& timer, const std::chrono::duration& duration, std::function handler) +{ + timer.expires_after(duration); + timer.async_wait( + [handler = std::move(handler)](const boost::system::error_code& ec) + { + if (ec) + { + LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n"; + } + else + { + handler(); + } + }); +} +} // namespace + +JackStream::JackStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri) + : PcmStream(pcmListener, ioc, server_settings, uri), read_timer_(strand_), silence_(0ms) +{ + send_silence_ = (uri_.getQuery("send_silence", "false") == "true"); + idle_threshold_ = std::chrono::milliseconds(std::max(cpt::stoi(uri_.getQuery("idle_threshold", "100")), 10)); + + doAutoConnect_ = (uri_.query.find("autoconnect") != uri_.query.end()); + autoConnectRegex_ = uri_.getQuery("autoconnect", ""); + autoConnectSkip_ = cpt::stoi(uri_.getQuery("autoconnect_skip", "0")); + + switch(sampleFormat_.bits()) + { + case 16: + interleave_func_ = float_to_s16; + break; + case 24: + interleave_func_ = float_to_s24; + break; + case 32: + interleave_func_ = float_to_s32; + break; + default: + throw SnapException("JackStreams only support 16, 24 and 32 bit sample formats"); + } + + jack_set_error_function(jackErrorMessage); + jack_set_info_function(jackInfoMessage); +} + + +void JackStream::start() +{ + LOG(TRACE, LOG_TAG) << "JackStream::start()\n"; + + boost::asio::post(strand_, [this] { tryConnect(); }); +} + + +void JackStream::stop() +{ + LOG(TRACE, LOG_TAG) << "JackStream::stop()\n"; + + PcmStream::stop(); + closeJackConnection(); +} + + +/** + * Try to connect to the jack server. If it fails, wait + * a little bit and try again. + */ +void JackStream::tryConnect() +{ + try { + if (!openJackConnection()) { + LOG(WARNING, LOG_TAG) << "Jack connection failed, trying again in 5 seconds\n"; + wait(read_timer_, 5s, [this] { tryConnect(); }); + return; + } + + if (!createJackPorts()) { + LOG(ERROR, LOG_TAG) << "Failed to create Jack ports, trying again in 5 seconds\n"; + closeJackConnection(); + wait(read_timer_, 5s, [this] { tryConnect(); }); + return; + } + + PcmStream::start(); + } + catch (exception& e) { + LOG(ERROR, LOG_TAG) << "Error during Jack connection: " << e.what() << "\n"; + stop(); + } +} + + +bool JackStream::openJackConnection() +{ + client_ = jack_client_open(name_.c_str(), JackNoStartServer, &status_, nullptr); + + if (client_ == NULL) { + return false; + } + + LOG(INFO, LOG_TAG) << "Connected Jack client " << jack_get_client_name(client_) << "\n"; + + jack_nframes_t jack_sample_rate = jack_get_sample_rate(client_); + if (jack_sample_rate != sampleFormat_.rate()) { + throw SnapException( + "Jack streams must match the sample rate of the Jack server. " + "The server sample rate is " + cpt::to_string(jack_sample_rate) + "." + ); + } + + jack_time_t connect_time = jack_get_time(); + jackConnectTime_ = std::chrono::steady_clock::now(); + jackTimeAdjust_ = chrono::duration_cast( + jackConnectTime_.time_since_epoch() + ).count() - connect_time; + + LOG(DEBUG, LOG_TAG) << name_ << ": Jack server time adjustment is " + << jackTimeAdjust_ << "\n"; + + jack_set_process_callback(client_, processCallback, this); + jack_on_shutdown(client_, jackShutdown, this); + jack_set_port_registration_callback(client_, jackPortRegistration, this); + + int err = jack_activate(client_); + if (err) + { + LOG(ERROR, LOG_TAG) << "Failed to activate Jack client " + << name_ << ": " << err << "\n"; + closeJackConnection(); + return false; + } + + return true; +} + +bool JackStream::createJackPorts() +{ + if (ports_.size() > 0) { + throw SnapException("Jack ports already created!"); + } + + int channelCount = sampleFormat_.channels(); + ports_.reserve(channelCount); + + // Register input ports + for (int i=0; i < channelCount; ++i) + { + std::string portName = "input_" + std::to_string(i); + jack_port_t *port = jack_port_register(client_, portName.c_str(), + JACK_DEFAULT_AUDIO_TYPE, JackPortIsInput, 0); + if (port == NULL) + { + LOG(ERROR, LOG_TAG) << name_ << ": failed to register port " << portName << "\n"; + return false; + } + ports_.push_back(port); + } + + return true; +} + +/** + * Reads the Jack port buffers, interlaces and converts the samples + * to the stream format and adds the resulting data to the + * ringbuffer, along with the jack timing information. + */ +int JackStream::readJackBuffers(jack_nframes_t nframes) +{ + jack_nframes_t current_frames; + jack_time_t current_usecs; + jack_time_t next_usecs; + float period_usecs; + + int err = jack_get_cycle_times(client_, ¤t_frames, ¤t_usecs, + &next_usecs, &period_usecs); + if (err) + { + LOG(ERROR, LOG_TAG) << "Unable to get Jack cycle times: " << err << "\n"; + return -1; + } + + int bytes_per_frame = sampleFormat_.sampleSize(); + int num_channels = sampleFormat_.channels(); + int payload_skip = bytes_per_frame * num_channels; + + // resize chunk payload buffer to match the Jack buffer size, if required + if (chunk_->getFrameCount() != nframes) + { + LOG(TRACE, LOG_TAG) << "Resizing chunk to " << nframes << " frames\n"; + chunk_->setFrameCount(nframes); + silent_chunk_ = std::vector(chunk_->payloadSize, 0); + } + + int connectedPorts = 0; + + for (size_t i=0; i < ports_.size(); i++) + { + int payload_offset = bytes_per_frame * i; + jack_port_t *port = ports_[i]; + + if (jack_port_connected(port)) + { + connectedPorts++; + } + + jack_default_audio_sample_t *buf = static_cast( + jack_port_get_buffer(port, nframes) + ); + + if (buf == NULL) + { + LOG(ERROR, LOG_TAG) << "Unable to get Jack port buffer!\n"; + return -1; + } + + interleave_func_(chunk_->payload + payload_offset, buf, nframes, payload_skip); + } + + if (connectedPorts == 0 || isSilent(*chunk_)) + { + silence_ += chunk_->duration(); + if (silence_ > idle_threshold_) + { + setState(ReaderState::kIdle); + } + } + else + { + silence_ = 0ms; + setState(ReaderState::kPlaying); + } + + if ((state_ == ReaderState::kPlaying) || ((state_ == ReaderState::kIdle) && send_silence_)) + { + // We use the (adjusted) Jack server time as chunk time, that way all Jack + // streams will play simultaneously (hopefully!) + tvEncodedChunk_ = std::chrono::time_point( + static_cast(current_usecs + jackTimeAdjust_) + ); + + // TODO: should we chunkRead() in a separate thread? + chunkRead(*chunk_); + } + + return 0; +} + +void JackStream::closeJackConnection() +{ + if (client_ == NULL) { + return; + } + + LOG(INFO, LOG_TAG) << "Closing Jack connection for " << name_ << "\n"; + + ports_.clear(); + + jack_client_close(client_); + client_ = NULL; +} + +void JackStream::onJackPortRegistration(jack_port_id_t port_id, int registered) +{ + if (!doAutoConnect_ || !registered) { + return; + } + + jack_port_t *port = jack_port_by_id(client_, port_id); + if (port == NULL) { + return; + } + + if (jack_port_is_mine(client_, port)) { + return; + } + + boost::asio::post(strand_, [this] { autoConnectPorts(); }); +} + +void JackStream::autoConnectPorts() +{ + const char **portNames = jack_get_ports(client_, autoConnectRegex_.c_str(), + NULL, JackPortIsOutput); + + if (portNames == NULL) { + return; + } + + size_t portIdx = 0; + int nameIdx = 0; + + while(portIdx < ports_.size() && portNames[nameIdx] != NULL) + { + if (nameIdx < autoConnectSkip_) + { + nameIdx++; + continue; + } + + if (!jack_port_connected_to(ports_[portIdx], portNames[nameIdx])) { + + const char *localPortName = jack_port_name(ports_[portIdx]); + int err = jack_connect(client_, portNames[nameIdx], localPortName); + if (err != 0) { + LOG(ERROR, LOG_TAG) << "Unable to autoconnect " << localPortName + << " to " << portNames[nameIdx] << "(Error: " << err << ")\n"; + } + } + portIdx++; + nameIdx++; + } + + jack_free(portNames); +} + +void JackStream::onJackShutdown() +{ + LOG(ERROR, LOG_TAG) << "Jack has shut down, trying to connect again!\n"; + PcmStream::stop(); + ports_.clear(); + client_ = NULL; + wait(read_timer_, 1000ms, [this] { tryConnect(); }); +} + +int JackStream::processCallback(jack_nframes_t nframes, void* arg) +{ + return static_cast(arg)->readJackBuffers(nframes); +} + +void JackStream::jackShutdown(void* arg) +{ + static_cast(arg)->onJackShutdown(); +} + +void JackStream::jackErrorMessage(const char* msg) +{ + //LOG(TRACE, LOG_TAG) << "Jack Error: " << msg << "\n"; +} + +void JackStream::jackInfoMessage(const char* msg) +{ + //LOG(TRACE, LOG_TAG) << msg << "\n"; +} + +void JackStream::jackPortRegistration(jack_port_id_t port_id, int registered, void* arg) +{ + return static_cast(arg)->onJackPortRegistration(port_id, registered); +} + +} // namespace streamreader + diff --git a/server/streamreader/jack_stream.hpp b/server/streamreader/jack_stream.hpp new file mode 100644 index 00000000..19015c8c --- /dev/null +++ b/server/streamreader/jack_stream.hpp @@ -0,0 +1,98 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2024 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#pragma once + +#include + +// local headers +#include "pcm_stream.hpp" +#include + +// 3rd party headers +#include +#include +#include + + +namespace streamreader +{ + + +/// Reads and decodes PCM data from a Jack server +/** + * Reads PCM from a Jack server and passes the data to an encoder. + * Implements EncoderListener to get the encoded data. + * Data is passed to the PcmStream::Listener + */ +class JackStream : public PcmStream +{ +public: + /// ctor. Encoded PCM data is passed to the PipeListener + JackStream(PcmStream::Listener* pcmListener, boost::asio::io_context& ioc, const ServerSettings& server_settings, const StreamUri& uri); + + void start() override; + void stop() override; + +protected: + bool openJackConnection(); + void closeJackConnection(); + bool createJackPorts(); + void tryConnect(); + int readJackBuffers(jack_nframes_t nframes); + + void onJackPortRegistration(jack_port_id_t port_id, int registered); + void onJackShutdown(); + + void autoConnectPorts(); + + jack_client_t *client_; + jack_status_t status_; + std::vector ports_; + jack_nframes_t jackConnectFrames_; + std::chrono::time_point jackConnectTime_; + jack_time_t jackTimeAdjust_; + SampleFormat jackSampleFormat_; + + std::string autoConnectRegex_; + bool doAutoConnect_ = false; + int autoConnectSkip_; + + void (*interleave_func_)(char *, jack_default_audio_sample_t *, unsigned long, unsigned long); + + bool first_; + std::chrono::time_point nextTick_; + boost::asio::steady_timer read_timer_; + std::chrono::microseconds silence_; + std::string lastException_; + + /// send silent chunks to clients + bool send_silence_; + /// silence duration before switching the stream to idle + std::chrono::milliseconds idle_threshold_; + + static int processCallback(jack_nframes_t nframes, void* arg); + static void jackShutdown(void* arg); + static void jackErrorMessage(const char* msg); + static void jackInfoMessage(const char* msg); + static void jackPortConnect(jack_port_id_t a, jack_port_id_t b, int connect, void* arg); + static void jackPortRegistration(jack_port_id_t port_id, int registered, void* arg); +}; + +} // namespace streamreader + diff --git a/server/streamreader/stream_manager.cpp b/server/streamreader/stream_manager.cpp index 961b87ba..cb4faa21 100644 --- a/server/streamreader/stream_manager.cpp +++ b/server/streamreader/stream_manager.cpp @@ -24,6 +24,9 @@ #ifdef HAS_ALSA #include "alsa_stream.hpp" #endif +#ifdef HAS_JACK +#include "jack_stream.hpp" +#endif #include "common/snap_exception.hpp" #include "common/str_compat.hpp" #include "file_stream.hpp" @@ -101,6 +104,12 @@ PcmStreamPtr StreamManager::addStream(StreamUri& streamUri) { stream = make_shared(listener, io_context_, settings_, streamUri); } +#endif +#ifdef HAS_JACK + else if (streamUri.scheme == "jack") + { + stream = make_shared(listener, io_context_, settings_, streamUri); + } #endif else if ((streamUri.scheme == "spotify") || (streamUri.scheme == "librespot")) {