diff --git a/control/testClient.py b/control/testClient.py index 03a0dd74..243056b1 100755 --- a/control/testClient.py +++ b/control/testClient.py @@ -40,13 +40,23 @@ t = ReaderThread(telnet, t_stop) t.start() doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"id\": 1}\r\n") -doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n") +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"a0:b4:a5:3a:f1:db\", \"id\": \"file:///home/johannes/Musik/AnetteLouisiane.wav\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"a0:b4:a5:3a:f1:db\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"a0:b4:a5:3a:f1:db\", \"id\": \"file:///home/johannes/Musik/AnetteLouisiane.wav\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n") +time.sleep(5) +doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"file:///home/johannes/Musik/AnetteLouisiane.wav\"}, \"id\": 3}\r\n") time.sleep(5) doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n") time.sleep(5) doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo1\"}, \"id\": 3}\r\n") time.sleep(5) doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Client.SetStream\", \"params\": {\"client\": \"00:21:6a:7d:74:fc\", \"id\": \"pipe:///tmp/snapfifo\"}, \"id\": 3}\r\n") +time.sleep(5) + #doRequest("{\"jsonrpc\": \"2.0\", \"method\": \"Server.GetStatus\", \"params\": {\"client\": \"80:1f:02:ed:fd:e0\"}, \"id\": 2}\r\n") ''' diff --git a/message/sampleFormat.cpp b/message/sampleFormat.cpp index 47a2061e..cb4f0a93 100644 --- a/message/sampleFormat.cpp +++ b/message/sampleFormat.cpp @@ -68,7 +68,9 @@ void SampleFormat::setFormat(const std::string& format) void SampleFormat::setFormat(uint32_t rate, uint16_t bits, uint16_t channels) { - //needs something like: 24_3 = 3 bytes, 24 = 4 bytes + //needs something like: + // 24_4 = 3 bytes, padded to 4 + // 32 = 4 bytes this->rate = rate; this->bits = bits; this->channels = channels; diff --git a/server/pcmreader/readerUri.cpp b/server/pcmreader/readerUri.cpp index 4e219c44..f3ead2ed 100644 --- a/server/pcmreader/readerUri.cpp +++ b/server/pcmreader/readerUri.cpp @@ -41,7 +41,7 @@ ReaderUri::ReaderUri(const std::string& uri) pos = id_.find('#'); if (pos != string::npos) id_ = id_.substr(0, pos); - logE << "id: '" << id_ << "'\n"; + logD << "id: '" << id_ << "'\n"; string tmp(uri); @@ -50,7 +50,7 @@ ReaderUri::ReaderUri(const std::string& uri) throw invalid_argument("missing ':'"); scheme = tmp.substr(0, pos); tmp = tmp.substr(pos + 1); - logE << "scheme: '" << scheme << "' tmp: '" << tmp << "'\n"; + logD << "scheme: '" << scheme << "' tmp: '" << tmp << "'\n"; if (tmp.find("//") != 0) throw invalid_argument("missing host separator: '//'"); @@ -62,7 +62,7 @@ ReaderUri::ReaderUri(const std::string& uri) host = tmp.substr(0, pos); tmp = tmp.substr(pos); path = tmp; - logE << "host: '" << host << "' tmp: '" << tmp << "' path: '" << path << "'\n"; + logD << "host: '" << host << "' tmp: '" << tmp << "' path: '" << path << "'\n"; pos = tmp.find('?'); if (pos == string::npos) @@ -71,7 +71,7 @@ ReaderUri::ReaderUri(const std::string& uri) path = tmp.substr(0, pos); tmp = tmp.substr(pos + 1); string queryStr = tmp; - logE << "path: '" << path << "' tmp: '" << tmp << "' query: '" << queryStr << "'\n"; + logD << "path: '" << path << "' tmp: '" << tmp << "' query: '" << queryStr << "'\n"; pos = tmp.find('#'); if (pos != string::npos) @@ -79,7 +79,7 @@ ReaderUri::ReaderUri(const std::string& uri) queryStr = tmp.substr(0, pos); tmp = tmp.substr(pos + 1); fragment = tmp; - logE << "query: '" << queryStr << "' fragment: '" << fragment << "' tmp: '" << tmp << "'\n"; + logD << "query: '" << queryStr << "' fragment: '" << fragment << "' tmp: '" << tmp << "'\n"; } vector keyValueList = split(queryStr, '&'); diff --git a/server/pcmreader/streamManager.cpp b/server/pcmreader/streamManager.cpp index 19985d51..9ebae1d2 100644 --- a/server/pcmreader/streamManager.cpp +++ b/server/pcmreader/streamManager.cpp @@ -52,12 +52,12 @@ PcmReader* StreamManager::addStream(const std::string& uri) if (readerUri.scheme == "pipe") { - streams_.push_back(make_shared(pcmListener_, readerUri));//, sampleFormat, codec, pcmReadMs); + streams_.push_back(make_shared(pcmListener_, readerUri)); return streams_.back().get(); } else if (readerUri.scheme == "file") { - streams_.push_back(make_shared(pcmListener_, readerUri));//, sampleFormat, codec, pcmReadMs); + streams_.push_back(make_shared(pcmListener_, readerUri)); return streams_.back().get(); } diff --git a/server/streamServer.cpp b/server/streamServer.cpp index a5d244a9..b0ae62f7 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -39,43 +39,6 @@ StreamServer::~StreamServer() { } -/* -void StreamServer::send(const msg::BaseMessage* message) -{ - std::lock_guard mlock(mutex_); - - logE << "send: " << sessions_.size() << "\n"; - - for (auto it = sessions_.begin(); it != sessions_.end(); ) - { - logE << "send: " << (*it)->macAddress << ", " << !(*it)->active() << "\n"; - if (!(*it)->active()) - { - logS(kLogErr) << "Session inactive. Removing\n"; - logE << "Session inactive. Removing\n"; - // don't block: remove ServerSession in a thread - onDisconnect(it->get()); - auto func = [](shared_ptr s)->void{s->stop();}; - std::thread t(func, *it); - t.detach(); - sessions_.erase(it++); - } - else - ++it; - } - - -// for (auto it = sessions_.begin(); it != sessions_.end(); ) -// { -// if (!(*it)->active()) -// onDisconnect(it->get()); -// } - - std::shared_ptr shared_message(message); - for (auto s : sessions_) - s->add(shared_message); -} -*/ void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) { @@ -205,7 +168,6 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std:: } else if (request.method == "Client.SetStream") { - //TODO: check stream id string streamId = request.getParam("id").get(); PcmReaderPtr stream = streamManager_->getStream(streamId); if (stream == nullptr) @@ -279,17 +241,6 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM logD << "Latency: " << timeMsg.latency << ", refers to: " << timeMsg.refersTo << "\n"; connection->send(&timeMsg); } - else if (requestMsg.request == kServerSettings) - { - } - else if (requestMsg.request == kHeader) - { -// std::lock_guard mlock(mutex_); -//TODO: use the correct stream - auto headerChunk = streamManager_->getDefaultStream()->getHeader(); - headerChunk->refersTo = requestMsg.id; - connection->send(headerChunk.get()); - } } else if (baseMessage.type == message_type::kHello) { @@ -317,19 +268,27 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM connection->send(&serverSettings); } -//TODO: use the correct stream - auto headerChunk = streamManager_->getDefaultStream()->getHeader(); - connection->send(headerChunk.get()); - - ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); client->ipAddress = connection->getIP(); client->hostName = helloMsg.getHostName(); client->version = helloMsg.getVersion(); client->connected = true; gettimeofday(&client->lastSeen, NULL); + + // Assign and update stream + PcmReaderPtr stream = streamManager_->getStream(client->streamId); + if (stream == nullptr) + { + stream = streamManager_->getDefaultStream(); + client->streamId = stream->getUri().id(); + } Config::instance().save(); + //TODO: wording pcmReader vs stream + connection->setPcmReader(stream); + auto headerChunk = stream->getHeader(); + connection->send(headerChunk.get()); + json notification = JsonNotification::getJson("Client.OnConnect", client->toJson()); logO << notification.dump(4) << "\n"; controlServer_->send(notification.dump());