diff --git a/test/message.cpp b/common/message.cpp similarity index 76% rename from test/message.cpp rename to common/message.cpp index 15116c95..bfe7eda7 100644 --- a/test/message.cpp +++ b/common/message.cpp @@ -4,46 +4,46 @@ #include "common/log.h" -Chunk::Chunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(), format(sampleFormat), idx(0) +PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(), format(sampleFormat), idx(0) { payloadSize = format.rate*format.frameSize*ms / 1000; payload = (char*)malloc(payloadSize); } -Chunk::~Chunk() +PcmChunk::~PcmChunk() { } -bool Chunk::isEndOfChunk() const +bool PcmChunk::isEndOfChunk() const { return idx >= getFrameCount(); } -double Chunk::getFrameCount() const +double PcmChunk::getFrameCount() const { return (payloadSize / format.frameSize); } -double Chunk::getDuration() const +double PcmChunk::getDuration() const { return getFrameCount() / format.msRate(); } -double Chunk::getTimeLeft() const +double PcmChunk::getTimeLeft() const { return (getFrameCount() - idx) / format.msRate(); } -int Chunk::seek(int frames) +int PcmChunk::seek(int frames) { idx += frames; if (idx > getFrameCount()) @@ -54,7 +54,7 @@ int Chunk::seek(int frames) } -int Chunk::read(void* outputBuffer, size_t frameCount) +int PcmChunk::read(void* outputBuffer, size_t frameCount) { //logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl; int result = frameCount; diff --git a/test/message.h b/common/message.h similarity index 89% rename from test/message.h rename to common/message.h index b69153f4..0780fcdb 100644 --- a/test/message.h +++ b/common/message.h @@ -150,6 +150,43 @@ protected: +class HeaderMessage : public BaseMessage +{ +public: + HeaderMessage(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) + { + payload = (char*)malloc(size); + } + + virtual ~HeaderMessage() + { + } + + virtual void read(istream& stream) + { + stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + payload = (char*)malloc(payloadSize); + stream.read(payload, payloadSize); + } + + virtual uint32_t getSize() + { + return sizeof(uint32_t) + payloadSize; + } + + uint32_t payloadSize; + char* payload; + +protected: + virtual void doserialize(ostream& stream) + { + stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + stream.write(payload, payloadSize); + } +}; + + + /* @@ -249,11 +286,11 @@ protected: }; */ -class Chunk : public WireChunk +class PcmChunk : public WireChunk { public: - Chunk(const SampleFormat& sampleFormat, size_t ms); - ~Chunk(); + PcmChunk(const SampleFormat& sampleFormat, size_t ms); + ~PcmChunk(); int read(void* outputBuffer, size_t frameCount); bool isEndOfChunk() const; diff --git a/server/encoder.h b/server/encoder.h index 7565055d..852afa1b 100644 --- a/server/encoder.h +++ b/server/encoder.h @@ -1,6 +1,6 @@ #ifndef ENCODER_H #define ENCODER_H -#include "common/chunk.h" +#include "common/message.h" #include "common/sampleFormat.h" @@ -11,8 +11,8 @@ public: { } - virtual double encode(Chunk* chunk) = 0; - virtual WireChunk* getHeader() + virtual double encode(PcmChunk* chunk) = 0; + virtual HeaderMessage* getHeader() { return NULL; } diff --git a/server/oggEncoder.cpp b/server/oggEncoder.cpp index ad089700..4021e675 100644 --- a/server/oggEncoder.cpp +++ b/server/oggEncoder.cpp @@ -12,32 +12,31 @@ OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format), headerChun } -WireChunk* OggEncoder::getHeader() +HeaderMessage* OggEncoder::getHeader() { return headerChunk; } -double OggEncoder::encode(Chunk* chunk) +double OggEncoder::encode(PcmChunk* chunk) { double res = 0; - WireChunk* wireChunk = chunk->wireChunk; if (tv_sec == 0) { - tv_sec = wireChunk->tv_sec; - tv_usec = wireChunk->tv_usec; + tv_sec = chunk->tv_sec; + tv_usec = chunk->tv_usec; } //cout << "-> pcm: " << wireChunk->length << endl; - int bytes = wireChunk->length / 4; + int bytes = chunk->payloadSize / 4; float **buffer=vorbis_analysis_buffer(&vd, bytes); /* uninterleave samples */ for(int i=0;ipayload[i*4+1]<<8)| - (0x00ff&(int)wireChunk->payload[i*4]))/32768.f; - buffer[1][i]=((wireChunk->payload[i*4+3]<<8)| - (0x00ff&(int)wireChunk->payload[i*4+2]))/32768.f; + buffer[0][i]=((chunk->payload[i*4+1]<<8)| + (0x00ff&(int)chunk->payload[i*4]))/32768.f; + buffer[1][i]=((chunk->payload[i*4+3]<<8)| + (0x00ff&(int)chunk->payload[i*4+2]))/32768.f; } /* tell the library how much we actually submitted */ @@ -68,12 +67,12 @@ double OggEncoder::encode(Chunk* chunk) res = true; size_t nextLen = pos + og.header_len + og.body_len; - if (wireChunk->length < nextLen) - wireChunk->payload = (char*)realloc(wireChunk->payload, nextLen); + if (chunk->payloadSize < nextLen) + chunk->payload = (char*)realloc(chunk->payload, nextLen); - memcpy(wireChunk->payload + pos, og.header, og.header_len); + memcpy(chunk->payload + pos, og.header, og.header_len); pos += og.header_len; - memcpy(wireChunk->payload + pos, og.body, og.body_len); + memcpy(chunk->payload + pos, og.body, og.body_len); pos += og.body_len; } } @@ -86,8 +85,8 @@ double OggEncoder::encode(Chunk* chunk) res = os.granulepos - lastGranulepos; res /= 48.; lastGranulepos = os.granulepos; - wireChunk->payload = (char*)realloc(wireChunk->payload, pos); - wireChunk->length = pos; + chunk->payload = (char*)realloc(chunk->payload, pos); + chunk->payloadSize = pos; tv_sec = 0; tv_usec = 0; } @@ -171,14 +170,14 @@ void OggEncoder::init() */ // while(!eos){ size_t pos(0); - headerChunk = Chunk::makeChunk(chunk_type::header, 0); + headerChunk = new HeaderMessage(); while (true) { int result=ogg_stream_flush(&os,&og); if (result == 0) break; - headerChunk->length += og.header_len + og.body_len; - headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->length); + headerChunk->payloadSize += og.header_len + og.body_len; + headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->payloadSize); cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n"; memcpy(headerChunk->payload + pos, og.header, og.header_len); pos += og.header_len; diff --git a/server/oggEncoder.h b/server/oggEncoder.h index b8e34808..53c8f2c9 100644 --- a/server/oggEncoder.h +++ b/server/oggEncoder.h @@ -8,8 +8,8 @@ class OggEncoder : public Encoder { public: OggEncoder(const SampleFormat& format); - virtual double encode(Chunk* chunk); - virtual WireChunk* getHeader(); + virtual double encode(PcmChunk* chunk); + virtual HeaderMessage* getHeader(); private: void init(); @@ -31,7 +31,7 @@ private: ogg_packet header_code; ogg_int64_t lastGranulepos; - WireChunk* headerChunk; + HeaderMessage* headerChunk; int eos=0,ret; int i, founddata; diff --git a/server/pcmEncoder.cpp b/server/pcmEncoder.cpp index 1947e938..c0902479 100644 --- a/server/pcmEncoder.cpp +++ b/server/pcmEncoder.cpp @@ -5,7 +5,7 @@ PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format) } -double PcmEncoder::encode(Chunk* chunk) +double PcmEncoder::encode(PcmChunk* chunk) { /* WireChunk* wireChunk = chunk->wireChunk; for (size_t n=0; nlength; ++n) diff --git a/server/pcmEncoder.h b/server/pcmEncoder.h index 157b0910..55ed426b 100644 --- a/server/pcmEncoder.h +++ b/server/pcmEncoder.h @@ -7,7 +7,7 @@ class PcmEncoder : public Encoder { public: PcmEncoder(const SampleFormat& format); - virtual double encode(Chunk* chunk); + virtual double encode(PcmChunk* chunk); }; diff --git a/test/Makefile b/test/Makefile index 60e3f8ec..f1281c36 100644 --- a/test/Makefile +++ b/test/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -OBJ = test.o message.o ../common/sampleFormat.o ../server/oggEncoder.o ../server/pcmEncoder.o +OBJ = test.o ../common/message.o ../common/sampleFormat.o ../server/oggEncoder.o ../server/pcmEncoder.o BIN = test all: server diff --git a/test/test.cpp b/test/test.cpp index 5f841d0c..e330e1eb 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -44,9 +44,9 @@ #include "common/signalHandler.h" #include "common/utils.h" #include "common/sampleFormat.h" -//#include "../server/pcmEncoder.h" -//#include "../server/oggEncoder.h" -#include "message.h" +#include "../server/pcmEncoder.h" +#include "../server/oggEncoder.h" +#include "common/message.h" using boost::asio::ip::tcp; @@ -114,7 +114,7 @@ public: { for (;;) { - shared_ptr chunk(chunks.pop()); + shared_ptr message(messages.pop()); /* char* stream = chunk->serialize(); size_t written(0); size_t toWrite = sizeof(stream); @@ -139,14 +139,14 @@ public: // readerThread.join(); } - void send(shared_ptr chunk) + void send(shared_ptr message) { - if (!chunk) + if (!message) return; - while (chunks.size() * chunk->getDuration() > 10000) - chunks.pop(); - chunks.push(chunk); + while (messages.size() > 100)//* chunk->getDuration() > 10000) + messages.pop(); + messages.push(message); } bool isActive() const @@ -158,7 +158,7 @@ private: bool active_; socket_ptr socket_; thread* senderThread; - Queue> chunks; + Queue> messages; }; @@ -185,16 +185,14 @@ cout << "Sending header: " << headerChunk->payloadSize << "\n"; } } - void setHeader(shared_ptr chunk) + void setHeader(shared_ptr header) { - if (chunk) - headerChunk = shared_ptr(chunk); + if (header) + headerChunk = shared_ptr(header); } - void send(shared_ptr chunk) + void send(shared_ptr message) { -// fwrite(chunk->wireChunk->payload, 1, chunk->wireChunk->length, stdout); - for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) { if (!(*it)->isActive()) @@ -207,7 +205,7 @@ cout << "Sending header: " << headerChunk->payloadSize << "\n"; } for (auto s : sessions) - s->send(chunk); + s->send(message); } void start() @@ -224,7 +222,7 @@ private: set> sessions; boost::asio::io_service io_service_; unsigned short port_; - shared_ptr headerChunk; + shared_ptr headerChunk; thread* acceptThread; }; @@ -302,7 +300,7 @@ int main(int argc, char* argv[]) size_t duration = 50; SampleFormat format(sampleFormat); -/* std::auto_ptr encoder; + std::auto_ptr encoder; if (codec == "ogg") encoder.reset(new OggEncoder(sampleFormat)); else if (codec == "pcm") @@ -312,8 +310,8 @@ size_t duration = 50; cout << "unknown codec: " << codec << "\n"; return 1; } -*/ -// shared_ptr header(new Chunk(format, encoder->getHeader())); + +/// shared_ptr header(encoder->getHeader()); // server->setHeader(header); while (!g_terminated) @@ -321,10 +319,10 @@ size_t duration = 50; int fd = open(fifoName.c_str(), O_RDONLY); try { - shared_ptr chunk;//(new WireChunk()); + shared_ptr chunk;//(new WireChunk()); while (true)//cin.good()) { - chunk.reset(new Chunk(format, duration));//2*WIRE_CHUNK_SIZE)); + chunk.reset(new PcmChunk(format, duration));//2*WIRE_CHUNK_SIZE)); int toRead = chunk->payloadSize; int len = 0; do