Pass io_context to stream readers

This commit is contained in:
badaix 2019-11-23 12:09:23 +01:00
parent 7a58526736
commit 6e138ff49f
15 changed files with 30 additions and 22 deletions

View file

@ -801,7 +801,7 @@ void StreamServer::start()
controlServer_.reset(new ControlServer(io_context_, settings_.tcp, settings_.http, this)); controlServer_.reset(new ControlServer(io_context_, settings_.tcp, settings_.http, this));
controlServer_->start(); controlServer_->start();
streamManager_.reset(new StreamManager(this, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamReadMs)); streamManager_.reset(new StreamManager(this, io_context_, settings_.stream.sampleFormat, settings_.stream.codec, settings_.stream.streamReadMs));
// throw SnapException("xxx"); // throw SnapException("xxx");
for (const auto& streamUri : settings_.stream.pcmStreams) for (const auto& streamUri : settings_.stream.pcmStreams)
{ {

View file

@ -41,7 +41,7 @@ static string hex2str(string input)
* move to Makefile? * move to Makefile?
*/ */
AirplayStream::AirplayStream(PcmListener* pcmListener, const StreamUri& uri) : ProcessStream(pcmListener, uri), port_(5000) AirplayStream::AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : ProcessStream(pcmListener, ioc, uri), port_(5000)
{ {
logStderr_ = true; logStderr_ = true;

View file

@ -56,7 +56,7 @@ class AirplayStream : public ProcessStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
AirplayStream(PcmListener* pcmListener, const StreamUri& uri); AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~AirplayStream() override; ~AirplayStream() override;
protected: protected:

View file

@ -30,7 +30,7 @@ using namespace std;
FileStream::FileStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri) FileStream::FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri)
{ {
ifs.open(uri_.path.c_str(), std::ifstream::in | std::ifstream::binary); ifs.open(uri_.path.c_str(), std::ifstream::in | std::ifstream::binary);
if (!ifs.good()) if (!ifs.good())

View file

@ -33,7 +33,7 @@ class FileStream : public PcmStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
FileStream(PcmListener* pcmListener, const StreamUri& uri); FileStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~FileStream() override; ~FileStream() override;
protected: protected:

View file

@ -28,7 +28,7 @@ using namespace std;
LibrespotStream::LibrespotStream(PcmListener* pcmListener, const StreamUri& uri) : ProcessStream(pcmListener, uri) LibrespotStream::LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : ProcessStream(pcmListener, ioc, uri)
{ {
sampleFormat_ = SampleFormat("44100:16:2"); sampleFormat_ = SampleFormat("44100:16:2");
uri_.query["sampleformat"] = sampleFormat_.getFormat(); uri_.query["sampleformat"] = sampleFormat_.getFormat();

View file

@ -35,7 +35,7 @@ class LibrespotStream : public ProcessStream, WatchdogListener
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
LibrespotStream(PcmListener* pcmListener, const StreamUri& uri); LibrespotStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~LibrespotStream() override; ~LibrespotStream() override;
protected: protected:

View file

@ -31,7 +31,8 @@ using namespace std;
PcmStream::PcmStream(PcmListener* pcmListener, const StreamUri& uri) : active_(false), pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20), state_(kIdle) PcmStream::PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: active_(false), pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20), state_(kIdle), ioc_(ioc)
{ {
encoder::EncoderFactory encoderFactory; encoder::EncoderFactory encoderFactory;
if (uri_.query.find("codec") == uri_.query.end()) if (uri_.query.find("codec") == uri_.query.end())
@ -169,7 +170,9 @@ json PcmStream::toJson() const
state = "disabled"; state = "disabled";
json j = { json j = {
{"uri", uri_.toJson()}, {"id", getId()}, {"status", state}, {"uri", uri_.toJson()},
{"id", getId()},
{"status", state},
}; };
if (meta_) if (meta_)

View file

@ -26,6 +26,7 @@
#include "message/stream_tags.hpp" #include "message/stream_tags.hpp"
#include "stream_uri.hpp" #include "stream_uri.hpp"
#include <atomic> #include <atomic>
#include <boost/asio/io_context.hpp>
#include <condition_variable> #include <condition_variable>
#include <map> #include <map>
#include <mutex> #include <mutex>
@ -68,7 +69,7 @@ class PcmStream : public encoder::EncoderListener
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PcmListener /// ctor. Encoded PCM data is passed to the PcmListener
PcmStream(PcmListener* pcmListener, const StreamUri& uri); PcmStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
virtual ~PcmStream(); virtual ~PcmStream();
virtual void start(); virtual void start();
@ -110,6 +111,7 @@ protected:
std::string name_; std::string name_;
ReaderState state_; ReaderState state_;
std::shared_ptr<msg::StreamTags> meta_; std::shared_ptr<msg::StreamTags> meta_;
boost::asio::io_context& ioc_;
}; };

View file

@ -33,7 +33,7 @@ using namespace std;
PipeStream::PipeStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri), fd_(-1) PipeStream::PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : PcmStream(pcmListener, ioc, uri), fd_(-1)
{ {
umask(0); umask(0);
string mode = uri_.getQuery("mode", "create"); string mode = uri_.getQuery("mode", "create");

View file

@ -33,7 +33,7 @@ class PipeStream : public PcmStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
PipeStream(PcmListener* pcmListener, const StreamUri& uri); PipeStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~PipeStream() override; ~PipeStream() override;
protected: protected:

View file

@ -31,7 +31,8 @@ using namespace std;
ProcessStream::ProcessStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri), path_(""), process_(nullptr) ProcessStream::ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: PcmStream(pcmListener, ioc, uri), path_(""), process_(nullptr)
{ {
params_ = uri_.getQuery("params"); params_ = uri_.getQuery("params");
logStderr_ = (uri_.getQuery("logStderr", "false") == "true"); logStderr_ = (uri_.getQuery("logStderr", "false") == "true");

View file

@ -36,7 +36,7 @@ class ProcessStream : public PcmStream
{ {
public: public:
/// ctor. Encoded PCM data is passed to the PipeListener /// ctor. Encoded PCM data is passed to the PipeListener
ProcessStream(PcmListener* pcmListener, const StreamUri& uri); ProcessStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri);
~ProcessStream() override; ~ProcessStream() override;
void start() override; void start() override;

View file

@ -31,8 +31,8 @@
using namespace std; using namespace std;
StreamManager::StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs) StreamManager::StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs)
: pcmListener_(pcmListener), sampleFormat_(defaultSampleFormat), codec_(defaultCodec), readBufferMs_(defaultReadBufferMs) : pcmListener_(pcmListener), sampleFormat_(defaultSampleFormat), codec_(defaultCodec), readBufferMs_(defaultReadBufferMs), ioc_(ioc)
{ {
} }
@ -59,23 +59,23 @@ PcmStreamPtr StreamManager::addStream(const std::string& uri)
if (streamUri.scheme == "pipe") if (streamUri.scheme == "pipe")
{ {
stream = make_shared<PipeStream>(pcmListener_, streamUri); stream = make_shared<PipeStream>(pcmListener_, ioc_, streamUri);
} }
else if (streamUri.scheme == "file") else if (streamUri.scheme == "file")
{ {
stream = make_shared<FileStream>(pcmListener_, streamUri); stream = make_shared<FileStream>(pcmListener_, ioc_, streamUri);
} }
else if (streamUri.scheme == "process") else if (streamUri.scheme == "process")
{ {
stream = make_shared<ProcessStream>(pcmListener_, streamUri); stream = make_shared<ProcessStream>(pcmListener_, ioc_, streamUri);
} }
else if ((streamUri.scheme == "spotify") || (streamUri.scheme == "librespot")) else if ((streamUri.scheme == "spotify") || (streamUri.scheme == "librespot"))
{ {
stream = make_shared<LibrespotStream>(pcmListener_, streamUri); stream = make_shared<LibrespotStream>(pcmListener_, ioc_, streamUri);
} }
else if (streamUri.scheme == "airplay") else if (streamUri.scheme == "airplay")
{ {
stream = make_shared<AirplayStream>(pcmListener_, streamUri); stream = make_shared<AirplayStream>(pcmListener_, ioc_, streamUri);
} }
else else
{ {

View file

@ -5,13 +5,14 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include <boost/asio/io_context.hpp>
typedef std::shared_ptr<PcmStream> PcmStreamPtr; typedef std::shared_ptr<PcmStream> PcmStreamPtr;
class StreamManager class StreamManager
{ {
public: public:
StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs = 20); StreamManager(PcmListener* pcmListener, boost::asio::io_context& ioc, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs = 20);
PcmStreamPtr addStream(const std::string& uri); PcmStreamPtr addStream(const std::string& uri);
void removeStream(const std::string& name); void removeStream(const std::string& name);
@ -28,6 +29,7 @@ private:
std::string sampleFormat_; std::string sampleFormat_;
std::string codec_; std::string codec_;
size_t readBufferMs_; size_t readBufferMs_;
boost::asio::io_context& ioc_;
}; };