wireChunk

git-svn-id: svn://elaine/murooma/trunk@134 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-08-04 06:43:10 +00:00
parent 042000c8cc
commit b8cc8e8a67
7 changed files with 75 additions and 38 deletions

View file

@ -5,7 +5,7 @@ LDFLAGS = -lrt -lzmq -lpthread -lportaudio
OBJ_SERVER = server.o OBJ_SERVER = server.o
BIN_SERVER = server BIN_SERVER = server
OBJ_CLIENT = client.o stream.o OBJ_CLIENT = client.o stream.o chunk.o
BIN_CLIENT = client BIN_CLIENT = client
OBJ = $(OBJ_SERVER) $(OBJ_CLIENT) OBJ = $(OBJ_SERVER) $(OBJ_CLIENT)

53
chunk.h
View file

@ -1,6 +1,8 @@
#ifndef CHUNK_H #ifndef CHUNK_H
#define CHUNK_H #define CHUNK_H
#include <chrono>
#define SAMPLE_RATE (48000) #define SAMPLE_RATE (48000)
//#define SAMPLE_BIT (16) //#define SAMPLE_BIT (16)
#define CHANNELS (2) #define CHANNELS (2)
@ -16,15 +18,62 @@
#define FRAMES_PER_BUFFER ((SAMPLE_RATE*PLAYER_CHUNK_MS)/1000) #define FRAMES_PER_BUFFER ((SAMPLE_RATE*PLAYER_CHUNK_MS)/1000)
struct Chunk typedef std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::milliseconds> time_point_ms;
struct WireChunk
{ {
int32_t tv_sec; int32_t tv_sec;
int32_t tv_usec; int32_t tv_usec;
int32_t idx;
int16_t payload[WIRE_CHUNK_SIZE]; int16_t payload[WIRE_CHUNK_SIZE];
}; };
class Chunk
{
public:
Chunk(WireChunk* _wireChunk);
~Chunk();
int read(short* _outputBuffer, int _count);
bool isEndOfChunk();
inline time_point_ms timePoint()
{
time_point_ms tp;
return tp + std::chrono::seconds(wireChunk->tv_sec) + std::chrono::milliseconds(wireChunk->tv_usec / 1000) + std::chrono::milliseconds(idx / WIRE_CHUNK_MS_SIZE);
}
template<typename T>
inline T getAge()
{
return getAge<T>(timePoint());
}
inline long getAge()
{
return getAge<std::chrono::milliseconds>().count();
}
inline static long getAge(const time_point_ms& time_point)
{
return getAge<std::chrono::milliseconds>(time_point).count();
}
template<typename T, typename U>
static inline T getAge(const std::chrono::time_point<U>& time_point)
{
return std::chrono::duration_cast<T>(std::chrono::high_resolution_clock::now() - time_point);
}
private:
int32_t idx;
WireChunk* wireChunk;
};
#endif #endif

View file

@ -42,7 +42,7 @@ void player()
while (1) while (1)
{ {
subscriber.recv(&update); subscriber.recv(&update);
stream->addChunk((Chunk*)(update.data())); stream->addChunk(new Chunk((WireChunk*)(update.data())));
} }
} }

View file

@ -29,7 +29,7 @@ int main () {
publisher.bind("tcp://0.0.0.0:123458"); publisher.bind("tcp://0.0.0.0:123458");
char c[2]; char c[2];
Chunk* chunk = new Chunk(); WireChunk* chunk = new WireChunk();
timeval tvChunk; timeval tvChunk;
gettimeofday(&tvChunk, NULL); gettimeofday(&tvChunk, NULL);
long nextTick = getTickCount(); long nextTick = getTickCount();
@ -50,9 +50,8 @@ int main () {
chunk->tv_sec = tvChunk.tv_sec; chunk->tv_sec = tvChunk.tv_sec;
chunk->tv_usec = tvChunk.tv_usec; chunk->tv_usec = tvChunk.tv_usec;
chunk->idx = 0; zmq::message_t message(sizeof(WireChunk));
zmq::message_t message(sizeof(Chunk)); memcpy(message.data(), chunk, sizeof(WireChunk));
memcpy(message.data(), chunk, sizeof(Chunk));
publisher.send(message); publisher.send(message);
addMs(tvChunk, WIRE_CHUNK_MS); addMs(tvChunk, WIRE_CHUNK_MS);
@ -64,8 +63,8 @@ int main () {
} }
else else
{ {
cin.sync(); cin.sync();
gettimeofday(&tvChunk, NULL); gettimeofday(&tvChunk, NULL);
nextTick = getTickCount(); nextTick = getTickCount();
} }
} }

View file

