refactoring

git-svn-id: svn://elaine/murooma/trunk@328 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-12-29 12:34:33 +00:00
parent 611b2a6c9f
commit c40bfda64c
35 changed files with 192 additions and 178 deletions

View file

@ -93,7 +93,7 @@ void ClientConnection::stop()
} }
bool ClientConnection::send(BaseMessage* message) bool ClientConnection::send(msg::BaseMessage* message)
{ {
// std::unique_lock<std::mutex> mlock(mutex_); // std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; //cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
@ -110,9 +110,9 @@ bool ClientConnection::send(BaseMessage* message)
} }
shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message, const chronos::msec& timeout) shared_ptr<msg::SerializedMessage> ClientConnection::sendRequest(msg::BaseMessage* message, const chronos::msec& timeout)
{ {
shared_ptr<SerializedMessage> response(NULL); shared_ptr<msg::SerializedMessage> response(NULL);
if (++reqId == 10000) if (++reqId == 10000)
reqId = 1; reqId = 1;
message->id = reqId; message->id = reqId;
@ -148,7 +148,7 @@ shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message
void ClientConnection::getNextMessage() void ClientConnection::getNextMessage()
{ {
BaseMessage baseMessage; msg::BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize); vector<char> buffer(baseMsgSize);
socketRead(&buffer[0], baseMsgSize); socketRead(&buffer[0], baseMsgSize);
@ -167,7 +167,7 @@ void ClientConnection::getNextMessage()
{ {
if (req->id == baseMessage.refersTo) if (req->id == baseMessage.refersTo)
{ {
req->response.reset(new SerializedMessage()); req->response.reset(new msg::SerializedMessage());
req->response->message = baseMessage; req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size); req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size); memcpy(req->response->buffer, &buffer[0], baseMessage.size);

View file

@ -24,7 +24,7 @@ struct PendingRequest
PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {};
uint16_t id; uint16_t id;
std::shared_ptr<SerializedMessage> response; std::shared_ptr<msg::SerializedMessage> response;
std::condition_variable cv; std::condition_variable cv;
}; };
@ -32,7 +32,7 @@ struct PendingRequest
class MessageReceiver class MessageReceiver
{ {
public: public:
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0;
virtual void onException(ClientConnection* connection, const std::exception& exception) = 0; virtual void onException(ClientConnection* connection, const std::exception& exception) = 0;
}; };
@ -44,13 +44,13 @@ public:
virtual ~ClientConnection(); virtual ~ClientConnection();
virtual void start(); virtual void start();
virtual void stop(); virtual void stop();
virtual bool send(BaseMessage* _message); virtual bool send(msg::BaseMessage* _message);
virtual std::shared_ptr<SerializedMessage> sendRequest(BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); virtual std::shared_ptr<msg::SerializedMessage> sendRequest(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000));
template <typename T> template <typename T>
std::shared_ptr<T> sendReq(BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) std::shared_ptr<T> sendReq(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000))
{ {
std::shared_ptr<SerializedMessage> reply = sendRequest(message, timeout); std::shared_ptr<msg::SerializedMessage> reply = sendRequest(message, timeout);
if (!reply) if (!reply)
return NULL; return NULL;
std::shared_ptr<T> msg(new T); std::shared_ptr<T> msg(new T);

View file

@ -8,10 +8,10 @@
#include "alsaPlayer.h" #include "alsaPlayer.h"
#include "timeProvider.h" #include "timeProvider.h"
#include "message/serverSettings.h" #include "message/serverSettings.h"
#include "message/timeMsg.h" #include "message/time.h"
#include "message/requestMsg.h" #include "message/request.h"
#include "message/ackMsg.h" #include "message/ack.h"
#include "message/commandMsg.h" #include "message/command.h"
using namespace std; using namespace std;
@ -27,13 +27,13 @@ void Controller::onException(ClientConnection* connection, const std::exception&
} }
void Controller::onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) void Controller::onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
if (baseMessage.type == message_type::payload) if (baseMessage.type == message_type::kPayload)
{ {
if ((stream != NULL) && (decoder != NULL)) if ((stream != NULL) && (decoder != NULL))
{ {
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); msg::PcmChunk* pcmChunk = new msg::PcmChunk(*sampleFormat, 0);
pcmChunk->deserialize(baseMessage, buffer); pcmChunk->deserialize(baseMessage, buffer);
//cout << "chunk: " << pcmChunk->payloadSize; //cout << "chunk: " << pcmChunk->payloadSize;
if (decoder->decode(pcmChunk)) if (decoder->decode(pcmChunk))
@ -81,18 +81,18 @@ void Controller::worker()
try try
{ {
clientConnection->start(); clientConnection->start();
RequestMsg requestMsg(serversettings); msg::Request requestMsg(kServerSettings);
shared_ptr<ServerSettings> serverSettings(NULL); shared_ptr<msg::ServerSettings> serverSettings(NULL);
while (active_ && !(serverSettings = clientConnection->sendReq<ServerSettings>(&requestMsg))); while (active_ && !(serverSettings = clientConnection->sendReq<msg::ServerSettings>(&requestMsg)));
cout << "ServerSettings buffer: " << serverSettings->bufferMs << "\n"; cout << "ServerSettings buffer: " << serverSettings->bufferMs << "\n";
requestMsg.request = sampleformat; requestMsg.request = kSampleFormat;
while (active_ && !(sampleFormat = clientConnection->sendReq<SampleFormat>(&requestMsg))); while (active_ && !(sampleFormat = clientConnection->sendReq<msg::SampleFormat>(&requestMsg)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
requestMsg.request = header; requestMsg.request = kHeader;
shared_ptr<HeaderMessage> headerChunk(NULL); shared_ptr<msg::Header> headerChunk(NULL);
while (active_ && !(headerChunk = clientConnection->sendReq<HeaderMessage>(&requestMsg))); while (active_ && !(headerChunk = clientConnection->sendReq<msg::Header>(&requestMsg)));
cout << "Codec: " << headerChunk->codec << "\n"; cout << "Codec: " << headerChunk->codec << "\n";
if (headerChunk->codec == "ogg") if (headerChunk->codec == "ogg")
decoder = new OggDecoder(); decoder = new OggDecoder();
@ -100,10 +100,10 @@ void Controller::worker()
decoder = new PcmDecoder(); decoder = new PcmDecoder();
decoder->setHeader(headerChunk.get()); decoder->setHeader(headerChunk.get());
RequestMsg timeReq(timemsg); msg::Request timeReq(kTime);
for (size_t n=0; n<50 && active_; ++n) for (size_t n=0; n<50 && active_; ++n)
{ {
shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq, chronos::msec(2000)); shared_ptr<msg::Time> reply = clientConnection->sendReq<msg::Time>(&timeReq, chronos::msec(2000));
if (reply) if (reply)
{ {
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
@ -119,14 +119,14 @@ void Controller::worker()
Player player(pcmDevice_, stream); Player player(pcmDevice_, stream);
player.start(); player.start();
CommandMsg startStream("startStream"); msg::Command startStream("startStream");
shared_ptr<AckMsg> ackMsg(NULL); shared_ptr<msg::Ack> ackMsg(NULL);
while (active_ && !(ackMsg = clientConnection->sendReq<AckMsg>(&startStream))); while (active_ && !(ackMsg = clientConnection->sendReq<msg::Ack>(&startStream)));
while (active_) while (active_)
{ {
usleep(500*1000); usleep(500*1000);
shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq); shared_ptr<msg::Time> reply = clientConnection->sendReq<msg::Time>(&timeReq);
if (reply) if (reply)
{ {
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;

View file

@ -16,7 +16,7 @@ public:
Controller(); Controller();
void start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency); void start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency);
void stop(); void stop();
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer); virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer);
virtual void onException(ClientConnection* connection, const std::exception& exception); virtual void onException(ClientConnection* connection, const std::exception& exception);
private: private:
@ -26,7 +26,7 @@ private:
ClientConnection* clientConnection; ClientConnection* clientConnection;
Stream* stream; Stream* stream;
std::string ip; std::string ip;
std::shared_ptr<SampleFormat> sampleFormat; std::shared_ptr<msg::SampleFormat> sampleFormat;
Decoder* decoder; Decoder* decoder;
PcmDevice pcmDevice_; PcmDevice pcmDevice_;
size_t latency_; size_t latency_;

View file

@ -1,15 +1,15 @@
#ifndef DECODER_H #ifndef DECODER_H
#define DECODER_H #define DECODER_H
#include "message/pcmChunk.h" #include "message/pcmChunk.h"
#include "message/headerMessage.h" #include "message/header.h"
class Decoder class Decoder
{ {
public: public:
Decoder() {}; Decoder() {};
virtual ~Decoder() {}; virtual ~Decoder() {};
virtual bool decode(PcmChunk* chunk) = 0; virtual bool decode(msg::PcmChunk* chunk) = 0;
virtual bool setHeader(HeaderMessage* chunk) = 0; virtual bool setHeader(msg::Header* chunk) = 0;
}; };

View file

@ -28,7 +28,7 @@ OggDecoder::~OggDecoder()
} }
bool OggDecoder::decode(PcmChunk* chunk) bool OggDecoder::decode(msg::PcmChunk* chunk)
{ {
/* grab some data at the head of the stream. We want the first page /* grab some data at the head of the stream. We want the first page
@ -122,7 +122,7 @@ bool OggDecoder::decode(PcmChunk* chunk)
} }
bool OggDecoder::setHeader(HeaderMessage* chunk) bool OggDecoder::setHeader(msg::Header* chunk)
{ {
bytes = chunk->payloadSize; bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes); buffer=ogg_sync_buffer(&oy, bytes);

View file

@ -9,11 +9,11 @@ class OggDecoder : public Decoder
public: public:
OggDecoder(); OggDecoder();
virtual ~OggDecoder(); virtual ~OggDecoder();
virtual bool decode(PcmChunk* chunk); virtual bool decode(msg::PcmChunk* chunk);
virtual bool setHeader(HeaderMessage* chunk); virtual bool setHeader(msg::Header* chunk);
private: private:
bool decodePayload(PcmChunk* chunk); bool decodePayload(msg::PcmChunk* chunk);
ogg_sync_state oy; /* sync and verify incoming physical bitstream */ ogg_sync_state oy; /* sync and verify incoming physical bitstream */
ogg_stream_state os; /* take physical pages, weld into a logical ogg_stream_state os; /* take physical pages, weld into a logical

View file

@ -5,17 +5,13 @@ PcmDecoder::PcmDecoder() : Decoder()
} }
bool PcmDecoder::decode(PcmChunk* chunk) bool PcmDecoder::decode(msg::PcmChunk* chunk)
{ {
/* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n)
wireChunk->payload[n] *= 1;
*/
return true; return true;
} }
bool PcmDecoder::setHeader(HeaderMessage* chunk) bool PcmDecoder::setHeader(msg::Header* chunk)
{ {
return true; return true;
} }

View file

@ -7,8 +7,8 @@ class PcmDecoder : public Decoder
{ {
public: public:
PcmDecoder(); PcmDecoder();
virtual bool decode(PcmChunk* chunk); virtual bool decode(msg::PcmChunk* chunk);
virtual bool setHeader(HeaderMessage* chunk); virtual bool setHeader(msg::Header* chunk);
}; };

View file

@ -8,7 +8,7 @@ using namespace std;
using namespace chronos; using namespace chronos;
Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0), playedFrames(0) Stream::Stream(const msg::SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0), playedFrames(0)
{ {
buffer.setSize(500); buffer.setSize(500);
shortBuffer.setSize(100); shortBuffer.setSize(100);
@ -16,8 +16,6 @@ Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(samp
// cardBuffer.setSize(50); // cardBuffer.setSize(50);
bufferMs = msec(500); bufferMs = msec(500);
playedSamples = 0;
playedSamplesTime = time_point_hrc::min();
/* /*
48000 x 48000 x
------- = ----- ------- = -----
@ -53,11 +51,11 @@ void Stream::clearChunks()
} }
void Stream::addChunk(PcmChunk* chunk) void Stream::addChunk(msg::PcmChunk* chunk)
{ {
while (chunks.size() * chunk->duration<chronos::msec>().count() > 10000) while (chunks.size() * chunk->duration<chronos::msec>().count() > 10000)
chunks.pop(); chunks.pop();
chunks.push(shared_ptr<PcmChunk>(chunk)); chunks.push(shared_ptr<msg::PcmChunk>(chunk));
// cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n"; // cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n";
} }
@ -174,17 +172,6 @@ void Stream::resetBuffers()
bool Stream::getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer) bool Stream::getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer)
{ {
/*if (playedSamplesTime == time_point_hrc::min())
playedSamplesTime = chronos::hrc::now() + outputBufferDacTime;
else
{
playedSamples += framesPerBuffer;
chronos::msec since = std::chrono::duration_cast<msec>(chronos::hrc::now() + outputBufferDacTime - playedSamplesTime);
if (since.count() > 0)
cout << (double)playedSamples / (double)since.count() << "\n";
}
*/
if (outputBufferDacTime > bufferMs) if (outputBufferDacTime > bufferMs)
return false; return false;

View file

@ -19,12 +19,12 @@
class Stream class Stream
{ {
public: public:
Stream(const SampleFormat& format); Stream(const msg::SampleFormat& format);
void addChunk(PcmChunk* chunk); void addChunk(msg::PcmChunk* chunk);
void clearChunks(); void clearChunks();
bool getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer); bool getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer);
void setBufferLen(size_t bufferLenMs); void setBufferLen(size_t bufferLenMs);
const SampleFormat& format; const msg::SampleFormat& format;
private: private:
chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer); chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer);
@ -36,20 +36,17 @@ private:
void resetBuffers(); void resetBuffers();
void setRealSampleRate(double sampleRate); void setRealSampleRate(double sampleRate);
SampleFormat format_; msg::SampleFormat format_;
long lastTick; long lastTick;
chronos::usec sleep; chronos::usec sleep;
unsigned long playedSamples; Queue<std::shared_ptr<msg::PcmChunk>> chunks;
chronos::time_point_hrc playedSamplesTime;
Queue<std::shared_ptr<PcmChunk>> chunks;
// DoubleBuffer<chronos::usec::rep> cardBuffer; // DoubleBuffer<chronos::usec::rep> cardBuffer;
DoubleBuffer<chronos::usec::rep> miniBuffer; DoubleBuffer<chronos::usec::rep> miniBuffer;
DoubleBuffer<chronos::usec::rep> buffer; DoubleBuffer<chronos::usec::rep> buffer;
DoubleBuffer<chronos::usec::rep> shortBuffer; DoubleBuffer<chronos::usec::rep> shortBuffer;
std::shared_ptr<PcmChunk> chunk; std::shared_ptr<msg::PcmChunk> chunk;
int median; int median;
int shortMedian; int shortMedian;

View file

@ -3,19 +3,21 @@
#include "message.h" #include "message.h"
namespace msg
{
class AckMsg : public BaseMessage class Ack : public BaseMessage
{ {
public: public:
AckMsg() : BaseMessage(message_type::ackMsg), message("") Ack() : BaseMessage(message_type::kAck), message("")
{ {
} }
AckMsg(const std::string& _message) : BaseMessage(message_type::requestmsg), message(_message) Ack(const std::string& _message) : BaseMessage(message_type::kAck), message(_message)
{ {
} }
virtual ~AckMsg() virtual ~Ack()
{ {
} }
@ -43,6 +45,7 @@ protected:
} }
}; };
}

View file

@ -5,18 +5,21 @@
#include <string> #include <string>
class CommandMsg : public BaseMessage namespace msg
{
class Command : public BaseMessage
{ {
public: public:
CommandMsg() : BaseMessage(message_type::commandmsg), command("") Command() : BaseMessage(message_type::kCommand), command("")
{ {
} }
CommandMsg(const std::string& _command) : BaseMessage(message_type::commandmsg), command(_command) Command(const std::string& _command) : BaseMessage(message_type::kCommand), command(_command)
{ {
} }
virtual ~CommandMsg() virtual ~Command()
{ {
} }
@ -44,7 +47,7 @@ protected:
} }
}; };
}
#endif #endif

View file

@ -3,17 +3,18 @@
#include "message.h" #include "message.h"
namespace msg
{
class Header : public BaseMessage
class HeaderMessage : public BaseMessage
{ {
public: public:
HeaderMessage(const std::string& codecName = "", size_t size = 0) : BaseMessage(message_type::header), payloadSize(size), codec(codecName) Header(const std::string& codecName = "", size_t size = 0) : BaseMessage(message_type::kHeader), payloadSize(size), codec(codecName)
{ {
payload = (char*)malloc(size); payload = (char*)malloc(size);
} }
virtual ~HeaderMessage() virtual ~Header()
{ {
free(payload); free(payload);
} }
@ -50,6 +51,7 @@ protected:
} }
}; };
}
#endif #endif

View file

@ -30,15 +30,15 @@ struct membuf : public std::basic_streambuf<char>
enum message_type enum message_type
{ {
base = 0, kBase = 0,
header = 1, kHeader = 1,
payload = 2, kPayload = 2,
sampleformat = 3, kSampleFormat = 3,
serversettings = 4, kServerSettings = 4,
timemsg = 5, kTime = 5,
requestmsg = 6, kRequest = 6,
ackMsg = 7, kAck = 7,
commandmsg = 8 kCommand = 8
}; };
@ -90,10 +90,12 @@ struct tv
} }
}; };
namespace msg
{
struct BaseMessage struct BaseMessage
{ {
BaseMessage() : type(base), id(0), refersTo(0) BaseMessage() : type(kBase), id(0), refersTo(0)
{ {
} }
@ -181,6 +183,7 @@ struct SerializedMessage
char* buffer; char* buffer;
}; };
}
#endif #endif

