diff --git a/Makefile b/Makefile index 710133c7..dc786001 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ VERSION = 0.01 CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -g -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" -LDFLAGS = -lzmq -lpthread +LDFLAGS = -lzmq -lpthread -lportaudio OBJ_SERVER = server.o BIN_SERVER = server diff --git a/client.cpp b/client.cpp index e2d7817d..79768d8d 100644 --- a/client.cpp +++ b/client.cpp @@ -16,18 +16,22 @@ #include #include #include +#include -const size_t ms(20); + +#define MS (50) //44100 / 20 = 2205 -const size_t size(44100*4*ms/1000); +#define SAMPLE_RATE (44100) +#define SIZE (SAMPLE_RATE*4*MS/1000) int bufferMs; +#define FRAMES_PER_BUFFER (SIZE/4) struct Chunk { int32_t tv_sec; int32_t tv_usec; - char payload[size]; + char payload[SIZE]; }; std::deque chunks; @@ -74,19 +78,40 @@ int getAge(const Chunk& chunk) ts.tv_usec = chunk.tv_usec; return diff_ms(now, ts); } - + +long getTickCount() +{ + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + return now.tv_sec*1000 + now.tv_nsec / 1000000; +} void player() { std::unique_lock lck(mtx); bool playing = true; +// struct timespec last; + while (1) + { + if (chunks.empty()) + cv.wait(lck); + mutex.lock(); + std::cerr << "Chunks: " << chunks.size() << "\t" << getAge(*(chunks.front())) << "\n"; + if (getAge(*(chunks.front())) > bufferMs) + { + mutex.unlock(); + break; + } + mutex.unlock(); + } + while (1) { if (chunks.empty()) cv.wait(lck); mutex.lock(); - std::cerr << "Chunks: " << chunks.size() << "\n"; Chunk* chunk = chunks.front(); +// std::cerr << "Chunks: " << chunks.size() << "\n"; chunks.pop_front(); mutex.unlock(); @@ -94,20 +119,26 @@ void player() if (playing) { - for (size_t n=0; npayload[n];// << std::flush; std::cout << std::flush; - - int age = getAge(*chunk) - bufferMs; - std::cerr << "Age: " << age << "\n"; - if (age < 0) + long after = getTickCount(); +// std::cerr << "After: " << after << " (" << after - now << ")\n"; + if (after - now > MS / 2) + usleep(((after - now) / 2) * 1000); + +// int age = getAge(*chunk); +// std::cerr << "Age: " << age << "\n"; +/* if (age < 0) { std::cerr << "Sleeping, age: " << age / 2 << "\n"; usleep((-age / 2) * 1000 - 100); } else std::cerr << "Dropping Chunk, age: " << age << "\n"; - +*/ /* int age = getAge(*chunk) - bufferMs; if (age < 10) { @@ -135,6 +166,128 @@ void player() } + +/* This routine will be called by the PortAudio engine when audio is needed. +** It may called at interrupt level on some machines so don't do anything +** that could mess up the system like calling malloc() or free(). +*/ +static int patestCallback( const void *inputBuffer, void *outputBuffer, + unsigned long framesPerBuffer, + const PaStreamCallbackTimeInfo* timeInfo, + PaStreamCallbackFlags statusFlags, + void *userData ) +{ + std::cerr << "outputBufferDacTime: " << timeInfo->outputBufferDacTime*1000 << "\n"; + std::deque* chunks = (std::deque*)userData; + short* out = (short*)outputBuffer; + unsigned long i; + + (void) timeInfo; /* Prevent unused variable warnings. */ + (void) statusFlags; + (void) inputBuffer; + + std::unique_lock lck(mtx); + int age = 0; + Chunk* chunk = NULL; + while (1) + { + if (chunks->empty()) + cv.wait(lck); + mutex.lock(); + chunk = chunks->front(); + std::cerr << "Chunks: " << chunks->size() << "\n"; + chunks->pop_front(); + mutex.unlock(); + age = getAge(*chunk) + timeInfo->outputBufferDacTime*1000; + std::cerr << "age: " << getAge(*chunk) << "\t" << age << "\n"; + if (age > bufferMs) + delete chunk; + else + break; + } + + for( i=0; ipayload[i] << "\t" << (int)chunk->payload[i+1] << "\t" << (int)chunk->payload[i+2] << "\t" << (int)chunk->payload[i+3] << "\n"; +//std::cerr << i << "\t" << 4*i+1 << "\t" << 4*i << "\n"; + *out++ = (int)chunk->payload[4*i+1]*256 + (int)chunk->payload[4*i]; + *out++ = (int)chunk->payload[4*i+3]*256 + (int)chunk->payload[4*i+2]; + } + delete chunk; + + return paContinue; +} + +/* + * This routine is called by portaudio when playback is done. + */ +static void StreamFinished( void* userData ) +{ +// paTestData *data = (paTestData *) userData; +// printf( "Stream Completed: %s\n", data->message ); +} + + +int initAudio() +{ + PaStreamParameters outputParameters; + PaStream *stream; + PaError err; + + printf("PortAudio Test: output sine wave. SR = %d, BufSize = %d\n", SAMPLE_RATE, FRAMES_PER_BUFFER); + + err = Pa_Initialize(); + if( err != paNoError ) goto error; + + outputParameters.device = Pa_GetDefaultOutputDevice(); /* default output device */ + if (outputParameters.device == paNoDevice) { + fprintf(stderr,"Error: No default output device.\n"); + goto error; + } + outputParameters.channelCount = 2; /* stereo output */ + outputParameters.sampleFormat = paInt16; /* 32 bit floating point output */ + outputParameters.suggestedLatency = Pa_GetDeviceInfo( outputParameters.device )->defaultLowOutputLatency; + outputParameters.hostApiSpecificStreamInfo = NULL; + + err = Pa_OpenStream( + &stream, + NULL, /* no input */ + &outputParameters, + SAMPLE_RATE, + FRAMES_PER_BUFFER, + paClipOff, /* we won't output out of range samples so don't bother clipping them */ + patestCallback, + &chunks ); + if( err != paNoError ) goto error; + + err = Pa_SetStreamFinishedCallback( stream, &StreamFinished ); + if( err != paNoError ) goto error; + + err = Pa_StartStream( stream ); + if( err != paNoError ) goto error; + +// printf("Play for %d seconds.\n", NUM_SECONDS ); +// Pa_Sleep( NUM_SECONDS * 1000 ); + +// err = Pa_StopStream( stream ); +// if( err != paNoError ) goto error; + +// err = Pa_CloseStream( stream ); +// if( err != paNoError ) goto error; + +// Pa_Terminate(); +// printf("Test finished.\n"); + + return err; +error: + Pa_Terminate(); + fprintf( stderr, "An error occured while using the portaudio stream\n" ); + fprintf( stderr, "Error number: %d\n", err ); + fprintf( stderr, "Error message: %s\n", Pa_GetErrorText( err ) ); + return err; +} + + int main (int argc, char *argv[]) { bufferMs = 300; @@ -146,13 +299,14 @@ int main (int argc, char *argv[]) const char* filter = ""; subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); - std::thread playerThread(player); +/* std::thread playerThread(player); struct sched_param params; params.sched_priority = sched_get_priority_max(SCHED_FIFO); int ret = pthread_setschedparam(playerThread.native_handle(), SCHED_FIFO, ¶ms); if (ret != 0) std::cerr << "Unsuccessful in setting thread realtime prio" << std::endl; - +*/ + initAudio(); while (1) { zmq::message_t update;