new messaging

git-svn-id: svn://elaine/murooma/trunk@230 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-04 19:23:29 +00:00
parent a519d0f0dd
commit 04e872e036
15 changed files with 107 additions and 104 deletions

View file

@ -3,7 +3,7 @@ CC = /usr/bin/g++
CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc
OBJ = snapClient.o stream.o player.o receiver.o pcmDecoder.o oggDecoder.o ../common/chunk.o ../common/log.o ../common/sampleFormat.o
OBJ = snapClient.o stream.o player.o receiver.o ../common/message.o ../common/log.o ../common/sampleFormat.o
BIN = snapclient
all: client

View file

@ -1,12 +1,12 @@
#ifndef DECODER_H
#define DECODER_H
#include "common/chunk.h"
#include "common/message.h"
class Decoder
{
public:
Decoder();
virtual bool decode(Chunk* chunk) = 0;
virtual bool decode(PcmChunk* chunk) = 0;
};

View file

@ -23,17 +23,20 @@ OggDecoder::~OggDecoder()
}
bool OggDecoder::decodePayload(Chunk* chunk)
bool OggDecoder::decodePayload(PcmChunk* chunk)
{
WireChunk* wireChunk = chunk->wireChunk;
/* grab some data at the head of the stream. We want the first page
(which is guaranteed to be small and only contain the Vorbis
stream initial header) We need the first page to get the stream
serialno. */
bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, chunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
wireChunk->length = 0;
chunk->payloadSize = 0;
convsize=4096;//bytes/vi.channels;
/* The rest is just a straight decode loop until end of stream */
// while(!eos){
@ -95,11 +98,11 @@ bool OggDecoder::decodePayload(Chunk* chunk)
}
}
size_t oldSize = wireChunk->length;
size_t oldSize = chunk->payloadSize;
size_t size = 2*vi.channels * bout;
wireChunk->length += size;
wireChunk->payload = (char*)realloc(wireChunk->payload, wireChunk->length);
memcpy(wireChunk->payload + oldSize, convbuffer, size);
chunk->payloadSize += size;
chunk->payload = (char*)realloc(chunk->payload, chunk->payloadSize);
memcpy(chunk->payload + oldSize, convbuffer, size);
/* tell libvorbis how many samples we actually consumed */
vorbis_synthesis_read(&vd,bout);
}
@ -113,8 +116,13 @@ bool OggDecoder::decodePayload(Chunk* chunk)
}
bool OggDecoder::decodeHeader(Chunk* chunk)
bool OggDecoder::decodeHeader(HeaderMessage* chunk)
{
bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, chunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
cout << "Decode header\n";
if(ogg_sync_pageout(&oy,&og)!=1)
{
@ -220,13 +228,8 @@ cout << "6" << endl;
}
bool OggDecoder::decode(Chunk* chunk)
bool OggDecoder::decode(BaseMessage* chunk)
{
WireChunk* wireChunk = chunk->wireChunk;
bytes = wireChunk->length;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, wireChunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
if (chunk->getType() == chunk_type::payload)
return decodePayload(chunk);
else if (chunk->getType() == chunk_type::header)

View file

@ -9,11 +9,11 @@ class OggDecoder
public:
OggDecoder();
virtual ~OggDecoder();
virtual bool decode(Chunk* chunk);
virtual bool decode(BaseMessage* chunk);
private:
bool decodePayload(Chunk* chunk);
bool decodeHeader(Chunk* chunk);
bool decodePayload(PcmChunk* chunk);
bool decodeHeader(HeaderMessage* chunk);
ogg_sync_state oy; /* sync and verify incoming physical bitstream */
ogg_stream_state os; /* take physical pages, weld into a logical

View file

@ -5,7 +5,7 @@ PcmDecoder::PcmDecoder()
}
bool PcmDecoder::decode(Chunk* chunk)
bool PcmDecoder::decode(BaseMessage* chunk)
{
/* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n)

View file

@ -7,7 +7,7 @@ class PcmDecoder
{
public:
PcmDecoder();
virtual bool decode(Chunk* chunk);
virtual bool decode(BaseMessage* chunk);
};

View file

@ -2,8 +2,8 @@
#include <boost/lexical_cast.hpp>
#include <iostream>
#include "common/log.h"
#include "oggDecoder.h"
#include "pcmDecoder.h"
//#include "oggDecoder.h"
//#include "pcmDecoder.h"
#define PCM_DEVICE "default"
@ -47,7 +47,7 @@ void Receiver::stop()
void Receiver::worker()
{
active_ = true;
OggDecoder decoder;
// OggDecoder decoder;
while (active_)
{
try
@ -62,18 +62,43 @@ void Receiver::worker()
// std::clog << kLogNotice << "connected to " << ip << ":" << port << std::endl;
while (true)
{
WireChunk* wireChunk = new WireChunk();
socketRead(&s, wireChunk, Chunk::getHeaderSize());
wireChunk->payload = (char*)malloc(wireChunk->length);
socketRead(&s, wireChunk->payload, wireChunk->length);
Chunk* chunk = new Chunk(stream_->format, wireChunk);
BaseMessage baseMessage;
boost::asio::streambuf b;
// reserve 512 bytes in output sequence
boost::asio::streambuf::mutable_buffers_type bufs = b.prepare(baseMessage.getSize());
size_t read = s.receive(bufs);
//cout << "read: " << read << "\n";
// received data is "committed" from output sequence to input sequence
b.commit(baseMessage.getSize());
std::istream is(&b);
baseMessage.read(is);
// cout << "type: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
read = 0;
bufs = b.prepare(baseMessage.size);
while (read < baseMessage.size)
{
size_t n = s.receive(bufs);
b.commit(n);
read += n;
}
// received data is "committed" from output sequence to input sequence
// std::istream is(&b);
if (baseMessage.type == message_type::payload)
{
PcmChunk* chunk = new PcmChunk(stream_->format, 0);
chunk->read(is);
//cout << "WireChunk length: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n";
stream_->addChunk(chunk);
}
//cout << "WireChunk length: " << wireChunk->length << ", Duration: " << chunk->getDuration() << ", sec: " << wireChunk->tv_sec << ", usec: " << wireChunk->tv_usec/1000 << ", type: " << wireChunk->type << "\n";
if (decoder.decode(chunk))
/* if (decoder.decode(chunk))
{
//cout << "Duration: " << chunk->getDuration() << "\n";
stream_->addChunk(chunk);
}
}
*/ }
}
catch (const std::exception& e)
{

View file

@ -10,7 +10,6 @@
#include <boost/program_options.hpp>
#include "common/sampleFormat.h"
#include "common/chunk.h"
#include "common/utils.h"
#include "common/log.h"
#include "stream.h"

View file

@ -31,11 +31,11 @@ void Stream::clearChunks()
}
void Stream::addChunk(Chunk* chunk)
void Stream::addChunk(PcmChunk* chunk)
{
while (chunks.size() * chunk->getDuration() > 10000)
chunks.pop();
chunks.push(shared_ptr<Chunk>(chunk));
chunks.push(shared_ptr<PcmChunk>(chunk));
// cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n";
}
@ -110,7 +110,7 @@ time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frame
while (read < toRead)
{
read += chunk->read(buffer + read*format.frameSize, toRead - read);
read += chunk->readFrames(buffer + read*format.frameSize, toRead - read);
if (chunk->isEndOfChunk())
chunk = chunks.pop();
}
@ -175,7 +175,7 @@ int msBuffer = framesPerBuffer / format_.msRate();
if (sleep < -msBuffer/2)
{
cout << "Sleep " << sleep;
sleep = Chunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime;
sleep = PcmChunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime;
std::cerr << " after: " << sleep << ", chunks: " << chunks.size() << "\n";
// std::clog << kLogNotice << "sleep: " << sleep << std::endl;
// if (sleep > -msBuffer/2)
@ -201,7 +201,7 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n"
// cout << "chunk->getAge() > chunk->getDuration(): " << chunk->getAge() - bufferMs + outputBufferDacTime << " > " << chunk->getDuration() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime << ", needed: " << msBuffer << ", sleep: " << sleep << "\n";
usleep(1000);
}
cout << "seek: " << Chunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n";
cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n";
sleep = 0;
}
else if (sleep < 0)
@ -218,7 +218,7 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n"
int age = Chunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime;
int age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime;
// if (pCardBuffer->full())

