diff --git a/chunk.h b/chunk.h index b768ec21..a219d0b1 100644 --- a/chunk.h +++ b/chunk.h @@ -18,6 +18,7 @@ struct ChunkT { int32_t tv_sec; int32_t tv_usec; + int32_t idx; int16_t payload[T]; }; diff --git a/client.cpp b/client.cpp index 68620e65..72bfffd0 100644 --- a/client.cpp +++ b/client.cpp @@ -14,21 +14,13 @@ #include #include #include -#include -#include #include #include "chunk.h" -#include "doubleBuffer.h" #include "timeUtils.h" +#include "stream.h" -DoubleBuffer buffer(30000 / PLAYER_CHUNK_MS); -DoubleBuffer shortBuffer(500 / PLAYER_CHUNK_MS); -std::deque chunks; std::deque timeDiffs; -std::mutex mtx; -std::mutex mutex; -std::condition_variable cv; int bufferMs; @@ -44,9 +36,7 @@ void sleepMs(int ms) } -int skip(0); -time_t lastUpdate(0); - +Stream* stream; /* This routine will be called by the PortAudio engine when audio is needed. ** It may called at interrupt level on some machines so don't do anything @@ -59,89 +49,21 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer, void *userData ) { // std::cerr << "outputBufferDacTime: " << timeInfo->outputBufferDacTime*1000 << "\n"; - std::deque* chunks = (std::deque*)userData; + Stream* stream = (Stream*)userData; short* out = (short*)outputBuffer; - unsigned long i; (void) timeInfo; /* Prevent unused variable warnings. */ (void) statusFlags; (void) inputBuffer; - std::unique_lock lck(mtx); - int age = 0; - int median = 0; - int shortMedian = 0; - PlayerChunk* chunk = NULL; - while (1) + std::vector s = stream->getChunk(timeInfo->outputBufferDacTime, framesPerBuffer); + + for (size_t n=0; nempty()) - cv.wait(lck); - mutex.lock(); - chunk = chunks->front(); - int chunkCount = chunks->size(); - mutex.unlock(); - age = getAge(*chunk) + timeInfo->outputBufferDacTime*1000 - bufferMs; - buffer.add(age); - shortBuffer.add(age); - time_t now = time(NULL); - - if (skip == 0) - { - if (now != lastUpdate) - { - lastUpdate = now; - median = buffer.median(); - shortMedian = shortBuffer.median(); - std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << chunkCount << "\t" << timeInfo->outputBufferDacTime*1000 << "\n"; - } - if ((age > 500) || (age < -500)) - skip = age / PLAYER_CHUNK_MS; - else if (shortBuffer.full() && ((shortMedian > 100) || (shortMedian < -100))) - skip = shortMedian / PLAYER_CHUNK_MS; - else if (buffer.full() && ((median > 10) || (median < -10))) - skip = median / PLAYER_CHUNK_MS; - } - - if (skip != 0) - { - std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << timeInfo->outputBufferDacTime*1000 << "\n"; - } - -// bool silence = (age < -500) || (shortBuffer.full() && (shortMedian < -100)) || (buffer.full() && (median < -15)); - if (skip > 0) - { - skip--; - chunks->pop_front(); - delete chunk; - std::cerr << "packe too old, dropping\n"; - buffer.clear(); - shortBuffer.clear(); - usleep(100); - } - else if (skip < 0) - { - skip++; - chunk = new PlayerChunk(); - memset(&(chunk->payload[0]), 0, sizeof(int16_t)*PLAYER_CHUNK_SIZE); -// std::cerr << "age < bufferMs (" << age << " < " << bufferMs << "), playing silence\n"; - buffer.clear(); - shortBuffer.clear(); - usleep(100); - break; - } - else - { - chunks->pop_front(); - break; - } + *out++ = s[2*n]; + *out++ = s[2*n+1]; } - - for( i=0; ipayload[2*i]; - *out++ = chunk->payload[2*i+1]; - } - delete chunk; +// delete chunk; return paContinue; } @@ -159,7 +81,7 @@ static void StreamFinished( void* userData ) int initAudio() { PaStreamParameters outputParameters; - PaStream *stream; + PaStream *paStream; PaError err; printf("PortAudio Test: output sine wave. SR = %d, BufSize = %d\n", SAMPLE_RATE, FRAMES_PER_BUFFER); @@ -194,29 +116,29 @@ int initAudio() outputParameters.hostApiSpecificStreamInfo = NULL; err = Pa_OpenStream( - &stream, + &paStream, NULL, /* no input */ &outputParameters, SAMPLE_RATE, FRAMES_PER_BUFFER, paClipOff, /* we won't output out of range samples so don't bother clipping them */ patestCallback, - &chunks ); + stream ); if( err != paNoError ) goto error; - err = Pa_SetStreamFinishedCallback( stream, &StreamFinished ); + err = Pa_SetStreamFinishedCallback( paStream, &StreamFinished ); if( err != paNoError ) goto error; - err = Pa_StartStream( stream ); + err = Pa_StartStream( paStream ); if( err != paNoError ) goto error; // printf("Play for %d seconds.\n", NUM_SECONDS ); // Pa_Sleep( NUM_SECONDS * 1000 ); -// err = Pa_StopStream( stream ); +// err = Pa_StopStream( paStream ); // if( err != paNoError ) goto error; -// err = Pa_CloseStream( stream ); +// err = Pa_CloseStream( paStream ); // if( err != paNoError ) goto error; // Pa_Terminate(); @@ -251,30 +173,20 @@ int main (int argc, char *argv[]) if (ret != 0) std::cerr << "Unsuccessful in setting thread realtime prio" << std::endl; */ + stream = new Stream(); initAudio(); Chunk* chunk;// = new Chunk(); while (1) { zmq::message_t update; subscriber.recv(&update); + + timeval now; + gettimeofday(&now, NULL); + std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n"; // memcpy(chunk, update.data(), sizeof(Chunk)); chunk = (Chunk*)(update.data()); -// timeval now; -// gettimeofday(&now, NULL); -// std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n"; -// std::cerr << chunk->tv_sec << "\t" << now.tv_sec << "\n"; - for (size_t n=0; ntv_sec = chunk->tv_sec; - playerChunk->tv_usec = chunk->tv_usec; - addMs(*playerChunk, n*PLAYER_CHUNK_MS); - memcpy(&(playerChunk->payload[0]), &chunk->payload[n*PLAYER_CHUNK_SIZE], sizeof(int16_t)*PLAYER_CHUNK_SIZE); - mutex.lock(); - chunks.push_back(playerChunk); - mutex.unlock(); - cv.notify_all(); - } + stream->addChunk(chunk); } return 0; } diff --git a/server.cpp b/server.cpp index d6c10daf..c25557d1 100644 --- a/server.cpp +++ b/server.cpp @@ -58,6 +58,7 @@ int main () { chunk->tv_sec = ts.tv_sec; chunk->tv_usec = ts.tv_usec; + chunk->idx = 0; zmq::message_t message(sizeof(Chunk)); memcpy(message.data(), chunk, sizeof(Chunk)); // snprintf ((char *) message.data(), size, "%05d %d", zipcode, c); diff --git a/stream.h b/stream.h new file mode 100644 index 00000000..c51ec847 --- /dev/null +++ b/stream.h @@ -0,0 +1,174 @@ +#ifndef STREAM_H +#define STREAM_H + + +#include +#include +#include +#include +#include "doubleBuffer.h" +#include "timeUtils.h" +#include "chunk.h" +#include "timeUtils.h" + + +class Stream +{ +public: + Stream() : lastUpdate(0), skip(0), idx(0) + { + pBuffer = new DoubleBuffer(30000 / PLAYER_CHUNK_MS); + pShortBuffer = new DoubleBuffer(5000 / PLAYER_CHUNK_MS); + pLock = new std::unique_lock(mtx); + bufferMs = 500; + } + + void addChunk(Chunk* chunk) + { + Chunk* c = new Chunk(*chunk); + mutex.lock(); + chunks.push_back(c); + mutex.unlock(); + cv.notify_all(); + } + + std::vector getChunk(double outputBufferDacTime, unsigned long framesPerBuffer) + { + Chunk* chunk = NULL; + while (1) + { + if (chunks.empty()) + cv.wait(*pLock); + + int age(0); + int chunkCount(0); + mutex.lock(); + do + { + chunk = chunks.front(); + chunkCount = chunks.size(); + int age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; + if (age > WIRE_CHUNK_MS) + { + std::cerr << "age > WIRE_CHUNK_MS\n"; + chunks.pop_front(); + delete chunk; + chunk = NULL; + } + } + while (chunk == NULL); + mutex.unlock(); + + if (chunk == NULL) + { + std::cerr << "no chunks available\n"; + continue; + } + + std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << pBuffer->size() << "\t" << chunkCount << "\n"; + pBuffer->add(age); + pShortBuffer->add(age); + time_t now = time(NULL); + + if (skip == 0) + { + int age = 0; + int median = 0; + int shortMedian = 0; + + if (now != lastUpdate) + { + lastUpdate = now; + median = pBuffer->median(); + shortMedian = pShortBuffer->median(); + std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << chunkCount << "\t" << outputBufferDacTime*1000 << "\n"; + } + if ((age > 500) || (age < -500)) + skip = age / PLAYER_CHUNK_MS; + else if (pShortBuffer->full() && ((shortMedian > 100) || (shortMedian < -100))) + skip = shortMedian / PLAYER_CHUNK_MS; + else if (pBuffer->full() && ((median > 10) || (median < -10))) + skip = median / PLAYER_CHUNK_MS; + } + + if (skip != 0) + { +// std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\t" << shortMedian << "\t" << median << "\t" << buffer->size() << "\t" << outputBufferDacTime*1000 << "\n"; + } + + // bool silence = (age < -500) || (shortBuffer.full() && (shortMedian < -100)) || (buffer.full() && (median < -15)); + if (skip > 0) + { + skip--; + chunks.pop_front(); + delete chunk; + std::cerr << "packe too old, dropping\n"; + pBuffer->clear(); + pShortBuffer->clear(); + usleep(100); + } + else if (skip < 0) + { + skip++; + chunk = new Chunk();//PlayerChunk(); + memset(&(chunk->payload[0]), 0, sizeof(int16_t)*PLAYER_CHUNK_SIZE); + // std::cerr << "age < bufferMs (" << age << " < " << bufferMs << "), playing silence\n"; + pBuffer->clear(); + pShortBuffer->clear(); + usleep(100); + break; + } + else + { + chunks.pop_front(); + break; + } + } + + std::vector v; + for (size_t n=0; n chunks; + std::mutex mtx; + std::mutex mutex; + std::unique_lock* pLock; + std::condition_variable cv; + DoubleBuffer* pBuffer; + DoubleBuffer* pShortBuffer; + + time_t lastUpdate; + int bufferMs; + int skip; + size_t idx; +}; + + +// std::cerr << chunk->tv_sec << "\t" << now.tv_sec << "\n"; +/* for (size_t n=0; ntv_sec = chunk->tv_sec; + playerChunk->tv_usec = chunk->tv_usec; + addMs(*playerChunk, n*PLAYER_CHUNK_MS); + memcpy(&(playerChunk->payload[0]), &chunk->payload[n*PLAYER_CHUNK_SIZE], sizeof(int16_t)*PLAYER_CHUNK_SIZE); + mutex.lock(); + chunks.push_back(playerChunk); + mutex.unlock(); + cv.notify_all(); + } +*/ + + + +#endif + +