added PCM reader base class

This commit is contained in:
badaix 2015-11-04 19:47:59 +01:00
parent 6d1cc2d081
commit 2036477446
8 changed files with 166 additions and 80 deletions

View file

@ -6,7 +6,7 @@ CXX = /usr/bin/g++
CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common
OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o
OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o pcmreader/pcmReader.o pcmreader/pipeReader.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o
BIN = snapserver
all: $(TARGET)

View file

@ -28,7 +28,6 @@
#include <mutex>
#include "controlSession.h"
#include "pipeReader.h"
#include "common/queue.h"
#include "message/message.h"
#include "message/header.h"

View file

@ -0,0 +1,84 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <memory>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "pcmReader.h"
#include "../encoder/encoderFactory.h"
#include "common/log.h"
#include "common/snapException.h"
using namespace std;
PcmReader::PcmReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : pcmListener_(pcmListener), sampleFormat_(sampleFormat), pcmReadMs_(pcmReadMs)
{
EncoderFactory encoderFactory;
encoder_.reset(encoderFactory.createEncoder(codec));
}
PcmReader::~PcmReader()
{
stop();
close(fd_);
}
msg::Header* PcmReader::getHeader()
{
return encoder_->getHeader();
}
void PcmReader::start()
{
encoder_->init(this, sampleFormat_);
active_ = true;
readerThread_ = thread(&PcmReader::worker, this);
}
void PcmReader::stop()
{
if (active_)
{
active_ = false;
readerThread_.join();
}
}
void PcmReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration)
{
// logO << "onChunkEncoded: " << duration << " us\n";
if (duration <= 0)
return;
chunk->timestamp.sec = tvEncodedChunk_.tv_sec;
chunk->timestamp.usec = tvEncodedChunk_.tv_usec;
chronos::addUs(tvEncodedChunk_, duration * 1000);
pcmListener_->onChunkRead(this, chunk, duration);
}

View file

@ -16,60 +16,60 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PIPE_READER_H
#define PIPE_READER_H
#ifndef PCM_READER_H
#define PCM_READER_H
#include <thread>
#include <atomic>
#include <string>
#include "encoder/encoder.h"
#include "../encoder/encoder.h"
#include "message/sampleFormat.h"
#include "message/header.h"
class PipeReader;
class PcmReader;
/// Callback interface for users of PipeReader
/// Callback interface for users of PcmReader
/**
* Users of PipeReader should implement this to get the data
* Users of PcmReader should implement this to get the data
*/
class PipeListener
class PcmListener
{
public:
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration) = 0;
virtual void onResync(const PipeReader* pipeReader, double ms) = 0;
virtual void onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) = 0;
virtual void onResync(const PcmReader* pcmReader, double ms) = 0;
};
/// Reads and decodes PCM data from a named pipe
/// Reads and decodes PCM data
/**
* Reads PCM from a named pipe and passes the data to an encoder.
* Reads PCM and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PipeListener
* Data is passed to the PcmListener
*/
class PipeReader : public EncoderListener
class PcmReader : public EncoderListener
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20);
virtual ~PipeReader();
/// ctor. Encoded PCM data is passed to the PcmListener
PcmReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20);
virtual ~PcmReader();
void start();
void stop();
virtual void start();
virtual void stop();
/// Implementation of EncoderListener::onChunkEncoded
virtual void onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration);
msg::Header* getHeader();
virtual msg::Header* getHeader();
protected:
void worker();
virtual void worker() = 0;
int fd_;
timeval tvEncodedChunk_;
std::atomic<bool> active_;
std::thread readerThread_;
PipeListener* pipeListener_;
PcmListener* pcmListener_;
msg::SampleFormat sampleFormat_;
size_t pcmReadMs_;
std::unique_ptr<Encoder> encoder_;

View file