View file

@ -8,7 +8,7 @@
#include <vector>
#include <memory>
#include "doubleBuffer.h"
#include "common/chunk.h"
#include "common/message.h"
#include "common/timeUtils.h"
#include "common/queue.h"
#include "common/sampleFormat.h"
@ -18,7 +18,7 @@ class Stream
{
public:
Stream(const SampleFormat& format);
void addChunk(Chunk* chunk);
void addChunk(PcmChunk* chunk);
void clearChunks();
void getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer);
void setBufferLen(size_t bufferLenMs);
@ -37,12 +37,12 @@ private:
long lastTick;
int sleep;
Queue<std::shared_ptr<Chunk>> chunks;
Queue<std::shared_ptr<PcmChunk>> chunks;
DoubleBuffer<int>* pCardBuffer;
DoubleBuffer<int>* pMiniBuffer;
DoubleBuffer<int>* pBuffer;
DoubleBuffer<int>* pShortBuffer;
std::shared_ptr<Chunk> chunk;
std::shared_ptr<PcmChunk> chunk;
int median;
int shortMedian;

View file

@ -54,7 +54,7 @@ int PcmChunk::seek(int frames)
}
int PcmChunk::read(void* outputBuffer, size_t frameCount)
int PcmChunk::readFrames(void* outputBuffer, size_t frameCount)
{
//logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl;
int result = frameCount;

View file

@ -28,15 +28,23 @@ struct BaseMessage
BaseMessage(message_type type_)
{
type = type;
type = type_;
};
virtual void read(istream& stream)
{
stream.read(reinterpret_cast<char*>(&type), sizeof(uint16_t));
cout << type << "\n";
// cout << type << "\n";
stream.read(reinterpret_cast<char*>(&size), sizeof(uint32_t));
cout << size << "\n";
// cout << size << "\n";
}
void read(char* stream)
{
memcpy(reinterpret_cast<char*>(&type), stream, sizeof(uint16_t));
// cout << "type: " << type << "\n";
memcpy(reinterpret_cast<char*>(&size), stream + sizeof(uint16_t), sizeof(uint32_t));
// cout << "size: " << size << "\n";
}
virtual void serialize(ostream& stream)
@ -107,8 +115,9 @@ protected:
class WireChunk : public BaseMessage
{
public:
WireChunk() : BaseMessage(message_type::payload)
WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size)
{
payload = (char*)malloc(size);
}
// WireChunk(int8_t logLevel_, char* text_) : BaseMessage(message_type::payload), logLevel(logLevel_), length(sizeof(text_)), text(text_)
@ -117,6 +126,7 @@ public:
virtual ~WireChunk()
{
free(payload);
}
virtual void read(istream& stream)
@ -124,7 +134,7 @@ public:
stream.read(reinterpret_cast<char *>(&tv_sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&tv_usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)malloc(payloadSize);
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
}
@ -153,19 +163,20 @@ protected:
class HeaderMessage : public BaseMessage
{
public:
HeaderMessage(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size)
HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size)
{
payload = (char*)malloc(size);
}
virtual ~HeaderMessage()
{
free(payload);
}
virtual void read(istream& stream)
{
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)malloc(payloadSize);
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
}
@ -292,7 +303,7 @@ public:
PcmChunk(const SampleFormat& sampleFormat, size_t ms);
~PcmChunk();
int read(void* outputBuffer, size_t frameCount);
int readFrames(void* outputBuffer, size_t frameCount);
bool isEndOfChunk() const;
inline time_point_ms timePoint() const

