git-svn-id: svn://elaine/murooma/trunk@222 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-08-30 20:48:43 +00:00
parent c6dbf53a16
commit 42f27c328d
10 changed files with 289 additions and 234 deletions

View file

@ -1,7 +1,7 @@
VERSION = 0.01 VERSION = 0.01
CC = /usr/bin/g++ CC = /usr/bin/g++
CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc
OBJ = snapClient.o stream.o player.o receiver.o pcmDecoder.o oggDecoder.o ../common/chunk.o ../common/log.o ../common/sampleFormat.o OBJ = snapClient.o stream.o player.o receiver.o pcmDecoder.o oggDecoder.o ../common/chunk.o ../common/log.o ../common/sampleFormat.o
BIN = snapclient BIN = snapclient

View file

@ -2,96 +2,180 @@
#include <iostream> #include <iostream>
#include <cstring> #include <cstring>
#include <cmath> #include <cmath>
#include <vorbis/vorbisenc.h>
using namespace std; using namespace std;
OggDecoder::OggDecoder() OggDecoder::OggDecoder()
{ {
init(); ogg_sync_init(&oy); /* Now we can read pages */
} }
bool OggDecoder::decode(Chunk* chunk) bool OggDecoder::decodePayload(Chunk* chunk)
{ {
WireChunk* wireChunk = chunk->wireChunk; WireChunk* wireChunk = chunk->wireChunk;
int eos=0;
int i;
/* grab some data at the head of the stream. We want the first page /* grab some data at the head of the stream. We want the first page
(which is guaranteed to be small and only contain the Vorbis (which is guaranteed to be small and only contain the Vorbis
stream initial header) We need the first page to get the stream stream initial header) We need the first page to get the stream
serialno. */ serialno. */
buffer=ogg_sync_buffer(&oy,wireChunk->length);
memcpy(buffer, wireChunk->payload, wireChunk->length);
bytes = wireChunk->length;
ogg_sync_wrote(&oy,bytes);
if (first) wireChunk->length = 0;
convsize=4096;//bytes/vi.channels;
/* The rest is just a straight decode loop until end of stream */
// while(!eos){
while(true)
{
int result=ogg_sync_pageout(&oy,&og);
if (result==0)
break; /* need more data */
if(result<0)
{ /* missing or corrupt data at this page position */
fprintf(stderr,"Corrupt or missing data in bitstream; continuing...\n");
continue;
}
ogg_stream_pagein(&os,&og); /* can safely ignore errors at
this point */
while(1)
{
result=ogg_stream_packetout(&os,&op);
if(result==0)
break; /* need more data */
if(result<0)
continue; /* missing or corrupt data at this page position */
/* no reason to complain; already complained above */
/* we have a packet. Decode it */
float **pcm;
int samples;
if(vorbis_synthesis(&vb,&op)==0) /* test for success! */
vorbis_synthesis_blockin(&vd,&vb);
/*
**pcm is a multichannel float vector. In stereo, for
example, pcm[0] is left, and pcm[1] is right. samples is
the size of each channel. Convert the float values
(-1.<=range<=1.) to whatever PCM format and write it out */
while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0)
{
int bout=(samples<convsize?samples:convsize);
//cout << "samples: " << samples << ", convsize: " << convsize << "\n";
/* convert floats to 16 bit signed ints (host order) and
interleave */
for(int i=0;i<vi.channels;i++)
{
ogg_int16_t *ptr=convbuffer+i;
float *mono=pcm[i];
for(int j=0;j<bout;j++)
{
int val=floor(mono[j]*32767.f+.5f);
/* might as well guard against clipping */
if(val>32767)
val=32767;
else if(val<-32768)
val=-32768;
*ptr=val;
ptr+=vi.channels;
}
}
size_t oldSize = wireChunk->length;
size_t size = 2*vi.channels * bout;
wireChunk->length += size;
wireChunk->payload = (char*)realloc(wireChunk->payload, wireChunk->length);
memcpy(wireChunk->payload + oldSize, convbuffer, size);
/* tell libvorbis how many samples we actually consumed */
vorbis_synthesis_read(&vd,bout);
}
}
}
// if(ogg_page_eos(&og))eos=1;
// ogg_stream_clear(&os);
// vorbis_comment_clear(&vc);
// vorbis_info_clear(&vi); /* must be called last */
return true;
}
bool OggDecoder::decodeHeader(Chunk* chunk)
{ {
cout << "1" << endl; cout << "Decode header\n";
if(ogg_sync_pageout(&oy,&og)!=1)
if(ogg_sync_pageout(&oy,&og)!=1){ {
fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n"); fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n");
exit(1); return false;
} }
ogg_stream_init(&os,ogg_page_serialno(&og)); ogg_stream_init(&os,ogg_page_serialno(&og));
cout << "2" << endl; cout << "2" << endl;
vorbis_info_init(&vi); vorbis_info_init(&vi);
vorbis_comment_init(&vc); vorbis_comment_init(&vc);
if(ogg_stream_pagein(&os,&og)<0){ if(ogg_stream_pagein(&os,&og)<0)
fprintf(stderr,"Error reading first page of Ogg bitstream data.\n"); {
exit(1); fprintf(stderr,"Error reading first page of Ogg bitstream data.\n");
return false;
}
if(ogg_stream_packetout(&os,&op)!=1)
{
fprintf(stderr,"Error reading initial header packet.\n");
return false;
} }
if(ogg_stream_packetout(&os,&op)!=1){ if(vorbis_synthesis_headerin(&vi,&vc,&op)<0)
fprintf(stderr,"Error reading initial header packet.\n"); {
exit(1); fprintf(stderr,"This Ogg bitstream does not contain Vorbis audio data.\n");
} return false;
if(vorbis_synthesis_headerin(&vi,&vc,&op)<0){
fprintf(stderr,"This Ogg bitstream does not contain Vorbis "
"audio data.\n");
exit(1);
} }
cout << "3" << endl; cout << "3" << endl;
i=0; int i(0);
while(i<2){ while(i<2)
while(i<2){ {
int result=ogg_sync_pageout(&oy,&og); while(i<2)
{
int result=ogg_sync_pageout(&oy,&og);
cout << result << endl; cout << result << endl;
if(result==0)break; /* Need more data */ if(result==0)
/* Don't complain about missing or corrupt data yet. We'll break; /* Need more data */
catch it at the packet output phase */ /* Don't complain about missing or corrupt data yet. We'll
if(result==1){ catch it at the packet output phase */
cout << "a" << endl; if(result==1)
ogg_stream_pagein(&os,&og); /* we can ignore any errors here {
as they'll also become apparent cout << "a" << endl;
at packetout */ ogg_stream_pagein(&os,&og); /* we can ignore any errors here as they'll also become apparent at packetout */
while(i<2){ while(i<2)
cout << "b" << endl; {
result=ogg_stream_packetout(&os,&op); cout << "b" << endl;
if(result==0)break; result=ogg_stream_packetout(&os,&op);
if(result<0){ if(result==0)
/* Uh oh; data at some point was corrupted or missing! break;
We can't tolerate that in a header. Die. */ if(result<0)
fprintf(stderr,"Corrupt secondary header. Exiting.\n"); {
exit(1); /* Uh oh; data at some point was corrupted or missing!
} We can't tolerate that in a header. Die. */
result=vorbis_synthesis_headerin(&vi,&vc,&op); fprintf(stderr,"Corrupt secondary header. Exiting.\n");
if(result<0){ return false;
fprintf(stderr,"Corrupt secondary header. Exiting.\n"); }
exit(1); result=vorbis_synthesis_headerin(&vi,&vc,&op);
} if(result<0)
i++; {
} fprintf(stderr,"Corrupt secondary header. Exiting.\n");
} return false;
} }
i++;
}
}
}
/* no harm in not checking before adding more */ /* no harm in not checking before adding more */
// buffer=ogg_sync_buffer(&oy,wireChunk->length); // buffer=ogg_sync_buffer(&oy,wireChunk->length);
// bytes=fread(buffer,1,4096,stdin); // bytes=fread(buffer,1,4096,stdin);
@ -102,144 +186,43 @@ cout << "b" << endl;
// ogg_sync_wrote(&oy,bytes); // ogg_sync_wrote(&oy,bytes);
} }
/* Throw the comments plus a few lines about the bitstream we're /* Throw the comments plus a few lines about the bitstream we're decoding */
decoding */ cout << "5" << endl;
{
char **ptr=vc.user_comments; char **ptr=vc.user_comments;
while(*ptr){ while(*ptr)
fprintf(stderr,"%s\n",*ptr); {
++ptr; fprintf(stderr,"%s\n",*ptr);
} ++ptr;
fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate); }
fprintf(stderr,"Encoded by: %s\n\n",vc.vendor); fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate);
} fprintf(stderr,"Encoded by: %s\n\n",vc.vendor);
cout << "6" << endl;
/* OK, got and parsed all three headers. Initialize the Vorbis /* OK, got and parsed all three headers. Initialize the Vorbis
packet->PCM decoder. */ packet->PCM decoder. */
if(vorbis_synthesis_init(&vd,&vi)==0){ /* central decode state */ if(vorbis_synthesis_init(&vd,&vi)==0) /* central decode state */
vorbis_block_init(&vd,&vb); /* local state for most of the decode vorbis_block_init(&vd,&vb); /* local state for most of the decode
so multiple block decodes can so multiple block decodes can
proceed in parallel. We could init proceed in parallel. We could init
multiple vorbis_block structures multiple vorbis_block structures
for vd here */ for vd here */
} return false;
first = false;
}
convsize=wireChunk->length/vi.channels;
/* The rest is just a straight decode loop until end of stream */
while(!eos){
while(!eos){
int result=ogg_sync_pageout(&oy,&og);
if(result==0)break; /* need more data */
if(result<0){ /* missing or corrupt data at this page position */
fprintf(stderr,"Corrupt or missing data in bitstream; "
"continuing...\n");
}else{
ogg_stream_pagein(&os,&og); /* can safely ignore errors at
this point */
while(1){
result=ogg_stream_packetout(&os,&op);
if(result==0)break; /* need more data */
if(result<0){ /* missing or corrupt data at this page position */
/* no reason to complain; already complained above */
}else{
/* we have a packet. Decode it */
float **pcm;
int samples;
if(vorbis_synthesis(&vb,&op)==0) /* test for success! */
vorbis_synthesis_blockin(&vd,&vb);
/*
**pcm is a multichannel float vector. In stereo, for
example, pcm[0] is left, and pcm[1] is right. samples is
the size of each channel. Convert the float values
(-1.<=range<=1.) to whatever PCM format and write it out */
wireChunk->length = 0;
while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0){
int j;
int clipflag=0;
int bout=(samples<convsize?samples:convsize);
/* convert floats to 16 bit signed ints (host order) and
interleave */
for(i=0;i<vi.channels;i++){
ogg_int16_t *ptr=convbuffer+i;
float *mono=pcm[i];
for(j=0;j<bout;j++){
#if 1
int val=floor(mono[j]*32767.f+.5f);
#else /* optional dither */
int val=mono[j]*32767.f+drand48()-0.5f;
#endif
/* might as well guard against clipping */
if(val>32767){
val=32767;
clipflag=1;
}
if(val<-32768){
val=-32768;
clipflag=1;
}
*ptr=val;
ptr+=vi.channels;
}
}
if(clipflag)
fprintf(stderr,"Clipping in frame %ld\n",(long)(vd.sequence));
//cout << "a" << endl;
size_t oldSize = wireChunk->length;
size_t size = 2*vi.channels * bout;
wireChunk->length += size;
wireChunk->payload = (char*)realloc(wireChunk->payload, wireChunk->length);
memcpy(wireChunk->payload + oldSize, convbuffer, size);
// fwrite(convbuffer,2*vi.channels,bout,stdout);
vorbis_synthesis_read(&vd,bout); /* tell libvorbis how
many samples we
actually consumed */
}
}
}
eos = 1;
// if(ogg_page_eos(&og))eos=1;
}
}
// if(!eos){
// buffer=ogg_sync_buffer(&oy,4096);
// bytes=fread(buffer,1,4096,stdin);
// ogg_sync_wrote(&oy,bytes);
// if(bytes==0)eos=1;
// }
}
/* ogg_page and ogg_packet structs always point to storage in
libvorbis. They're never freed or manipulated directly */
// vorbis_block_clear(&vb);
// vorbis_dsp_clear(&vd);
// }else{
// fprintf(stderr,"Error: Corrupt header during playback initialization.\n");
// }
/* clean up this logical bitstream; before exit we see if we're
followed by another [chained] */
// ogg_stream_clear(&os);
// vorbis_comment_clear(&vc);
// vorbis_info_clear(&vi); /* must be called last */
return true;
} }
void OggDecoder::init() bool OggDecoder::decode(Chunk* chunk)
{ {
ogg_sync_init(&oy); /* Now we can read pages */ WireChunk* wireChunk = chunk->wireChunk;
first = true; bytes = wireChunk->length;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, wireChunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
if (chunk->getType() == chunk_type::payload)
return decodePayload(chunk);
else if (chunk->getType() == chunk_type::header)
return decodeHeader(chunk);
return false;
} }