@ -22,7 +22,7 @@
#include <unistd.h>
#include "pipeReader.h"
#include "encoder/encoderFactory.h"
#include "../encoder/encoderFactory.h"
#include "common/log.h"
#include "common/snapException.h"
@ -32,59 +32,18 @@ using namespace std;
PipeReader::PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : pipeListener_(pipeListener), sampleFormat_(sampleFormat), pcmReadMs_(pcmReadMs)
PipeReader::PipeReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : PcmReader(pcmListener, sampleFormat, codec, fifoName, pcmReadMs)
{
umask(0);
mkfifo(fifoName.c_str(), 0666);
fd_ = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK);
if (fd_ == -1)
throw SnapException("failed to open fifo: \"" + fifoName + "\"");
EncoderFactory encoderFactory;
encoder_.reset(encoderFactory.createEncoder(codec));
}
PipeReader::~PipeReader()
{
stop();
close(fd_);
}
msg::Header* PipeReader::getHeader()
{
return encoder_->getHeader();
}
void PipeReader::start()
{
encoder_->init(this, sampleFormat_);
active_ = true;
readerThread_ = thread(&PipeReader::worker, this);
}
void PipeReader::stop()
{
if (active_)
{
active_ = false;
readerThread_.join();
}
}
void PipeReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration)
{
// logO << "onChunkEncoded: " << duration << " us\n";
if (duration <= 0)
return;
chunk->timestamp.sec = tvEncodedChunk_.tv_sec;
chunk->timestamp.usec = tvEncodedChunk_.tv_usec;
chronos::addUs(tvEncodedChunk_, duration * 1000);
pipeListener_->onChunkRead(this, chunk, duration);
}
@ -131,7 +90,7 @@ void PipeReader::worker()
{
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
pipeListener_->onResync(this, currentTick - nextTick);
pcmListener_->onResync(this, currentTick - nextTick);
nextTick = currentTick;
}
}

View file

@ -0,0 +1,44 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PIPE_READER_H
#define PIPE_READER_H
#include "pcmReader.h"
/// Reads and decodes PCM data from a named pipe
/**
* Reads PCM from a named pipe and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class PipeReader : public PcmReader
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
PipeReader(PcmListener* pcmListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20);
virtual ~PipeReader();
protected:
void worker();
};
#endif

View file

@ -76,14 +76,14 @@ void StreamServer::send(const msg::BaseMessage* message)
}
void StreamServer::onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration)
void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration)
{
// logO << "onChunkRead " << duration << "ms\n";
send(chunk);
}
void StreamServer::onResync(const PipeReader* pipeReader, double ms)
void StreamServer::onResync(const PcmReader* pcmReader, double ms)
{
logO << "onResync " << ms << "ms\n";
}
@ -248,7 +248,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
else if (requestMsg.request == kHeader)
{
std::unique_lock<std::mutex> mlock(mutex_);
msg::Header* headerChunk = pipeReader_->getHeader();
msg::Header* headerChunk = pcmReader_->getHeader();
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
}
@ -328,8 +328,8 @@ void StreamServer::start()
controlServer_.reset(new ControlServer(io_service_, settings_.controlPort, this));
controlServer_->start();
pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs);
pipeReader_->start();
pcmReader_ .reset(new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs));
pcmReader_->start();
acceptor_ = make_shared<tcp::acceptor>(*io_service_, tcp::endpoint(tcp::v4(), settings_.port));
startAccept();
}
@ -339,7 +339,7 @@ void StreamServer::stop()
{
controlServer_->stop();
acceptor_->cancel();
pipeReader_->stop();
pcmReader_->stop();
std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
(*it)->stop();

View file

@ -28,7 +28,7 @@
#include <mutex>
#include "streamSession.h"
#include "pipeReader.h"
#include "pcmreader/pipeReader.h"
#include "common/queue.h"
#include "message/message.h"
#include "message/header.h"
@ -65,12 +65,12 @@ struct StreamServerSettings
/// Forwars PCM data to the connected clients
/**
* Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream.
* Reads PCM data using PipeReader, implements PcmListener to get the (encoded) PCM stream.
* Accepts and holds client connections (StreamSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients
*/
class StreamServer : public MessageReceiver, ControlMessageReceiver, PipeListener
class StreamServer : public MessageReceiver, ControlMessageReceiver, PcmListener
{
public:
StreamServer(boost::asio::io_service* io_service, const StreamServerSettings& streamServerSettings);
@ -89,16 +89,16 @@ public:
/// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived
virtual void onMessageReceived(ControlSession* connection, const std::string& message);
/// Implementation of PipeListener
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration);
virtual void onResync(const PipeReader* pipeReader, double ms);
/// Implementation of PcmListener
virtual void onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration);
virtual void onResync(const PcmReader* pcmReader, double ms);
private:
void startAccept();
void handleAccept(socket_ptr socket);
StreamSession* getStreamSession(const std::string& mac);
mutable std::mutex mutex_;
PipeReader* pipeReader_;
std::unique_ptr<PcmReader> pcmReader_;
std::set<std::shared_ptr<StreamSession>> sessions_;
boost::asio::io_service* io_service_;
std::shared_ptr<tcp::acceptor> acceptor_;