renamed Reader => Stream

This commit is contained in:
badaix 2016-03-22 23:14:19 +01:00
parent 1f177e5119
commit 0eaa616ab6
15 changed files with 114 additions and 114 deletions

View file

@ -0,0 +1,111 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <memory>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "fileStream.h"
#include "../encoder/encoderFactory.h"
#include "common/log.h"
#include "common/snapException.h"
using namespace std;
FileStream::FileStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri)
{
ifs.open(uri_.path.c_str(), std::ifstream::in|std::ifstream::binary);
if (!ifs.good())
{
logE << "failed to open PCM file: \"" + uri_.path + "\"\n";
throw SnapException("failed to open PCM file: \"" + uri_.path + "\"");
}
}
FileStream::~FileStream()
{
ifs.close();
}
void FileStream::worker()
{
timeval tvChunk;
std::unique_ptr<msg::PcmChunk> chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_));
ifs.seekg (0, ifs.end);
size_t length = ifs.tellg();
ifs.seekg (0, ifs.beg);
setState(kPlaying);
while (active_)
{
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
long nextTick = chronos::getTickCount();
try
{
while (active_)
{
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
size_t toRead = chunk->payloadSize;
size_t count = 0;
size_t pos = ifs.tellg();
size_t left = length - pos;
if (left < toRead)
{
ifs.read(chunk->payload, left);
ifs.seekg (0, ifs.beg);
count = left;
}
ifs.read(chunk->payload + count, toRead - count);
encoder_->encode(chunk.get());
nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount();
if (nextTick >= currentTick)
{
// logO << "sleep: " << nextTick - currentTick << "\n";
usleep((nextTick - currentTick) * 1000);
}
else
{
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
pcmListener_->onResync(this, currentTick - nextTick);
nextTick = currentTick;
}
}
}
catch(const std::exception& e)
{
logE << "Exception: " << e.what() << std::endl;
}
}
}

View file