View file

@ -11,7 +11,8 @@ public:
virtual bool decode(Chunk* chunk); virtual bool decode(Chunk* chunk);
private: private:
void init(); bool decodePayload(Chunk* chunk);
bool decodeHeader(Chunk* chunk);
ogg_sync_state oy; /* sync and verify incoming physical bitstream */ ogg_sync_state oy; /* sync and verify incoming physical bitstream */
ogg_stream_state os; /* take physical pages, weld into a logical ogg_stream_state os; /* take physical pages, weld into a logical
@ -24,10 +25,10 @@ private:
vorbis_comment vc; /* struct that stores all the bitstream user comments */ vorbis_comment vc; /* struct that stores all the bitstream user comments */
vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */
vorbis_block vb; /* local working space for packet->PCM decode */ vorbis_block vb; /* local working space for packet->PCM decode */
ogg_int16_t convbuffer[4096]; /* take 8k out of the data segment, not the stack */ ogg_int16_t convbuffer[4096]; /* take 8k out of the data segment, not the stack */
int convsize=4096; int convsize=4096;
bool first;
char *buffer; char *buffer;
int bytes; int bytes;
}; };

View file

@ -47,7 +47,7 @@ void Receiver::stop()
void Receiver::worker() void Receiver::worker()
{ {
active_ = true; active_ = true;
PcmDecoder decoder; OggDecoder decoder;
while (active_) while (active_)
{ {
try try
@ -64,12 +64,15 @@ void Receiver::worker()
{ {
WireChunk* wireChunk = new WireChunk(); WireChunk* wireChunk = new WireChunk();
socketRead(&s, wireChunk, Chunk::getHeaderSize()); socketRead(&s, wireChunk, Chunk::getHeaderSize());
//cout << "WireChunk length: " << wireChunk->length << ", sec: " << wireChunk->tv_sec << ", usec: " << wireChunk->tv_usec << "\n";
wireChunk->payload = (char*)malloc(wireChunk->length); wireChunk->payload = (char*)malloc(wireChunk->length);
socketRead(&s, wireChunk->payload, wireChunk->length); socketRead(&s, wireChunk->payload, wireChunk->length);
Chunk* chunk = new Chunk(stream_->format, wireChunk); Chunk* chunk = new Chunk(stream_->format, wireChunk);
//cout << "WireChunk length: " << wireChunk->length << ", Duration: " << chunk->getDuration() << ", sec: " << wireChunk->tv_sec << ", usec: " << wireChunk->tv_usec/1000 << ", type: " << wireChunk->type << "\n";
if (decoder.decode(chunk)) if (decoder.decode(chunk))
{
//cout << "Duration: " << chunk->getDuration() << "\n";
stream_->addChunk(chunk); stream_->addChunk(chunk);
}
} }
} }
catch (const std::exception& e) catch (const std::exception& e)

View file

@ -11,10 +11,7 @@ Chunk::Chunk(const SampleFormat& sampleFormat, WireChunk* _wireChunk) : wireChun
Chunk::Chunk(const SampleFormat& sampleFormat, size_t ms) : format(sampleFormat), idx(0) Chunk::Chunk(const SampleFormat& sampleFormat, size_t ms) : format(sampleFormat), idx(0)
{ {
// format = sampleFormat; wireChunk = makeChunk(payload, format.rate*format.frameSize*ms / 1000);
wireChunk = new WireChunk;
wireChunk->length = format.rate*format.frameSize*ms / 1000;
wireChunk->payload = (char*)malloc(wireChunk->length);
} }

View file

@ -11,13 +11,18 @@ typedef std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono:
struct WireChunk struct WireChunk
{ {
uint16_t type;
int32_t tv_sec; int32_t tv_sec;
int32_t tv_usec; int32_t tv_usec;
uint32_t length; uint32_t length;
char* payload; char* payload;
}; };
enum chunk_type
{
header = 0,
payload = 1
};
class Chunk class Chunk
{ {
@ -28,7 +33,16 @@ public:
static inline size_t getHeaderSize() static inline size_t getHeaderSize()
{ {
return sizeof(WireChunk::tv_sec) + sizeof(WireChunk::tv_usec) + sizeof(WireChunk::length); return sizeof(WireChunk::type) + sizeof(WireChunk::tv_sec) + sizeof(WireChunk::tv_usec) + sizeof(WireChunk::length);
}
static WireChunk* makeChunk(chunk_type type, size_t size)
{
WireChunk* wireChunk = new WireChunk;
wireChunk->type = type;
wireChunk->length = size;
wireChunk->payload = (char*)malloc(wireChunk->length);
return wireChunk;
} }
int read(void* outputBuffer, size_t frameCount); int read(void* outputBuffer, size_t frameCount);
@ -45,6 +59,11 @@ public:
std::chrono::milliseconds(relativeIdxTp); std::chrono::milliseconds(relativeIdxTp);
} }
inline chunk_type getType()
{
return (chunk_type)wireChunk->type;
}
template<typename T> template<typename T>
inline T getAge() const inline T getAge() const
{ {
@ -69,6 +88,7 @@ public:
int seek(int frames); int seek(int frames);
double getDuration() const; double getDuration() const;
double getDurationUs() const;
double getTimeLeft() const; double getTimeLeft() const;
double getFrameCount() const; double getFrameCount() const;

View file

@ -62,6 +62,21 @@ static void addMs(timeval& tv, int ms)
tv.tv_usec %= 1000000; tv.tv_usec %= 1000000;
} }
static void addUs(timeval& tv, int us)
{
if (us < 0)
{
timeval t;
t.tv_sec = -us / 1000000;
t.tv_usec = (-us % 1000000);
timersub(&tv, &t, &tv);
return;
}
tv.tv_usec += us;
tv.tv_sec += (tv.tv_usec / 1000000);
tv.tv_usec %= 1000000;
}
static long getTickCount() static long getTickCount()

View file

@ -11,6 +11,17 @@ OggEncoder::OggEncoder()
} }
bool OggEncoder::getHeader(Chunk* chunk)
{
WireChunk* wireChunk = chunk->wireChunk;
wireChunk->type = chunk_type::header;
wireChunk->payload = (char*)realloc(wireChunk->payload, oggHeaderLen);
memcpy(wireChunk->payload, oggHeader, oggHeaderLen);
wireChunk->length = oggHeaderLen;
return true;
}
bool OggEncoder::encode(Chunk* chunk) bool OggEncoder::encode(Chunk* chunk)
{ {
bool res = false; bool res = false;
@ -20,14 +31,12 @@ bool OggEncoder::encode(Chunk* chunk)
tv_sec = wireChunk->tv_sec; tv_sec = wireChunk->tv_sec;
tv_usec = wireChunk->tv_usec; tv_usec = wireChunk->tv_usec;
} }
//cout << "pcm: " << wireChunk->length << endl; //cout << "-> pcm: " << wireChunk->length << endl;
int bytes = wireChunk->length / 4; int bytes = wireChunk->length / 4;
float **buffer=vorbis_analysis_buffer(&vd, bytes); float **buffer=vorbis_analysis_buffer(&vd, bytes);
int i;
/* uninterleave samples */ /* uninterleave samples */
for(i=0;i<bytes;i++) for(int i=0;i<bytes;i++)
{ {
buffer[0][i]=((wireChunk->payload[i*4+1]<<8)| buffer[0][i]=((wireChunk->payload[i*4+1]<<8)|
(0x00ff&(int)wireChunk->payload[i*4]))/32768.f; (0x00ff&(int)wireChunk->payload[i*4]))/32768.f;
@ -36,7 +45,7 @@ bool OggEncoder::encode(Chunk* chunk)
} }
/* tell the library how much we actually submitted */ /* tell the library how much we actually submitted */
vorbis_analysis_wrote(&vd,i); vorbis_analysis_wrote(&vd, bytes);
/* vorbis does some data preanalysis, then divvies up blocks for /* vorbis does some data preanalysis, then divvies up blocks for
more involved (potentially parallel) processing. Get a single more involved (potentially parallel) processing. Get a single
@ -44,7 +53,6 @@ bool OggEncoder::encode(Chunk* chunk)
size_t pos = 0; size_t pos = 0;
while(vorbis_analysis_blockout(&vd,&vb)==1) while(vorbis_analysis_blockout(&vd,&vb)==1)
{ {
/* analysis, assume we want to use bitrate management */ /* analysis, assume we want to use bitrate management */
vorbis_analysis(&vb,NULL); vorbis_analysis(&vb,NULL);
vorbis_bitrate_addblock(&vb); vorbis_bitrate_addblock(&vb);
@ -59,22 +67,19 @@ bool OggEncoder::encode(Chunk* chunk)
{ {
// int result = ogg_stream_pageout(&os,&og); // int result = ogg_stream_pageout(&os,&og);
int result = ogg_stream_flush(&os,&og); int result = ogg_stream_flush(&os,&og);
//cout << "result: " << result << "\n";
if (result == 0) if (result == 0)
{
break; break;
} res = true;
else // cout << "pcm: " << wireChunk->length << ", header len: " << og.header_len << ", body len: " << og.body_len << endl;
{
res = true; size_t nextLen = pos + og.header_len + og.body_len;
cout << "pcm: " << wireChunk->length << ", header len: " << og.header_len << ", body len: " << og.body_len << endl; if (wireChunk->length < nextLen)
// fwrite(og.header,1,og.header_len,stdout); wireChunk->payload = (char*)realloc(wireChunk->payload, nextLen);
// fwrite(og.body,1,og.body_len,stdout);
memcpy(wireChunk->payload + pos, og.header, og.header_len); memcpy(wireChunk->payload + pos, og.header, og.header_len);
pos += og.header_len; pos += og.header_len;
memcpy(wireChunk->payload + pos, og.body, og.body_len); memcpy(wireChunk->payload + pos, og.body, og.body_len);
pos += og.body_len; pos += og.body_len;
}
} }
} }
} }
@ -82,8 +87,8 @@ bool OggEncoder::encode(Chunk* chunk)
{ {
wireChunk->payload = (char*)realloc(wireChunk->payload, pos); wireChunk->payload = (char*)realloc(wireChunk->payload, pos);
wireChunk->length = pos; wireChunk->length = pos;
wireChunk->tv_sec = tv_sec; // wireChunk->tv_sec = tv_sec;
wireChunk->tv_usec = tv_usec; // wireChunk->tv_usec = tv_usec;
tv_sec = 0; tv_sec = 0;
tv_usec = 0; tv_usec = 0;
} }
@ -128,7 +133,7 @@ void OggEncoder::init()
*********************************************************************/ *********************************************************************/
ret=vorbis_encode_init_vbr(&vi,2,48000,0.4); ret=vorbis_encode_init_vbr(&vi,2,48000,0.7);
/* do not continue if setup failed; this can happen if we ask for a /* do not continue if setup failed; this can happen if we ask for a
mode that libVorbis does not support (eg, too low a bitrate, etc, mode that libVorbis does not support (eg, too low a bitrate, etc,
@ -138,7 +143,7 @@ void OggEncoder::init()
/* add a comment */ /* add a comment */
vorbis_comment_init(&vc); vorbis_comment_init(&vc);
vorbis_comment_add_tag(&vc,"ENCODER","encoder_example.c"); vorbis_comment_add_tag(&vc,"ENCODER","snapstream");
/* set up the analysis state and auxiliary encoding storage */ /* set up the analysis state and auxiliary encoding storage */
vorbis_analysis_init(&vd,&vi); vorbis_analysis_init(&vd,&vi);
@ -165,13 +170,26 @@ void OggEncoder::init()
/* This ensures the actual /* This ensures the actual
* audio data will start on a new page, as per spec * audio data will start on a new page, as per spec
*/ */
/* while(!eos){ // while(!eos){
int result=ogg_stream_flush(&os,&og); size_t pos(0);
if(result==0)break; oggHeader = (char*)malloc(0);
fwrite(og.header,1,og.header_len,stdout); oggHeaderLen = 0;
fwrite(og.body,1,og.body_len,stdout); while (true)
{
int result=ogg_stream_flush(&os,&og);
if (result == 0)
break;
oggHeaderLen += og.header_len + og.body_len;
oggHeader = (char*)realloc(oggHeader, oggHeaderLen);
cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n";
memcpy(oggHeader + pos, og.header, og.header_len);
pos += og.header_len;
memcpy(oggHeader + pos, og.body, og.body_len);
pos += og.body_len;
} }
*/ // fwrite(og.header,1,og.header_len,stdout);
// fwrite(og.body,1,og.body_len,stdout);
// }
} }

