From 5fae6e85f7d6ee9a7b3b7a6b834345bea6cea01d Mon Sep 17 00:00:00 2001 From: badaix Date: Wed, 3 Feb 2016 22:34:52 +0100 Subject: [PATCH] change stream with "SetStream (id)" --- control/testClient.py | 13 +++++++++++-- server/config.h | 4 ++-- server/controlServer.cpp | 4 ++-- server/pcmreader/readerUri.cpp | 16 ++++++++++++++++ server/pcmreader/readerUri.h | 5 +++++ server/pcmreader/streamManager.cpp | 11 +++++++++++ server/pcmreader/streamManager.h | 1 + server/streamServer.cpp | 24 +++++++++++++++++++++++- server/streamSession.cpp | 15 ++++++++++++++- server/streamSession.h | 5 +++++ 10 files changed, 90 insertions(+), 8 deletions(-) diff --git a/control/testClient.py b/control/testClient.py index 1927c84b..03a0dd74 100755 --- a/control/testClient.py +++ b/control/testClient.py @@ -40,7 +40,16 @@ t = ReaderThread(telnet, t_stop) t.start() doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"id\": 1}\r\n") -doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 2}\r\n") +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n") + +#doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 2}\r\n") +''' doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 10}, \"id\": 3}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 30}, \"id\": 4}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 50}, \"id\": 5}\r\n") @@ -61,7 +70,7 @@ doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetName\", \"params\": {\ doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"NonExistingMethod\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 8}\r\n") #out of range doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 101}, \"id\": 3}\r\n") - +''' s = raw_input("") print(s) t_stop.set(); diff --git a/server/config.h b/server/config.h index 015073e2..24ca24e5 100644 --- a/server/config.h +++ b/server/config.h @@ -69,7 +69,7 @@ struct Volume struct ClientInfo { - ClientInfo(const std::string& _macAddress = "") : macAddress(_macAddress), volume(100), connected(false), latency(0), streamId("TODO") + ClientInfo(const std::string& _macAddress = "") : macAddress(_macAddress), volume(100), connected(false), latency(0), streamId("") { lastSeen.tv_sec = 0; lastSeen.tv_usec = 0; @@ -87,7 +87,7 @@ struct ClientInfo lastSeen.tv_usec = jGet(j["lastSeen"], "usec", 0); connected = jGet(j, "connected", true); latency = jGet(j, "latency", 0); - streamId = jGet(j, "stream", "TODO"); + streamId = jGet(j, "stream", ""); } json toJson() diff --git a/server/controlServer.cpp b/server/controlServer.cpp index b07a1f88..fb76b414 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -128,7 +128,7 @@ void ControlServer::stop() { acceptor_->cancel(); std::unique_lock mlock(mutex_); - for (auto it = sessions_.begin(); it != sessions_.end(); ++it) - (*it)->stop(); + for (auto s: sessions_) + s->stop(); } diff --git a/server/pcmreader/readerUri.cpp b/server/pcmreader/readerUri.cpp index 712c6b00..2f0145d1 100644 --- a/server/pcmreader/readerUri.cpp +++ b/server/pcmreader/readerUri.cpp @@ -33,6 +33,16 @@ ReaderUri::ReaderUri(const std::string& uri) // would be more elegant with regex. Not yet supported on my dev machine's gcc 4.8 :( size_t pos; this->uri = uri; + + id_ = uri; + pos = id_.find('?'); + if (pos != string::npos) + id_ = id_.substr(0, pos); + pos = id_.find('#'); + if (pos != string::npos) + id_ = id_.substr(0, pos); + logE << "id: '" << id_ << "'\n"; + string tmp(uri); pos = tmp.find(':'); @@ -91,6 +101,12 @@ json ReaderUri::toJson() const j["path"] = path; j["fragment"] = fragment; j["query"] = json(query); + j["id"] = id_; return j; } + +std::string ReaderUri::id() const +{ + return id_; +} \ No newline at end of file diff --git a/server/pcmreader/readerUri.h b/server/pcmreader/readerUri.h index 6edf5c58..a10ecc4f 100644 --- a/server/pcmreader/readerUri.h +++ b/server/pcmreader/readerUri.h @@ -45,7 +45,12 @@ struct ReaderUri std::map query; std::string fragment; + std::string id() const; json toJson() const; + +private: + std::string id_; + }; diff --git a/server/pcmreader/streamManager.cpp b/server/pcmreader/streamManager.cpp index 62b6be3e..fc522aef 100644 --- a/server/pcmreader/streamManager.cpp +++ b/server/pcmreader/streamManager.cpp @@ -74,6 +74,17 @@ const PcmReaderPtr StreamManager::getDefaultStream() } +const PcmReaderPtr StreamManager::getStream(const std::string& id) +{ + for (auto stream: streams_) + { + if (stream->getUri().id() == id) + return stream; + } + return nullptr; +} + + void StreamManager::start() { for (auto stream: streams_) diff --git a/server/pcmreader/streamManager.h b/server/pcmreader/streamManager.h index 9ac83c67..1b6a8a6e 100644 --- a/server/pcmreader/streamManager.h +++ b/server/pcmreader/streamManager.h @@ -18,6 +18,7 @@ public: void stop(); const std::vector& getStreams(); const PcmReaderPtr getDefaultStream(); + const PcmReaderPtr getStream(const std::string& id); json toJson() const; private: diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 53e287b6..5f3adf33 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -86,7 +86,9 @@ void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* std::lock_guard mlock(sessionsMutex_); for (auto s : sessions_) { - if (isDefaultStream)//->getName() == "default") + if (!s->pcmReader() && isDefaultStream)//->getName() == "default") + s->add(shared_message); + else if (s->pcmReader().get() == pcmReader) s->add(shared_message); } } @@ -201,6 +203,25 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std:: clientInfo->volume.muted = request.getParam("mute", false, true); response = clientInfo->volume.muted; } + else if (request.method == "Client.SetStream") + { + //TODO: check stream id + string streamId = request.getParam("id").get(); + PcmReaderPtr stream = streamManager_->getStream(streamId); + if (stream == nullptr) + throw JsonInternalErrorException("Stream not found", request.id); + + clientInfo->streamId = streamId; + response = clientInfo->streamId; + + StreamSession* session = getStreamSession(request.getParam("client").get()); + if (session != NULL) + { + msg::Header* headerChunk = stream->getHeader(); + session->send(headerChunk); + session->setPcmReader(stream); + } + } else if (request.method == "Client.SetLatency") { clientInfo->latency = request.getParam("latency", -10000, settings_.bufferMs); @@ -363,6 +384,7 @@ void StreamServer::start() controlServer_->start(); streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs)); + //TODO: check uniqueness of the stream for (auto& streamUri: settings_.pcmStreams) logE << "Stream: " << streamManager_->addStream(streamUri)->getUri().toJson() << "\n"; // throw SnapException("bad"); diff --git a/server/streamSession.cpp b/server/streamSession.cpp index 5280e044..6a656bf4 100644 --- a/server/streamSession.cpp +++ b/server/streamSession.cpp @@ -27,7 +27,8 @@ using namespace std; -StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr socket) : active_(true), messageReceiver_(receiver) +StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr socket) : + active_(true), messageReceiver_(receiver), pcmReader_(nullptr) { socket_ = socket; } @@ -39,6 +40,18 @@ StreamSession::~StreamSession() } +void StreamSession::setPcmReader(PcmReaderPtr pcmReader) +{ + pcmReader_ = pcmReader; +} + + +const PcmReaderPtr StreamSession::pcmReader() const +{ + return pcmReader_; +} + + void StreamSession::start() { setActive(true); diff --git a/server/streamSession.h b/server/streamSession.h index 639d64fd..ba44d438 100644 --- a/server/streamSession.h +++ b/server/streamSession.h @@ -29,6 +29,7 @@ #include #include "message/message.h" #include "common/queue.h" +#include "pcmreader/streamManager.h" using asio::ip::tcp; @@ -79,6 +80,9 @@ public: return socket_->remote_endpoint().address().to_string(); } + void setPcmReader(PcmReaderPtr pcmReader); + const PcmReaderPtr pcmReader() const; + protected: void socketRead(void* _to, size_t _bytes); void getNextMessage(); @@ -96,6 +100,7 @@ protected: MessageReceiver* messageReceiver_; Queue> messages_; size_t bufferMs_; + PcmReaderPtr pcmReader_; };