View file

@ -6,6 +6,8 @@
using namespace std; using namespace std;
namespace msg
{
PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(sampleFormat.rate*sampleFormat.frameSize*ms / 1000), format(sampleFormat), idx(0) PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(sampleFormat.rate*sampleFormat.frameSize*ms / 1000), format(sampleFormat), idx(0)
{ {
@ -51,4 +53,6 @@ int PcmChunk::readFrames(void* outputBuffer, size_t frameCount)
} }
}

View file

@ -8,6 +8,8 @@
#include "common/timeDefs.h" #include "common/timeDefs.h"
namespace msg
{
class PcmChunk : public WireChunk class PcmChunk : public WireChunk
{ {
@ -62,7 +64,7 @@ private:
uint32_t idx; uint32_t idx;
}; };
}
#endif #endif

View file

@ -4,19 +4,21 @@
#include "message.h" #include "message.h"
#include <string> #include <string>
namespace msg
{
class RequestMsg : public BaseMessage class Request : public BaseMessage
{ {
public: public:
RequestMsg() : BaseMessage(message_type::requestmsg), request(base) Request() : BaseMessage(message_type::kRequest), request(kBase)
{ {
} }
RequestMsg(message_type _request) : BaseMessage(message_type::requestmsg), request(_request) Request(message_type _request) : BaseMessage(message_type::kRequest), request(_request)
{ {
} }
virtual ~RequestMsg() virtual ~Request()
{ {
} }
@ -39,7 +41,7 @@ protected:
} }
}; };
}
#endif #endif

