diff --git a/server/publishZeroConf/publish_avahi.cpp b/server/publishZeroConf/publish_avahi.cpp index da6aa397..c46cbc8b 100644 --- a/server/publishZeroConf/publish_avahi.cpp +++ b/server/publishZeroConf/publish_avahi.cpp @@ -27,7 +27,7 @@ static AvahiEntryGroup* group; static AvahiSimplePoll* simple_poll; static char* name; -PublishAvahi::PublishAvahi(const std::string& serviceName) : PublishmDNS(serviceName), client_(nullptr), active_(false) +PublishAvahi::PublishAvahi(const std::string& serviceName, boost::asio::io_context& ioc) : PublishmDNS(serviceName, ioc), client_(nullptr), timer_(ioc) { group = nullptr; simple_poll = nullptr; @@ -56,22 +56,24 @@ void PublishAvahi::publish(const std::vector& services) LOG(ERROR) << "Failed to create client: " << avahi_strerror(error) << "\n"; } - active_ = true; - pollThread_ = std::thread(&PublishAvahi::worker, this); + poll(); } -void PublishAvahi::worker() +void PublishAvahi::poll() { - while (active_ && (avahi_simple_poll_iterate(simple_poll, 100) == 0)) - ; + auto self = shared_from_this(); + timer_.expires_from_now(boost::posix_time::milliseconds(50)); + timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (!ec && (avahi_simple_poll_iterate(simple_poll, 0) == 0)) + poll(); + }); } PublishAvahi::~PublishAvahi() { - active_ = false; - pollThread_.join(); + timer_.cancel(); if (client_) avahi_client_free(client_); diff --git a/server/publishZeroConf/publish_avahi.hpp b/server/publishZeroConf/publish_avahi.hpp index 98f7ab25..ff29d792 100644 --- a/server/publishZeroConf/publish_avahi.hpp +++ b/server/publishZeroConf/publish_avahi.hpp @@ -37,10 +37,10 @@ class PublishAvahi; #include "publish_mdns.hpp" -class PublishAvahi : public PublishmDNS +class PublishAvahi : public PublishmDNS, public std::enable_shared_from_this { public: - PublishAvahi(const std::string& serviceName); + PublishAvahi(const std::string& serviceName, boost::asio::io_context& ioc); ~PublishAvahi() override; void publish(const std::vector& services) override; @@ -48,11 +48,10 @@ private: static void entry_group_callback(AvahiEntryGroup* g, AvahiEntryGroupState state, AVAHI_GCC_UNUSED void* userdata); static void client_callback(AvahiClient* c, AvahiClientState state, AVAHI_GCC_UNUSED void* userdata); void create_services(AvahiClient* c); - void worker(); + void poll(); AvahiClient* client_; - std::thread pollThread_; - std::atomic active_; std::vector services_; + boost::asio::deadline_timer timer_; }; diff --git a/server/publishZeroConf/publish_bonjour.cpp b/server/publishZeroConf/publish_bonjour.cpp index f7aedf27..7a0c1207 100644 --- a/server/publishZeroConf/publish_bonjour.cpp +++ b/server/publishZeroConf/publish_bonjour.cpp @@ -28,7 +28,7 @@ typedef union { } Opaque16; -PublishBonjour::PublishBonjour(const std::string& serviceName) : PublishmDNS(serviceName), active_(false) +PublishBonjour::PublishBonjour(const std::string& serviceName, boost::asio::io_context& ioc) : PublishmDNS(serviceName, ioc), active_(false) { /// dns-sd -R Snapcast _snapcast._tcp local 1704 /// dns-sd -R Snapcast _snapcast-jsonrpc._tcp local 1705 diff --git a/server/publishZeroConf/publish_bonjour.hpp b/server/publishZeroConf/publish_bonjour.hpp index 51cfbccd..6d82cac2 100644 --- a/server/publishZeroConf/publish_bonjour.hpp +++ b/server/publishZeroConf/publish_bonjour.hpp @@ -30,7 +30,7 @@ class PublishBonjour; class PublishBonjour : public PublishmDNS { public: - PublishBonjour(const std::string& serviceName); + PublishBonjour(const std::string& serviceName, boost::asio::io_context& ioc); virtual ~PublishBonjour(); virtual void publish(const std::vector& services); diff --git a/server/publishZeroConf/publish_mdns.hpp b/server/publishZeroConf/publish_mdns.hpp index d3922f89..35de2e95 100644 --- a/server/publishZeroConf/publish_mdns.hpp +++ b/server/publishZeroConf/publish_mdns.hpp @@ -1,6 +1,7 @@ #ifndef PUBLISH_MDNS_H #define PUBLISH_MDNS_H +#include #include #include @@ -19,7 +20,7 @@ struct mDNSService class PublishmDNS { public: - PublishmDNS(const std::string& serviceName) : serviceName_(serviceName) + PublishmDNS(const std::string& serviceName, boost::asio::io_context& ioc) : serviceName_(serviceName), ioc_(ioc) { } @@ -29,6 +30,7 @@ public: protected: std::string serviceName_; + boost::asio::io_context& ioc_; }; #if defined(HAS_AVAHI) diff --git a/server/snapserver.cpp b/server/snapserver.cpp index c760ae99..c7a43b52 100644 --- a/server/snapserver.cpp +++ b/server/snapserver.cpp @@ -235,9 +235,9 @@ int main(int argc, char* argv[]) Config::instance().init(); #endif - + boost::asio::io_context io_context; #if defined(HAS_AVAHI) || defined(HAS_BONJOUR) - PublishZeroConf publishZeroConfg("Snapcast"); + auto publishZeroConfg = std::make_shared("Snapcast", io_context); vector dns_services; dns_services.emplace_back("_snapcast._tcp", settings.stream.port); dns_services.emplace_back("_snapcast-stream._tcp", settings.stream.port); @@ -250,7 +250,7 @@ int main(int argc, char* argv[]) { dns_services.emplace_back("_snapcast-http._tcp", settings.http.port); } - publishZeroConfg.publish(dns_services); + publishZeroConfg->publish(dns_services); #endif if (settings.stream.streamReadMs < 10) { @@ -264,7 +264,6 @@ int main(int argc, char* argv[]) settings.stream.bufferMs = 400; } - boost::asio::io_context io_context; std::unique_ptr streamServer(new StreamServer(io_context, settings)); streamServer->start(); diff --git a/server/streamreader/pipe_stream.cpp b/server/streamreader/pipe_stream.cpp index eb14ba17..d8ed163f 100644 --- a/server/streamreader/pipe_stream.cpp +++ b/server/streamreader/pipe_stream.cpp @@ -33,7 +33,7 @@ using namespace std; -PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), fd_(-1) +PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), timer_(ioc) { umask(0); string mode = uri_.getQuery("mode", "create"); @@ -47,107 +47,187 @@ PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, c if ((mkfifo(uri_.path.c_str(), 0666) != 0) && (errno != EEXIST)) throw SnapException("failed to make fifo \"" + uri_.path + "\": " + cpt::to_string(errno)); } + chunk_ = make_unique(sampleFormat_, pcmReadMs_); } PipeStream::~PipeStream() { - if (fd_ != -1) - close(fd_); + fifo_->close(); } -void PipeStream::worker() +void PipeStream::start() { - timeval tvChunk; - std::unique_ptr chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_)); - string lastException = ""; - - while (active_) - { - if (fd_ != -1) - close(fd_); - fd_ = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); - chronos::systemtimeofday(&tvChunk); - tvEncodedChunk_ = tvChunk; - long nextTick = chronos::getTickCount(); - int idleBytes = 0; - int maxIdleBytes = sampleFormat_.rate * sampleFormat_.frameSize * dryoutMs_ / 1000; - try - { - if (fd_ == -1) - throw SnapException("failed to open fifo: \"" + uri_.path + "\""); - - while (active_) - { - chunk->timestamp.sec = tvChunk.tv_sec; - chunk->timestamp.usec = tvChunk.tv_usec; - int toRead = chunk->payloadSize; - int len = 0; - do - { - int count = read(fd_, chunk->payload + len, toRead - len); - if (count < 0 && idleBytes < maxIdleBytes) - { - memset(chunk->payload + len, 0, toRead - len); - idleBytes += toRead - len; - len += toRead - len; - continue; - } - if (count < 0) - { - setState(kIdle); - if (!sleep(100)) - break; - } - else if (count == 0) - throw SnapException("end of file"); - else - { - len += count; - idleBytes = 0; - } - } while ((len < toRead) && active_); - - if (!active_) - break; - - /// TODO: use less raw pointers, make this encoding more transparent - encoder_->encode(chunk.get()); - - if (!active_) - break; - - nextTick += pcmReadMs_; - chronos::addUs(tvChunk, pcmReadMs_ * 1000); - long currentTick = chronos::getTickCount(); - - if (nextTick >= currentTick) - { - setState(kPlaying); - if (!sleep(nextTick - currentTick)) - break; - } - else - { - chronos::systemtimeofday(&tvChunk); - tvEncodedChunk_ = tvChunk; - pcmListener_->onResync(this, currentTick - nextTick); - nextTick = currentTick; - } - - lastException = ""; - } - } - catch (const std::exception& e) - { - if (lastException != e.what()) - { - LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl; - lastException = e.what(); - } - if (!sleep(100)) - break; - } - } + encoder_->init(this, sampleFormat_); + active_ = true; + do_accept(); } + + +void PipeStream::do_accept() +{ + LOG(DEBUG) << "do_accept\n"; + auto self = shared_from_this(); + auto fd = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); + fifo_ = std::make_unique(ioc_, fd); + chronos::systemtimeofday(&tv_chunk_); + tvEncodedChunk_ = tv_chunk_; + nextTick_ = chronos::getTickCount(); + first_ = true; + do_read(); +} + + +void PipeStream::do_read() +{ + // LOG(DEBUG) << "do_read\n"; + auto self = shared_from_this(); + chunk_->timestamp.sec = tv_chunk_.tv_sec; + chunk_->timestamp.usec = tv_chunk_.tv_usec; + boost::asio::async_read(*fifo_, boost::asio::buffer(chunk_->payload, chunk_->payloadSize), + [this, self](boost::system::error_code ec, std::size_t length) mutable { + if (ec) + { + LOG(ERROR) << "Error reading message: " << ec.message() << ", length: " << length << "\n"; + do_accept(); + return; + } + // LOG(DEBUG) << "Read: " << length << " bytes\n"; + if (first_) + { + first_ = false; + chronos::systemtimeofday(&tv_chunk_); + chunk_->timestamp.sec = tv_chunk_.tv_sec; + chunk_->timestamp.usec = tv_chunk_.tv_usec; + tvEncodedChunk_ = tv_chunk_; + nextTick_ = chronos::getTickCount(); + } + encoder_->encode(chunk_.get()); + nextTick_ += pcmReadMs_; + chronos::addUs(tv_chunk_, pcmReadMs_ * 1000); + long currentTick = chronos::getTickCount(); + + if (nextTick_ >= currentTick) + { + setState(kPlaying); + timer_.expires_from_now(boost::posix_time::milliseconds(nextTick_ - currentTick)); + timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (ec) + { + LOG(ERROR) << "Error during async wait: " << ec.message() << "\n"; + } + else + { + do_read(); + } + }); + return; + } + else + { + chronos::systemtimeofday(&tv_chunk_); + tvEncodedChunk_ = tv_chunk_; + pcmListener_->onResync(this, currentTick - nextTick_); + nextTick_ = currentTick; + do_read(); + } + }); +} + + + +// void PipeStream::worker() +// { +// timeval tvChunk; +// std::unique_ptr chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_)); +// string lastException = ""; + +// while (active_) +// { +// if (fd_ != -1) +// close(fd_); +// fd_ = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK); +// chronos::systemtimeofday(&tvChunk); +// tvEncodedChunk_ = tvChunk; +// long nextTick = chronos::getTickCount(); +// int idleBytes = 0; +// int maxIdleBytes = sampleFormat_.rate * sampleFormat_.frameSize * dryoutMs_ / 1000; +// try +// { +// if (fd_ == -1) +// throw SnapException("failed to open fifo: \"" + uri_.path + "\""); + +// while (active_) +// { +// chunk->timestamp.sec = tvChunk.tv_sec; +// chunk->timestamp.usec = tvChunk.tv_usec; +// int toRead = chunk->payloadSize; +// int len = 0; +// do +// { +// int count = read(fd_, chunk->payload + len, toRead - len); +// if (count < 0 && idleBytes < maxIdleBytes) +// { +// memset(chunk->payload + len, 0, toRead - len); +// idleBytes += toRead - len; +// len += toRead - len; +// continue; +// } +// if (count < 0) +// { +// setState(kIdle); +// if (!sleep(100)) +// break; +// } +// else if (count == 0) +// throw SnapException("end of file"); +// else +// { +// len += count; +// idleBytes = 0; +// } +// } while ((len < toRead) && active_); + +// if (!active_) +// break; + +// /// TODO: use less raw pointers, make this encoding more transparent +// encoder_->encode(chunk.get()); + +// if (!active_) +// break; + +// nextTick += pcmReadMs_; +// chronos::addUs(tvChunk, pcmReadMs_ * 1000); +// long currentTick = chronos::getTickCount(); + +// if (nextTick >= currentTick) +// { +// setState(kPlaying); +// if (!sleep(nextTick - currentTick)) +// break; +// } +// else +// { +// chronos::systemtimeofday(&tvChunk); +// tvEncodedChunk_ = tvChunk; +// pcmListener_->onResync(this, currentTick - nextTick); +// nextTick = currentTick; +// } + +// lastException = ""; +// } +// } +// catch (const std::exception& e) +// { +// if (lastException != e.what()) +// { +// LOG(ERROR) << "(PipeStream) Exception: " << e.what() << std::endl; +// lastException = e.what(); +// } +// if (!sleep(100)) +// break; +// } +// } +// } diff --git a/server/streamreader/pipe_stream.hpp b/server/streamreader/pipe_stream.hpp index e5fbc43f..623a7565 100644 --- a/server/streamreader/pipe_stream.hpp +++ b/server/streamreader/pipe_stream.hpp @@ -20,7 +20,7 @@ #define PIPE_STREAM_H #include "pcm_stream.hpp" - +#include /// Reads and decodes PCM data from a named pipe @@ -29,16 +29,24 @@ * Implements EncoderListener to get the encoded data. * Data is passed to the PcmListener */ -class PipeStream : public PcmStream +class PipeStream : public PcmStream, public std::enable_shared_from_this { public: /// ctor. Encoded PCM data is passed to the PipeListener PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri); ~PipeStream() override; + void start() override; + protected: - void worker() override; - int fd_; + void do_accept(); + void do_read(); + std::unique_ptr chunk_; + timeval tv_chunk_; + bool first_; + long nextTick_; + boost::asio::deadline_timer timer_; + std::unique_ptr fifo_; };