new messaging

git-svn-id: svn://elaine/murooma/trunk@229 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-03 21:12:44 +00:00
parent c6ee0960fd
commit a519d0f0dd
9 changed files with 96 additions and 62 deletions

View file

@ -4,46 +4,46 @@
#include "common/log.h" #include "common/log.h"
Chunk::Chunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(), format(sampleFormat), idx(0) PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(), format(sampleFormat), idx(0)
{ {
payloadSize = format.rate*format.frameSize*ms / 1000; payloadSize = format.rate*format.frameSize*ms / 1000;
payload = (char*)malloc(payloadSize); payload = (char*)malloc(payloadSize);
} }
Chunk::~Chunk() PcmChunk::~PcmChunk()
{ {
} }
bool Chunk::isEndOfChunk() const bool PcmChunk::isEndOfChunk() const
{ {
return idx >= getFrameCount(); return idx >= getFrameCount();
} }
double Chunk::getFrameCount() const double PcmChunk::getFrameCount() const
{ {
return (payloadSize / format.frameSize); return (payloadSize / format.frameSize);
} }
double Chunk::getDuration() const double PcmChunk::getDuration() const
{ {
return getFrameCount() / format.msRate(); return getFrameCount() / format.msRate();
} }
double Chunk::getTimeLeft() const double PcmChunk::getTimeLeft() const
{ {
return (getFrameCount() - idx) / format.msRate(); return (getFrameCount() - idx) / format.msRate();
} }
int Chunk::seek(int frames) int PcmChunk::seek(int frames)
{ {
idx += frames; idx += frames;
if (idx > getFrameCount()) if (idx > getFrameCount())
@ -54,7 +54,7 @@ int Chunk::seek(int frames)
} }
int Chunk::read(void* outputBuffer, size_t frameCount) int PcmChunk::read(void* outputBuffer, size_t frameCount)
{ {
//logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl; //logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl;
int result = frameCount; int result = frameCount;

View file

@ -150,6 +150,43 @@ protected:
class HeaderMessage : public BaseMessage
{
public:
HeaderMessage(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size)
{
payload = (char*)malloc(size);
}
virtual ~HeaderMessage()
{
}
virtual void read(istream& stream)
{
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)malloc(payloadSize);
stream.read(payload, payloadSize);
}
virtual uint32_t getSize()
{
return sizeof(uint32_t) + payloadSize;
}
uint32_t payloadSize;
char* payload;
protected:
virtual void doserialize(ostream& stream)
{
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize);
}
};
/* /*
@ -249,11 +286,11 @@ protected:
}; };
*/ */
class Chunk : public WireChunk class PcmChunk : public WireChunk
{ {
public: public:
Chunk(const SampleFormat& sampleFormat, size_t ms); PcmChunk(const SampleFormat& sampleFormat, size_t ms);
~Chunk(); ~PcmChunk();
int read(void* outputBuffer, size_t frameCount); int read(void* outputBuffer, size_t frameCount);
bool isEndOfChunk() const; bool isEndOfChunk() const;

View file

@ -1,6 +1,6 @@
#ifndef ENCODER_H #ifndef ENCODER_H
#define ENCODER_H #define ENCODER_H
#include "common/chunk.h" #include "common/message.h"
#include "common/sampleFormat.h" #include "common/sampleFormat.h"
@ -11,8 +11,8 @@ public:
{ {
} }
virtual double encode(Chunk* chunk) = 0; virtual double encode(PcmChunk* chunk) = 0;
virtual WireChunk* getHeader() virtual HeaderMessage* getHeader()
{ {
return NULL; return NULL;
} }

View file

@ -12,32 +12,31 @@ OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format), headerChun
} }
WireChunk* OggEncoder::getHeader() HeaderMessage* OggEncoder::getHeader()
{ {
return headerChunk; return headerChunk;
} }
double OggEncoder::encode(Chunk* chunk) double OggEncoder::encode(PcmChunk* chunk)
{ {
double res = 0; double res = 0;
WireChunk* wireChunk = chunk->wireChunk;
if (tv_sec == 0) if (tv_sec == 0)
{ {
tv_sec = wireChunk->tv_sec; tv_sec = chunk->tv_sec;
tv_usec = wireChunk->tv_usec; tv_usec = chunk->tv_usec;
} }
//cout << "-> pcm: " << wireChunk->length << endl; //cout << "-> pcm: " << wireChunk->length << endl;
int bytes = wireChunk->length / 4; int bytes = chunk->payloadSize / 4;
float **buffer=vorbis_analysis_buffer(&vd, bytes); float **buffer=vorbis_analysis_buffer(&vd, bytes);
/* uninterleave samples */ /* uninterleave samples */
for(int i=0;i<bytes;i++) for(int i=0;i<bytes;i++)
{ {
buffer[0][i]=((wireChunk->payload[i*4+1]<<8)| buffer[0][i]=((chunk->payload[i*4+1]<<8)|
(0x00ff&(int)wireChunk->payload[i*4]))/32768.f; (0x00ff&(int)chunk->payload[i*4]))/32768.f;
buffer[1][i]=((wireChunk->payload[i*4+3]<<8)| buffer[1][i]=((chunk->payload[i*4+3]<<8)|
(0x00ff&(int)wireChunk->payload[i*4+2]))/32768.f; (0x00ff&(int)chunk->payload[i*4+2]))/32768.f;
} }
/* tell the library how much we actually submitted */ /* tell the library how much we actually submitted */
@ -68,12 +67,12 @@ double OggEncoder::encode(Chunk* chunk)
res = true; res = true;
size_t nextLen = pos + og.header_len + og.body_len; size_t nextLen = pos + og.header_len + og.body_len;
if (wireChunk->length < nextLen) if (chunk->payloadSize < nextLen)
wireChunk->payload = (char*)realloc(wireChunk->payload, nextLen); chunk->payload = (char*)realloc(chunk->payload, nextLen);
memcpy(wireChunk->payload + pos, og.header, og.header_len); memcpy(chunk->payload + pos, og.header, og.header_len);
pos += og.header_len; pos += og.header_len;
memcpy(wireChunk->payload + pos, og.body, og.body_len); memcpy(chunk->payload + pos, og.body, og.body_len);
pos += og.body_len; pos += og.body_len;
} }
} }
@ -86,8 +85,8 @@ double OggEncoder::encode(Chunk* chunk)
res = os.granulepos - lastGranulepos; res = os.granulepos - lastGranulepos;
res /= 48.; res /= 48.;
lastGranulepos = os.granulepos; lastGranulepos = os.granulepos;
wireChunk->payload = (char*)realloc(wireChunk->payload, pos); chunk->payload = (char*)realloc(chunk->payload, pos);
wireChunk->length = pos; chunk->payloadSize = pos;
tv_sec = 0; tv_sec = 0;
tv_usec = 0; tv_usec = 0;
} }
@ -171,14 +170,14 @@ void OggEncoder::init()
*/ */
// while(!eos){ // while(!eos){
size_t pos(0); size_t pos(0);
headerChunk = Chunk::makeChunk(chunk_type::header, 0); headerChunk = new HeaderMessage();
while (true) while (true)
{ {
int result=ogg_stream_flush(&os,&og); int result=ogg_stream_flush(&os,&og);
if (result == 0) if (result == 0)
break; break;
headerChunk->length += og.header_len + og.body_len; headerChunk->payloadSize += og.header_len + og.body_len;
headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->length); headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->payloadSize);
cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n"; cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n";
memcpy(headerChunk->payload + pos, og.header, og.header_len); memcpy(headerChunk->payload + pos, og.header, og.header_len);
pos += og.header_len; pos += og.header_len;

