"sendNow"

This commit is contained in:
badaix 2016-11-13 23:01:14 +01:00
parent a6993f11df
commit bd979f1c0e
4 changed files with 38 additions and 15 deletions

View file

@ -19,7 +19,7 @@
#ifndef QUEUE_H #ifndef QUEUE_H
#define QUEUE_H #define QUEUE_H
#include <queue> #include <deque>
#include <atomic> #include <atomic>
#include <thread> #include <thread>
#include <mutex> #include <mutex>
@ -38,7 +38,7 @@ public:
// std::lock_guard<std::mutex> lock(mutex_); // std::lock_guard<std::mutex> lock(mutex_);
auto val = queue_.front(); auto val = queue_.front();
queue_.pop(); queue_.pop_front();
return val; return val;
} }
@ -81,7 +81,7 @@ public:
return false; return false;
item = std::move(queue_.front()); item = std::move(queue_.front());
queue_.pop(); queue_.pop_front();
return true; return true;
} }
@ -98,14 +98,32 @@ public:
cond_.wait(mlock); cond_.wait(mlock);
item = queue_.front(); item = queue_.front();
queue_.pop(); queue_.pop_front();
}
void push_front(const T& item)
{
{
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push_front(item);
}
cond_.notify_one();
}
void push_front(T&& item)
{
{
std::lock_guard<std::mutex> mlock(mutex_);
queue_.push_front(std::move(item));
}
cond_.notify_one();
} }
void push(const T& item) void push(const T& item)
{ {
{ {
std::lock_guard<std::mutex> mlock(mutex_); std::lock_guard<std::mutex> mlock(mutex_);
queue_.push(item); queue_.push_back(item);
} }
cond_.notify_one(); cond_.notify_one();
} }
@ -114,7 +132,7 @@ public:
{ {
{ {
std::lock_guard<std::mutex> mlock(mutex_); std::lock_guard<std::mutex> mlock(mutex_);
queue_.push(std::move(item)); queue_.push_back(std::move(item));
} }
cond_.notify_one(); cond_.notify_one();
} }
@ -135,7 +153,7 @@ public:
Queue& operator=(const Queue&) = delete; // disable assignment Queue& operator=(const Queue&) = delete; // disable assignment
private: private:
std::queue<T> queue_; std::deque<T> queue_;
mutable std::atomic<bool> abort_; mutable std::atomic<bool> abort_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
mutable std::condition_variable cond_; mutable std::condition_variable cond_;

View file

@ -243,7 +243,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
timeMsg->refersTo = timeMsg->id; timeMsg->refersTo = timeMsg->id;
timeMsg->latency = timeMsg->received - timeMsg->sent; timeMsg->latency = timeMsg->received - timeMsg->sent;
// logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n"; // logO << "Latency sec: " << timeMsg.latency.sec << ", usec: " << timeMsg.latency.usec << ", refers to: " << timeMsg.refersTo << "\n";
connection->sendAsync(timeMsg); connection->sendAsync(timeMsg, true);
// refresh connection state // refresh connection state
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);

View file

@ -118,20 +118,25 @@ void StreamSession::socketRead(void* _to, size_t _bytes)
} }
void StreamSession::sendAsync(const msg::BaseMessage* message) void StreamSession::sendAsync(const msg::BaseMessage* message, bool sendNow)
{ {
std::shared_ptr<const msg::BaseMessage> shared_message(message); std::shared_ptr<const msg::BaseMessage> shared_message(message);
sendAsync(shared_message); sendAsync(shared_message, sendNow);
} }
void StreamSession::sendAsync(const shared_ptr<const msg::BaseMessage>& message) void StreamSession::sendAsync(const shared_ptr<const msg::BaseMessage>& message, bool sendNow)
{ {
if (!message) if (!message)
return; return;
while (messages_.size() > 100)// chunk->getDuration() > 10000) //the writer will take care about old messages
while (messages_.size() > 10000)// chunk->getDuration() > 10000)
messages_.pop(); messages_.pop();
if (sendNow)
messages_.push_front(message);
else
messages_.push(message); messages_.push(message);
} }

View file

@ -66,8 +66,8 @@ public:
bool send(const msg::BaseMessage* message) const; bool send(const msg::BaseMessage* message) const;
/// Sends a message to the client (asynchronous) /// Sends a message to the client (asynchronous)
void sendAsync(const std::shared_ptr<const msg::BaseMessage>& message); void sendAsync(const std::shared_ptr<const msg::BaseMessage>& message, bool sendNow = false);
void sendAsync(const msg::BaseMessage* message); void sendAsync(const msg::BaseMessage* message, bool sendNow = false);
bool active() const; bool active() const;