diff --git a/common/queue.h b/common/queue.h index 8877bd3b..17e459f3 100644 --- a/common/queue.h +++ b/common/queue.h @@ -19,7 +19,7 @@ #ifndef QUEUE_H #define QUEUE_H -#include +#include #include #include #include @@ -38,7 +38,7 @@ public: // std::lock_guard lock(mutex_); auto val = queue_.front(); - queue_.pop(); + queue_.pop_front(); return val; } @@ -81,7 +81,7 @@ public: return false; item = std::move(queue_.front()); - queue_.pop(); + queue_.pop_front(); return true; } @@ -98,14 +98,32 @@ public: cond_.wait(mlock); item = queue_.front(); - queue_.pop(); + queue_.pop_front(); + } + + void push_front(const T& item) + { + { + std::lock_guard mlock(mutex_); + queue_.push_front(item); + } + cond_.notify_one(); + } + + void push_front(T&& item) + { + { + std::lock_guard mlock(mutex_); + queue_.push_front(std::move(item)); + } + cond_.notify_one(); } void push(const T& item) { { std::lock_guard mlock(mutex_); - queue_.push(item); + queue_.push_back(item); } cond_.notify_one(); } @@ -114,7 +132,7 @@ public: { { std::lock_guard mlock(mutex_); - queue_.push(std::move(item)); + queue_.push_back(std::move(item)); } cond_.notify_one(); } @@ -135,7 +153,7 @@ public: Queue& operator=(const Queue&) = delete; // disable assignment private: - std::queue queue_; + std::deque queue_; mutable std::atomic abort_; mutable std::mutex mutex_; mutable std::condition_variable cond_; diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 5d177a98..1f188478 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -243,7 +243,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM timeMsg->refersTo = timeMsg->id; timeMsg->latency = timeMsg->received - timeMsg->sent; // logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; - connection->sendAsync(timeMsg); + connection->sendAsync(timeMsg, true); // refresh connection state ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); diff --git a/server/streamSession.cpp b/server/streamSession.cpp index acdc8436..139b8028 100644 --- a/server/streamSession.cpp +++ b/server/streamSession.cpp @@ -118,21 +118,26 @@ void StreamSession::socketRead(void* _to, size_t _bytes) } -void StreamSession::sendAsync(const msg::BaseMessage* message) +void StreamSession::sendAsync(const msg::BaseMessage* message, bool sendNow) { std::shared_ptr shared_message(message); - sendAsync(shared_message); + sendAsync(shared_message, sendNow); } -void StreamSession::sendAsync(const shared_ptr& message) +void StreamSession::sendAsync(const shared_ptr& message, bool sendNow) { if (!message) return; - while (messages_.size() > 100)// chunk->getDuration() > 10000) + //the writer will take care about old messages + while (messages_.size() > 10000)// chunk->getDuration() > 10000) messages_.pop(); - messages_.push(message); + + if (sendNow) + messages_.push_front(message); + else + messages_.push(message); } diff --git a/server/streamSession.h b/server/streamSession.h index eb30cad4..64428361 100644 --- a/server/streamSession.h +++ b/server/streamSession.h @@ -66,8 +66,8 @@ public: bool send(const msg::BaseMessage* message) const; /// Sends a message to the client (asynchronous) - void sendAsync(const std::shared_ptr& message); - void sendAsync(const msg::BaseMessage* message); + void sendAsync(const std::shared_ptr& message, bool sendNow = false); + void sendAsync(const msg::BaseMessage* message, bool sendNow = false); bool active() const;