git-svn-id: svn://elaine/murooma/trunk@96 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-07-11 22:20:48 +00:00
parent ebc2890679
commit a27f926f4b
4 changed files with 19 additions and 116 deletions

View file

@ -29,7 +29,7 @@ struct ChunkT
typedef ChunkT<WIRE_CHUNK_SIZE> Chunk; typedef ChunkT<WIRE_CHUNK_SIZE> Chunk;
typedef ChunkT<PLAYER_CHUNK_SIZE> PlayerChunk; //typedef ChunkT<PLAYER_CHUNK_SIZE> PlayerChunk;
#endif #endif

View file

@ -19,18 +19,29 @@
#include "stream.h" #include "stream.h"
std::deque<int> timeDiffs;
int bufferMs; int bufferMs;
Stream* stream;
void player() void player()
{ {
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://192.168.0.2:123458");
const char* filter = "";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
zmq::message_t update;
while (1)
{
subscriber.recv(&update);
stream->addChunk((Chunk*)(update.data()));
}
} }
Stream* stream;
/* This routine will be called by the PortAudio engine when audio is needed. /* This routine will be called by the PortAudio engine when audio is needed.
** It may called at interrupt level on some machines so don't do anything ** It may called at interrupt level on some machines so don't do anything
@ -42,7 +53,6 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer,
PaStreamCallbackFlags statusFlags, PaStreamCallbackFlags statusFlags,
void *userData ) void *userData )
{ {
// std::cerr << "outputBufferDacTime: " << timeInfo->outputBufferDacTime*1000 << "\n";
Stream* stream = (Stream*)userData; Stream* stream = (Stream*)userData;
short* out = (short*)outputBuffer; short* out = (short*)outputBuffer;
@ -51,25 +61,9 @@ static int patestCallback( const void *inputBuffer, void *outputBuffer,
(void) inputBuffer; (void) inputBuffer;
stream->getChunk(out, timeInfo->outputBufferDacTime, framesPerBuffer); stream->getChunk(out, timeInfo->outputBufferDacTime, framesPerBuffer);
/* for (size_t n=0; n<framesPerBuffer; n++)
{
*out++ = playerChunk->payload[2*n];
*out++ = playerChunk->payload[2*n+1];
}
delete playerChunk;
*/
return paContinue; return paContinue;
} }
/*
* This routine is called by portaudio when playback is done.
*/
static void StreamFinished( void* userData )
{
// paTestData *data = (paTestData *) userData;
// printf( "Stream Completed: %s\n", data->message );
}
int initAudio() int initAudio()
@ -120,15 +114,9 @@ int initAudio()
stream ); stream );
if( err != paNoError ) goto error; if( err != paNoError ) goto error;
err = Pa_SetStreamFinishedCallback( paStream, &StreamFinished );
if( err != paNoError ) goto error;
err = Pa_StartStream( paStream ); err = Pa_StartStream( paStream );
if( err != paNoError ) goto error; if( err != paNoError ) goto error;
// printf("Play for %d seconds.\n", NUM_SECONDS );
// Pa_Sleep( NUM_SECONDS * 1000 );
// err = Pa_StopStream( paStream ); // err = Pa_StopStream( paStream );
// if( err != paNoError ) goto error; // if( err != paNoError ) goto error;
@ -148,42 +136,20 @@ error:
} }
int main (int argc, char *argv[]) int main (int argc, char *argv[])
{ {
bufferMs = 300; bufferMs = 300;
if (argc > 1) if (argc > 1)
bufferMs = atoi(argv[1]); bufferMs = atoi(argv[1]);
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
// subscriber.connect("tcp://127.0.0.1:123458");
subscriber.connect("tcp://192.168.0.2:123458");
const char* filter = "";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
/* std::thread playerThread(player);
struct sched_param params;
params.sched_priority = sched_get_priority_max(SCHED_FIFO);
int ret = pthread_setschedparam(playerThread.native_handle(), SCHED_FIFO, &params);
if (ret != 0)
std::cerr << "Unsuccessful in setting thread realtime prio" << std::endl;
*/
stream = new Stream(); stream = new Stream();
stream->setBufferLen(bufferMs); stream->setBufferLen(bufferMs);
initAudio(); initAudio();
Chunk* chunk;// = new Chunk(); std::thread playerThread(player);
while (1) playerThread.join();
{
zmq::message_t update;
subscriber.recv(&update);
// timeval now;
// gettimeofday(&now, NULL);
// memcpy(chunk, update.data(), sizeof(Chunk));
chunk = (Chunk*)(update.data());
// std::cerr << "New chunk: " << chunkTime(*chunk) << "\t" << timeToStr(now) << "\t" << getAge(*chunk) << "\n";
stream->addChunk(chunk);
}
return 0; return 0;
} }

