stream server uses async send and receive

This commit is contained in:
badaix 2019-10-22 23:21:14 +02:00
parent 6a3f59b0e1
commit 2f06ebed04
8 changed files with 185 additions and 218 deletions

View file

@ -29,7 +29,7 @@ using namespace std;
using json = nlohmann::json;
StreamServer::StreamServer(boost::asio::io_context* io_context, const ServerSettings& serverSettings) : io_context_(io_context), settings_(serverSettings)
StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings) : io_context_(io_context), settings_(serverSettings)
{
}
@ -76,8 +76,15 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, msg::PcmChunk* chunk,
// LOG(INFO) << "onChunkRead (" << pcmStream->getName() << "): " << duration << "ms\n";
bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get());
msg::message_ptr shared_message(chunk);
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
unique_ptr<msg::PcmChunk> chunk_ptr(chunk);
std::ostringstream oss;
tv t;
chunk_ptr->sent = t;
chunk_ptr->serialize(oss);
shared_const_buffer buffer(oss.str());
for (auto s : sessions_)
{
if (!settings_.stream.sendAudioToMutedClients)
@ -95,9 +102,9 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, msg::PcmChunk* chunk,
}
if (!s->pcmStream() && isDefaultStream) //->getName() == "default")
s->sendAsync(shared_message);
s->sendAsync(buffer);
else if (s->pcmStream().get() == pcmStream)
s->sendAsync(shared_message);
s->sendAsync(buffer);
}
}
@ -789,7 +796,7 @@ void StreamServer::handleAccept(tcp::socket socket)
socket.set_option(tcp::no_delay(true));
SLOG(NOTICE) << "StreamServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<StreamSession> session = make_shared<StreamSession>(this, std::move(socket));
shared_ptr<StreamSession> session = make_shared<StreamSession>(io_context_, this, std::move(socket));
session->setBufferMs(settings_.stream.bufferMs);
session->start();
@ -828,7 +835,7 @@ void StreamServer::start()
{
LOG(INFO) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n";
acceptor_.emplace_back(
make_unique<tcp::acceptor>(*io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port)));
make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port)));
}
catch (const boost::system::system_error& e)
{