diff --git a/client/Makefile b/client/Makefile index 1047bbbb..1a442bda 100644 --- a/client/Makefile +++ b/client/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc -OBJ = snapClient.o stream.o player.o receiver.o pcmDecoder.o oggDecoder.o ../common/chunk.o ../common/log.o ../common/sampleFormat.o +OBJ = snapClient.o stream.o player.o receiver.o ../common/message.o ../common/log.o ../common/sampleFormat.o BIN = snapclient all: client diff --git a/client/decoder.h b/client/decoder.h index 9f0ae236..a17ea874 100644 --- a/client/decoder.h +++ b/client/decoder.h @@ -1,12 +1,12 @@ #ifndef DECODER_H #define DECODER_H -#include "common/chunk.h" +#include "common/message.h" class Decoder { public: Decoder(); - virtual bool decode(Chunk* chunk) = 0; + virtual bool decode(PcmChunk* chunk) = 0; }; diff --git a/client/oggDecoder.cpp b/client/oggDecoder.cpp index f9d6b653..2587b2c0 100644 --- a/client/oggDecoder.cpp +++ b/client/oggDecoder.cpp @@ -23,17 +23,20 @@ OggDecoder::~OggDecoder() } -bool OggDecoder::decodePayload(Chunk* chunk) +bool OggDecoder::decodePayload(PcmChunk* chunk) { - WireChunk* wireChunk = chunk->wireChunk; /* grab some data at the head of the stream. We want the first page (which is guaranteed to be small and only contain the Vorbis stream initial header) We need the first page to get the stream serialno. */ + bytes = chunk->payloadSize; + buffer=ogg_sync_buffer(&oy, bytes); + memcpy(buffer, chunk->payload, bytes); + ogg_sync_wrote(&oy,bytes); - wireChunk->length = 0; + chunk->payloadSize = 0; convsize=4096;//bytes/vi.channels; /* The rest is just a straight decode loop until end of stream */ // while(!eos){ @@ -95,11 +98,11 @@ bool OggDecoder::decodePayload(Chunk* chunk) } } - size_t oldSize = wireChunk->length; + size_t oldSize = chunk->payloadSize; size_t size = 2*vi.channels * bout; - wireChunk->length += size; - wireChunk->payload = (char*)realloc(wireChunk->payload, wireChunk->length); - memcpy(wireChunk->payload + oldSize, convbuffer, size); + chunk->payloadSize += size; + chunk->payload = (char*)realloc(chunk->payload, chunk->payloadSize); + memcpy(chunk->payload + oldSize, convbuffer, size); /* tell libvorbis how many samples we actually consumed */ vorbis_synthesis_read(&vd,bout); } @@ -113,8 +116,13 @@ bool OggDecoder::decodePayload(Chunk* chunk) } -bool OggDecoder::decodeHeader(Chunk* chunk) +bool OggDecoder::decodeHeader(HeaderMessage* chunk) { + bytes = chunk->payloadSize; + buffer=ogg_sync_buffer(&oy, bytes); + memcpy(buffer, chunk->payload, bytes); + ogg_sync_wrote(&oy,bytes); + cout << "Decode header\n"; if(ogg_sync_pageout(&oy,&og)!=1) { @@ -220,13 +228,8 @@ cout << "6" << endl; } -bool OggDecoder::decode(Chunk* chunk) +bool OggDecoder::decode(BaseMessage* chunk) { - WireChunk* wireChunk = chunk->wireChunk; - bytes = wireChunk->length; - buffer=ogg_sync_buffer(&oy, bytes); - memcpy(buffer, wireChunk->payload, bytes); - ogg_sync_wrote(&oy,bytes); if (chunk->getType() == chunk_type::payload) return decodePayload(chunk); else if (chunk->getType() == chunk_type::header) diff --git a/client/oggDecoder.h b/client/oggDecoder.h index 6e44a3ba..9d8d87d1 100644 --- a/client/oggDecoder.h +++ b/client/oggDecoder.h @@ -9,11 +9,11 @@ class OggDecoder public: OggDecoder(); virtual ~OggDecoder(); - virtual bool decode(Chunk* chunk); + virtual bool decode(BaseMessage* chunk); private: - bool decodePayload(Chunk* chunk); - bool decodeHeader(Chunk* chunk); + bool decodePayload(PcmChunk* chunk); + bool decodeHeader(HeaderMessage* chunk); ogg_sync_state oy; /* sync and verify incoming physical bitstream */ ogg_stream_state os; /* take physical pages, weld into a logical diff --git a/client/pcmDecoder.cpp b/client/pcmDecoder.cpp index 8bfd3d1a..28c1f3b2 100644 --- a/client/pcmDecoder.cpp +++ b/client/pcmDecoder.cpp @@ -5,7 +5,7 @@ PcmDecoder::PcmDecoder() } -bool PcmDecoder::decode(Chunk* chunk) +bool PcmDecoder::decode(BaseMessage* chunk) { /* WireChunk* wireChunk = chunk->wireChunk; for (size_t n=0; nlength; ++n) diff --git a/client/pcmDecoder.h b/client/pcmDecoder.h index 5a04bae8..202a1483 100644 --- a/client/pcmDecoder.h +++ b/client/pcmDecoder.h @@ -7,7 +7,7 @@ class PcmDecoder { public: PcmDecoder(); - virtual bool decode(Chunk* chunk); + virtual bool decode(BaseMessage* chunk); }; diff --git a/client/receiver.cpp b/client/receiver.cpp index 2e178b8d..3c2ca370 100644 --- a/client/receiver.cpp +++ b/client/receiver.cpp @@ -2,8 +2,8 @@ #include #include #include "common/log.h" -#include "oggDecoder.h" -#include "pcmDecoder.h" +//#include "oggDecoder.h" +//#include "pcmDecoder.h" #define PCM_DEVICE "default" @@ -47,7 +47,7 @@ void Receiver::stop() void Receiver::worker() { active_ = true; - OggDecoder decoder; +// OggDecoder decoder; while (active_) { try @@ -62,18 +62,43 @@ void Receiver::worker() // std::clog << kLogNotice << "connected to " << ip << ":" << port << std::endl; while (true) { - WireChunk* wireChunk = new WireChunk(); - socketRead(&s, wireChunk, Chunk::getHeaderSize()); - wireChunk->payload = (char*)malloc(wireChunk->length); - socketRead(&s, wireChunk->payload, wireChunk->length); - Chunk* chunk = new Chunk(stream_->format, wireChunk); + BaseMessage baseMessage; + boost::asio::streambuf b; + // reserve 512 bytes in output sequence + boost::asio::streambuf::mutable_buffers_type bufs = b.prepare(baseMessage.getSize()); + size_t read = s.receive(bufs); +//cout << "read: " << read << "\n"; + // received data is "committed" from output sequence to input sequence + b.commit(baseMessage.getSize()); + std::istream is(&b); + baseMessage.read(is); +// cout << "type: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; + + read = 0; + bufs = b.prepare(baseMessage.size); + while (read < baseMessage.size) + { + size_t n = s.receive(bufs); + b.commit(n); + read += n; + } + // received data is "committed" from output sequence to input sequence +// std::istream is(&b); + if (baseMessage.type == message_type::payload) + { + PcmChunk* chunk = new PcmChunk(stream_->format, 0); + chunk->read(is); +//cout << "WireChunk length: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n"; + stream_->addChunk(chunk); + } + //cout << "WireChunk length: " << wireChunk->length << ", Duration: " << chunk->getDuration() << ", sec: " << wireChunk->tv_sec << ", usec: " << wireChunk->tv_usec/1000 << ", type: " << wireChunk->type << "\n"; - if (decoder.decode(chunk)) +/* if (decoder.decode(chunk)) { //cout << "Duration: " << chunk->getDuration() << "\n"; stream_->addChunk(chunk); } - } +*/ } } catch (const std::exception& e) { diff --git a/client/snapClient.cpp b/client/snapClient.cpp index d60a5000..90b79385 100644 --- a/client/snapClient.cpp +++ b/client/snapClient.cpp @@ -10,7 +10,6 @@ #include #include "common/sampleFormat.h" -#include "common/chunk.h" #include "common/utils.h" #include "common/log.h" #include "stream.h" diff --git a/client/stream.cpp b/client/stream.cpp index df463385..40090217 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -31,11 +31,11 @@ void Stream::clearChunks() } -void Stream::addChunk(Chunk* chunk) +void Stream::addChunk(PcmChunk* chunk) { while (chunks.size() * chunk->getDuration() > 10000) chunks.pop(); - chunks.push(shared_ptr(chunk)); + chunks.push(shared_ptr(chunk)); // cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n"; } @@ -110,7 +110,7 @@ time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frame while (read < toRead) { - read += chunk->read(buffer + read*format.frameSize, toRead - read); + read += chunk->readFrames(buffer + read*format.frameSize, toRead - read); if (chunk->isEndOfChunk()) chunk = chunks.pop(); } @@ -175,7 +175,7 @@ int msBuffer = framesPerBuffer / format_.msRate(); if (sleep < -msBuffer/2) { cout << "Sleep " << sleep; - sleep = Chunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime; + sleep = PcmChunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime; std::cerr << " after: " << sleep << ", chunks: " << chunks.size() << "\n"; // std::clog << kLogNotice << "sleep: " << sleep << std::endl; // if (sleep > -msBuffer/2) @@ -201,7 +201,7 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n" // cout << "chunk->getAge() > chunk->getDuration(): " << chunk->getAge() - bufferMs + outputBufferDacTime << " > " << chunk->getDuration() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime << ", needed: " << msBuffer << ", sleep: " << sleep << "\n"; usleep(1000); } - cout << "seek: " << Chunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n"; + cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n"; sleep = 0; } else if (sleep < 0) @@ -218,7 +218,7 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n" - int age = Chunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime; + int age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime; // if (pCardBuffer->full()) diff --git a/client/stream.h b/client/stream.h index 05c4b99b..4912c945 100644 --- a/client/stream.h +++ b/client/stream.h @@ -8,7 +8,7 @@ #include #include #include "doubleBuffer.h" -#include "common/chunk.h" +#include "common/message.h" #include "common/timeUtils.h" #include "common/queue.h" #include "common/sampleFormat.h" @@ -18,7 +18,7 @@ class Stream { public: Stream(const SampleFormat& format); - void addChunk(Chunk* chunk); + void addChunk(PcmChunk* chunk); void clearChunks(); void getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer); void setBufferLen(size_t bufferLenMs); @@ -37,12 +37,12 @@ private: long lastTick; int sleep; - Queue> chunks; + Queue> chunks; DoubleBuffer* pCardBuffer; DoubleBuffer* pMiniBuffer; DoubleBuffer* pBuffer; DoubleBuffer* pShortBuffer; - std::shared_ptr chunk; + std::shared_ptr chunk; int median; int shortMedian; diff --git a/common/chunk.cpp b/common/chunk.old.cpp similarity index 100% rename from common/chunk.cpp rename to common/chunk.old.cpp diff --git a/common/chunk.h b/common/chunk.old.h similarity index 100% rename from common/chunk.h rename to common/chunk.old.h diff --git a/common/message.cpp b/common/message.cpp index bfe7eda7..acfffdc8 100644 --- a/common/message.cpp +++ b/common/message.cpp @@ -54,7 +54,7 @@ int PcmChunk::seek(int frames) } -int PcmChunk::read(void* outputBuffer, size_t frameCount) +int PcmChunk::readFrames(void* outputBuffer, size_t frameCount) { //logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl; int result = frameCount; diff --git a/common/message.h b/common/message.h index 0780fcdb..dab37b8f 100644 --- a/common/message.h +++ b/common/message.h @@ -28,15 +28,23 @@ struct BaseMessage BaseMessage(message_type type_) { - type = type; + type = type_; }; virtual void read(istream& stream) { stream.read(reinterpret_cast(&type), sizeof(uint16_t)); - cout << type << "\n"; +// cout << type << "\n"; stream.read(reinterpret_cast(&size), sizeof(uint32_t)); - cout << size << "\n"; +// cout << size << "\n"; + } + + void read(char* stream) + { + memcpy(reinterpret_cast(&type), stream, sizeof(uint16_t)); +// cout << "type: " << type << "\n"; + memcpy(reinterpret_cast(&size), stream + sizeof(uint16_t), sizeof(uint32_t)); +// cout << "size: " << size << "\n"; } virtual void serialize(ostream& stream) @@ -107,8 +115,9 @@ protected: class WireChunk : public BaseMessage { public: - WireChunk() : BaseMessage(message_type::payload) + WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) { + payload = (char*)malloc(size); } // WireChunk(int8_t logLevel_, char* text_) : BaseMessage(message_type::payload), logLevel(logLevel_), length(sizeof(text_)), text(text_) @@ -117,6 +126,7 @@ public: virtual ~WireChunk() { + free(payload); } virtual void read(istream& stream) @@ -124,7 +134,7 @@ public: stream.read(reinterpret_cast(&tv_sec), sizeof(int32_t)); stream.read(reinterpret_cast(&tv_usec), sizeof(int32_t)); stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); - payload = (char*)malloc(payloadSize); + payload = (char*)realloc(payload, payloadSize); stream.read(payload, payloadSize); } @@ -153,19 +163,20 @@ protected: class HeaderMessage : public BaseMessage { public: - HeaderMessage(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) + HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size) { payload = (char*)malloc(size); } virtual ~HeaderMessage() { + free(payload); } virtual void read(istream& stream) { stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); - payload = (char*)malloc(payloadSize); + payload = (char*)realloc(payload, payloadSize); stream.read(payload, payloadSize); } @@ -292,7 +303,7 @@ public: PcmChunk(const SampleFormat& sampleFormat, size_t ms); ~PcmChunk(); - int read(void* outputBuffer, size_t frameCount); + int readFrames(void* outputBuffer, size_t frameCount); bool isEndOfChunk() const; inline time_point_ms timePoint() const diff --git a/test/test.cpp b/test/test.cpp index e330e1eb..ade0b512 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -60,45 +60,6 @@ using namespace std::chrono; bool g_terminated = false; -/* -int main(int argc, char* argv[]) -{ - TestMessage* chunk = new TestMessage(1, (char*)"Hallo"); - stringstream ss; - chunk->serialize(ss); - - BaseMessage header; - ss.seekg(0, ss.beg); - header.read(ss); -// cout << ss.str() << "\n"; -// ss.read(reinterpret_cast(&header), sizeof(MessageHeader)); - cout << "Header: " << header.type << ", " << header.size << "\n"; - delete chunk; - chunk = new TestMessage(); - chunk->read(ss); - cout << "Header: " << chunk->type << ", " << chunk->size << ", " << (int)chunk->logLevel << ", " << chunk->text << "\n"; - - - chunk->tv_sec = 21; - chunk->tv_usec = 2; - chunk->payloadSize = 5; - chunk->payload = (char*)malloc(5); -chunk->payload[0] = 99; - char* stream = chunk->serialize(); -cout << "1\n"; -for (size_t n=0; n<24; ++n) - cout << (int)stream[n] << " "; - delete chunk; -cout << "\n3\n"; - chunk = new WireChunk(); -cout << "4\n"; - chunk->deserialize(stream); -cout << "5\n"; - cout << chunk->tv_sec << ", " << chunk->tv_usec << ", " << chunk->payloadSize << ", " << chunk->payload << "\n"; - -return 0; -} -*/ class Session @@ -112,15 +73,18 @@ public: { try { + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); for (;;) { shared_ptr message(messages.pop()); -/* char* stream = chunk->serialize(); - size_t written(0); - size_t toWrite = sizeof(stream); + message->serialize(stream); + boost::asio::write(*socket_, streambuf); +/* size_t written(0); + size_t toWrite = message->getSize(); do { - written += boost::asio::write(*socket_, boost::asio::buffer(stream + written, toWrite - written));//, error); + written += boost::asio::write(*socket_, streambuf);//, error); } while (written < toWrite); */ } @@ -176,9 +140,10 @@ public: { socket_ptr sock(new tcp::socket(io_service_)); a.accept(*sock); -// cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; Session* session = new Session(sock); -cout << "Sending header: " << headerChunk->payloadSize << "\n"; + if (headerChunk) +cout << "Sending header: " << headerChunk->payloadSize << endl; session->send(headerChunk); session->start(); sessions.insert(shared_ptr(session)); @@ -297,9 +262,9 @@ int main(int argc, char* argv[]) long nextTick = getTickCount(); mkfifo(fifoName.c_str(), 0777); -size_t duration = 50; - SampleFormat format(sampleFormat); +size_t duration = 50; +//size_t chunkSize = duration*format.rate*format.frameSize / 1000; std::auto_ptr encoder; if (codec == "ogg") encoder.reset(new OggEncoder(sampleFormat)); @@ -311,8 +276,8 @@ size_t duration = 50; return 1; } -/// shared_ptr header(encoder->getHeader()); -// server->setHeader(header); + shared_ptr header(encoder->getHeader()); + server->setHeader(header); while (!g_terminated) { @@ -322,7 +287,7 @@ size_t duration = 50; shared_ptr chunk;//(new WireChunk()); while (true)//cin.good()) { - chunk.reset(new PcmChunk(format, duration));//2*WIRE_CHUNK_SIZE)); + chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); int toRead = chunk->payloadSize; int len = 0; do @@ -337,10 +302,10 @@ size_t duration = 50; chunk->tv_sec = tvChunk.tv_sec; chunk->tv_usec = tvChunk.tv_usec; - double chunkDuration = 50;//encoder->encode(chunk.get()); + double chunkDuration = encoder->encode(chunk.get()); if (chunkDuration > 0) server->send(chunk); -cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n"; +//cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n"; // addUs(tvChunk, 1000*chunk->getDuration()); addUs(tvChunk, chunkDuration * 1000); nextTick += duration;