From 224ee53a5a9da52ba5d64a1d1871ff6a995a9160 Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Fri, 4 Jul 2014 22:48:25 +0000 Subject: [PATCH] working again git-svn-id: svn://elaine/murooma/trunk@68 d8a302eb-03bc-478d-80e4-98257eca68ef --- client.cpp | 6 ++--- stream.h | 70 ++++++++++++++++++++++++++++++++++++++++++++--------- timeUtils.h | 8 ++++++ 3 files changed, 70 insertions(+), 14 deletions(-) diff --git a/client.cpp b/client.cpp index 72bfffd0..6c2ddf91 100644 --- a/client.cpp +++ b/client.cpp @@ -181,11 +181,11 @@ int main (int argc, char *argv[]) zmq::message_t update; subscriber.recv(&update); - timeval now; - gettimeofday(&now, NULL); - std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n"; +// timeval now; +// gettimeofday(&now, NULL); // memcpy(chunk, update.data(), sizeof(Chunk)); chunk = (Chunk*)(update.data()); +// std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n"; stream->addChunk(chunk); } return 0; diff --git a/stream.h b/stream.h index c51ec847..f9e93a01 100644 --- a/stream.h +++ b/stream.h @@ -15,7 +15,7 @@ class Stream { public: - Stream() : lastUpdate(0), skip(0), idx(0) + Stream() : median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) { pBuffer = new DoubleBuffer(30000 / PLAYER_CHUNK_MS); pShortBuffer = new DoubleBuffer(5000 / PLAYER_CHUNK_MS); @@ -34,29 +34,35 @@ public: std::vector getChunk(double outputBufferDacTime, unsigned long framesPerBuffer) { - Chunk* chunk = NULL; while (1) { + Chunk* chunk = NULL; if (chunks.empty()) cv.wait(*pLock); int age(0); int chunkCount(0); mutex.lock(); - do + while (!chunks.empty()) { chunk = chunks.front(); - chunkCount = chunks.size(); - int age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; - if (age > WIRE_CHUNK_MS) + age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; + if ((age > 500) || (shortMedian > 100) || (median > WIRE_CHUNK_MS)) { - std::cerr << "age > WIRE_CHUNK_MS\n"; - chunks.pop_front(); + std::cerr << "age > WIRE_CHUNK_MS (" << age << " ms)\n"; delete chunk; chunk = NULL; + chunks.pop_front(); + pBuffer->clear(); + pShortBuffer->clear(); + usleep(10); + shortMedian -= WIRE_CHUNK_MS; + median -= WIRE_CHUNK_MS; } + else + break; } - while (chunk == NULL); + chunkCount = chunks.size(); mutex.unlock(); if (chunk == NULL) @@ -65,11 +71,51 @@ public: continue; } + pBuffer->add(age); + pShortBuffer->add(age); + + time_t now = time(NULL); + + if (now != lastUpdate) + { + lastUpdate = now; + if (pBuffer->full()) + median = pBuffer->median(); + else + median = 0; + + if (pShortBuffer->full()) + shortMedian = pShortBuffer->median(); + else + shortMedian = 0; + std::cerr << "Chunk: " << age << /*" \tidx: " << chunk->idx <<*/ "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << chunkCount << "\t" << outputBufferDacTime*1000 << "\n"; + } + + + std::vector v; + for (size_t n=chunk->idx; nidx + PLAYER_CHUNK_SIZE; ++n) + { + v.push_back(chunk->payload[n]); + } +//std::cerr << "before: " << chunkTime(*chunk) << ", after: "; + addMs(*chunk, -PLAYER_CHUNK_MS); +//std::cerr << chunkTime(*chunk) << "\n"; + chunk->idx += PLAYER_CHUNK_SIZE; + if (chunk->idx >= WIRE_CHUNK_SIZE) + { +//std::cerr << "Chunk played out, deleting\n"; + chunks.pop_front(); + delete chunk; + } + return v; + } + } +/* std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << pBuffer->size() << "\t" << chunkCount << "\n"; pBuffer->add(age); pShortBuffer->add(age); time_t now = time(NULL); - + if (skip == 0) { int age = 0; @@ -134,7 +180,7 @@ public: return v; // return chunk; } - +*/ private: std::deque chunks; @@ -145,6 +191,8 @@ private: DoubleBuffer* pBuffer; DoubleBuffer* pShortBuffer; + int median; + int shortMedian; time_t lastUpdate; int bufferMs; int skip; diff --git a/timeUtils.h b/timeUtils.h index a2fa48b0..a0430aa6 100644 --- a/timeUtils.h +++ b/timeUtils.h @@ -55,6 +55,14 @@ inline long getTickCount() inline void addMs(timeval& tv, int ms) { + if (ms < 0) + { + timeval t; + t.tv_sec = -ms / 1000; + t.tv_usec = (-ms % 1000) * 1000; + timersub(&tv, &t, &tv); + return; + } tv.tv_usec += ms*1000; tv.tv_sec += (tv.tv_usec / 1000000); tv.tv_usec %= 1000000;