diff --git a/server/controlServer.cpp b/server/controlServer.cpp index d7c38df4..5bcd1a9c 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -137,11 +137,12 @@ void ControlServer::handleAccept(socket_ptr socket) setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; - ServerSession* session = new ServerSession(this, socket); + shared_ptr session(new ServerSession(this, socket)); { std::unique_lock mlock(mutex_); + session->setBufferMs(settings_.bufferMs); session->start(); - sessions_.insert(shared_ptr(session)); + sessions_.insert(session); } startAccept(); } diff --git a/server/controlServer.h b/server/controlServer.h index 6f8d7407..5480166a 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -50,6 +50,13 @@ struct ControlServerSettings }; +/// Forwars PCM data to the connected clients +/** + * Reads PCM data with pipereader, implements PipeListener to get the (encoded) PCM stream. + * Accepts and holds client connections (ServerSession) + * Receives (via the MessageReceiver interface) and answers messages from the clients + * Forwards PCM data to the clients + */ class ControlServer : public MessageReceiver, PipeListener { public: diff --git a/server/pipeReader.h b/server/pipeReader.h index d2c19ad8..edf92a2d 100644 --- a/server/pipeReader.h +++ b/server/pipeReader.h @@ -38,6 +38,12 @@ public: +/// Reads and decodes PCM data from a named pipe +/** + * Reads PCM from a named pipe and passed the data to an encoder. + * Implements EncoderListener to get the encoded data. + * Data is passed to the PipeListener + */ class PipeReader : public EncoderListener { public: diff --git a/server/serverSession.cpp b/server/serverSession.cpp index 899c92f7..56ebe6f6 100644 --- a/server/serverSession.cpp +++ b/server/serverSession.cpp @@ -21,6 +21,7 @@ #include #include "serverSession.h" #include "common/log.h" +#include "message/pcmChunk.h" using namespace std; @@ -171,7 +172,23 @@ void ServerSession::writer() while (active_) { if (messages_.try_pop(message, std::chrono::milliseconds(500))) + { + if (bufferMs_ > 0) + { + const msg::PcmChunk* pcmChunk = dynamic_cast(message.get()); + if (pcmChunk != NULL) + { + chronos::time_point_hrc now = chronos::hrc::now(); + size_t age = 0; + if (now > pcmChunk->start()) + age = std::chrono::duration_cast(now - pcmChunk->start()).count(); + //logD << "PCM chunk. Age: " << age << ", buffer: " << bufferMs_ << ", age > buffer: " << (age > bufferMs_) << "\n"; + if (age > bufferMs_) + continue; + } + } send(message.get()); + } } } catch (const std::exception& e) diff --git a/server/serverSession.h b/server/serverSession.h index 898a9d00..298be75e 100644 --- a/server/serverSession.h +++ b/server/serverSession.h @@ -36,6 +36,8 @@ using boost::asio::ip::tcp; class ServerSession; + +/// Interface: callback for a received message. class MessageReceiver { public: @@ -43,6 +45,12 @@ public: }; +/// Endpoint for a connected client. +/** + * Endpoint for a connected client. + * Messages are sent to the client with the "send" method. + * Received messages from the client are passed to the MessageReceiver callback + */ class ServerSession { public: @@ -53,16 +61,20 @@ public: bool send(const msg::BaseMessage* message) const; void add(const std::shared_ptr& message); - virtual bool active() const + bool active() const { return active_; } - virtual void setStreamActive(bool active) + void setStreamActive(bool active) { streamActive_ = active; } + void setBufferMs(size_t bufferMs) + { + bufferMs_ = bufferMs; + } protected: void socketRead(void* _to, size_t _bytes); @@ -78,6 +90,7 @@ protected: std::shared_ptr socket_; MessageReceiver* messageReceiver_; Queue> messages_; + size_t bufferMs_; };