View file

@ -5,18 +5,21 @@
#include <iostream> #include <iostream>
SampleFormat::SampleFormat() : BaseMessage(message_type::sampleformat) namespace msg
{
SampleFormat::SampleFormat() : BaseMessage(message_type::kSampleFormat)
{ {
} }
SampleFormat::SampleFormat(const std::string& format) : BaseMessage(message_type::sampleformat) SampleFormat::SampleFormat(const std::string& format) : BaseMessage(message_type::kSampleFormat)
{ {
setFormat(format); setFormat(format);
} }
SampleFormat::SampleFormat(uint32_t sampleRate, uint16_t bitsPerSample, uint16_t channelCount) : BaseMessage(message_type::sampleformat) SampleFormat::SampleFormat(uint32_t sampleRate, uint16_t bitsPerSample, uint16_t channelCount) : BaseMessage(message_type::kSampleFormat)
{ {
setFormat(sampleRate, bitsPerSample, channelCount); setFormat(sampleRate, bitsPerSample, channelCount);
} }
@ -45,5 +48,7 @@ void SampleFormat::setFormat(uint32_t rate, uint16_t bits, uint16_t channels)
frameSize = channels*sampleSize; frameSize = channels*sampleSize;
} }
}

View file

@ -4,6 +4,8 @@
#include <string> #include <string>
#include "message.h" #include "message.h"
namespace msg
{
class SampleFormat : public BaseMessage class SampleFormat : public BaseMessage
{ {
@ -63,6 +65,7 @@ protected:
}; };
}
#endif #endif