View file

@ -60,45 +60,6 @@ using namespace std::chrono;
bool g_terminated = false;
/*
int main(int argc, char* argv[])
{
TestMessage* chunk = new TestMessage(1, (char*)"Hallo");
stringstream ss;
chunk->serialize(ss);
BaseMessage header;
ss.seekg(0, ss.beg);
header.read(ss);
// cout << ss.str() << "\n";
// ss.read(reinterpret_cast<char*>(&header), sizeof(MessageHeader));
cout << "Header: " << header.type << ", " << header.size << "\n";
delete chunk;
chunk = new TestMessage();
chunk->read(ss);
cout << "Header: " << chunk->type << ", " << chunk->size << ", " << (int)chunk->logLevel << ", " << chunk->text << "\n";
chunk->tv_sec = 21;
chunk->tv_usec = 2;
chunk->payloadSize = 5;
chunk->payload = (char*)malloc(5);
chunk->payload[0] = 99;
char* stream = chunk->serialize();
cout << "1\n";
for (size_t n=0; n<24; ++n)
cout << (int)stream[n] << " ";
delete chunk;
cout << "\n3\n";
chunk = new WireChunk();
cout << "4\n";
chunk->deserialize(stream);
cout << "5\n";
cout << chunk->tv_sec << ", " << chunk->tv_usec << ", " << chunk->payloadSize << ", " << chunk->payload << "\n";
return 0;
}
*/
class Session
@ -112,15 +73,18 @@ public:
{
try
{
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
for (;;)
{
shared_ptr<BaseMessage> message(messages.pop());
/* char* stream = chunk->serialize();
size_t written(0);
size_t toWrite = sizeof(stream);
message->serialize(stream);
boost::asio::write(*socket_, streambuf);
/* size_t written(0);
size_t toWrite = message->getSize();
do
{
written += boost::asio::write(*socket_, boost::asio::buffer(stream + written, toWrite - written));//, error);
written += boost::asio::write(*socket_, streambuf);//, error);
}
while (written < toWrite);
*/ }
@ -176,9 +140,10 @@ public:
{
socket_ptr sock(new tcp::socket(io_service_));
a.accept(*sock);
// cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n";
cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n";
Session* session = new Session(sock);
cout << "Sending header: " << headerChunk->payloadSize << "\n";
if (headerChunk)
cout << "Sending header: " << headerChunk->payloadSize << endl;
session->send(headerChunk);
session->start();
sessions.insert(shared_ptr<Session>(session));
@ -297,9 +262,9 @@ int main(int argc, char* argv[])
long nextTick = getTickCount();
mkfifo(fifoName.c_str(), 0777);
size_t duration = 50;
SampleFormat format(sampleFormat);
size_t duration = 50;
//size_t chunkSize = duration*format.rate*format.frameSize / 1000;
std::auto_ptr<Encoder> encoder;
if (codec == "ogg")
encoder.reset(new OggEncoder(sampleFormat));
@ -311,8 +276,8 @@ size_t duration = 50;
return 1;
}
/// shared_ptr<HeaderMessage> header(encoder->getHeader());
// server->setHeader(header);
shared_ptr<HeaderMessage> header(encoder->getHeader());
server->setHeader(header);
while (!g_terminated)
{
@ -322,7 +287,7 @@ size_t duration = 50;
shared_ptr<PcmChunk> chunk;//(new WireChunk());
while (true)//cin.good())
{
chunk.reset(new PcmChunk(format, duration));//2*WIRE_CHUNK_SIZE));
chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE));
int toRead = chunk->payloadSize;
int len = 0;
do
@ -337,10 +302,10 @@ size_t duration = 50;
chunk->tv_sec = tvChunk.tv_sec;
chunk->tv_usec = tvChunk.tv_usec;
double chunkDuration = 50;//encoder->encode(chunk.get());
double chunkDuration = encoder->encode(chunk.get());
if (chunkDuration > 0)
server->send(chunk);
cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n";
//cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n";
// addUs(tvChunk, 1000*chunk->getDuration());
addUs(tvChunk, chunkDuration * 1000);
nextTick += duration;