View file

@ -8,8 +8,8 @@ class OggEncoder : public Encoder
{ {
public: public:
OggEncoder(const SampleFormat& format); OggEncoder(const SampleFormat& format);
virtual double encode(Chunk* chunk); virtual double encode(PcmChunk* chunk);
virtual WireChunk* getHeader(); virtual HeaderMessage* getHeader();
private: private:
void init(); void init();
@ -31,7 +31,7 @@ private:
ogg_packet header_code; ogg_packet header_code;
ogg_int64_t lastGranulepos; ogg_int64_t lastGranulepos;
WireChunk* headerChunk; HeaderMessage* headerChunk;
int eos=0,ret; int eos=0,ret;
int i, founddata; int i, founddata;

View file

@ -5,7 +5,7 @@ PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format)
} }
double PcmEncoder::encode(Chunk* chunk) double PcmEncoder::encode(PcmChunk* chunk)
{ {
/* WireChunk* wireChunk = chunk->wireChunk; /* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n) for (size_t n=0; n<wireChunk->length; ++n)

View file

@ -7,7 +7,7 @@ class PcmEncoder : public Encoder
{ {
public: public:
PcmEncoder(const SampleFormat& format); PcmEncoder(const SampleFormat& format);
virtual double encode(Chunk* chunk); virtual double encode(PcmChunk* chunk);
}; };

View file

@ -3,7 +3,7 @@ CC = /usr/bin/g++
CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg
OBJ = test.o message.o ../common/sampleFormat.o ../server/oggEncoder.o ../server/pcmEncoder.o OBJ = test.o ../common/message.o ../common/sampleFormat.o ../server/oggEncoder.o ../server/pcmEncoder.o
BIN = test BIN = test
all: server all: server

View file

@ -44,9 +44,9 @@
#include "common/signalHandler.h" #include "common/signalHandler.h"
#include "common/utils.h" #include "common/utils.h"
#include "common/sampleFormat.h" #include "common/sampleFormat.h"
//#include "../server/pcmEncoder.h" #include "../server/pcmEncoder.h"
//#include "../server/oggEncoder.h" #include "../server/oggEncoder.h"
#include "message.h" #include "common/message.h"
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
@ -114,7 +114,7 @@ public:
{ {
for (;;) for (;;)
{ {
shared_ptr<Chunk> chunk(chunks.pop()); shared_ptr<BaseMessage> message(messages.pop());
/* char* stream = chunk->serialize(); /* char* stream = chunk->serialize();
size_t written(0); size_t written(0);
size_t toWrite = sizeof(stream); size_t toWrite = sizeof(stream);
@ -139,14 +139,14 @@ public:
// readerThread.join(); // readerThread.join();
} }
void send(shared_ptr<Chunk> chunk) void send(shared_ptr<BaseMessage> message)
{ {
if (!chunk) if (!message)
return; return;
while (chunks.size() * chunk->getDuration() > 10000) while (messages.size() > 100)//* chunk->getDuration() > 10000)
chunks.pop(); messages.pop();
chunks.push(chunk); messages.push(message);
} }
bool isActive() const bool isActive() const
@ -158,7 +158,7 @@ private:
bool active_; bool active_;
socket_ptr socket_; socket_ptr socket_;
thread* senderThread; thread* senderThread;
Queue<shared_ptr<Chunk>> chunks; Queue<shared_ptr<BaseMessage>> messages;
}; };
@ -185,16 +185,14 @@ cout << "Sending header: " << headerChunk->payloadSize << "\n";
} }
} }
void setHeader(shared_ptr<Chunk> chunk) void setHeader(shared_ptr<HeaderMessage> header)
{ {
if (chunk) if (header)
headerChunk = shared_ptr<Chunk>(chunk); headerChunk = shared_ptr<HeaderMessage>(header);
} }
void send(shared_ptr<Chunk> chunk) void send(shared_ptr<BaseMessage> message)
{ {
// fwrite(chunk->wireChunk->payload, 1, chunk->wireChunk->length, stdout);
for (std::set<shared_ptr<Session>>::iterator it = sessions.begin(); it != sessions.end(); ) for (std::set<shared_ptr<Session>>::iterator it = sessions.begin(); it != sessions.end(); )
{ {
if (!(*it)->isActive()) if (!(*it)->isActive())
@ -207,7 +205,7 @@ cout << "Sending header: " << headerChunk->payloadSize << "\n";
} }
for (auto s : sessions) for (auto s : sessions)
s->send(chunk); s->send(message);
} }
void start() void start()
@ -224,7 +222,7 @@ private:
set<shared_ptr<Session>> sessions; set<shared_ptr<Session>> sessions;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
unsigned short port_; unsigned short port_;
shared_ptr<Chunk> headerChunk; shared_ptr<HeaderMessage> headerChunk;
thread* acceptThread; thread* acceptThread;
}; };
@ -302,7 +300,7 @@ int main(int argc, char* argv[])
size_t duration = 50; size_t duration = 50;
SampleFormat format(sampleFormat); SampleFormat format(sampleFormat);
/* std::auto_ptr<Encoder> encoder; std::auto_ptr<Encoder> encoder;
if (codec == "ogg") if (codec == "ogg")
encoder.reset(new OggEncoder(sampleFormat)); encoder.reset(new OggEncoder(sampleFormat));
else if (codec == "pcm") else if (codec == "pcm")
@ -312,8 +310,8 @@ size_t duration = 50;
cout << "unknown codec: " << codec << "\n"; cout << "unknown codec: " << codec << "\n";
return 1; return 1;
} }
*/
// shared_ptr<Chunk> header(new Chunk(format, encoder->getHeader())); /// shared_ptr<HeaderMessage> header(encoder->getHeader());
// server->setHeader(header); // server->setHeader(header);
while (!g_terminated) while (!g_terminated)
@ -321,10 +319,10 @@ size_t duration = 50;
int fd = open(fifoName.c_str(), O_RDONLY); int fd = open(fifoName.c_str(), O_RDONLY);
try try
{ {
shared_ptr<Chunk> chunk;//(new WireChunk()); shared_ptr<PcmChunk> chunk;//(new WireChunk());
while (true)//cin.good()) while (true)//cin.good())
{ {
chunk.reset(new Chunk(format, duration));//2*WIRE_CHUNK_SIZE)); chunk.reset(new PcmChunk(format, duration));//2*WIRE_CHUNK_SIZE));
int toRead = chunk->payloadSize; int toRead = chunk->payloadSize;
int len = 0; int len = 0;
do do