assign the group's stream to clients

This commit is contained in:
badaix 2016-12-13 21:59:15 +01:00
parent 7c93582e07
commit 3818394cf2

View file

@ -105,6 +105,7 @@ void StreamServer::onDisconnect(StreamSession* streamSession)
} }
void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message) void StreamServer::onMessageReceived(ControlSession* controlSession, const std::string& message)
{ {
JsonRequest request; JsonRequest request;
@ -175,7 +176,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
for (auto client: group->clients) for (auto client: group->clients)
{ {
session_ptr session = getStreamSession(client->id); session_ptr session = getStreamSession(client->id);
if (session != nullptr) if (session && (session->pcmStream() != stream))
{ {
session->sendAsync(stream->getHeader()); session->sendAsync(stream->getHeader());
session->setPcmStream(stream); session->setPcmStream(stream);
@ -203,6 +204,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
} }
/// Add clients to group /// Add clients to group
PcmStreamPtr stream = streamManager_->getStream(group->streamId);
for (const auto& clientId: clients) for (const auto& clientId: clients)
{ {
ClientInfoPtr client = Config::instance().getClientInfo(clientId); ClientInfoPtr client = Config::instance().getClientInfo(clientId);
@ -219,6 +221,14 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
} }
group->addClient(client); group->addClient(client);
/// assign new stream
session_ptr session = getStreamSession(client->id);
if (session && stream && (session->pcmStream() != stream))
{
session->sendAsync(stream->getHeader());
session->setPcmStream(stream);
}
} }
if (group->empty()) if (group->empty())
@ -268,6 +278,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
} }
void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
// logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; // logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
@ -347,6 +358,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
} }
session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const
{ {
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);