View file

@ -4,11 +4,13 @@
#include "message/message.h" #include "message/message.h"
namespace msg
{
class ServerSettings : public BaseMessage class ServerSettings : public BaseMessage
{ {
public: public:
ServerSettings() : BaseMessage(message_type::serversettings) ServerSettings() : BaseMessage(message_type::kServerSettings)
{ {
} }
@ -35,7 +37,7 @@ protected:
} }
}; };
}
#endif #endif

View file

@ -3,15 +3,17 @@
#include "message.h" #include "message.h"
namespace msg
{
class TimeMsg : public BaseMessage class Time : public BaseMessage
{ {
public: public:
TimeMsg() : BaseMessage(message_type::timemsg) Time() : BaseMessage(message_type::kTime)
{ {
} }
virtual ~TimeMsg() virtual ~Time()
{ {
} }
@ -34,7 +36,7 @@ protected:
} }
}; };
}
#endif #endif

View file

@ -10,10 +10,13 @@
#include "message.h" #include "message.h"
namespace msg
{
class WireChunk : public BaseMessage class WireChunk : public BaseMessage
{ {
public: public:
WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) WireChunk(size_t size = 0) : BaseMessage(message_type::kPayload), payloadSize(size)
{ {
payload = (char*)malloc(size); payload = (char*)malloc(size);
} }
@ -51,7 +54,7 @@ protected:
} }
}; };
}
#endif #endif

