diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index 62b81020..d040babc 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -176,10 +176,10 @@ void ClientConnection::getNextMessage() // logD << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; if (baseMessage.size > buffer.size()) buffer.resize(baseMessage.size); - { - std::lock_guard socketLock(socketMutex_); - socketRead(&buffer[0], baseMessage.size); - } +// { +// std::lock_guard socketLock(socketMutex_); + socketRead(&buffer[0], baseMessage.size); +// } tv t; baseMessage.received = t; diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 3fcedd56..5d177a98 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -58,9 +58,9 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk* for (auto s : sessions_) { if (!s->pcmStream() && isDefaultStream)//->getName() == "default") - s->add(shared_message); + s->sendAsync(shared_message); else if (s->pcmStream().get() == pcmStream) - s->add(shared_message); + s->sendAsync(shared_message); } } @@ -186,7 +186,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std:: session_ptr session = getStreamSession(request.getParam("client").get()); if (session != nullptr) { - session->add(stream->getHeader()); + session->sendAsync(stream->getHeader()); session->setPcmStream(stream); } } @@ -238,12 +238,12 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM // logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; if (baseMessage.type == message_type::kTime) { - msg::Time timeMsg; - timeMsg.deserialize(baseMessage, buffer); - timeMsg.refersTo = timeMsg.id; - timeMsg.latency = timeMsg.received - timeMsg.sent; + msg::Time* timeMsg = new msg::Time(); + timeMsg->deserialize(baseMessage, buffer); + timeMsg->refersTo = timeMsg->id; + timeMsg->latency = timeMsg->received - timeMsg->sent; // logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; - connection->send(&timeMsg); + connection->sendAsync(timeMsg); // refresh connection state ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); @@ -272,13 +272,13 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM else { logD << "request kServerSettings\n"; - msg::ServerSettings serverSettings; - serverSettings.setVolume(clientInfo->config.volume.percent); - serverSettings.setMuted(clientInfo->config.volume.muted); - serverSettings.setLatency(clientInfo->config.latency); - serverSettings.setBufferMs(settings_.bufferMs); - serverSettings.refersTo = helloMsg.id; - connection->send(&serverSettings); + msg::ServerSettings* serverSettings = new msg::ServerSettings(); + serverSettings->setVolume(clientInfo->config.volume.percent); + serverSettings->setMuted(clientInfo->config.volume.muted); + serverSettings->setLatency(clientInfo->config.latency); + serverSettings->setBufferMs(settings_.bufferMs); + serverSettings->refersTo = helloMsg.id; + connection->sendAsync(serverSettings); } ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); @@ -303,7 +303,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM connection->setPcmStream(stream); auto headerChunk = stream->getHeader(); - connection->send(headerChunk.get()); + connection->sendAsync(headerChunk); json notification = JsonNotification::getJson("Client.OnConnect", client->toJson()); // logO << notification.dump(4) << "\n"; diff --git a/server/streamSession.cpp b/server/streamSession.cpp index c9c84130..acdc8436 100644 --- a/server/streamSession.cpp +++ b/server/streamSession.cpp @@ -118,7 +118,14 @@ void StreamSession::socketRead(void* _to, size_t _bytes) } -void StreamSession::add(const shared_ptr& message) +void StreamSession::sendAsync(const msg::BaseMessage* message) +{ + std::shared_ptr shared_message(message); + sendAsync(shared_message); +} + + +void StreamSession::sendAsync(const shared_ptr& message) { if (!message) return; @@ -178,10 +185,10 @@ void StreamSession::getNextMessage() // logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; if (baseMessage.size > buffer.size()) buffer.resize(baseMessage.size); - { - std::lock_guard socketLock(socketMutex_); - socketRead(&buffer[0], baseMessage.size); - } +// { +// std::lock_guard socketLock(socketMutex_); + socketRead(&buffer[0], baseMessage.size); +// } tv t; baseMessage.received = t; diff --git a/server/streamSession.h b/server/streamSession.h index fb31bddf..eb30cad4 100644 --- a/server/streamSession.h +++ b/server/streamSession.h @@ -66,7 +66,8 @@ public: bool send(const msg::BaseMessage* message) const; /// Sends a message to the client (asynchronous) - void add(const std::shared_ptr& message); + void sendAsync(const std::shared_ptr& message); + void sendAsync(const msg::BaseMessage* message); bool active() const;