diff --git a/server/Makefile b/server/Makefile index 80c181c6..b40d49c4 100644 --- a/server/Makefile +++ b/server/Makefile @@ -6,7 +6,7 @@ CXX = /usr/bin/g++ CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common -OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o +OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o pcmreader/pcmReader.o pcmreader/pipeReader.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o BIN = snapserver all: $(TARGET) diff --git a/server/controlServer.h b/server/controlServer.h index ae0b8fc0..1ac9084c 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -28,7 +28,6 @@ #include #include "controlSession.h" -#include "pipeReader.h" #include "common/queue.h" #include "message/message.h" #include "message/header.h" diff --git a/server/pcmreader/pcmReader.cpp b/server/pcmreader/pcmReader.cpp new file mode 100644 index 00000000..83a22664 --- /dev/null +++ b/server/pcmreader/pcmReader.cpp @@ -0,0 +1,84 @@ +/*** + This file is part of snapcast + Copyright (C) 2015 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#include +#include +#include +#include + +#include "pcmReader.h" +#include "../encoder/encoderFactory.h" +#include "common/log.h" +#include "common/snapException.h" + + +using namespace std; + + + + +PcmReader::PcmReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : pcmListener_(pcmListener), sampleFormat_(sampleFormat), pcmReadMs_(pcmReadMs) +{ + EncoderFactory encoderFactory; + encoder_.reset(encoderFactory.createEncoder(codec)); +} + + +PcmReader::~PcmReader() +{ + stop(); + close(fd_); +} + + +msg::Header* PcmReader::getHeader() +{ + return encoder_->getHeader(); +} + + +void PcmReader::start() +{ + encoder_->init(this, sampleFormat_); + active_ = true; + readerThread_ = thread(&PcmReader::worker, this); +} + + +void PcmReader::stop() +{ + if (active_) + { + active_ = false; + readerThread_.join(); + } +} + + +void PcmReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration) +{ +// logO << "onChunkEncoded: " << duration << " us\n"; + if (duration <= 0) + return; + + chunk->timestamp.sec = tvEncodedChunk_.tv_sec; + chunk->timestamp.usec = tvEncodedChunk_.tv_usec; + chronos::addUs(tvEncodedChunk_, duration * 1000); + pcmListener_->onChunkRead(this, chunk, duration); +} + diff --git a/server/pipeReader.h b/server/pcmreader/pcmReader.h similarity index 58% rename from server/pipeReader.h rename to server/pcmreader/pcmReader.h index 1bea1226..cb8a9375 100644 --- a/server/pipeReader.h +++ b/server/pcmreader/pcmReader.h @@ -16,60 +16,60 @@ along with this program. If not, see . ***/ -#ifndef PIPE_READER_H -#define PIPE_READER_H +#ifndef PCM_READER_H +#define PCM_READER_H #include #include #include -#include "encoder/encoder.h" +#include "../encoder/encoder.h" #include "message/sampleFormat.h" #include "message/header.h" -class PipeReader; +class PcmReader; -/// Callback interface for users of PipeReader +/// Callback interface for users of PcmReader /** - * Users of PipeReader should implement this to get the data + * Users of PcmReader should implement this to get the data */ -class PipeListener +class PcmListener { public: - virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration) = 0; - virtual void onResync(const PipeReader* pipeReader, double ms) = 0; + virtual void onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) = 0; + virtual void onResync(const PcmReader* pcmReader, double ms) = 0; }; -/// Reads and decodes PCM data from a named pipe +/// Reads and decodes PCM data /** - * Reads PCM from a named pipe and passes the data to an encoder. + * Reads PCM and passes the data to an encoder. * Implements EncoderListener to get the encoded data. - * Data is passed to the PipeListener + * Data is passed to the PcmListener */ -class PipeReader : public EncoderListener +class PcmReader : public EncoderListener { public: - /// ctor. Encoded PCM data is passed to the PipeListener - PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20); - virtual ~PipeReader(); + /// ctor. Encoded PCM data is passed to the PcmListener + PcmReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20); + virtual ~PcmReader(); - void start(); - void stop(); + virtual void start(); + virtual void stop(); /// Implementation of EncoderListener::onChunkEncoded virtual void onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration); - msg::Header* getHeader(); + virtual msg::Header* getHeader(); protected: - void worker(); + virtual void worker() = 0; int fd_; timeval tvEncodedChunk_; std::atomic active_; std::thread readerThread_; - PipeListener* pipeListener_; + PcmListener* pcmListener_; msg::SampleFormat sampleFormat_; size_t pcmReadMs_; std::unique_ptr encoder_; diff --git a/server/pipeReader.cpp b/server/pcmreader/pipeReader.cpp similarity index 66% rename from server/pipeReader.cpp rename to server/pcmreader/pipeReader.cpp index 98ce54ab..6dd33b42 100644 --- a/server/pipeReader.cpp +++ b/server/pcmreader/pipeReader.cpp @@ -22,7 +22,7 @@ #include #include "pipeReader.h" -#include "encoder/encoderFactory.h" +#include "../encoder/encoderFactory.h" #include "common/log.h" #include "common/snapException.h" @@ -32,59 +32,18 @@ using namespace std; -PipeReader::PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : pipeListener_(pipeListener), sampleFormat_(sampleFormat), pcmReadMs_(pcmReadMs) +PipeReader::PipeReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : PcmReader(pcmListener, sampleFormat, codec, fifoName, pcmReadMs) { umask(0); mkfifo(fifoName.c_str(), 0666); fd_ = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK); if (fd_ == -1) throw SnapException("failed to open fifo: \"" + fifoName + "\""); - EncoderFactory encoderFactory; - encoder_.reset(encoderFactory.createEncoder(codec)); } PipeReader::~PipeReader() { - stop(); - close(fd_); -} - - -msg::Header* PipeReader::getHeader() -{ - return encoder_->getHeader(); -} - - -void PipeReader::start() -{ - encoder_->init(this, sampleFormat_); - active_ = true; - readerThread_ = thread(&PipeReader::worker, this); -} - - -void PipeReader::stop() -{ - if (active_) - { - active_ = false; - readerThread_.join(); - } -} - - -void PipeReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration) -{ -// logO << "onChunkEncoded: " << duration << " us\n"; - if (duration <= 0) - return; - - chunk->timestamp.sec = tvEncodedChunk_.tv_sec; - chunk->timestamp.usec = tvEncodedChunk_.tv_usec; - chronos::addUs(tvEncodedChunk_, duration * 1000); - pipeListener_->onChunkRead(this, chunk, duration); } @@ -131,7 +90,7 @@ void PipeReader::worker() { gettimeofday(&tvChunk, NULL); tvEncodedChunk_ = tvChunk; - pipeListener_->onResync(this, currentTick - nextTick); + pcmListener_->onResync(this, currentTick - nextTick); nextTick = currentTick; } } diff --git a/server/pcmreader/pipeReader.h b/server/pcmreader/pipeReader.h new file mode 100644 index 00000000..339dbc00 --- /dev/null +++ b/server/pcmreader/pipeReader.h @@ -0,0 +1,44 @@ +/*** + This file is part of snapcast + Copyright (C) 2015 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef PIPE_READER_H +#define PIPE_READER_H + +#include "pcmReader.h" + + + +/// Reads and decodes PCM data from a named pipe +/** + * Reads PCM from a named pipe and passes the data to an encoder. + * Implements EncoderListener to get the encoded data. + * Data is passed to the PcmListener + */ +class PipeReader : public PcmReader +{ +public: + /// ctor. Encoded PCM data is passed to the PipeListener + PipeReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20); + virtual ~PipeReader(); + +protected: + void worker(); +}; + + +#endif diff --git a/server/streamServer.cpp b/server/streamServer.cpp index f80da338..86465c18 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -76,14 +76,14 @@ void StreamServer::send(const msg::BaseMessage* message) } -void StreamServer::onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration) +void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) { // logO << "onChunkRead " << duration << "ms\n"; send(chunk); } -void StreamServer::onResync(const PipeReader* pipeReader, double ms) +void StreamServer::onResync(const PcmReader* pcmReader, double ms) { logO << "onResync " << ms << "ms\n"; } @@ -248,7 +248,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM else if (requestMsg.request == kHeader) { std::unique_lock mlock(mutex_); - msg::Header* headerChunk = pipeReader_->getHeader(); + msg::Header* headerChunk = pcmReader_->getHeader(); headerChunk->refersTo = requestMsg.id; connection->send(headerChunk); } @@ -328,8 +328,8 @@ void StreamServer::start() controlServer_.reset(new ControlServer(io_service_, settings_.controlPort, this)); controlServer_->start(); - pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs); - pipeReader_->start(); + pcmReader_ .reset(new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs)); + pcmReader_->start(); acceptor_ = make_shared(*io_service_, tcp::endpoint(tcp::v4(), settings_.port)); startAccept(); } @@ -339,7 +339,7 @@ void StreamServer::stop() { controlServer_->stop(); acceptor_->cancel(); - pipeReader_->stop(); + pcmReader_->stop(); std::unique_lock mlock(mutex_); for (auto it = sessions_.begin(); it != sessions_.end(); ++it) (*it)->stop(); diff --git a/server/streamServer.h b/server/streamServer.h index 6448ef04..fcfee9f2 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -28,7 +28,7 @@ #include #include "streamSession.h" -#include "pipeReader.h" +#include "pcmreader/pipeReader.h" #include "common/queue.h" #include "message/message.h" #include "message/header.h" @@ -65,12 +65,12 @@ struct StreamServerSettings /// Forwars PCM data to the connected clients /** - * Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream. + * Reads PCM data using PipeReader, implements PcmListener to get the (encoded) PCM stream. * Accepts and holds client connections (StreamSession) * Receives (via the MessageReceiver interface) and answers messages from the clients * Forwards PCM data to the clients */ -class StreamServer : public MessageReceiver, ControlMessageReceiver, PipeListener +class StreamServer : public MessageReceiver, ControlMessageReceiver, PcmListener { public: StreamServer(boost::asio::io_service* io_service, const StreamServerSettings& streamServerSettings); @@ -89,16 +89,16 @@ public: /// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived virtual void onMessageReceived(ControlSession* connection, const std::string& message); - /// Implementation of PipeListener - virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration); - virtual void onResync(const PipeReader* pipeReader, double ms); + /// Implementation of PcmListener + virtual void onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration); + virtual void onResync(const PcmReader* pcmReader, double ms); private: void startAccept(); void handleAccept(socket_ptr socket); StreamSession* getStreamSession(const std::string& mac); mutable std::mutex mutex_; - PipeReader* pipeReader_; + std::unique_ptr pcmReader_; std::set> sessions_; boost::asio::io_service* io_service_; std::shared_ptr acceptor_;