on connect: assign correct stream to client

This commit is contained in:
badaix 2016-02-08 13:30:37 +01:00
parent 07d76f365f
commit a4e501c960
5 changed files with 34 additions and 63 deletions

View file

@ -40,13 +40,23 @@ t = ReaderThread(telnet, t_stop)
t.start()
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"id\": 1}\r\n")
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n")
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"a0:b4:a5:3a:f1:db\", \"id\": \"file:///home/johannes/Musik/AnetteLouisiane.wav\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"a0:b4:a5:3a:f1:db\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"a0:b4:a5:3a:f1:db\", \"id\": \"file:///home/johannes/Musik/AnetteLouisiane.wav\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"file:///home/johannes/Musik/AnetteLouisiane.wav\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n")
time.sleep(5)
doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n")
time.sleep(5)
#doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 2}\r\n")
'''

View file

@ -68,7 +68,9 @@ void SampleFormat::setFormat(const std::string& format)
void SampleFormat::setFormat(uint32_t rate, uint16_t bits, uint16_t channels)
{
//needs something like: 24_3 = 3 bytes, 24 = 4 bytes
//needs something like:
// 24_4 = 3 bytes, padded to 4
// 32 = 4 bytes
this->rate = rate;
this->bits = bits;
this->channels = channels;

View file

@ -41,7 +41,7 @@ ReaderUri::ReaderUri(const std::string& uri)
pos = id_.find('#');
if (pos != string::npos)
id_ = id_.substr(0, pos);
logE << "id: '" << id_ << "'\n";
logD << "id: '" << id_ << "'\n";
string tmp(uri);
@ -50,7 +50,7 @@ ReaderUri::ReaderUri(const std::string& uri)
throw invalid_argument("missing ':'");
scheme = tmp.substr(0, pos);
tmp = tmp.substr(pos + 1);
logE << "scheme: '" << scheme << "' tmp: '" << tmp << "'\n";
logD << "scheme: '" << scheme << "' tmp: '" << tmp << "'\n";
if (tmp.find("//") != 0)
throw invalid_argument("missing host separator: '//'");
@ -62,7 +62,7 @@ ReaderUri::ReaderUri(const std::string& uri)
host = tmp.substr(0, pos);
tmp = tmp.substr(pos);
path = tmp;
logE << "host: '" << host << "' tmp: '" << tmp << "' path: '" << path << "'\n";
logD << "host: '" << host << "' tmp: '" << tmp << "' path: '" << path << "'\n";
pos = tmp.find('?');
if (pos == string::npos)
@ -71,7 +71,7 @@ ReaderUri::ReaderUri(const std::string& uri)
path = tmp.substr(0, pos);
tmp = tmp.substr(pos + 1);
string queryStr = tmp;
logE << "path: '" << path << "' tmp: '" << tmp << "' query: '" << queryStr << "'\n";
logD << "path: '" << path << "' tmp: '" << tmp << "' query: '" << queryStr << "'\n";
pos = tmp.find('#');
if (pos != string::npos)
@ -79,7 +79,7 @@ ReaderUri::ReaderUri(const std::string& uri)
queryStr = tmp.substr(0, pos);
tmp = tmp.substr(pos + 1);
fragment = tmp;
logE << "query: '" << queryStr << "' fragment: '" << fragment << "' tmp: '" << tmp << "'\n";
logD << "query: '" << queryStr << "' fragment: '" << fragment << "' tmp: '" << tmp << "'\n";
}
vector<string> keyValueList = split(queryStr, '&');

View file

@ -52,12 +52,12 @@ PcmReader* StreamManager::addStream(const std::string& uri)
if (readerUri.scheme == "pipe")
{
streams_.push_back(make_shared<PipeReader>(pcmListener_, readerUri));//, sampleFormat, codec, pcmReadMs);
streams_.push_back(make_shared<PipeReader>(pcmListener_, readerUri));
return streams_.back().get();
}
else if (readerUri.scheme == "file")
{
streams_.push_back(make_shared<FileReader>(pcmListener_, readerUri));//, sampleFormat, codec, pcmReadMs);
streams_.push_back(make_shared<FileReader>(pcmListener_, readerUri));
return streams_.back().get();
}

View file

@ -39,43 +39,6 @@ StreamServer::~StreamServer()
{
}
/*
void StreamServer::send(const msg::BaseMessage* message)
{
std::lock_guard<std::mutex> mlock(mutex_);
logE << "send: " << sessions_.size() << "\n";
for (auto it = sessions_.begin(); it != sessions_.end(); )
{
logE << "send: " << (*it)->macAddress << ", " << !(*it)->active() << "\n";
if (!(*it)->active())
{
logS(kLogErr) << "Session inactive. Removing\n";
logE << "Session inactive. Removing\n";
// don't block: remove ServerSession in a thread
onDisconnect(it->get());
auto func = [](shared_ptr<StreamSession> s)->void{s->stop();};
std::thread t(func, *it);
t.detach();
sessions_.erase(it++);
}
else
++it;
}
// for (auto it = sessions_.begin(); it != sessions_.end(); )
// {
// if (!(*it)->active())
// onDisconnect(it->get());
// }
std::shared_ptr<const msg::BaseMessage> shared_message(message);
for (auto s : sessions_)
s->add(shared_message);
}
*/
void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration)
{
@ -205,7 +168,6 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
}
else if (request.method == "Client.SetStream")
{
//TODO: check stream id
string streamId = request.getParam("id").get<string>();
PcmReaderPtr stream = streamManager_->getStream(streamId);
if (stream == nullptr)
@ -279,17 +241,6 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
logD << "Latency: " << timeMsg.latency << ", refers to: " << timeMsg.refersTo << "\n";
connection->send(&timeMsg);
}
else if (requestMsg.request == kServerSettings)
{
}
else if (requestMsg.request == kHeader)
{
// std::lock_guard<std::mutex> mlock(mutex_);
//TODO: use the correct stream
auto headerChunk = streamManager_->getDefaultStream()->getHeader();
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk.get());
}
}
else if (baseMessage.type == message_type::kHello)
{
@ -317,19 +268,27 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
connection->send(&serverSettings);
}
//TODO: use the correct stream
auto headerChunk = streamManager_->getDefaultStream()->getHeader();
connection->send(headerChunk.get());
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);
client->ipAddress = connection->getIP();
client->hostName = helloMsg.getHostName();
client->version = helloMsg.getVersion();
client->connected = true;
gettimeofday(&client->lastSeen, NULL);
// Assign and update stream
PcmReaderPtr stream = streamManager_->getStream(client->streamId);
if (stream == nullptr)
{
stream = streamManager_->getDefaultStream();
client->streamId = stream->getUri().id();
}
Config::instance().save();
//TODO: wording pcmReader vs stream
connection->setPcmReader(stream);
auto headerChunk = stream->getHeader();
connection->send(headerChunk.get());
json notification = JsonNotification::getJson("Client.OnConnect", client->toJson());
logO << notification.dump(4) << "\n";
controlServer_->send(notification.dump());