View file

@ -1,8 +1,8 @@
#include "controlServer.h" #include "controlServer.h"
#include "message/timeMsg.h" #include "message/time.h"
#include "message/ackMsg.h" #include "message/ack.h"
#include "message/requestMsg.h" #include "message/request.h"
#include "message/commandMsg.h" #include "message/command.h"
#include <iostream> #include <iostream>
@ -13,7 +13,7 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL
void ControlServer::send(shared_ptr<BaseMessage> message) void ControlServer::send(shared_ptr<msg::BaseMessage> message)
{ {
std::unique_lock<std::mutex> mlock(mutex); std::unique_lock<std::mutex> mlock(mutex);
for (std::set<shared_ptr<ServerSession>>::iterator it = sessions.begin(); it != sessions.end(); ) for (std::set<shared_ptr<ServerSession>>::iterator it = sessions.begin(); it != sessions.end(); )
@ -33,47 +33,47 @@ void ControlServer::send(shared_ptr<BaseMessage> message)
} }
void ControlServer::onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer) void ControlServer::onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
// cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; // cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::requestmsg) if (baseMessage.type == message_type::kRequest)
{ {
RequestMsg requestMsg; msg::Request requestMsg;
requestMsg.deserialize(baseMessage, buffer); requestMsg.deserialize(baseMessage, buffer);
// cout << "request: " << requestMsg.request << "\n"; // cout << "request: " << requestMsg.request << "\n";
if (requestMsg.request == timemsg) if (requestMsg.request == kTime)
{ {
// timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec); // timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec);
TimeMsg timeMsg; msg::Time timeMsg;
timeMsg.refersTo = requestMsg.id; timeMsg.refersTo = requestMsg.id;
timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.; timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.;
// tv diff = timeMsg.received - timeMsg.sent; // tv diff = timeMsg.received - timeMsg.sent;
// cout << "Latency: " << diff.sec << "." << diff.usec << "\n"; // cout << "Latency: " << diff.sec << "." << diff.usec << "\n";
connection->send(&timeMsg); connection->send(&timeMsg);
} }
else if (requestMsg.request == serversettings) else if (requestMsg.request == kServerSettings)
{ {
serverSettings->refersTo = requestMsg.id; serverSettings->refersTo = requestMsg.id;
connection->send(serverSettings); connection->send(serverSettings);
} }
else if (requestMsg.request == sampleformat) else if (requestMsg.request == kSampleFormat)
{ {
sampleFormat->refersTo = requestMsg.id; sampleFormat->refersTo = requestMsg.id;
connection->send(sampleFormat); connection->send(sampleFormat);
} }
else if (requestMsg.request == header) else if (requestMsg.request == kHeader)
{ {
headerChunk->refersTo = requestMsg.id; headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk); connection->send(headerChunk);
} }
} }
else if (baseMessage.type == message_type::commandmsg) else if (baseMessage.type == message_type::kCommand)
{ {
CommandMsg commandMsg; msg::Command commandMsg;
commandMsg.deserialize(baseMessage, buffer); commandMsg.deserialize(baseMessage, buffer);
if (commandMsg.command == "startStream") if (commandMsg.command == "startStream")
{ {
AckMsg ackMsg; msg::Ack ackMsg;
ackMsg.refersTo = commandMsg.id; ackMsg.refersTo = commandMsg.id;
connection->send(&ackMsg); connection->send(&ackMsg);
connection->setStreamActive(true); connection->setStreamActive(true);
@ -117,14 +117,14 @@ void ControlServer::stop()
} }
void ControlServer::setHeader(HeaderMessage* header) void ControlServer::setHeader(msg::Header* header)
{ {
if (header) if (header)
headerChunk = header; headerChunk = header;
} }
void ControlServer::setFormat(SampleFormat* format) void ControlServer::setFormat(msg::SampleFormat* format)
{ {
if (format) if (format)
sampleFormat = format; sampleFormat = format;
@ -132,7 +132,7 @@ void ControlServer::setFormat(SampleFormat* format)
void ControlServer::setServerSettings(ServerSettings* settings) void ControlServer::setServerSettings(msg::ServerSettings* settings)
{ {
if (settings) if (settings)
serverSettings = settings; serverSettings = settings;

View file

@ -12,7 +12,7 @@
#include "serverSession.h" #include "serverSession.h"
#include "common/queue.h" #include "common/queue.h"
#include "message/message.h" #include "message/message.h"
#include "message/headerMessage.h" #include "message/header.h"
#include "message/sampleFormat.h" #include "message/sampleFormat.h"
#include "message/serverSettings.h" #include "message/serverSettings.h"
@ -29,11 +29,11 @@ public:
void start(); void start();
void stop(); void stop();
void send(shared_ptr<BaseMessage> message); void send(shared_ptr<msg::BaseMessage> message);
virtual void onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer); virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer);
void setHeader(HeaderMessage* header); void setHeader(msg::Header* header);
void setFormat(SampleFormat* format); void setFormat(msg::SampleFormat* format);
void setServerSettings(ServerSettings* settings); void setServerSettings(msg::ServerSettings* settings);
private: private:
void acceptor(); void acceptor();
@ -41,11 +41,11 @@ private:
set<shared_ptr<ServerSession>> sessions; set<shared_ptr<ServerSession>> sessions;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
unsigned short port_; unsigned short port_;
HeaderMessage* headerChunk; msg::Header* headerChunk;
SampleFormat* sampleFormat; msg::SampleFormat* sampleFormat;
ServerSettings* serverSettings; msg::ServerSettings* serverSettings;
thread* acceptThread; thread* acceptThread;
Queue<shared_ptr<BaseMessage>> messages; Queue<shared_ptr<msg::BaseMessage>> messages;
}; };

View file

@ -1,14 +1,14 @@
#ifndef ENCODER_H #ifndef ENCODER_H
#define ENCODER_H #define ENCODER_H
#include "message/pcmChunk.h" #include "message/pcmChunk.h"
#include "message/headerMessage.h" #include "message/header.h"
#include "message/sampleFormat.h" #include "message/sampleFormat.h"
class Encoder class Encoder
{ {
public: public:
Encoder(const SampleFormat& format) : sampleFormat(format), headerChunk(NULL) Encoder(const msg::SampleFormat& format) : sampleFormat(format), headerChunk(NULL)
{ {
} }
@ -18,16 +18,16 @@ public:
delete headerChunk; delete headerChunk;
} }
virtual double encode(PcmChunk* chunk) = 0; virtual double encode(msg::PcmChunk* chunk) = 0;
virtual HeaderMessage* getHeader() virtual msg::Header* getHeader()
{ {
return headerChunk; return headerChunk;
} }
protected: protected:
SampleFormat sampleFormat; msg::SampleFormat sampleFormat;
HeaderMessage* headerChunk; msg::Header* headerChunk;
}; };