@ -0,0 +1,45 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef FILE_READER_H
#define FILE_READER_H
#include "pcmStream.h"
#include <fstream>
/// Reads and decodes PCM data from a file
/**
* Reads PCM from a file and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class FileStream : public PcmStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
FileStream(PcmListener* pcmListener, const StreamUri& uri);
virtual ~FileStream();
protected:
void worker();
std::ifstream ifs;
};
#endif

View file

@ -0,0 +1,153 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <memory>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "../encoder/encoderFactory.h"
#include "common/snapException.h"
#include "common/compat.h"
#include "pcmStream.h"
#include "common/log.h"
using namespace std;
PcmStream::PcmStream(PcmListener* pcmListener, const StreamUri& uri) : pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20), state_(kIdle)
{
EncoderFactory encoderFactory;
if (uri_.query.find("codec") == uri_.query.end())
throw SnapException("Stream URI must have a codec");
encoder_.reset(encoderFactory.createEncoder(uri_.query["codec"]));
if (uri_.query.find("name") == uri_.query.end())
throw SnapException("Stream URI must have a name");
name_ = uri_.query["name"];
if (uri_.query.find("sampleformat") == uri_.query.end())
throw SnapException("Stream URI must have a sampleformat");
sampleFormat_ = SampleFormat(uri_.query["sampleformat"]);
logE << "PcmStream sampleFormat: " << sampleFormat_.getFormat() << "\n";
if (uri_.query.find("buffer_ms") != uri_.query.end())
pcmReadMs_ = cpt::stoul(uri_.query["buffer_ms"]);
}
PcmStream::~PcmStream()
{
stop();
}
std::shared_ptr<msg::Header> PcmStream::getHeader()
{
return encoder_->getHeader();
}
const StreamUri& PcmStream::getUri() const
{
return uri_;
}
const std::string& PcmStream::getName() const
{
return name_;
}
const SampleFormat& PcmStream::getSampleFormat() const
{
return sampleFormat_;
}
void PcmStream::start()
{
logE << "PcmStream start: " << sampleFormat_.getFormat() << "\n";
//TODO: wrong encoder settings leads to: terminate called after throwing an instance of 'std::system_error' what(): Invalid argument
encoder_->init(this, sampleFormat_);
active_ = true;
readerThread_ = thread(&PcmStream::worker, this);
}
void PcmStream::stop()
{
if (active_)
{
active_ = false;
readerThread_.join();
}
}
ReaderState PcmStream::getState() const
{
return state_;
}
void PcmStream::setState(const ReaderState& newState)
{
if (newState != state_)
{
state_ = newState;
pcmListener_->onStateChanged(this, newState);
}
}
void PcmStream::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration)
{
// logO << "onChunkEncoded: " << duration << " us\n";
if (duration <= 0)
return;
chunk->timestamp.sec = tvEncodedChunk_.tv_sec;
chunk->timestamp.usec = tvEncodedChunk_.tv_usec;
chronos::addUs(tvEncodedChunk_, duration * 1000);
pcmListener_->onChunkRead(this, chunk, duration);
}
json PcmStream::toJson() const
{
string state("unknown");
if (state_ == kIdle)
state = "idle";
else if (state_ == kPlaying)
state = "playing";
else if (state_ == kDisabled)
state = "disabled";
json j = {
{"uri", uri_.toJson()},
{"id", uri_.id()},
{"status", state}
};
return j;
}

View file

@ -0,0 +1,102 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PCM_READER_H
#define PCM_READER_H
#include <thread>
#include <atomic>
#include <string>
#include <map>
#include "streamUri.h"
#include "../encoder/encoder.h"
#include "../json/json.hpp"
#include "message/sampleFormat.h"
#include "message/header.h"
class PcmStream;
enum ReaderState
{
kUnknown = 0,
kIdle = 1,
kPlaying = 2,
kDisabled = 3
};
/// Callback interface for users of PcmStream
/**
* Users of PcmStream should implement this to get the data
*/
class PcmListener
{
public:
virtual void onStateChanged(const PcmStream* pcmStream, const ReaderState& state) = 0;
virtual void onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk* chunk, double duration) = 0;
virtual void onResync(const PcmStream* pcmStream, double ms) = 0;
};
/// Reads and decodes PCM data
/**
* Reads PCM and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class PcmStream : public EncoderListener
{
public:
/// ctor. Encoded PCM data is passed to the PcmListener
PcmStream(PcmListener* pcmListener, const StreamUri& uri);
virtual ~PcmStream();
virtual void start();
virtual void stop();
/// Implementation of EncoderListener::onChunkEncoded
virtual void onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration);
virtual std::shared_ptr<msg::Header> getHeader();
virtual const StreamUri& getUri() const;
virtual const std::string& getName() const;
virtual const SampleFormat& getSampleFormat() const;
virtual ReaderState getState() const;
virtual json toJson() const;
protected:
virtual void worker() = 0;
void setState(const ReaderState& newState);
timeval tvEncodedChunk_;
std::atomic<bool> active_;
std::thread readerThread_;
PcmListener* pcmListener_;
StreamUri uri_;
SampleFormat sampleFormat_;
size_t pcmReadMs_;
std::unique_ptr<Encoder> encoder_;
std::string name_;
ReaderState state_;
};
#endif

View file

@ -0,0 +1,116 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <memory>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "pipeStream.h"
#include "../encoder/encoderFactory.h"
#include "common/log.h"
#include "common/snapException.h"
#include "common/compat.h"
using namespace std;
PipeStream::PipeStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri), fd_(-1)
{
umask(0);
if ((mkfifo(uri_.path.c_str(), 0666) != 0) && (errno != EEXIST))
throw SnapException("failed to make fifo \"" + uri_.path + "\": " + cpt::to_string(errno));
}
PipeStream::~PipeStream()
{
if (fd_ != -1)
close(fd_);
}
void PipeStream::worker()
{
timeval tvChunk;
std::unique_ptr<msg::PcmChunk> chunk(new msg::PcmChunk(sampleFormat_, pcmReadMs_));
while (active_)
{
if (fd_ != -1)
close(fd_);
fd_ = open(uri_.path.c_str(), O_RDONLY | O_NONBLOCK);
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
long nextTick = chronos::getTickCount();
try
{
if (fd_ == -1)
throw SnapException("failed to open fifo: \"" + uri_.path + "\"");
while (active_)
{
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
int toRead = chunk->payloadSize;
int len = 0;
do
{
int count = read(fd_, chunk->payload + len, toRead - len);
if (count < 0)
{
setState(kIdle);
usleep(100*1000);
}
else if (count == 0)
throw SnapException("end of file");
else
len += count;
}
while ((len < toRead) && active_);
encoder_->encode(chunk.get());
nextTick += pcmReadMs_;
chronos::addUs(tvChunk, pcmReadMs_ * 1000);
long currentTick = chronos::getTickCount();
if (nextTick >= currentTick)
{
// logO << "sleep: " << nextTick - currentTick << "\n";
setState(kPlaying);
usleep((nextTick - currentTick) * 1000);
}
else
{
gettimeofday(&tvChunk, NULL);
tvEncodedChunk_ = tvChunk;
pcmListener_->onResync(this, currentTick - nextTick);
nextTick = currentTick;
}
}
}
catch(const std::exception& e)
{
logE << "Exception: " << e.what() << std::endl;
usleep(100*1000);
}
}
}

View file

@ -0,0 +1,45 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef PIPE_STREAM_H
#define PIPE_STREAM_H
#include "pcmStream.h"
/// Reads and decodes PCM data from a named pipe
/**
* Reads PCM from a named pipe and passes the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PcmListener
*/
class PipeStream : public PcmStream
{
public:
/// ctor. Encoded PCM data is passed to the PipeListener
PipeStream(PcmListener* pcmListener, const StreamUri& uri);
virtual ~PipeStream();
protected:
void worker();
int fd_;
};
#endif

