diff --git a/chunk.h b/chunk.h index 679d7a9d..ce41adf7 100644 --- a/chunk.h +++ b/chunk.h @@ -29,7 +29,7 @@ struct ChunkT typedef ChunkT Chunk; -typedef ChunkT PlayerChunk; +//typedef ChunkT PlayerChunk; #endif diff --git a/client.cpp b/client.cpp index 9b802a37..d5cabd26 100644 --- a/client.cpp +++ b/client.cpp @@ -19,18 +19,29 @@ #include "stream.h" -std::deque timeDiffs; int bufferMs; +Stream* stream; void player() { + zmq::context_t context (1); + zmq::socket_t subscriber (context, ZMQ_SUB); + subscriber.connect("tcp://192.168.0.2:123458"); + + const char* filter = ""; + subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); + zmq::message_t update; + while (1) + { + subscriber.recv(&update); + stream->addChunk((Chunk*)(update.data())); + } } -Stream* stream; /* This routine will be called by the PortAudio engine when audio is needed. ** It may called at interrupt level on some machines so don't do anything @@ -42,7 +53,6 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer, PaStreamCallbackFlags statusFlags, void *userData ) { -// std::cerr << "outputBufferDacTime: " << timeInfo->outputBufferDacTime*1000 << "\n"; Stream* stream = (Stream*)userData; short* out = (short*)outputBuffer; @@ -51,25 +61,9 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer, (void) inputBuffer; stream->getChunk(out, timeInfo->outputBufferDacTime, framesPerBuffer); - -/* for (size_t n=0; npayload[2*n]; - *out++ = playerChunk->payload[2*n+1]; - } - delete playerChunk; -*/ return paContinue; } -/* - * This routine is called by portaudio when playback is done. - */ -static void StreamFinished( void* userData ) -{ -// paTestData *data = (paTestData *) userData; -// printf( "Stream Completed: %s\n", data->message ); -} int initAudio() @@ -120,15 +114,9 @@ int initAudio() stream ); if( err != paNoError ) goto error; - err = Pa_SetStreamFinishedCallback( paStream, &StreamFinished ); - if( err != paNoError ) goto error; - err = Pa_StartStream( paStream ); if( err != paNoError ) goto error; -// printf("Play for %d seconds.\n", NUM_SECONDS ); -// Pa_Sleep( NUM_SECONDS * 1000 ); - // err = Pa_StopStream( paStream ); // if( err != paNoError ) goto error; @@ -148,42 +136,20 @@ error: } + + int main (int argc, char *argv[]) { bufferMs = 300; if (argc > 1) bufferMs = atoi(argv[1]); - zmq::context_t context (1); - zmq::socket_t subscriber (context, ZMQ_SUB); -// subscriber.connect("tcp://127.0.0.1:123458"); - subscriber.connect("tcp://192.168.0.2:123458"); - const char* filter = ""; - subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); -/* std::thread playerThread(player); - struct sched_param params; - params.sched_priority = sched_get_priority_max(SCHED_FIFO); - int ret = pthread_setschedparam(playerThread.native_handle(), SCHED_FIFO, ¶ms); - if (ret != 0) - std::cerr << "Unsuccessful in setting thread realtime prio" << std::endl; -*/ stream = new Stream(); stream->setBufferLen(bufferMs); - initAudio(); - Chunk* chunk;// = new Chunk(); - while (1) - { - zmq::message_t update; - subscriber.recv(&update); + std::thread playerThread(player); + playerThread.join(); -// timeval now; -// gettimeofday(&now, NULL); -// memcpy(chunk, update.data(), sizeof(Chunk)); - chunk = (Chunk*)(update.data()); -// std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n"; - stream->addChunk(chunk); - } return 0; } diff --git a/stream.cpp b/stream.cpp index 1e7daf79..f45f61f5 100644 --- a/stream.cpp +++ b/stream.cpp @@ -4,10 +4,8 @@ #include #include -Stream::Stream() : sleep(0), lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) +Stream::Stream() : sleep(0), median(0), shortMedian(0), lastUpdate(0) { - silentPlayerChunk = new PlayerChunk(); - playerChunk = (short*)malloc(sizeof(short)*PLAYER_CHUNK_SIZE); pBuffer = new DoubleBuffer(30000 / PLAYER_CHUNK_MS); pShortBuffer = new DoubleBuffer(5000 / PLAYER_CHUNK_MS); pLock = new std::unique_lock(mtx); @@ -55,38 +53,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) Chunk* chunk = getNextChunk(); timeval tv = getTimeval(chunk); -//std::cerr << "GetNextPlayerChunk: " << correction << "\n"; -// int age(0); -// age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs; -// std::cerr << "age: " << age << " \tidx: " << chunk->idx << "\n"; - -/* double factor = (double)PLAYER_CHUNK_MS / (double)(PLAYER_CHUNK_MS + correction); - size_t idx(0); - size_t idxCorrection(0); - for (size_t n=0; nidx + 2*floor(n*factor) - idxCorrection; -//std::cerr << factor << "\t" << n << "\t" << idx << "\n"; - if (idx >= WIRE_CHUNK_SIZE) - { - idxCorrection = 2*floor(n*factor); - idx = 0; - chunks.pop_front(); - delete chunk; - chunk = getNextChunk(); - } - playerChunk->payload[2*n] = chunk->payload[idx]; - playerChunk->payload[2*n+1] = chunk->payload[idx + 1]; - } - addMs(chunk, -PLAYER_CHUNK_MS - correction); - chunk->idx = idx; - if (idx >= WIRE_CHUNK_SIZE) - { - chunks.pop_front(); - delete chunk; - } -*/ - if (correction != 0) { int factor = ceil((float)PLAYER_CHUNK_MS / (float)abs(correction)); @@ -97,7 +63,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) { *(outputBuffer + 2*n) = chunk->payload[idx]; *(outputBuffer + 2*n+1) = chunk->payload[idx + 1]; -//std::cerr << 2*n << "\t" << idx << "\n"; if (n % factor == 0) { if (correction < 0) @@ -111,28 +76,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) samples += 2; idx += 2; } -/* if (correction > 0) - { - if (n % 2 != 0) - { - samples += 4; - idx += 4; - } - else - { - samples += 2; - idx += 2; - } - } - else if (correction < 0) - { - if (n % 3 != 0) - { - samples += 2; - idx += 2; - } - } -*/ if (idx >= WIRE_CHUNK_SIZE) { //std::cerr << "idx >= WIRE_CHUNK_SIZE: " << idx << "\t" << WIRE_CHUNK_SIZE << "\n"; diff --git a/stream.h b/stream.h index 8e743bb9..4cb98ba5 100644 --- a/stream.h +++ b/stream.h @@ -33,16 +33,10 @@ private: DoubleBuffer* pBuffer; DoubleBuffer* pShortBuffer; - PlayerChunk* lastPlayerChunk; - PlayerChunk* silentPlayerChunk; - short* playerChunk; - int median; int shortMedian; time_t lastUpdate; int bufferMs; - int skip; - size_t idx; };