diff --git a/client.cpp b/client.cpp index 850fd00b..8d18ecff 100644 --- a/client.cpp +++ b/client.cpp @@ -21,7 +21,6 @@ std::deque timeDiffs; int bufferMs; -std::mutex mutex; @@ -51,17 +50,15 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer, (void) statusFlags; (void) inputBuffer; - mutex.lock(); - PlayerChunk* playerChunk = stream->getChunk(timeInfo->outputBufferDacTime, framesPerBuffer); - mutex.unlock(); + stream->getChunk(out, timeInfo->outputBufferDacTime, framesPerBuffer); - for (size_t n=0; npayload[2*n]; *out++ = playerChunk->payload[2*n+1]; } delete playerChunk; - +*/ return paContinue; } @@ -183,9 +180,7 @@ int main (int argc, char *argv[]) // memcpy(chunk, update.data(), sizeof(Chunk)); chunk = (Chunk*)(update.data()); // std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n"; - mutex.lock(); stream->addChunk(chunk); - mutex.unlock(); } return 0; } diff --git a/stream.cpp b/stream.cpp index bd091657..d84ef9e8 100644 --- a/stream.cpp +++ b/stream.cpp @@ -3,9 +3,10 @@ #include #include -Stream::Stream() : lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) +Stream::Stream() : sleep(0), lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) { silentPlayerChunk = new PlayerChunk(); + playerChunk = (short*)malloc(sizeof(short)*PLAYER_CHUNK_SIZE); pBuffer = new DoubleBuffer(30000 / PLAYER_CHUNK_MS); pShortBuffer = new DoubleBuffer(5000 / PLAYER_CHUNK_MS); pLock = new std::unique_lock(mtx); @@ -16,7 +17,9 @@ Stream::Stream() : lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate( void Stream::addChunk(Chunk* chunk) { Chunk* c = new Chunk(*chunk); + mutex.lock(); chunks.push_back(c); + mutex.unlock(); cv.notify_all(); } @@ -27,12 +30,20 @@ Chunk* Stream::getNextChunk() if (chunks.empty()) cv.wait(*pLock); + mutex.lock(); chunk = chunks.front(); + mutex.unlock(); return chunk; } -PlayerChunk* Stream::getNextPlayerChunk(int correction) +void Stream::getSilentPlayerChunk(short* outputBuffer) +{ + memset(outputBuffer, 0, sizeof(short)*PLAYER_CHUNK_SIZE); +} + + +timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) { Chunk* chunk = getNextChunk(); if (correction > PLAYER_CHUNK_MS / 2) @@ -44,10 +55,9 @@ PlayerChunk* Stream::getNextPlayerChunk(int correction) // int age(0); // age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; // std::cerr << "age: " << age << " \tidx: " << chunk->idx << "\n"; - PlayerChunk* playerChunk = new PlayerChunk(); - playerChunk->tv_sec = chunk->tv_sec; - playerChunk->tv_usec = chunk->tv_usec; - playerChunk->idx = 0; + timeval tv; + tv.tv_sec = chunk->tv_sec; + tv.tv_usec = chunk->tv_usec; size_t missing = PLAYER_CHUNK_SIZE;// + correction*PLAYER_CHUNK_MS_SIZE; /* double factor = (double)PLAYER_CHUNK_MS / (double)(PLAYER_CHUNK_MS + correction); @@ -76,43 +86,126 @@ PlayerChunk* Stream::getNextPlayerChunk(int correction) delete chunk; } */ + + if (correction != 0) + { + std::cerr << "Correction: " << correction << "\n"; + size_t idxCorrection(0); + size_t idx(0); + for (size_t n=0; n 0) + idx = 2*n; + else if (correction < 0) + idx = 0.5*n; + +// idx -= idxCorrection; + if (chunk->idx + idx + 1 - idxCorrection >= WIRE_CHUNK_SIZE) + { + idxCorrection = idx; + chunks.pop_front(); + delete chunk; + chunk = getNextChunk(); + } + + *(outputBuffer + 2*n) = chunk->payload[chunk->idx + idx - idxCorrection]; + *(outputBuffer + 2*n+1) = chunk->payload[chunk->idx + idx+1 - idxCorrection]; + } + if (correction > 0) + addMs(chunk, -PLAYER_CHUNK_MS*2); + else if (correction < 0) + addMs(chunk, -PLAYER_CHUNK_MS*0.5); + chunk->idx = chunk->idx + idx+2 - idxCorrection; + if (chunk->idx >= WIRE_CHUNK_SIZE) + { + // mutex.lock(); + chunks.pop_front(); + // mutex.unlock(); + delete chunk; + } + return tv; + } + + if (chunk->idx + PLAYER_CHUNK_SIZE > WIRE_CHUNK_SIZE) { //std::cerr << "chunk->idx + PLAYER_CHUNK_SIZE >= WIRE_CHUNK_SIZE: " << chunk->idx + PLAYER_CHUNK_SIZE << " >= " << WIRE_CHUNK_SIZE << "\n"; - memcpy(&(playerChunk->payload[0]), &chunk->payload[chunk->idx], sizeof(int16_t)*(WIRE_CHUNK_SIZE - chunk->idx)); + if (outputBuffer != NULL) + memcpy(outputBuffer, &chunk->payload[chunk->idx], sizeof(int16_t)*(WIRE_CHUNK_SIZE - chunk->idx)); missing = chunk->idx + PLAYER_CHUNK_SIZE - WIRE_CHUNK_SIZE; +// mutex.lock(); chunks.pop_front(); +// mutex.unlock(); delete chunk; chunk = getNextChunk(); } - memcpy(&(playerChunk->payload[0]), &chunk->payload[chunk->idx], sizeof(int16_t)*missing); + if (outputBuffer != NULL) + memcpy((outputBuffer + PLAYER_CHUNK_SIZE - missing), &chunk->payload[chunk->idx], sizeof(int16_t)*missing); addMs(chunk, -PLAYER_CHUNK_MS); chunk->idx += missing; if (chunk->idx >= WIRE_CHUNK_SIZE) { +// mutex.lock(); chunks.pop_front(); +// mutex.unlock(); delete chunk; } - return playerChunk; + + return tv; } -PlayerChunk* Stream::getChunk(double outputBufferDacTime, unsigned long framesPerBuffer) +void Stream::getChunk(short* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer) { + if (sleep != 0) + { + pBuffer->clear(); + pShortBuffer->clear(); + if (sleep < 0) + { +// std::cerr << "Sleep: " << sleep << "\n"; + sleep += PLAYER_CHUNK_MS; + if (sleep > 0) + sleep = 0; + getSilentPlayerChunk(outputBuffer); + } + else + { + for (int i=0; i<(sleep / PLAYER_CHUNK_MS) + 1; ++i) + { +// std::cerr << "Sleep: " << sleep << "\n"; + usleep(100); + getNextPlayerChunk(outputBuffer); + } + sleep = 0; + } + return; + } + int correction(0); if (pBuffer->full() && (abs(median) <= PLAYER_CHUNK_MS)) - correction = median; - - PlayerChunk* playerChunk = getNextPlayerChunk(correction); - int age = getAge(playerChunk) - bufferMs + outputBufferDacTime*1000; + { + if (median >= PLAYER_CHUNK_MS / 2) + correction = 1; + else if (median <= -PLAYER_CHUNK_MS / 2) + correction = -1; + if (correction != 0) + { + pBuffer->clear(); + pShortBuffer->clear(); + } + } + + timeval tv = getNextPlayerChunk(outputBuffer, correction); + int age = getAge(tv) - bufferMs + outputBufferDacTime*1000; pBuffer->add(age); pShortBuffer->add(age); -// std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n"; +// std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n"; - int sleep(0); time_t now = time(NULL); if (now != lastUpdate) { @@ -121,35 +214,12 @@ PlayerChunk* Stream::getChunk(double outputBufferDacTime, unsigned long framesPe shortMedian = pShortBuffer->median(); if (abs(age) > 300) sleep = age; - if (pShortBuffer->full() && (abs(shortMedian) > WIRE_CHUNK_MS)) + else if (pShortBuffer->full() && (abs(shortMedian) > WIRE_CHUNK_MS)) sleep = shortMedian; - if (pBuffer->full() && (abs(median) > PLAYER_CHUNK_MS)) + else if (pBuffer->full() && (abs(median) > PLAYER_CHUNK_MS)) sleep = median; std::cerr << "Chunk: " << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << outputBufferDacTime*1000 << "\n"; } - - if (sleep != 0) - { - std::cerr << "Sleep: " << sleep << "\n"; - pBuffer->clear(); - pShortBuffer->clear(); - if (sleep < 0) - { - sleepMs(100-sleep); - } - else - { - for (size_t n=0; n<(size_t)(sleep / PLAYER_CHUNK_MS); ++n) - { - delete playerChunk; - playerChunk = getNextPlayerChunk(); - } - } - } - - -// int age = getAge(*lastPlayerChunk) + outputBufferDacTime*1000 - bufferMs; - return playerChunk; } diff --git a/stream.h b/stream.h index 222a543e..b3186ff4 100644 --- a/stream.h +++ b/stream.h @@ -16,15 +16,17 @@ public: Stream(); void addChunk(Chunk* chunk); Chunk* getNextChunk(); - PlayerChunk* getNextPlayerChunk(int correction = 0); - PlayerChunk* getChunk(double outputBufferDacTime, unsigned long framesPerBuffer); + timeval getNextPlayerChunk(short* outputBuffer, int correction = 0); + void getSilentPlayerChunk(short* outputBuffer); + void getChunk(short* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer); private: void sleepMs(int ms); - + int sleep; std::deque chunks; std::mutex mtx; + std::mutex mutex; std::unique_lock* pLock; std::condition_variable cv; DoubleBuffer* pBuffer; @@ -32,6 +34,7 @@ private: PlayerChunk* lastPlayerChunk; PlayerChunk* silentPlayerChunk; + short* playerChunk; int median; int shortMedian; diff --git a/timeUtils.h b/timeUtils.h index 3edbb849..b5c256ac 100644 --- a/timeUtils.h +++ b/timeUtils.h @@ -46,6 +46,14 @@ long getAge(const T* chunk) } +long getAge(const timeval& tv) +{ + timeval now; + gettimeofday(&now, NULL); + return diff_ms(now, tv); +} + + inline long getTickCount() { struct timespec now;