configurable pipe read buffer

This commit is contained in:
badaix 2015-08-15 11:22:41 +02:00
parent 2756ec4897
commit 239e93de43
8 changed files with 39 additions and 16 deletions

View file

@ -23,8 +23,11 @@
#include "sampleFormat.h"
#include "common/log.h"
#include <sstream>
using namespace std;
namespace msg
{
@ -45,6 +48,14 @@ SampleFormat::SampleFormat(uint32_t sampleRate, uint16_t bitsPerSample, uint16_t
}
string SampleFormat::getFormat() const
{
stringstream ss;
ss << rate << ":" << bits << ":" << channels;
return ss.str();
}
void SampleFormat::setFormat(const std::string& format)
{
std::vector<std::string> strs;

View file

@ -43,6 +43,8 @@ public:
SampleFormat(const std::string& format);
SampleFormat(uint32_t rate, uint16_t bits, uint16_t channels);
std::string getFormat() const;
void setFormat(const std::string& format);
void setFormat(uint32_t rate, uint16_t bits, uint16_t channels);

View file

@ -62,9 +62,9 @@ void ControlServer::send(const msg::BaseMessage* message)
}
void ControlServer::onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk)
void ControlServer::onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration)
{
// logO << "onChunkRead " << chunk->duration<chronos::msec>().count() << "ms\n";
// logO << "onChunkRead " << duration << "ms\n";
send(chunk);
}
@ -155,7 +155,7 @@ void ControlServer::handleAccept(socket_ptr socket)
void ControlServer::start()
{
pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName);
pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs);
pipeReader_->start();
acceptor_ = make_shared<tcp::acceptor>(io_service_, tcp::endpoint(tcp::v4(), settings_.port));
startAccept();

View file

@ -42,11 +42,21 @@ typedef std::shared_ptr<tcp::socket> socket_ptr;
struct ControlServerSettings
{
ControlServerSettings() :
port(98765),
fifoName("/tmp/snapfifo"),
codec("flac"),
bufferMs(1000),
sampleFormat("44100:16:2"),
pipeReadMs(20)
{
}
size_t port;
std::string fifoName;
std::string codec;
int32_t bufferMs;
msg::SampleFormat sampleFormat;
size_t pipeReadMs;
};
@ -73,7 +83,7 @@ public:
virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer);
/// Implementation of PipeListener
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk);
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration);
virtual void onResync(const PipeReader* pipeReader, double ms);
private:

View file

@ -58,7 +58,7 @@ std::string FlacEncoder::getAvailableOptions() const
std::string FlacEncoder::getDefaultOptions() const
{
return "2";
return "5";
}

View file

@ -32,14 +32,13 @@ using namespace std;
PipeReader::PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName) : pipeListener_(pipeListener), sampleFormat_(sampleFormat)
PipeReader::PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs) : pipeListener_(pipeListener), sampleFormat_(sampleFormat), pcmReadMs_(pcmReadMs)
{
umask(0);
mkfifo(fifoName.c_str(), 0666);
fd_ = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK);
if (fd_ == -1)
throw SnapException("failed to open fifo: \"" + fifoName + "\"");
pcmReadMs_ = 20;
EncoderFactory encoderFactory;
encoder_.reset(encoderFactory.createEncoder(codec));
}
@ -85,7 +84,7 @@ void PipeReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, do
chunk->timestamp.sec = tvEncodedChunk_.tv_sec;
chunk->timestamp.usec = tvEncodedChunk_.tv_usec;
chronos::addUs(tvEncodedChunk_, duration * 1000);
pipeListener_->onChunkRead(this, chunk);
pipeListener_->onChunkRead(this, chunk, duration);
}

View file

@ -37,7 +37,7 @@ class PipeReader;
class PipeListener
{
public:
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk) = 0;
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration) = 0;
virtual void onResync(const PipeReader* pipeReader, double ms) = 0;
};
@ -53,7 +53,7 @@ class PipeReader : public EncoderListener
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName);
PipeReader(PipeListener* pipeListener, const msg::SampleFormat& sampleFormat, const std::string& codec, const std::string& fifoName, size_t pcmReadMs = 20);
virtual ~PipeReader();
void start();
@ -66,12 +66,12 @@ public:
protected:
void worker();
int fd_;
size_t pcmReadMs_;
timeval tvEncodedChunk_;
std::atomic<bool> active_;
std::thread readerThread_;
PipeListener* pipeListener_;
msg::SampleFormat sampleFormat_;
size_t pcmReadMs_;
std::unique_ptr<Encoder> encoder_;
};

View file

@ -54,12 +54,13 @@ int main(int argc, char* argv[])
desc.add_options()
("help,h", "produce help message")
("version,v", "show version number")
("port,p", po::value<size_t>(&settings.port)->default_value(98765), "server port")
("sampleformat,s", po::value<string>(&sampleFormat)->default_value("44100:16:2"), "sample format")
("codec,c", po::value<string>(&settings.codec)->default_value("flac"), "transport codec [flac|ogg|pcm][:options]. Type codec:? to get codec specific options")
("fifo,f", po::value<string>(&settings.fifoName)->default_value("/tmp/snapfifo"), "name of the input fifo file")
("port,p", po::value<size_t>(&settings.port)->default_value(settings.port), "server port")
("sampleformat,s", po::value<string>(&sampleFormat)->default_value(settings.sampleFormat.getFormat()), "sample format")
("codec,c", po::value<string>(&settings.codec)->default_value(settings.codec), "transport codec [flac|ogg|pcm][:options]. Type codec:? to get codec specific options")
("fifo,f", po::value<string>(&settings.fifoName)->default_value(settings.fifoName), "name of the input fifo file")
("daemon,d", po::value<int>(&runAsDaemon)->implicit_value(-3), "daemonize, optional process priority [-20..19]")
("buffer,b", po::value<int32_t>(&settings.bufferMs)->default_value(1000), "buffer [ms]")
("buffer,b", po::value<int32_t>(&settings.bufferMs)->default_value(settings.bufferMs), "buffer [ms]")
("pipeReadBuffer", po::value<size_t>(&settings.pipeReadMs)->default_value(settings.pipeReadMs), "pipe read buffer [ms]")
;
po::variables_map vm;