Don't send old PCM chunks to the client

This commit is contained in:
badaix 2015-08-12 00:22:56 +02:00
parent 30a88602dc
commit 0cc2c39357
5 changed files with 48 additions and 4 deletions

View file

@ -137,11 +137,12 @@ void ControlServer::handleAccept(socket_ptr socket)
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
ServerSession* session = new ServerSession(this, socket); shared_ptr<ServerSession> session(new ServerSession(this, socket));
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
session->setBufferMs(settings_.bufferMs);
session->start(); session->start();
sessions_.insert(shared_ptr<ServerSession>(session)); sessions_.insert(session);
} }
startAccept(); startAccept();
} }

View file

@ -50,6 +50,13 @@ struct ControlServerSettings
}; };
/// Forwars PCM data to the connected clients
/**
* Reads PCM data with pipereader, implements PipeListener to get the (encoded) PCM stream.
* Accepts and holds client connections (ServerSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients
*/
class ControlServer : public MessageReceiver, PipeListener class ControlServer : public MessageReceiver, PipeListener
{ {
public: public:

View file

@ -38,6 +38,12 @@ public:
/// Reads and decodes PCM data from a named pipe
/**
* Reads PCM from a named pipe and passed the data to an encoder.
* Implements EncoderListener to get the encoded data.
* Data is passed to the PipeListener
*/
class PipeReader : public EncoderListener class PipeReader : public EncoderListener
{ {
public: public:

View file

@ -21,6 +21,7 @@
#include <mutex> #include <mutex>
#include "serverSession.h" #include "serverSession.h"
#include "common/log.h" #include "common/log.h"
#include "message/pcmChunk.h"
using namespace std; using namespace std;
@ -171,9 +172,25 @@ void ServerSession::writer()
while (active_) while (active_)
{ {
if (messages_.try_pop(message, std::chrono::milliseconds(500))) if (messages_.try_pop(message, std::chrono::milliseconds(500)))
{
if (bufferMs_ > 0)
{
const msg::PcmChunk* pcmChunk = dynamic_cast<const msg::PcmChunk*>(message.get());
if (pcmChunk != NULL)
{
chronos::time_point_hrc now = chronos::hrc::now();
size_t age = 0;
if (now > pcmChunk->start())
age = std::chrono::duration_cast<chronos::msec>(now - pcmChunk->start()).count();
//logD << "PCM chunk. Age: " << age << ", buffer: " << bufferMs_ << ", age > buffer: " << (age > bufferMs_) << "\n";
if (age > bufferMs_)
continue;
}
}
send(message.get()); send(message.get());
} }
} }
}
catch (const std::exception& e) catch (const std::exception& e)
{ {
logS(kLogErr) << "Exception in ServerSession::writer(): " << e.what() << endl; logS(kLogErr) << "Exception in ServerSession::writer(): " << e.what() << endl;

View file

@ -36,6 +36,8 @@ using boost::asio::ip::tcp;
class ServerSession; class ServerSession;
/// Interface: callback for a received message.
class MessageReceiver class MessageReceiver
{ {
public: public:
@ -43,6 +45,12 @@ public:
}; };
/// Endpoint for a connected client.
/**
* Endpoint for a connected client.
* Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the MessageReceiver callback
*/
class ServerSession class ServerSession
{ {
public: public:
@ -53,16 +61,20 @@ public:
bool send(const msg::BaseMessage* message) const; bool send(const msg::BaseMessage* message) const;
void add(const std::shared_ptr<const msg::BaseMessage>& message); void add(const std::shared_ptr<const msg::BaseMessage>& message);
virtual bool active() const bool active() const
{ {
return active_; return active_;
} }
virtual void setStreamActive(bool active) void setStreamActive(bool active)
{ {
streamActive_ = active; streamActive_ = active;
} }
void setBufferMs(size_t bufferMs)
{
bufferMs_ = bufferMs;
}
protected: protected:
void socketRead(void* _to, size_t _bytes); void socketRead(void* _to, size_t _bytes);
@ -78,6 +90,7 @@ protected:
std::shared_ptr<tcp::socket> socket_; std::shared_ptr<tcp::socket> socket_;
MessageReceiver* messageReceiver_; MessageReceiver* messageReceiver_;
Queue<std::shared_ptr<const msg::BaseMessage>> messages_; Queue<std::shared_ptr<const msg::BaseMessage>> messages_;
size_t bufferMs_;
}; };