change stream with "SetStream (id)"

This commit is contained in:
badaix 2016-02-03 22:34:52 +01:00
parent 0092f79f88
commit 5fae6e85f7
10 changed files with 90 additions and 8 deletions

View file

@ -40,7 +40,16 @@ t = ReaderThread(telnet, t_stop)
t.start() t.start()
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"id\": 1}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"id\": 1}\r\n")
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 2}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n")
#doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 2}\r\n")
'''
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 10}, \"id\": 3}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 10}, \"id\": 3}\r\n")
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 30}, \"id\": 4}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 30}, \"id\": 4}\r\n")
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 50}, \"id\": 5}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 50}, \"id\": 5}\r\n")
@ -61,7 +70,7 @@ doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetName\", \"params\": {\
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"NonExistingMethod\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 8}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"NonExistingMethod\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 8}\r\n")
#out of range #out of range
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 101}, \"id\": 3}\r\n") doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetVolume\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\", \"volume\": 101}, \"id\": 3}\r\n")
'''
s = raw_input("") s = raw_input("")
print(s) print(s)
t_stop.set(); t_stop.set();

View file

@ -69,7 +69,7 @@ struct Volume
struct ClientInfo struct ClientInfo
{ {
ClientInfo(const std::string& _macAddress = "") : macAddress(_macAddress), volume(100), connected(false), latency(0), streamId("TODO") ClientInfo(const std::string& _macAddress = "") : macAddress(_macAddress), volume(100), connected(false), latency(0), streamId("")
{ {
lastSeen.tv_sec = 0; lastSeen.tv_sec = 0;
lastSeen.tv_usec = 0; lastSeen.tv_usec = 0;
@ -87,7 +87,7 @@ struct ClientInfo
lastSeen.tv_usec = jGet<int32_t>(j["lastSeen"], "usec", 0); lastSeen.tv_usec = jGet<int32_t>(j["lastSeen"], "usec", 0);
connected = jGet<bool>(j, "connected", true); connected = jGet<bool>(j, "connected", true);
latency = jGet<int32_t>(j, "latency", 0); latency = jGet<int32_t>(j, "latency", 0);
streamId = jGet<std::string>(j, "stream", "TODO"); streamId = jGet<std::string>(j, "stream", "");
} }
json toJson() json toJson()

View file

@ -128,7 +128,7 @@ void ControlServer::stop()
{ {
acceptor_->cancel(); acceptor_->cancel();
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ++it) for (auto s: sessions_)
(*it)->stop(); s->stop();
} }

View file

@ -33,6 +33,16 @@ ReaderUri::ReaderUri(const std::string& uri)
// would be more elegant with regex. Not yet supported on my dev machine's gcc 4.8 :( // would be more elegant with regex. Not yet supported on my dev machine's gcc 4.8 :(
size_t pos; size_t pos;
this->uri = uri; this->uri = uri;
id_ = uri;
pos = id_.find('?');
if (pos != string::npos)
id_ = id_.substr(0, pos);
pos = id_.find('#');
if (pos != string::npos)
id_ = id_.substr(0, pos);
logE << "id: '" << id_ << "'\n";
string tmp(uri); string tmp(uri);
pos = tmp.find(':'); pos = tmp.find(':');
@ -91,6 +101,12 @@ json ReaderUri::toJson() const
j["path"] = path; j["path"] = path;
j["fragment"] = fragment; j["fragment"] = fragment;
j["query"] = json(query); j["query"] = json(query);
j["id"] = id_;
return j; return j;
} }
std::string ReaderUri::id() const
{
return id_;
}

View file

@ -45,7 +45,12 @@ struct ReaderUri
std::map<std::string, std::string> query; std::map<std::string, std::string> query;
std::string fragment; std::string fragment;
std::string id() const;
json toJson() const; json toJson() const;
private:
std::string id_;
}; };

View file

