diff --git a/Makefile b/Makefile index dc786001..ce7fb9b7 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ LDFLAGS = -lzmq -lpthread -lportaudio OBJ_SERVER = server.o BIN_SERVER = server -OBJ_CLIENT = client.o +OBJ_CLIENT = client.o stream.o BIN_CLIENT = client OBJ = $(OBJ_SERVER) $(OBJ_CLIENT) diff --git a/client.cpp b/client.cpp index facd9e07..850fd00b 100644 --- a/client.cpp +++ b/client.cpp @@ -16,12 +16,13 @@ #include #include #include "chunk.h" -#include "timeUtils.h" #include "stream.h" std::deque timeDiffs; int bufferMs; +std::mutex mutex; + void player() @@ -50,7 +51,9 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer, (void) statusFlags; (void) inputBuffer; + mutex.lock(); PlayerChunk* playerChunk = stream->getChunk(timeInfo->outputBufferDacTime, framesPerBuffer); + mutex.unlock(); for (size_t n=0; naddChunk(chunk); + mutex.unlock(); } return 0; } diff --git a/stream.cpp b/stream.cpp new file mode 100644 index 00000000..bd091657 --- /dev/null +++ b/stream.cpp @@ -0,0 +1,162 @@ +#include "stream.h" +#include "timeUtils.h" +#include +#include + +Stream::Stream() : lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) +{ + silentPlayerChunk = new PlayerChunk(); + pBuffer = new DoubleBuffer(30000 / PLAYER_CHUNK_MS); + pShortBuffer = new DoubleBuffer(5000 / PLAYER_CHUNK_MS); + pLock = new std::unique_lock(mtx); + bufferMs = 500; +} + + +void Stream::addChunk(Chunk* chunk) +{ + Chunk* c = new Chunk(*chunk); + chunks.push_back(c); + cv.notify_all(); +} + + +Chunk* Stream::getNextChunk() +{ + Chunk* chunk = NULL; + if (chunks.empty()) + cv.wait(*pLock); + + chunk = chunks.front(); + return chunk; +} + + +PlayerChunk* Stream::getNextPlayerChunk(int correction) +{ + Chunk* chunk = getNextChunk(); + if (correction > PLAYER_CHUNK_MS / 2) + correction = PLAYER_CHUNK_MS/2; + else if (correction < -PLAYER_CHUNK_MS/2) + correction = -PLAYER_CHUNK_MS/2; + +//std::cerr << "GetNextPlayerChunk: " << correction << "\n"; +// int age(0); +// age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; +// std::cerr << "age: " << age << " \tidx: " << chunk->idx << "\n"; + PlayerChunk* playerChunk = new PlayerChunk(); + playerChunk->tv_sec = chunk->tv_sec; + playerChunk->tv_usec = chunk->tv_usec; + playerChunk->idx = 0; + + size_t missing = PLAYER_CHUNK_SIZE;// + correction*PLAYER_CHUNK_MS_SIZE; +/* double factor = (double)PLAYER_CHUNK_MS / (double)(PLAYER_CHUNK_MS + correction); + size_t idx(0); + size_t idxCorrection(0); + for (size_t n=0; nidx + 2*floor(n*factor) - idxCorrection; +//std::cerr << factor << "\t" << n << "\t" << idx << "\n"; + if (idx >= WIRE_CHUNK_SIZE) + { + idxCorrection = 2*floor(n*factor); + idx = 0; + chunks.pop_front(); + delete chunk; + chunk = getNextChunk(); + } + playerChunk->payload[2*n] = chunk->payload[idx]; + playerChunk->payload[2*n+1] = chunk->payload[idx + 1]; + } + addMs(chunk, -PLAYER_CHUNK_MS - correction); + chunk->idx = idx; + if (idx >= WIRE_CHUNK_SIZE) + { + chunks.pop_front(); + delete chunk; + } +*/ + if (chunk->idx + PLAYER_CHUNK_SIZE > WIRE_CHUNK_SIZE) + { +//std::cerr << "chunk->idx + PLAYER_CHUNK_SIZE >= WIRE_CHUNK_SIZE: " << chunk->idx + PLAYER_CHUNK_SIZE << " >= " << WIRE_CHUNK_SIZE << "\n"; + memcpy(&(playerChunk->payload[0]), &chunk->payload[chunk->idx], sizeof(int16_t)*(WIRE_CHUNK_SIZE - chunk->idx)); + missing = chunk->idx + PLAYER_CHUNK_SIZE - WIRE_CHUNK_SIZE; + chunks.pop_front(); + delete chunk; + + chunk = getNextChunk(); + } + + memcpy(&(playerChunk->payload[0]), &chunk->payload[chunk->idx], sizeof(int16_t)*missing); + + addMs(chunk, -PLAYER_CHUNK_MS); + chunk->idx += missing; + if (chunk->idx >= WIRE_CHUNK_SIZE) + { + chunks.pop_front(); + delete chunk; + } + return playerChunk; +} + + +PlayerChunk* Stream::getChunk(double outputBufferDacTime, unsigned long framesPerBuffer) +{ + int correction(0); + if (pBuffer->full() && (abs(median) <= PLAYER_CHUNK_MS)) + correction = median; + + PlayerChunk* playerChunk = getNextPlayerChunk(correction); + int age = getAge(playerChunk) - bufferMs + outputBufferDacTime*1000; + pBuffer->add(age); + pShortBuffer->add(age); +// std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n"; + + int sleep(0); + time_t now = time(NULL); + if (now != lastUpdate) + { + lastUpdate = now; + median = pBuffer->median(); + shortMedian = pShortBuffer->median(); + if (abs(age) > 300) + sleep = age; + if (pShortBuffer->full() && (abs(shortMedian) > WIRE_CHUNK_MS)) + sleep = shortMedian; + if (pBuffer->full() && (abs(median) > PLAYER_CHUNK_MS)) + sleep = median; + std::cerr << "Chunk: " << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << outputBufferDacTime*1000 << "\n"; + } + + if (sleep != 0) + { + std::cerr << "Sleep: " << sleep << "\n"; + pBuffer->clear(); + pShortBuffer->clear(); + if (sleep < 0) + { + sleepMs(100-sleep); + } + else + { + for (size_t n=0; n<(size_t)(sleep / PLAYER_CHUNK_MS); ++n) + { + delete playerChunk; + playerChunk = getNextPlayerChunk(); + } + } + } + + +// int age = getAge(*lastPlayerChunk) + outputBufferDacTime*1000 - bufferMs; + return playerChunk; +} + + +void Stream::sleepMs(int ms) +{ + if (ms > 0) + usleep(ms * 1000); +} + + diff --git a/stream.h b/stream.h index 00f005b0..222a543e 100644 --- a/stream.h +++ b/stream.h @@ -7,414 +7,24 @@ #include #include #include "doubleBuffer.h" -#include "timeUtils.h" #include "chunk.h" -#include "timeUtils.h" class Stream { public: - Stream() : lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) - { - silentPlayerChunk = new PlayerChunk(); - pBuffer = new DoubleBuffer(30000 / PLAYER_CHUNK_MS); - pShortBuffer = new DoubleBuffer(5000 / PLAYER_CHUNK_MS); - pLock = new std::unique_lock(mtx); - bufferMs = 500; - } - - void addChunk(Chunk* chunk) - { - Chunk* c = new Chunk(*chunk); - mutex.lock(); - chunks.push_back(c); - mutex.unlock(); - cv.notify_all(); - } - - - Chunk* getNextChunk() - { - Chunk* chunk = NULL; - if (chunks.empty()) - cv.wait(*pLock); - - mutex.lock(); - chunk = chunks.front(); - mutex.unlock(); - return chunk; - } - - - PlayerChunk* getNextPlayerChunk(int correction = 0) - { - Chunk* chunk = getNextChunk(); - if (correction > PLAYER_CHUNK_MS / 2) - correction = PLAYER_CHUNK_MS/2; - else if (correction < -PLAYER_CHUNK_MS/2) - correction = -PLAYER_CHUNK_MS/2; - -//std::cerr << "GetNextPlayerChunk: " << correction << "\n"; -// int age(0); -// age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; -// std::cerr << "age: " << age << " \tidx: " << chunk->idx << "\n"; - PlayerChunk* playerChunk = new PlayerChunk(); - playerChunk->tv_sec = chunk->tv_sec; - playerChunk->tv_usec = chunk->tv_usec; - playerChunk->idx = 0; - - size_t missing = PLAYER_CHUNK_SIZE;// + correction*PLAYER_CHUNK_MS_SIZE; -/* double factor = (double)PLAYER_CHUNK_MS / (double)(PLAYER_CHUNK_MS + correction); - size_t idx(0); - size_t idxCorrection(0); - for (size_t n=0; nidx + 2*floor(n*factor) - idxCorrection; -//std::cerr << factor << "\t" << n << "\t" << idx << "\n"; - if (idx >= WIRE_CHUNK_SIZE) - { - idxCorrection = 2*floor(n*factor); - idx = 0; - chunks.pop_front(); - delete chunk; - chunk = getNextChunk(); - } - playerChunk->payload[2*n] = chunk->payload[idx]; - playerChunk->payload[2*n+1] = chunk->payload[idx + 1]; - } - addMs(chunk, -PLAYER_CHUNK_MS - correction); - chunk->idx = idx; - if (idx >= WIRE_CHUNK_SIZE) - { - chunks.pop_front(); - delete chunk; - } -*/ - if (chunk->idx + PLAYER_CHUNK_SIZE > WIRE_CHUNK_SIZE) - { -//std::cerr << "chunk->idx + PLAYER_CHUNK_SIZE >= WIRE_CHUNK_SIZE: " << chunk->idx + PLAYER_CHUNK_SIZE << " >= " << WIRE_CHUNK_SIZE << "\n"; - memcpy(&(playerChunk->payload[0]), &chunk->payload[chunk->idx], sizeof(int16_t)*(WIRE_CHUNK_SIZE - chunk->idx)); - missing = chunk->idx + PLAYER_CHUNK_SIZE - WIRE_CHUNK_SIZE; - chunks.pop_front(); - delete chunk; - - chunk = getNextChunk(); - } - - memcpy(&(playerChunk->payload[0]), &chunk->payload[chunk->idx], sizeof(int16_t)*missing); - - addMs(chunk, -PLAYER_CHUNK_MS); - chunk->idx += missing; - if (chunk->idx >= WIRE_CHUNK_SIZE) - { - chunks.pop_front(); - delete chunk; - } - return playerChunk; - } - - - PlayerChunk* getChunk(double outputBufferDacTime, unsigned long framesPerBuffer) - { - int correction(0); - if (pBuffer->full() && (abs(median) <= PLAYER_CHUNK_MS)) - correction = median; - - PlayerChunk* playerChunk = getNextPlayerChunk(correction); - int age = getAge(playerChunk) - bufferMs + outputBufferDacTime*1000; - pBuffer->add(age); - pShortBuffer->add(age); -// std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n"; - - int sleep(0); - time_t now = time(NULL); - if (now != lastUpdate) - { - lastUpdate = now; - median = pBuffer->median(); - shortMedian = pShortBuffer->median(); - if (abs(age) > 300) - sleep = age; - if (pShortBuffer->full() && (abs(shortMedian) > WIRE_CHUNK_MS)) - sleep = shortMedian; - if (pBuffer->full() && (abs(median) > PLAYER_CHUNK_MS)) - sleep = median; - std::cerr << "Chunk: " << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << outputBufferDacTime*1000 << "\n"; - } - - if (sleep != 0) - { - std::cerr << "Sleep: " << sleep << "\n"; - pBuffer->clear(); - pShortBuffer->clear(); - if (sleep < 0) - { - sleepMs(100-sleep); - } - else - { - for (size_t n=0; n<(size_t)(sleep / PLAYER_CHUNK_MS); ++n) - { - delete playerChunk; - playerChunk = getNextPlayerChunk(); - } - } - } - - -// int age = getAge(*lastPlayerChunk) + outputBufferDacTime*1000 - bufferMs; - return playerChunk; - } - -/* if (age < -500) - { - std::vector v; - if (skip < 0) - { - ++skip; - std::cerr << "chunk too new, sleeping\n";//age > WIRE_CHUNK_MS (" << age << " ms)\n"; - for (size_t n=0; n<(size_t)PLAYER_CHUNK_SIZE; ++n) - v.push_back(chunk->payload[n]); - return v; - } - } - - pBuffer->add(age); - pShortBuffer->add(age); - - time_t now = time(NULL); - - if (skip == 0) - { - if (now != lastUpdate) - { - lastUpdate = now; - median = pBuffer->median(); - shortMedian = pShortBuffer->median(); - if (abs(age) > 500) - skip = age / PLAYER_CHUNK_SIZE; - if (pShortBuffer->full() && (abs(shortMedian) > WIRE_CHUNK_MS)) - skip = shortMedian / PLAYER_CHUNK_SIZE; - if (pBuffer->full() && (abs(median) > WIRE_CHUNK_MS)) - skip = median / PLAYER_CHUNK_SIZE; - } - std::cerr << "Chunk: " << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << chunkCount << "\t" << outputBufferDacTime*1000 << "\n"; - } - - std::vector v; - if (skip < 0) - { - ++skip; - std::cerr << "chunk too new, sleeping\n";//age > WIRE_CHUNK_MS (" << age << " ms)\n"; - for (size_t n=0; n<(size_t)PLAYER_CHUNK_SIZE; ++n) - v.push_back(chunk->payload[n]); - return v; - } - - for (size_t n=chunk->idx; nidx + (size_t)PLAYER_CHUNK_SIZE; ++n) - { - v.push_back(chunk->payload[n]); - } -//std::cerr << "before: " << chunkTime(*chunk) << ", after: "; - addMs(*chunk, -PLAYER_CHUNK_MS); -//std::cerr << chunkTime(*chunk) << "\n"; - chunk->idx += PLAYER_CHUNK_SIZE; - if (chunk->idx >= WIRE_CHUNK_SIZE) - { -//std::cerr << "Chunk played out, deleting\n"; - chunks.pop_front(); - delete chunk; - } - return v; - } -*/ - - - - - - -/* - std::vector getChunk(double outputBufferDacTime, unsigned long framesPerBuffer) - { - while (1) - { - Chunk* chunk = NULL; - if (chunks.empty()) - cv.wait(*pLock); - - int age(0); - int chunkCount(0); - mutex.lock(); - chunk = chunks.front(); - - while (!chunks.empty()) - { - chunk = chunks.front(); - if (skip != 0) - { - pBuffer->clear(); - pShortBuffer->clear(); - shortMedian = 0; - median = 0; - } - if (skip > 0) - { - --skip; - std::cerr << "chunk too old, skipping\n";//age > WIRE_CHUNK_MS (" << age << " ms)\n"; - delete chunk; - chunk = NULL; - chunks.pop_front(); - } - else - break; - } - chunkCount = chunks.size(); - mutex.unlock(); - - if (chunk == NULL) - { - std::cerr << "no chunks available\n"; - continue; - } - - age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; - pBuffer->add(age); - pShortBuffer->add(age); - - time_t now = time(NULL); - - if (skip == 0) - { - if (now != lastUpdate) - { - lastUpdate = now; - median = pBuffer->median(); - shortMedian = pShortBuffer->median(); - if (abs(age) > 500) - skip = age / PLAYER_CHUNK_SIZE; - if (pShortBuffer->full() && (abs(shortMedian) > WIRE_CHUNK_MS)) - skip = shortMedian / PLAYER_CHUNK_SIZE; - if (pBuffer->full() && (abs(median) > WIRE_CHUNK_MS)) - skip = median / PLAYER_CHUNK_SIZE; - } - std::cerr << "Chunk: " << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << chunkCount << "\t" << outputBufferDacTime*1000 << "\n"; - } - - std::vector v; - if (skip < 0) - { - ++skip; - std::cerr << "chunk too new, sleeping\n";//age > WIRE_CHUNK_MS (" << age << " ms)\n"; - for (size_t n=0; n<(size_t)PLAYER_CHUNK_SIZE; ++n) - v.push_back(chunk->payload[n]); - return v; - } - - for (size_t n=chunk->idx; nidx + (size_t)PLAYER_CHUNK_SIZE; ++n) - { - v.push_back(chunk->payload[n]); - } -//std::cerr << "before: " << chunkTime(*chunk) << ", after: "; - addMs(*chunk, -PLAYER_CHUNK_MS); -//std::cerr << chunkTime(*chunk) << "\n"; - chunk->idx += PLAYER_CHUNK_SIZE; - if (chunk->idx >= WIRE_CHUNK_SIZE) - { -//std::cerr << "Chunk played out, deleting\n"; - chunks.pop_front(); - delete chunk; - } - return v; - } - } -*/ - - -/* - std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << pBuffer->size() << "\t" << chunkCount << "\n"; - pBuffer->add(age); - pShortBuffer->add(age); - time_t now = time(NULL); - - if (skip == 0) - { - int age = 0; - int median = 0; - int shortMedian = 0; - - if (now != lastUpdate) - { - lastUpdate = now; - median = pBuffer->median(); - shortMedian = pShortBuffer->median(); - std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << chunkCount << "\t" << outputBufferDacTime*1000 << "\n"; - } - if ((age > 500) || (age < -500)) - skip = age / PLAYER_CHUNK_MS; - else if (pShortBuffer->full() && ((shortMedian > 100) || (shortMedian < -100))) - skip = shortMedian / PLAYER_CHUNK_MS; - else if (pBuffer->full() && ((median > 10) || (median < -10))) - skip = median / PLAYER_CHUNK_MS; - } - - if (skip != 0) - { -// std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << shortMedian << "\t" << median << "\t" << buffer->size() << "\t" << outputBufferDacTime*1000 << "\n"; - } - - // bool silence = (age < -500) || (shortBuffer.full() && (shortMedian < -100)) || (buffer.full() && (median < -15)); - if (skip > 0) - { - skip--; - chunks.pop_front(); - delete chunk; - std::cerr << "packe too old, dropping\n"; - pBuffer->clear(); - pShortBuffer->clear(); - usleep(100); - } - else if (skip < 0) - { - skip++; - chunk = new Chunk();//PlayerChunk(); - memset(&(chunk->payload[0]), 0, sizeof(int16_t)*PLAYER_CHUNK_SIZE); - // std::cerr << "age < bufferMs (" << age << " < " << bufferMs << "), playing silence\n"; - pBuffer->clear(); - pShortBuffer->clear(); - usleep(100); - break; - } - else - { - chunks.pop_front(); - break; - } - } - - std::vector v; - for (size_t n=0; n 0) - usleep(ms * 1000); - } + void sleepMs(int ms); std::deque chunks; std::mutex mtx; - std::mutex mutex; std::unique_lock* pLock; std::condition_variable cv; DoubleBuffer* pBuffer; @@ -432,22 +42,6 @@ private: }; -// std::cerr << chunk->tv_sec << "\t" << now.tv_sec << "\n"; -/* for (size_t n=0; ntv_sec = chunk->tv_sec; - playerChunk->tv_usec = chunk->tv_usec; - addMs(*playerChunk, n*PLAYER_CHUNK_MS); - memcpy(&(playerChunk->payload[0]), &chunk->payload[n*PLAYER_CHUNK_SIZE], sizeof(int16_t)*PLAYER_CHUNK_SIZE); - mutex.lock(); - chunks.push_back(playerChunk); - mutex.unlock(); - cv.notify_all(); - } -*/ - - #endif diff --git a/timeUtils.h b/timeUtils.h index f1fd1967..3edbb849 100644 --- a/timeUtils.h +++ b/timeUtils.h @@ -2,6 +2,7 @@ #define TIME_UTILS_H #include "chunk.h" +#include std::string timeToStr(const timeval& timestamp) {