improved time sync

This commit is contained in:
badaix 2015-08-26 00:10:09 +02:00
parent 48ac9d2209
commit c92588344e
5 changed files with 44 additions and 17 deletions

View file

@ -36,7 +36,7 @@
using namespace std; using namespace std;
Controller::Controller() : MessageReceiver(), active_(false), sampleFormat_(NULL), decoder_(NULL), sendTimeSyncMsg_(false), asyncException_(false) Controller::Controller() : MessageReceiver(), active_(false), sampleFormat_(NULL), decoder_(NULL), asyncException_(false)
{ {
} }
@ -73,17 +73,28 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
msg::Time reply; msg::Time reply;
reply.deserialize(baseMessage, buffer); reply.deserialize(baseMessage, buffer);
double latency = (reply.received.sec - reply.sent.sec) + (reply.received.usec - reply.sent.usec) / 1000000.; double latency = (reply.received.sec - reply.sent.sec) + (reply.received.usec - reply.sent.usec) / 1000000.;
// logO << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer<chronos::usec>().count() / 1000.f << "\n";
// logO << "timeMsg: " << latency << "\n"; // logO << "timeMsg: " << latency << "\n";
TimeProvider::getInstance().setDiffToServer((reply.latency - latency) * 1000 / 2); TimeProvider::getInstance().setDiffToServer((reply.latency - latency) * 1000 / 2);
// logO << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer<chronos::usec>().count() / 1000.f << "\n";
} }
if (sendTimeSyncMsg_) if (baseMessage.type != message_type::kTime)
{ if (sendTimeSyncMessage(1000))
msg::Request timeReq(kTime); logD << "time sync onMessageReceived\n";
clientConnection_->send(&timeReq); }
sendTimeSyncMsg_ = false;
}
bool Controller::sendTimeSyncMessage(long after)
{
static long lastTimeSync(0);
long now = chronos::getTickCount();
if (lastTimeSync + after > now)
return false;
lastTimeSync = now;
msg::Request timeReq(kTime);
clientConnection_->send(&timeReq);
return true;
} }
@ -172,7 +183,8 @@ void Controller::worker()
throw exception_; throw exception_;
} }
sendTimeSyncMsg_ = true; if (sendTimeSyncMessage(2000))
logO << "time sync main loop\n";
// shared_ptr<msg::Time> reply = clientConnection_->sendReq<msg::Time>(&timeReq); // shared_ptr<msg::Time> reply = clientConnection_->sendReq<msg::Time>(&timeReq);
// if (reply) // if (reply)
// { // {

View file

@ -51,6 +51,7 @@ public:
private: private:
void worker(); void worker();
bool sendTimeSyncMessage(long after = 1000);
std::atomic<bool> active_; std::atomic<bool> active_;
std::thread* controllerThread_; std::thread* controllerThread_;
ClientConnection* clientConnection_; ClientConnection* clientConnection_;
@ -60,7 +61,7 @@ private:
Decoder* decoder_; Decoder* decoder_;
PcmDevice pcmDevice_; PcmDevice pcmDevice_;
size_t latency_; size_t latency_;
bool sendTimeSyncMsg_;
std::exception exception_; std::exception exception_;
bool asyncException_; bool asyncException_;
}; };

View file

@ -37,13 +37,27 @@ public:
buffer.pop_front(); buffer.pop_front();
} }
T median() const T median(unsigned int mean = 1) const
{ {
if (buffer.empty()) if (buffer.empty())
return 0; return 0;
std::deque<T> tmpBuffer(buffer.begin(), buffer.end()); std::deque<T> tmpBuffer(buffer.begin(), buffer.end());
std::sort(tmpBuffer.begin(), tmpBuffer.end()); std::sort(tmpBuffer.begin(), tmpBuffer.end());
return tmpBuffer[tmpBuffer.size() / 2]; if ((mean <= 1) || (tmpBuffer.size() < mean))
return tmpBuffer[tmpBuffer.size() / 2];
else
{
unsigned int low = tmpBuffer.size() / 2;
unsigned int high = low;
low -= mean/2;
high += mean/2;
T result((T)0);
for (unsigned int i=low; i<=high; ++i)
{
result += tmpBuffer[i];
}
return result / mean;
}
} }
double mean() const double mean() const

View file

@ -20,7 +20,7 @@
#include "common/log.h" #include "common/log.h"
TimeProvider::TimeProvider() : diffToServer_(0), lastTimeSync_(0) TimeProvider::TimeProvider() : diffToServer_(0)
{ {
diffBuffer_.setSize(200); diffBuffer_.setSize(200);
} }
@ -28,18 +28,19 @@ TimeProvider::TimeProvider() : diffToServer_(0), lastTimeSync_(0)
void TimeProvider::setDiffToServer(double ms) void TimeProvider::setDiffToServer(double ms)
{ {
static long lastTimeSync = 0;
long now = chronos::getTickCount(); long now = chronos::getTickCount();
/// clear diffBuffer if last update is older than a minute /// clear diffBuffer if last update is older than a minute
if (!diffBuffer_.empty() && (now > lastTimeSync_ + 60*1000)) if (!diffBuffer_.empty() && (now > lastTimeSync + 60*1000))
{ {
logO << "Last time sync older than a minute. Clearing time buffer\n"; logO << "Last time sync older than a minute. Clearing time buffer\n";
diffToServer_ = ms*1000; diffToServer_ = ms*1000;
diffBuffer_.clear(); diffBuffer_.clear();
} }
lastTimeSync_ = now; lastTimeSync = now;
diffBuffer_.add(ms*1000); diffBuffer_.add(ms*1000);
diffToServer_ = diffBuffer_.median(); diffToServer_ = diffBuffer_.median(3);
// logO << "setDiffToServer: " << ms << ", diff: " << diffToServer_ / 1000.f << "\n"; // logO << "setDiffToServer: " << ms << ", diff: " << diffToServer_ / 1000.f << "\n";
} }

View file

@ -82,7 +82,6 @@ private:
DoubleBuffer<chronos::usec::rep> diffBuffer_; DoubleBuffer<chronos::usec::rep> diffBuffer_;
std::atomic<chronos::usec::rep> diffToServer_; std::atomic<chronos::usec::rep> diffToServer_;
long lastTimeSync_;
}; };