View file

@ -4,10 +4,8 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
Stream::Stream() : sleep(0), lastPlayerChunk(NULL), median(0), shortMedian(0), lastUpdate(0), skip(0), idx(0) Stream::Stream() : sleep(0), median(0), shortMedian(0), lastUpdate(0)
{ {
silentPlayerChunk = new PlayerChunk();
playerChunk = (short*)malloc(sizeof(short)*PLAYER_CHUNK_SIZE);
pBuffer = new DoubleBuffer<int>(30000 / PLAYER_CHUNK_MS); pBuffer = new DoubleBuffer<int>(30000 / PLAYER_CHUNK_MS);
pShortBuffer = new DoubleBuffer<int>(5000 / PLAYER_CHUNK_MS); pShortBuffer = new DoubleBuffer<int>(5000 / PLAYER_CHUNK_MS);
pLock = new std::unique_lock<std::mutex>(mtx); pLock = new std::unique_lock<std::mutex>(mtx);
@ -55,38 +53,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction)
Chunk* chunk = getNextChunk(); Chunk* chunk = getNextChunk();
timeval tv = getTimeval(chunk); timeval tv = getTimeval(chunk);
//std::cerr << "GetNextPlayerChunk: " << correction << "\n";
// int age(0);
// age = getAge(*chunk) + outputBufferDacTime*1000 - bufferMs;
// std::cerr << "age: " << age << " \tidx: " << chunk->idx << "\n";
/* double factor = (double)PLAYER_CHUNK_MS / (double)(PLAYER_CHUNK_MS + correction);
size_t idx(0);
size_t idxCorrection(0);
for (size_t n=0; n<PLAYER_CHUNK_SIZE/2; ++n)
{
idx = chunk->idx + 2*floor(n*factor) - idxCorrection;
//std::cerr << factor << "\t" << n << "\t" << idx << "\n";
if (idx >= WIRE_CHUNK_SIZE)
{
idxCorrection = 2*floor(n*factor);
idx = 0;
chunks.pop_front();
delete chunk;
chunk = getNextChunk();
}
playerChunk->payload[2*n] = chunk->payload[idx];
playerChunk->payload[2*n+1] = chunk->payload[idx + 1];
}
addMs(chunk, -PLAYER_CHUNK_MS - correction);
chunk->idx = idx;
if (idx >= WIRE_CHUNK_SIZE)
{
chunks.pop_front();
delete chunk;
}
*/
if (correction != 0) if (correction != 0)
{ {
int factor = ceil((float)PLAYER_CHUNK_MS / (float)abs(correction)); int factor = ceil((float)PLAYER_CHUNK_MS / (float)abs(correction));
@ -97,7 +63,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction)
{ {
*(outputBuffer + 2*n) = chunk->payload[idx]; *(outputBuffer + 2*n) = chunk->payload[idx];
*(outputBuffer + 2*n+1) = chunk->payload[idx + 1]; *(outputBuffer + 2*n+1) = chunk->payload[idx + 1];
//std::cerr << 2*n << "\t" << idx << "\n";
if (n % factor == 0) if (n % factor == 0)
{ {
if (correction < 0) if (correction < 0)
@ -111,28 +76,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction)
samples += 2; samples += 2;
idx += 2; idx += 2;
} }
/* if (correction > 0)
{
if (n % 2 != 0)
{
samples += 4;
idx += 4;
}
else
{
samples += 2;
idx += 2;
}
}
else if (correction < 0)
{
if (n % 3 != 0)
{
samples += 2;
idx += 2;
}
}
*/
if (idx >= WIRE_CHUNK_SIZE) if (idx >= WIRE_CHUNK_SIZE)
{ {
//std::cerr << "idx >= WIRE_CHUNK_SIZE: " << idx << "\t" << WIRE_CHUNK_SIZE << "\n"; //std::cerr << "idx >= WIRE_CHUNK_SIZE: " << idx << "\t" << WIRE_CHUNK_SIZE << "\n";

View file

@ -33,16 +33,10 @@ private:
DoubleBuffer<int>* pBuffer; DoubleBuffer<int>* pBuffer;
DoubleBuffer<int>* pShortBuffer; DoubleBuffer<int>* pShortBuffer;
PlayerChunk* lastPlayerChunk;
PlayerChunk* silentPlayerChunk;
short* playerChunk;
int median; int median;
int shortMedian; int shortMedian;
time_t lastUpdate; time_t lastUpdate;
int bufferMs; int bufferMs;
int skip;
size_t idx;
}; };