Drop PCM chunks that are too old

This commit is contained in:
badaix 2020-02-26 19:35:19 +01:00
parent 7376c7709c
commit 92088eb9c2
3 changed files with 43 additions and 27 deletions

View file

@ -97,11 +97,7 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, msg::PcmChunk* chunk,
// wrap it into a unique_ptr to ensure that the memory will be freed // wrap it into a unique_ptr to ensure that the memory will be freed
unique_ptr<msg::PcmChunk> chunk_ptr(chunk); unique_ptr<msg::PcmChunk> chunk_ptr(chunk);
std::ostringstream oss; shared_const_buffer buffer(*chunk_ptr);
tv t;
chunk_ptr->sent = t;
chunk_ptr->serialize(oss);
shared_const_buffer buffer(oss.str());
std::vector<std::shared_ptr<StreamSession>> sessions; std::vector<std::shared_ptr<StreamSession>> sessions;
{ {

View file

@ -147,10 +147,22 @@ void StreamSession::send_next()
void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now)
{ {
strand_.post([ this, self = shared_from_this(), const_buf, send_now ]() { strand_.post([ this, self = shared_from_this(), const_buf, send_now ]() {
// delete PCM chunks that are older than the overall buffer duration
messages_.erase(std::remove_if(messages_.begin(), messages_.end(),
[this](const shared_const_buffer& buffer) {
const auto& msg = buffer.message();
if (!msg.is_pcm_chunk)
return false;
auto age = chronos::clk::now() - msg.rec_time;
return (age > std::chrono::milliseconds(bufferMs_) + 100ms);
}),
messages_.end());
if (send_now) if (send_now)
messages_.push_front(const_buf); messages_.push_front(const_buf);
else else
messages_.push_back(const_buf); messages_.push_back(const_buf);
if (messages_.size() > 1) if (messages_.size() > 1)
{ {
LOG(DEBUG, LOG_TAG) << "outstanding async_write\n"; LOG(DEBUG, LOG_TAG) << "outstanding async_write\n";
@ -166,12 +178,8 @@ void StreamSession::sendAsync(msg::message_ptr message, bool send_now)
if (!message) if (!message)
return; return;
tv t;
// TODO: better set the timestamp in send_next for more accurate time sync // TODO: better set the timestamp in send_next for more accurate time sync
message->sent = t; sendAsync(shared_const_buffer(*message), send_now);
std::ostringstream oss;
message->serialize(oss);
sendAsync(shared_const_buffer(oss.str()), send_now);
} }

View file

@ -49,27 +49,33 @@ public:
// A reference-counted non-modifiable buffer class. // A reference-counted non-modifiable buffer class.
// TODO: add overload for messages
class shared_const_buffer class shared_const_buffer
{ {
public: struct Message
// Construct from a std::string.
explicit shared_const_buffer(const std::string& data) : data_(new std::vector<char>(data.begin(), data.end())), buffer_(boost::asio::buffer(*data_))
{ {
std::vector<char> data;
bool is_pcm_chunk;
chronos::time_point_clk rec_time;
};
public:
shared_const_buffer(msg::BaseMessage& message)
{
tv t;
message.sent = t;
const msg::PcmChunk* pcm_chunk = dynamic_cast<const msg::PcmChunk*>(&message);
message_ = std::make_shared<Message>();
message_->is_pcm_chunk = (pcm_chunk != nullptr);
if (message_->is_pcm_chunk)
message_->rec_time = pcm_chunk->start();
std::ostringstream oss;
message.serialize(oss);
std::string s = oss.str();
message_->data = std::vector<char>(s.begin(), s.end());
buffer_ = boost::asio::buffer(message_->data);
} }
// // Construct from a message.
// explicit shared_const_buffer(const msg::BaseMessage& message)
// {
// std::ostringstream oss;
// message.serialize(oss);
// data_ = std::shared_ptr<std::vector<char>>(new std::vector<char>(oss.str().begin(), oss.str().end()));
// //std::make_shared<std::vector<char>>(oss.str().begin(), oss.str().end());
// buffer_ = boost::asio::buffer(*data_);
// }
// Implement the ConstBufferSequence requirements. // Implement the ConstBufferSequence requirements.
typedef boost::asio::const_buffer value_type; typedef boost::asio::const_buffer value_type;
typedef const boost::asio::const_buffer* const_iterator; typedef const boost::asio::const_buffer* const_iterator;
@ -77,13 +83,19 @@ public:
{ {
return &buffer_; return &buffer_;
} }
const boost::asio::const_buffer* end() const const boost::asio::const_buffer* end() const
{ {
return &buffer_ + 1; return &buffer_ + 1;
} }
const Message& message() const
{
return *message_;
}
private: private:
std::shared_ptr<std::vector<char>> data_; std::shared_ptr<Message> message_;
boost::asio::const_buffer buffer_; boost::asio::const_buffer buffer_;
}; };