From 94629f9909dc3167d232e90139f276f8253e4c99 Mon Sep 17 00:00:00 2001 From: badaix Date: Sun, 6 Mar 2016 22:30:23 +0100 Subject: [PATCH] report stream state --- server/config.cpp | 2 +- server/pcmreader/fileReader.cpp | 2 ++ server/pcmreader/pcmReader.cpp | 37 +++++++++++++++++++++++++++++- server/pcmreader/pcmReader.h | 16 +++++++++++++ server/pcmreader/pipeReader.cpp | 4 ++++ server/pcmreader/readerUri.cpp | 16 ++++++------- server/pcmreader/streamManager.cpp | 2 +- server/streamServer.cpp | 11 ++++++++- server/streamServer.h | 1 + 9 files changed, 79 insertions(+), 12 deletions(-) diff --git a/server/config.cpp b/server/config.cpp index 377b1736..20c530ec 100644 --- a/server/config.cpp +++ b/server/config.cpp @@ -55,7 +55,7 @@ Config::Config() client->fromJson(*it); client->connected = false; clients.push_back(client); - std::cout << "Client:\n" << std::setw(4) << client->toJson() << '\n'; +// logO << "Client:\n" << std::setw(4) << client->toJson() << '\n'; } } } diff --git a/server/pcmreader/fileReader.cpp b/server/pcmreader/fileReader.cpp index ea5cf9ac..b84e3092 100644 --- a/server/pcmreader/fileReader.cpp +++ b/server/pcmreader/fileReader.cpp @@ -58,6 +58,8 @@ void FileReader::worker() size_t length = ifs.tellg(); ifs.seekg (0, ifs.beg); + setState(kPlaying); + while (active_) { gettimeofday(&tvChunk, NULL); diff --git a/server/pcmreader/pcmReader.cpp b/server/pcmreader/pcmReader.cpp index c1c56073..93c5179f 100644 --- a/server/pcmreader/pcmReader.cpp +++ b/server/pcmreader/pcmReader.cpp @@ -32,7 +32,7 @@ using namespace std; -PcmReader::PcmReader(PcmListener* pcmListener, const ReaderUri& uri) : pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20) +PcmReader::PcmReader(PcmListener* pcmListener, const ReaderUri& uri) : pcmListener_(pcmListener), uri_(uri), pcmReadMs_(20), state_(kIdle) { EncoderFactory encoderFactory; if (uri_.query.find("codec") == uri_.query.end()) @@ -104,6 +104,22 @@ void PcmReader::stop() } +ReaderState PcmReader::getState() const +{ + return state_; +} + + +void PcmReader::setState(const ReaderState& newState) +{ + if (newState != state_) + { + state_ = newState; + pcmListener_->onStateChanged(this, newState); + } +} + + void PcmReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, double duration) { // logO << "onChunkEncoded: " << duration << " us\n"; @@ -116,3 +132,22 @@ void PcmReader::onChunkEncoded(const Encoder* encoder, msg::PcmChunk* chunk, dou pcmListener_->onChunkRead(this, chunk, duration); } + +json PcmReader::toJson() const +{ + string state("idle"); + if (state_ == kIdle) + state = "idle"; + else if (state_ == kPlaying) + state = "playing"; + else if (state_ == kDisabled) + state = "disabled"; + + json j = { + {"uri", uri_.toJson()}, + {"id", uri_.id()}, + {"status", state} + }; + return j; +} + diff --git a/server/pcmreader/pcmReader.h b/server/pcmreader/pcmReader.h index c4d3ab77..11dff2a2 100644 --- a/server/pcmreader/pcmReader.h +++ b/server/pcmreader/pcmReader.h @@ -25,12 +25,20 @@ #include #include "readerUri.h" #include "../encoder/encoder.h" +#include "../json/json.hpp" #include "message/sampleFormat.h" #include "message/header.h" class PcmReader; +enum ReaderState +{ + kIdle = 0, + kPlaying = 1, + kDisabled = 2 +}; + /// Callback interface for users of PcmReader /** @@ -39,6 +47,7 @@ class PcmReader; class PcmListener { public: + virtual void onStateChanged(const PcmReader* pcmReader, const ReaderState& state) = 0; virtual void onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) = 0; virtual void onResync(const PcmReader* pcmReader, double ms) = 0; }; @@ -68,8 +77,14 @@ public: virtual const std::string& getName() const; virtual const SampleFormat& getSampleFormat() const; + virtual ReaderState getState() const; + virtual json toJson() const; + + protected: virtual void worker() = 0; + void setState(const ReaderState& newState); + timeval tvEncodedChunk_; std::atomic active_; std::thread readerThread_; @@ -79,6 +94,7 @@ protected: size_t pcmReadMs_; std::unique_ptr encoder_; std::string name_; + ReaderState state_; }; diff --git a/server/pcmreader/pipeReader.cpp b/server/pcmreader/pipeReader.cpp index 3be8cbf2..efff57d1 100644 --- a/server/pcmreader/pipeReader.cpp +++ b/server/pcmreader/pipeReader.cpp @@ -76,7 +76,10 @@ void PipeReader::worker() { int count = read(fd_, chunk->payload + len, toRead - len); if (count < 0) + { + setState(kIdle); usleep(100*1000); + } else if (count == 0) throw SnapException("end of file"); else @@ -92,6 +95,7 @@ void PipeReader::worker() if (nextTick >= currentTick) { // logO << "sleep: " << nextTick - currentTick << "\n"; + setState(kPlaying); usleep((nextTick - currentTick) * 1000); } else diff --git a/server/pcmreader/readerUri.cpp b/server/pcmreader/readerUri.cpp index 3ab496e6..7355571b 100644 --- a/server/pcmreader/readerUri.cpp +++ b/server/pcmreader/readerUri.cpp @@ -108,14 +108,14 @@ ReaderUri::ReaderUri(const std::string& readerUri) json ReaderUri::toJson() const { - json j; - j["uri"] = uri; - j["scheme"] = scheme; - j["host"] = host; - j["path"] = path; - j["fragment"] = fragment; - j["query"] = json(query); - j["id"] = id_; + json j = { + {"raw", uri}, + {"scheme", scheme}, + {"host", host}, + {"path", path}, + {"fragment", fragment}, + {"query", query} + }; return j; } diff --git a/server/pcmreader/streamManager.cpp b/server/pcmreader/streamManager.cpp index d486e092..aa517d6d 100644 --- a/server/pcmreader/streamManager.cpp +++ b/server/pcmreader/streamManager.cpp @@ -114,7 +114,7 @@ json StreamManager::toJson() const { json result = json::array(); for (auto stream: streams_) - result.push_back(stream->getUri().toJson()); + result.push_back(stream->toJson()); return result; } diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 163172d2..ae6b4792 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -40,6 +40,15 @@ StreamServer::~StreamServer() } +void StreamServer::onStateChanged(const PcmReader* pcmReader, const ReaderState& state) +{ + logO << "onStateChanged (" << pcmReader->getName() << "): " << state << "\n"; +// logO << pcmReader->toJson().dump(4); + json notification = JsonNotification::getJson("Stream.OnUpdate", pcmReader->toJson()); + controlServer_->send(notification.dump(), NULL); +} + + void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) { // logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n"; @@ -301,7 +310,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM connection->send(headerChunk.get()); json notification = JsonNotification::getJson("Client.OnConnect", client->toJson()); - logO << notification.dump(4) << "\n"; +// logO << notification.dump(4) << "\n"; controlServer_->send(notification.dump()); } } diff --git a/server/streamServer.h b/server/streamServer.h index 26acc276..ae8d587d 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -89,6 +89,7 @@ public: virtual void onMessageReceived(ControlSession* connection, const std::string& message); /// Implementation of PcmListener + virtual void onStateChanged(const PcmReader* pcmReader, const ReaderState& state); virtual void onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration); virtual void onResync(const PcmReader* pcmReader, double ms);