added StreamManager

This commit is contained in:
badaix 2016-01-24 22:45:02 +01:00
parent f89ae0d501
commit 7bec5b8744
7 changed files with 88 additions and 33 deletions

View file

@ -13,7 +13,7 @@ CXX = /usr/bin/g++
CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DASIO_STANDALONE -DVERSION=\"$(VERSION)\" -I.. -I../externals/asio/asio/include -I../externals/popl/include CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DASIO_STANDALONE -DVERSION=\"$(VERSION)\" -I.. -I../externals/asio/asio/include -I../externals/popl/include
LDFLAGS = -lrt -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common LDFLAGS = -lrt -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common
OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o pcmreader/pcmReaderFactory.o pcmreader/pcmReader.o pcmreader/pipeReader.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o pcmreader/streamManager.o pcmreader/pcmReader.o pcmreader/pipeReader.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o
BIN = snapserver BIN = snapserver
all: $(TARGET) all: $(TARGET)

View file

@ -1,14 +0,0 @@
#ifndef PCM_READER_FACTORY_H
#define PCM_READER_FACTORY_H
#include <string>
#include "pcmReader.h"
class PcmReaderFactory
{
public:
static PcmReader* createPcmReader(PcmListener* pcmListener, const std::string& uri, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs = 20);
};
#endif

View file

@ -17,7 +17,7 @@
***/ ***/
#include "common/utils.h" #include "common/utils.h"
#include "pcmReaderFactory.h" #include "streamManager.h"
#include "pipeReader.h" #include "pipeReader.h"
#include "common/log.h" #include "common/log.h"
@ -25,15 +25,23 @@
using namespace std; using namespace std;
PcmReader* PcmReaderFactory::createPcmReader(PcmListener* pcmListener, const std::string& uri, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs) StreamManager::StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs) : pcmListener_(pcmListener), sampleFormat_(defaultSampleFormat), codec_(defaultCodec), readBufferMs_(defaultReadBufferMs)
{
}
PcmReader* StreamManager::addStream(const std::string& uri)
{ {
ReaderUri readerUri(uri); ReaderUri readerUri(uri);
if (readerUri.query.find("sampleformat") == readerUri.query.end()) if (readerUri.query.find("sampleformat") == readerUri.query.end())
readerUri.query["sampleformat"] = defaultSampleFormat; readerUri.query["sampleformat"] = sampleFormat_;
if (readerUri.query.find("codec") == readerUri.query.end()) if (readerUri.query.find("codec") == readerUri.query.end())
readerUri.query["codec"] = defaultCodec; readerUri.query["codec"] = codec_;
if (readerUri.query.find("buffer_ms") == readerUri.query.end())
readerUri.query["buffer_ms"] = to_string(readBufferMs_);
logE << "\nURI: " << readerUri.uri << "\nscheme: " << readerUri.scheme << "\nhost: " logE << "\nURI: " << readerUri.uri << "\nscheme: " << readerUri.scheme << "\nhost: "
<< readerUri.host << "\npath: " << readerUri.path << "\nfragment: " << readerUri.fragment << "\n"; << readerUri.host << "\npath: " << readerUri.path << "\nfragment: " << readerUri.fragment << "\n";
@ -42,10 +50,42 @@ PcmReader* PcmReaderFactory::createPcmReader(PcmListener* pcmListener, const std
logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n"; logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n";
if (readerUri.scheme == "pipe") if (readerUri.scheme == "pipe")
return new PipeReader(pcmListener, readerUri);//, sampleFormat, codec, pcmReadMs); {
streams_.push_back(make_shared<PipeReader>(pcmListener_, readerUri));//, sampleFormat, codec, pcmReadMs);
return streams_.back().get();
}
return NULL; return NULL;
} }
const std::vector<PcmReaderPtr>& StreamManager::getStreams()
{
return streams_;
}
const PcmReaderPtr StreamManager::getDefaultStream()
{
if (streams_.empty())
return nullptr;
return streams_.front();
}
void StreamManager::start()
{
for (auto stream: streams_)
stream->start();
}
void StreamManager::stop()
{
for (auto stream: streams_)
stream->stop();
}

View file

@ -0,0 +1,31 @@
#ifndef PCM_READER_FACTORY_H
#define PCM_READER_FACTORY_H
#include <string>
#include <vector>
#include <memory>
#include "pcmReader.h"
typedef std::shared_ptr<PcmReader> PcmReaderPtr;
class StreamManager
{
public:
StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs = 20);
PcmReader* addStream(const std::string& uri);
void start();
void stop();
const std::vector<PcmReaderPtr>& getStreams();
const PcmReaderPtr getDefaultStream();
private:
std::vector<PcmReaderPtr> streams_;
PcmListener* pcmListener_;
std::string sampleFormat_;
std::string codec_;
size_t readBufferMs_;
};
#endif