View file

@ -9,6 +9,7 @@ class OggEncoder
public: public:
OggEncoder(); OggEncoder();
virtual bool encode(Chunk* chunk); virtual bool encode(Chunk* chunk);
virtual bool getHeader(Chunk* chunk);
private: private:
void init(); void init();
@ -31,6 +32,8 @@ private:
int eos=0,ret; int eos=0,ret;
int i, founddata; int i, founddata;
char* oggHeader;
int oggHeaderLen;
int32_t tv_sec; int32_t tv_sec;
int32_t tv_usec; int32_t tv_usec;

View file

@ -34,7 +34,6 @@
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
namespace po = boost::program_options; namespace po = boost::program_options;
const int max_length = 1024;
typedef boost::shared_ptr<tcp::socket> socket_ptr; typedef boost::shared_ptr<tcp::socket> socket_ptr;
using namespace std; using namespace std;
@ -105,6 +104,9 @@ public:
void send(shared_ptr<Chunk> chunk) void send(shared_ptr<Chunk> chunk)
{ {
if (!chunk)
return;
while (chunks.size() * chunk->getDuration() > 10000) while (chunks.size() * chunk->getDuration() > 10000)
chunks.pop(); chunks.pop();
chunks.push(chunk); chunks.push(chunk);
@ -126,7 +128,7 @@ private:
class Server class Server
{ {
public: public:
Server(unsigned short port) : port_(port) Server(unsigned short port) : port_(port), headerChunk(NULL)
{ {
} }
@ -139,11 +141,18 @@ public:
a.accept(*sock); a.accept(*sock);
// cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; // cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n";
Session* session = new Session(sock); Session* session = new Session(sock);
cout << "Sending header: " << headerChunk->wireChunk->length << "\n";
session->send(headerChunk);
session->start(); session->start();
sessions.insert(shared_ptr<Session>(session)); sessions.insert(shared_ptr<Session>(session));
} }
} }
void setHeader(shared_ptr<Chunk> chunk)
{
headerChunk = chunk;
}
void send(shared_ptr<Chunk> chunk) void send(shared_ptr<Chunk> chunk)
{ {
// fwrite(chunk->wireChunk->payload, 1, chunk->wireChunk->length, stdout); // fwrite(chunk->wireChunk->payload, 1, chunk->wireChunk->length, stdout);
@ -177,6 +186,7 @@ private:
set<shared_ptr<Session>> sessions; set<shared_ptr<Session>> sessions;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
unsigned short port_; unsigned short port_;
shared_ptr<Chunk> headerChunk;
thread* acceptThread; thread* acceptThread;
}; };
@ -264,9 +274,13 @@ int main(int argc, char* argv[])
mkfifo(fifoName.c_str(), 0777); mkfifo(fifoName.c_str(), 0777);
size_t duration = 50; size_t duration = 50;
PcmEncoder encoder;
SampleFormat format(sampleFormat); SampleFormat format(sampleFormat);
OggEncoder encoder;
shared_ptr<Chunk> header(new Chunk(format, Chunk::makeChunk(chunk_type::header, 0)));
encoder.getHeader(header.get());
server->setHeader(header);
while (!g_terminated) while (!g_terminated)
{ {
int fd = open(fifoName.c_str(), O_RDONLY); int fd = open(fifoName.c_str(), O_RDONLY);
@ -295,7 +309,8 @@ size_t duration = 50;
wireChunk->tv_usec = tvChunk.tv_usec; wireChunk->tv_usec = tvChunk.tv_usec;
if (encoder.encode(chunk.get())) if (encoder.encode(chunk.get()))
server->send(chunk); server->send(chunk);
//cout << wireChunk->tv_sec << ", " << wireChunk->tv_usec / 1000 << "\n";
// addUs(tvChunk, 1000*chunk->getDuration());
addMs(tvChunk, duration); addMs(tvChunk, duration);
nextTick += duration; nextTick += duration;
long currentTick = getTickCount(); long currentTick = getTickCount();