View file

@ -0,0 +1,121 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "common/utils.h"
#include "streamManager.h"
#include "pipeStream.h"
#include "fileStream.h"
#include "common/log.h"
#include "common/snapException.h"
using namespace std;
StreamManager::StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs) : pcmListener_(pcmListener), sampleFormat_(defaultSampleFormat), codec_(defaultCodec), readBufferMs_(defaultReadBufferMs)
{
}
PcmStream* StreamManager::addStream(const std::string& uri)
{
StreamUri streamUri(uri);
if (streamUri.query.find("sampleformat") == streamUri.query.end())
streamUri.query["sampleformat"] = sampleFormat_;
if (streamUri.query.find("codec") == streamUri.query.end())
streamUri.query["codec"] = codec_;
if (streamUri.query.find("buffer_ms") == streamUri.query.end())
streamUri.query["buffer_ms"] = to_string(readBufferMs_);
// logE << "\nURI: " << streamUri.uri << "\nscheme: " << streamUri.scheme << "\nhost: "
// << streamUri.host << "\npath: " << streamUri.path << "\nfragment: " << streamUri.fragment << "\n";
// for (auto kv: streamUri.query)
// logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n";
if (streamUri.scheme == "pipe")
{
streams_.push_back(make_shared<PipeStream>(pcmListener_, streamUri));
return streams_.back().get();
}
else if (streamUri.scheme == "file")
{
streams_.push_back(make_shared<FileStream>(pcmListener_, streamUri));
return streams_.back().get();
}
else
{
throw SnapException("Unknown stream type: " + streamUri.scheme);
}
return NULL;
}
const std::vector<PcmStreamPtr>& StreamManager::getStreams()
{
return streams_;
}
const PcmStreamPtr StreamManager::getDefaultStream()
{
if (streams_.empty())
return nullptr;
return streams_.front();
}
const PcmStreamPtr StreamManager::getStream(const std::string& id)
{
for (auto stream: streams_)
{
if (stream->getUri().id() == id)
return stream;
}
return nullptr;
}
void StreamManager::start()
{
for (auto stream: streams_)
stream->start();
}
void StreamManager::stop()
{
for (auto stream: streams_)
stream->stop();
}
json StreamManager::toJson() const
{
json result = json::array();
for (auto stream: streams_)
result.push_back(stream->toJson());
return result;
}

View file

@ -0,0 +1,33 @@
#ifndef PCM_READER_FACTORY_H
#define PCM_READER_FACTORY_H
#include <string>
#include <vector>
#include <memory>
#include "pcmStream.h"
typedef std::shared_ptr<PcmStream> PcmStreamPtr;
class StreamManager
{
public:
StreamManager(PcmListener* pcmListener, const std::string& defaultSampleFormat, const std::string& defaultCodec, size_t defaultReadBufferMs = 20);
PcmStream* addStream(const std::string& uri);
void start();
void stop();
const std::vector<PcmStreamPtr>& getStreams();
const PcmStreamPtr getDefaultStream();
const PcmStreamPtr getStream(const std::string& id);
json toJson() const;
private:
std::vector<PcmStreamPtr> streams_;
PcmListener* pcmListener_;
std::string sampleFormat_;
std::string codec_;
size_t readBufferMs_;
};
#endif