View file

@ -4,13 +4,13 @@
using namespace std; using namespace std;
FlacEncoder::FlacEncoder(const SampleFormat& format) : Encoder(format) FlacEncoder::FlacEncoder(const msg::SampleFormat& format) : Encoder(format)
{ {
headerChunk = new HeaderMessage("flac"); headerChunk = new HeaderMessage("flac");
} }
double FlacEncoder::encode(PcmChunk* chunk) double FlacEncoder::encode(msg::PcmChunk* chunk)
{ {
return chunk->duration<chronos::msec>().count(); return chunk->duration<chronos::msec>().count();
} }

View file

@ -12,8 +12,8 @@
class FlacEncoder : public Encoder class FlacEncoder : public Encoder
{ {
public: public:
FlacEncoder(const SampleFormat& format); FlacEncoder(const msg::SampleFormat& format);
virtual double encode(PcmChunk* chunk); virtual double encode(msg::PcmChunk* chunk);
protected: protected:
void initEncoder(); void initEncoder();

View file

@ -5,7 +5,7 @@
using namespace std; using namespace std;
OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format) OggEncoder::OggEncoder(const msg::SampleFormat& format) : Encoder(format)
{ {
init(); init();
lastGranulepos = -1; lastGranulepos = -1;
@ -13,7 +13,7 @@ OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format)
double OggEncoder::encode(PcmChunk* chunk) double OggEncoder::encode(msg::PcmChunk* chunk)
{ {
double res = 0; double res = 0;
if (tv_sec == 0) if (tv_sec == 0)
@ -126,7 +126,7 @@ void OggEncoder::init()
*********************************************************************/ *********************************************************************/
ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 0.9); ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 1.0);
/* do not continue if setup failed; this can happen if we ask for a /* do not continue if setup failed; this can happen if we ask for a
mode that libVorbis does not support (eg, too low a bitrate, etc, mode that libVorbis does not support (eg, too low a bitrate, etc,
@ -165,7 +165,7 @@ void OggEncoder::init()
*/ */
// while(!eos){ // while(!eos){
size_t pos(0); size_t pos(0);
headerChunk = new HeaderMessage("ogg"); headerChunk = new msg::Header("ogg");
while (true) while (true)
{ {
int result=ogg_stream_flush(&os,&og); int result=ogg_stream_flush(&os,&og);

View file

@ -7,8 +7,8 @@
class OggEncoder : public Encoder class OggEncoder : public Encoder
{ {
public: public:
OggEncoder(const SampleFormat& format); OggEncoder(const msg::SampleFormat& format);
virtual double encode(PcmChunk* chunk); virtual double encode(msg::PcmChunk* chunk);
private: private:
void init(); void init();

View file

@ -1,12 +1,12 @@
#include "pcmEncoder.h" #include "pcmEncoder.h"
PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format) PcmEncoder::PcmEncoder(const msg::SampleFormat& format) : Encoder(format)
{ {
headerChunk = new HeaderMessage("pcm"); headerChunk = new msg::Header("pcm");
} }
double PcmEncoder::encode(PcmChunk* chunk) double PcmEncoder::encode(msg::PcmChunk* chunk)
{ {
return chunk->duration<chronos::msec>().count(); return chunk->duration<chronos::msec>().count();
} }

View file

@ -6,8 +6,8 @@
class PcmEncoder : public Encoder class PcmEncoder : public Encoder
{ {
public: public:
PcmEncoder(const SampleFormat& format); PcmEncoder(const msg::SampleFormat& format);
virtual double encode(PcmChunk* chunk); virtual double encode(msg::PcmChunk* chunk);
}; };

View file

@ -78,7 +78,7 @@ void ServerSession::socketRead(void* _to, size_t _bytes)
} }
void ServerSession::add(shared_ptr<BaseMessage> message) void ServerSession::add(shared_ptr<msg::BaseMessage> message)
{ {
if (!message || !streamActive) if (!message || !streamActive)
return; return;
@ -89,7 +89,7 @@ void ServerSession::add(shared_ptr<BaseMessage> message)
} }
bool ServerSession::send(BaseMessage* message) bool ServerSession::send(msg::BaseMessage* message)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
if (!socket) if (!socket)
@ -107,7 +107,7 @@ bool ServerSession::send(BaseMessage* message)
void ServerSession::getNextMessage() void ServerSession::getNextMessage()
{ {
//cout << "getNextMessage\n"; //cout << "getNextMessage\n";
BaseMessage baseMessage; msg::BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize); vector<char> buffer(baseMsgSize);
socketRead(&buffer[0], baseMsgSize); socketRead(&buffer[0], baseMsgSize);
@ -151,7 +151,7 @@ void ServerSession::writer()
{ {
boost::asio::streambuf streambuf; boost::asio::streambuf streambuf;
std::ostream stream(&streambuf); std::ostream stream(&streambuf);
shared_ptr<BaseMessage> message; shared_ptr<msg::BaseMessage> message;
while (active_) while (active_)
{ {
if (messages.try_pop(message, std::chrono::milliseconds(500))) if (messages.try_pop(message, std::chrono::milliseconds(500)))

View file

@ -21,7 +21,7 @@ class ServerSession;
class MessageReceiver class MessageReceiver
{ {
public: public:
virtual void onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer) = 0; virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0;
}; };
@ -32,8 +32,8 @@ public:
~ServerSession(); ~ServerSession();
void start(); void start();
void stop(); void stop();
bool send(BaseMessage* message); bool send(msg::BaseMessage* message);
void add(std::shared_ptr<BaseMessage> message); void add(std::shared_ptr<msg::BaseMessage> message);
virtual bool active() virtual bool active()
{ {
@ -59,7 +59,7 @@ protected:
std::thread* writerThread; std::thread* writerThread;
std::shared_ptr<tcp::socket> socket; std::shared_ptr<tcp::socket> socket;
MessageReceiver* messageReceiver; MessageReceiver* messageReceiver;
Queue<std::shared_ptr<BaseMessage>> messages; Queue<std::shared_ptr<msg::BaseMessage>> messages;
}; };

View file

@ -40,7 +40,7 @@ int main(int argc, char* argv[])
("codec,c", po::value<string>(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") ("codec,c", po::value<string>(&codec)->default_value("ogg"), "transport codec [ogg|pcm]")
("fifo,f", po::value<string>(&fifoName)->default_value("/tmp/snapfifo"), "name of the input fifo file") ("fifo,f", po::value<string>(&fifoName)->default_value("/tmp/snapfifo"), "name of the input fifo file")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
("buffer,b", po::value<int32_t>(&bufferMs)->default_value(500), "buffer [ms]") ("buffer,b", po::value<int32_t>(&bufferMs)->default_value(1000), "buffer [ms]")
; ;
po::variables_map vm; po::variables_map vm;
@ -69,7 +69,7 @@ int main(int argc, char* argv[])
long nextTick = chronos::getTickCount(); long nextTick = chronos::getTickCount();
mkfifo(fifoName.c_str(), 0777); mkfifo(fifoName.c_str(), 0777);
SampleFormat format(sampleFormat); msg::SampleFormat format(sampleFormat);
size_t duration = 50; size_t duration = 50;
//size_t chunkSize = duration*format.rate*format.frameSize / 1000; //size_t chunkSize = duration*format.rate*format.frameSize / 1000;
std::auto_ptr<Encoder> encoder; std::auto_ptr<Encoder> encoder;
@ -88,7 +88,7 @@ int main(int argc, char* argv[])
return 1; return 1;
} }
ServerSettings serverSettings; msg::ServerSettings serverSettings;
serverSettings.bufferMs = bufferMs; serverSettings.bufferMs = bufferMs;
ControlServer* controlServer = new ControlServer(port); ControlServer* controlServer = new ControlServer(port);
@ -106,10 +106,10 @@ int main(int argc, char* argv[])
int fd = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK); int fd = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK);
try try
{ {
shared_ptr<PcmChunk> chunk;//(new WireChunk()); shared_ptr<msg::PcmChunk> chunk;//(new WireChunk());
while (!g_terminated)//cin.good()) while (!g_terminated)//cin.good())
{ {
chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); chunk.reset(new msg::PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE));
int toRead = chunk->payloadSize; int toRead = chunk->payloadSize;
int len = 0; int len = 0;
do do