From 4cb67c7960424f779d94b1f724b2560fcb7e1f58 Mon Sep 17 00:00:00 2001 From: badaix Date: Thu, 27 Feb 2020 20:44:13 +0100 Subject: [PATCH] Fix message queueing --- server/stream_session.cpp | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/server/stream_session.cpp b/server/stream_session.cpp index 0cdea8b2..3fb149e6 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -128,10 +128,10 @@ void StreamSession::stop() void StreamSession::send_next() { - shared_const_buffer buffer = messages_.front(); - messages_.pop_front(); + auto& buffer = messages_.front(); boost::asio::async_write(socket_, buffer, boost::asio::bind_executor(strand_, [ this, self = shared_from_this(), buffer ](boost::system::error_code ec, std::size_t length) { + messages_.pop_front(); if (ec) { LOG(ERROR, LOG_TAG) << "StreamSession write error (msg length: " << length << "): " << ec.message() << "\n"; @@ -148,15 +148,19 @@ void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) { strand_.post([ this, self = shared_from_this(), const_buf, send_now ]() { // delete PCM chunks that are older than the overall buffer duration - messages_.erase(std::remove_if(messages_.begin(), messages_.end(), - [this](const shared_const_buffer& buffer) { - const auto& msg = buffer.message(); - if (!msg.is_pcm_chunk) - return false; - auto age = chronos::clk::now() - msg.rec_time; - return (age > std::chrono::milliseconds(bufferMs_) + 100ms); - }), - messages_.end()); + if (messages_.size() > 1) + { + // don't remove the first message as it might beeing sent already + messages_.erase(std::remove_if(messages_.begin() + 1, messages_.end(), + [this](const shared_const_buffer& buffer) { + const auto& msg = buffer.message(); + if (!msg.is_pcm_chunk) + return false; + auto age = chronos::clk::now() - msg.rec_time; + return (age > std::chrono::milliseconds(bufferMs_) + 100ms); + }), + messages_.end()); + } if (send_now) messages_.push_front(const_buf);