@ -55,8 +55,8 @@ void Stream::getSilentPlayerChunk(short* outputBuffer)
time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction) time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction)
{ {
Chunk* chunk = getNextChunk(); Chunk* chunk = getNextChunk();
time_point_ms tp = timePoint(chunk); time_point_ms tp = chunk->timePoint();
/*
if (correction != 0) if (correction != 0)
{ {
float idx(chunk->idx); float idx(chunk->idx);
@ -80,7 +80,7 @@ time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction)
// std::cerr << "Diff: " << diff_ms(getTimeval(chunk), tv) << "\t" << chunk->idx / PLAYER_CHUNK_MS_SIZE << "\n"; // std::cerr << "Diff: " << diff_ms(getTimeval(chunk), tv) << "\t" << chunk->idx / PLAYER_CHUNK_MS_SIZE << "\n";
} }
else else
{ */ {
/* int idx(chunk->idx); /* int idx(chunk->idx);
for (size_t n=0; n<PLAYER_CHUNK_SIZE; n+=2) for (size_t n=0; n<PLAYER_CHUNK_SIZE; n+=2)
{ {
@ -102,27 +102,17 @@ time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction)
} }
chunk->idx = idx; chunk->idx = idx;
*/ */
size_t missing = PLAYER_CHUNK_SIZE;// + correction*PLAYER_CHUNK_MS_SIZE; int read = 0;
if (chunk->idx + PLAYER_CHUNK_SIZE > WIRE_CHUNK_SIZE) while (read < PLAYER_CHUNK_SIZE)
{ {
if (outputBuffer != NULL) read += chunk->read(outputBuffer + read, PLAYER_CHUNK_SIZE - read);
memcpy(outputBuffer, &chunk->payload[chunk->idx], sizeof(int16_t)*(WIRE_CHUNK_SIZE - chunk->idx)); if (chunk->isEndOfChunk())
missing = chunk->idx + PLAYER_CHUNK_SIZE - WIRE_CHUNK_SIZE; {
chunks.pop_front(); chunks.pop_front();
delete chunk; delete chunk;
chunk = getNextChunk(); chunk = getNextChunk();
}
} }
if (outputBuffer != NULL)
memcpy((outputBuffer + PLAYER_CHUNK_SIZE - missing), &chunk->payload[chunk->idx], sizeof(int16_t)*missing);
chunk->idx += missing;
if (chunk->idx >= WIRE_CHUNK_SIZE)
{
chunks.pop_front();
delete chunk;
}
} }
return tp; return tp;
@ -147,11 +137,11 @@ void Stream::getChunk(short* outputBuffer, double outputBufferDacTime, unsigned
else else
{ {
for (size_t i=0; i<chunks.size(); ++i) for (size_t i=0; i<chunks.size(); ++i)
std::cerr << "Chunk " << i << ": " << getAge(chunks[i]) - bufferMs << "\n"; std::cerr << "Chunk " << i << ": " << chunks[i]->getAge() - bufferMs << "\n";
while (true)// (int i=0; i<(int)(round((float)sleep / (float)PLAYER_CHUNK_MS)) + 1; ++i) while (true)// (int i=0; i<(int)(round((float)sleep / (float)PLAYER_CHUNK_MS)) + 1; ++i)
{ {
// std::cerr << "Sleep: " << sleep << "\n"; // std::cerr << "Sleep: " << sleep << "\n";
int age = getAge(getNextPlayerChunk(outputBuffer)) - bufferMs; int age = Chunk::getAge(getNextPlayerChunk(outputBuffer)) - bufferMs;
if (age < PLAYER_CHUNK_MS / 2) if (age < PLAYER_CHUNK_MS / 2)
break; break;
// std::cerr << getAge(getNextPlayerChunk(outputBuffer)) - bufferMs << "\t"; // std::cerr << getAge(getNextPlayerChunk(outputBuffer)) - bufferMs << "\t";
@ -182,7 +172,7 @@ void Stream::getChunk(short* outputBuffer, double outputBufferDacTime, unsigned
} }
} }
int age = getAge(getNextPlayerChunk(outputBuffer, correction)) - bufferMs;// + outputBufferDacTime*1000; int age = Chunk::getAge(getNextPlayerChunk(outputBuffer, correction)) - bufferMs;// + outputBufferDacTime*1000;
if (outputBufferDacTime < 1) if (outputBufferDacTime < 1)
age += outputBufferDacTime*1000; age += outputBufferDacTime*1000;
pBuffer->add(age); pBuffer->add(age);

View file

@ -23,7 +23,6 @@ public:
void setBufferLen(size_t bufferLenMs); void setBufferLen(size_t bufferLenMs);
private: private:
void updateChunkTime(Chunk* chunk);
void sleepMs(int ms); void sleepMs(int ms);
size_t currentSample; size_t currentSample;

View file

@ -5,7 +5,7 @@
#include <sys/time.h> #include <sys/time.h>
#include <chrono> #include <chrono>
/*
typedef std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::milliseconds> time_point_ms; typedef std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono::milliseconds> time_point_ms;
@ -44,7 +44,7 @@ inline static long getAge(const time_point_ms& time_point)
{ {
return getAge<std::chrono::milliseconds>(time_point).count(); return getAge<std::chrono::milliseconds>(time_point).count();
} }
*/
static void addMs(timeval& tv, int ms) static void addMs(timeval& tv, int ms)