View file

@ -47,7 +47,7 @@ int main(int argc, char* argv[])
{ {
StreamServerSettings settings; StreamServerSettings settings;
std::string pcmStream = "pipe:///tmp/snapfifo?name=default"; std::string pcmStream = "pipe:///tmp/snapfifo?name=default";
int processPriority(-3); int processPriority(0);
Switch helpSwitch("h", "help", "Produce help message"); Switch helpSwitch("h", "help", "Produce help message");
Switch versionSwitch("v", "version", "Show version number"); Switch versionSwitch("v", "version", "Show version number");

View file

@ -77,7 +77,7 @@ void StreamServer::send(const msg::BaseMessage* message)
void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration)
{ {
logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n"; logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n";
bool isDefaultStream(pcmReader == pcmReader_.front().get()); bool isDefaultStream(pcmReader == streamManager_->getDefaultStream().get());
std::shared_ptr<const msg::BaseMessage> shared_message(chunk); std::shared_ptr<const msg::BaseMessage> shared_message(chunk);
for (auto s : sessions_) for (auto s : sessions_)
@ -85,7 +85,6 @@ void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk*
if (isDefaultStream)//->getName() == "default") if (isDefaultStream)//->getName() == "default")
s->add(shared_message); s->add(shared_message);
} }
// send(chunk);
} }
@ -273,7 +272,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
//TODO: use the correct stream //TODO: use the correct stream
msg::Header* headerChunk = pcmReader_.front()->getHeader(); msg::Header* headerChunk = streamManager_->getDefaultStream()->getHeader();
headerChunk->refersTo = requestMsg.id; headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk); connection->send(headerChunk);
} }
@ -341,12 +340,11 @@ void StreamServer::start()
controlServer_.reset(new ControlServer(io_service_, settings_.controlPort, this)); controlServer_.reset(new ControlServer(io_service_, settings_.controlPort, this));
controlServer_->start(); controlServer_->start();
streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs));
for (auto& streamUri: settings_.pcmStreams) for (auto& streamUri: settings_.pcmStreams)
{ streamManager_->addStream(streamUri);
shared_ptr<PcmReader> reader(PcmReaderFactory::createPcmReader(this, streamUri, settings_.sampleFormat, settings_.codec, settings_.streamReadMs));
pcmReader_.push_back(reader); streamManager_->start();
pcmReader_.back()->start();
}
acceptor_ = make_shared<tcp::acceptor>(*io_service_, tcp::endpoint(tcp::v4(), settings_.port)); acceptor_ = make_shared<tcp::acceptor>(*io_service_, tcp::endpoint(tcp::v4(), settings_.port));
startAccept(); startAccept();
@ -357,8 +355,8 @@ void StreamServer::stop()
{ {
controlServer_->stop(); controlServer_->stop();
acceptor_->cancel(); acceptor_->cancel();
for (auto pcmReader: pcmReader_)
pcmReader->stop(); streamManager_->stop();
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it) for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it)

View file

@ -28,7 +28,7 @@
#include <mutex> #include <mutex>
#include "streamSession.h" #include "streamSession.h"
#include "pcmreader/pcmReaderFactory.h" #include "pcmreader/streamManager.h"
#include "common/queue.h" #include "common/queue.h"
#include "message/message.h" #include "message/message.h"
#include "message/header.h" #include "message/header.h"
@ -97,7 +97,6 @@ private:
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
StreamSession* getStreamSession(const std::string& mac); StreamSession* getStreamSession(const std::string& mac);
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::vector<std::shared_ptr<PcmReader>> pcmReader_;
std::set<std::shared_ptr<StreamSession>> sessions_; std::set<std::shared_ptr<StreamSession>> sessions_;
asio::io_service* io_service_; asio::io_service* io_service_;
std::shared_ptr<tcp::acceptor> acceptor_; std::shared_ptr<tcp::acceptor> acceptor_;
@ -105,6 +104,7 @@ private:
StreamServerSettings settings_; StreamServerSettings settings_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_; Queue<std::shared_ptr<msg::BaseMessage>> messages_;
std::unique_ptr<ControlServer> controlServer_; std::unique_ptr<ControlServer> controlServer_;
std::unique_ptr<StreamManager> streamManager_;
}; };