send messages async

This commit is contained in:
badaix 2016-11-13 22:30:02 +01:00
parent b01e3fe549
commit a6993f11df
4 changed files with 34 additions and 26 deletions

View file

@ -176,10 +176,10 @@ void ClientConnection::getNextMessage()
// logD << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; // logD << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
if (baseMessage.size > buffer.size()) if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size); buffer.resize(baseMessage.size);
{ // {
std::lock_guard<std::mutex> socketLock(socketMutex_); // std::lock_guard<std::mutex> socketLock(socketMutex_);
socketRead(&buffer[0], baseMessage.size); socketRead(&buffer[0], baseMessage.size);
} // }
tv t; tv t;
baseMessage.received = t; baseMessage.received = t;

View file

@ -58,9 +58,9 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk*
for (auto s : sessions_) for (auto s : sessions_)
{ {
if (!s->pcmStream() && isDefaultStream)//->getName() == "default") if (!s->pcmStream() && isDefaultStream)//->getName() == "default")
s->add(shared_message); s->sendAsync(shared_message);
else if (s->pcmStream().get() == pcmStream) else if (s->pcmStream().get() == pcmStream)
s->add(shared_message); s->sendAsync(shared_message);
} }
} }
@ -186,7 +186,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
session_ptr session = getStreamSession(request.getParam("client").get<string>()); session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != nullptr) if (session != nullptr)
{ {
session->add(stream->getHeader()); session->sendAsync(stream->getHeader());
session->setPcmStream(stream); session->setPcmStream(stream);
} }
} }
@ -238,12 +238,12 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
// logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; // logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::kTime) if (baseMessage.type == message_type::kTime)
{ {
msg::Time timeMsg; msg::Time* timeMsg = new msg::Time();
timeMsg.deserialize(baseMessage, buffer); timeMsg->deserialize(baseMessage, buffer);
timeMsg.refersTo = timeMsg.id; timeMsg->refersTo = timeMsg->id;
timeMsg.latency = timeMsg.received - timeMsg.sent; timeMsg->latency = timeMsg->received - timeMsg->sent;
// logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; // logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n";
connection->send(&timeMsg); connection->sendAsync(timeMsg);
// refresh connection state // refresh connection state
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);
@ -272,13 +272,13 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
else else
{ {
logD << "request kServerSettings\n"; logD << "request kServerSettings\n";
msg::ServerSettings serverSettings; msg::ServerSettings* serverSettings = new msg::ServerSettings();
serverSettings.setVolume(clientInfo->config.volume.percent); serverSettings->setVolume(clientInfo->config.volume.percent);
serverSettings.setMuted(clientInfo->config.volume.muted); serverSettings->setMuted(clientInfo->config.volume.muted);
serverSettings.setLatency(clientInfo->config.latency); serverSettings->setLatency(clientInfo->config.latency);
serverSettings.setBufferMs(settings_.bufferMs); serverSettings->setBufferMs(settings_.bufferMs);
serverSettings.refersTo = helloMsg.id; serverSettings->refersTo = helloMsg.id;
connection->send(&serverSettings); connection->sendAsync(serverSettings);
} }
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);
@ -303,7 +303,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
connection->setPcmStream(stream); connection->setPcmStream(stream);
auto headerChunk = stream->getHeader(); auto headerChunk = stream->getHeader();
connection->send(headerChunk.get()); connection->sendAsync(headerChunk);
json notification = JsonNotification::getJson("Client.OnConnect", client->toJson()); json notification = JsonNotification::getJson("Client.OnConnect", client->toJson());
// logO << notification.dump(4) << "\n"; // logO << notification.dump(4) << "\n";

View file

@ -118,7 +118,14 @@ void StreamSession::socketRead(void* _to, size_t _bytes)
} }
void StreamSession::add(const shared_ptr<const msg::BaseMessage>& message) void StreamSession::sendAsync(const msg::BaseMessage* message)
{
std::shared_ptr<const msg::BaseMessage> shared_message(message);
sendAsync(shared_message);
}
void StreamSession::sendAsync(const shared_ptr<const msg::BaseMessage>& message)
{ {
if (!message) if (!message)
return; return;
@ -178,10 +185,10 @@ void StreamSession::getNextMessage()
// logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; // logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
if (baseMessage.size > buffer.size()) if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size); buffer.resize(baseMessage.size);
{ // {
std::lock_guard<std::mutex> socketLock(socketMutex_); // std::lock_guard<std::mutex> socketLock(socketMutex_);
socketRead(&buffer[0], baseMessage.size); socketRead(&buffer[0], baseMessage.size);
} // }
tv t; tv t;
baseMessage.received = t; baseMessage.received = t;

View file

@ -66,7 +66,8 @@ public:
bool send(const msg::BaseMessage* message) const; bool send(const msg::BaseMessage* message) const;
/// Sends a message to the client (asynchronous) /// Sends a message to the client (asynchronous)
void add(const std::shared_ptr<const msg::BaseMessage>& message); void sendAsync(const std::shared_ptr<const msg::BaseMessage>& message);
void sendAsync(const msg::BaseMessage* message);
bool active() const; bool active() const;