View file

@ -0,0 +1,126 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <common/utils.h>
#include <common/compat.h>
#include <common/log.h>
#include "streamUri.h"
using namespace std;
StreamUri::StreamUri(const std::string& streamUri)
{
// https://en.wikipedia.org/wiki/Uniform_Resource_Identifier
// scheme:[//[user:password@]host[:port]][/]path[?query][#fragment]
// would be more elegant with regex. Not yet supported on my dev machine's gcc 4.8 :(
logE << "StreamUri: " << streamUri << "\n";
size_t pos;
uri = trim_copy(streamUri);
while (!uri.empty() && ((uri[0] == '\'') || (uri[0] == '"')))
uri = uri.substr(1);
while (!uri.empty() && ((uri[uri.length()-1] == '\'') || (uri[uri.length()-1] == '"')))
uri = uri.substr(0, this->uri.length()-1);
string decodedUri = uriDecode(uri);
logD << "StreamUri: " << decodedUri << "\n";
id_ = decodedUri;
pos = id_.find('?');
if (pos != string::npos)
id_ = id_.substr(0, pos);
pos = id_.find('#');
if (pos != string::npos)
id_ = id_.substr(0, pos);
logD << "id: '" << id_ << "'\n";
string tmp(decodedUri);
pos = tmp.find(':');
if (pos == string::npos)
throw invalid_argument("missing ':'");
scheme = trim_copy(tmp.substr(0, pos));
tmp = tmp.substr(pos + 1);
logD << "scheme: '" << scheme << "' tmp: '" << tmp << "'\n";
if (tmp.find("//") != 0)
throw invalid_argument("missing host separator: '//'");
tmp = tmp.substr(2);
pos = tmp.find('/');
if (pos == string::npos)
throw invalid_argument("missing path separator: '/'");
host = trim_copy(tmp.substr(0, pos));
tmp = tmp.substr(pos);
path = tmp;
logD << "host: '" << host << "' tmp: '" << tmp << "' path: '" << path << "'\n";
pos = tmp.find('?');
if (pos == string::npos)
return;
path = trim_copy(tmp.substr(0, pos));
tmp = tmp.substr(pos + 1);
string queryStr = tmp;
logD << "path: '" << path << "' tmp: '" << tmp << "' query: '" << queryStr << "'\n";
pos = tmp.find('#');
if (pos != string::npos)
{
queryStr = tmp.substr(0, pos);
tmp = tmp.substr(pos + 1);
fragment = trim_copy(tmp);
logD << "query: '" << queryStr << "' fragment: '" << fragment << "' tmp: '" << tmp << "'\n";
}
vector<string> keyValueList = split(queryStr, '&');
for (auto& kv: keyValueList)
{
pos = kv.find('=');
if (pos != string::npos)
{
string key = trim_copy(kv.substr(0, pos));
string value = trim_copy(kv.substr(pos+1));
query[key] = value;
if (key == "id")
id_ = value;
}
}
}
json StreamUri::toJson() const
{
json j = {
{"raw", uri},
{"scheme", scheme},
{"host", host},
{"path", path},
{"fragment", fragment},
{"query", query}
};
return j;
}
std::string StreamUri::id() const
{
return id_;
}

View file

@ -0,0 +1,56 @@
/***
This file is part of snapcast
Copyright (C) 2014-2016 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef READER_URI_H
#define READER_URI_H
#include <string>
#include <map>
#include "../json/json.hpp"
using json = nlohmann::json;
struct StreamUri
{
StreamUri(const std::string& uri);
std::string uri;
std::string scheme;
/* struct Authority
{
std::string username;
std::string password;
std::string host;
size_t port;
};
Authority authority;
*/
std::string host;
std::string path;
std::map<std::string, std::string> query;
std::string fragment;
std::string id() const;
json toJson() const;
private:
std::string id_;
};
#endif