@ -74,6 +74,17 @@ const PcmReaderPtr StreamManager::getDefaultStream()
} }
const PcmReaderPtr StreamManager::getStream(const std::string& id)
{
for (auto stream: streams_)
{
if (stream->getUri().id() == id)
return stream;
}
return nullptr;
}
void StreamManager::start() void StreamManager::start()
{ {
for (auto stream: streams_) for (auto stream: streams_)

View file

@ -18,6 +18,7 @@ public:
void stop(); void stop();
const std::vector<PcmReaderPtr>& getStreams(); const std::vector<PcmReaderPtr>& getStreams();
const PcmReaderPtr getDefaultStream(); const PcmReaderPtr getDefaultStream();
const PcmReaderPtr getStream(const std::string& id);
json toJson() const; json toJson() const;
private: private:

View file

@ -86,7 +86,9 @@ void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk*
std::lock_guard<std::mutex> mlock(sessionsMutex_); std::lock_guard<std::mutex> mlock(sessionsMutex_);
for (auto s : sessions_) for (auto s : sessions_)
{ {
if (isDefaultStream)//->getName() == "default") if (!s->pcmReader() && isDefaultStream)//->getName() == "default")
s->add(shared_message);
else if (s->pcmReader().get() == pcmReader)
s->add(shared_message); s->add(shared_message);
} }
} }
@ -201,6 +203,25 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
clientInfo->volume.muted = request.getParam<bool>("mute", false, true); clientInfo->volume.muted = request.getParam<bool>("mute", false, true);
response = clientInfo->volume.muted; response = clientInfo->volume.muted;
} }
else if (request.method == "Client.SetStream")
{
//TODO: check stream id
string streamId = request.getParam("id").get<string>();
PcmReaderPtr stream = streamManager_->getStream(streamId);
if (stream == nullptr)
throw JsonInternalErrorException("Stream not found", request.id);
clientInfo->streamId = streamId;
response = clientInfo->streamId;
StreamSession* session = getStreamSession(request.getParam("client").get<string>());
if (session != NULL)
{
msg::Header* headerChunk = stream->getHeader();
session->send(headerChunk);
session->setPcmReader(stream);
}
}
else if (request.method == "Client.SetLatency") else if (request.method == "Client.SetLatency")
{ {
clientInfo->latency = request.getParam<int>("latency", -10000, settings_.bufferMs); clientInfo->latency = request.getParam<int>("latency", -10000, settings_.bufferMs);
@ -363,6 +384,7 @@ void StreamServer::start()
controlServer_->start(); controlServer_->start();
streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs)); streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs));
//TODO: check uniqueness of the stream
for (auto& streamUri: settings_.pcmStreams) for (auto& streamUri: settings_.pcmStreams)
logE << "Stream: " << streamManager_->addStream(streamUri)->getUri().toJson() << "\n"; logE << "Stream: " << streamManager_->addStream(streamUri)->getUri().toJson() << "\n";
// throw SnapException("bad"); // throw SnapException("bad");

View file

@ -27,7 +27,8 @@ using namespace std;
StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : active_(true), messageReceiver_(receiver) StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) :
active_(true), messageReceiver_(receiver), pcmReader_(nullptr)
{ {
socket_ = socket; socket_ = socket;
} }
@ -39,6 +40,18 @@ StreamSession::~StreamSession()
} }
void StreamSession::setPcmReader(PcmReaderPtr pcmReader)
{
pcmReader_ = pcmReader;
}
const PcmReaderPtr StreamSession::pcmReader() const
{
return pcmReader_;
}
void StreamSession::start() void StreamSession::start()
{ {
setActive(true); setActive(true);

View file

@ -29,6 +29,7 @@
#include <set> #include <set>
#include "message/message.h" #include "message/message.h"
#include "common/queue.h" #include "common/queue.h"
#include "pcmreader/streamManager.h"
using asio::ip::tcp; using asio::ip::tcp;
@ -79,6 +80,9 @@ public:
return socket_->remote_endpoint().address().to_string(); return socket_->remote_endpoint().address().to_string();
} }
void setPcmReader(PcmReaderPtr pcmReader);
const PcmReaderPtr pcmReader() const;
protected: protected:
void socketRead(void* _to, size_t _bytes); void socketRead(void* _to, size_t _bytes);
void getNextMessage(); void getNextMessage();
@ -96,6 +100,7 @@ protected:
MessageReceiver* messageReceiver_; MessageReceiver* messageReceiver_;
Queue<std::shared_ptr<const msg::BaseMessage>> messages_; Queue<std::shared_ptr<const msg::BaseMessage>> messages_;
size_t bufferMs_; size_t bufferMs_;
PcmReaderPtr pcmReader_;
}; };