diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index f3a38743..23c8d70f 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -93,7 +93,7 @@ void ClientConnection::stop() } -bool ClientConnection::send(BaseMessage* message) +bool ClientConnection::send(msg::BaseMessage* message) { // std::unique_lock mlock(mutex_); //cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; @@ -110,9 +110,9 @@ bool ClientConnection::send(BaseMessage* message) } -shared_ptr ClientConnection::sendRequest(BaseMessage* message, const chronos::msec& timeout) +shared_ptr ClientConnection::sendRequest(msg::BaseMessage* message, const chronos::msec& timeout) { - shared_ptr response(NULL); + shared_ptr response(NULL); if (++reqId == 10000) reqId = 1; message->id = reqId; @@ -148,7 +148,7 @@ shared_ptr ClientConnection::sendRequest(BaseMessage* message void ClientConnection::getNextMessage() { - BaseMessage baseMessage; + msg::BaseMessage baseMessage; size_t baseMsgSize = baseMessage.getSize(); vector buffer(baseMsgSize); socketRead(&buffer[0], baseMsgSize); @@ -167,7 +167,7 @@ void ClientConnection::getNextMessage() { if (req->id == baseMessage.refersTo) { - req->response.reset(new SerializedMessage()); + req->response.reset(new msg::SerializedMessage()); req->response->message = baseMessage; req->response->buffer = (char*)malloc(baseMessage.size); memcpy(req->response->buffer, &buffer[0], baseMessage.size); diff --git a/client/clientConnection.h b/client/clientConnection.h index 686dc522..cf9d127a 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -24,7 +24,7 @@ struct PendingRequest PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; uint16_t id; - std::shared_ptr response; + std::shared_ptr response; std::condition_variable cv; }; @@ -32,7 +32,7 @@ struct PendingRequest class MessageReceiver { public: - virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; + virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; virtual void onException(ClientConnection* connection, const std::exception& exception) = 0; }; @@ -44,13 +44,13 @@ public: virtual ~ClientConnection(); virtual void start(); virtual void stop(); - virtual bool send(BaseMessage* _message); - virtual std::shared_ptr sendRequest(BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); + virtual bool send(msg::BaseMessage* _message); + virtual std::shared_ptr sendRequest(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); template - std::shared_ptr sendReq(BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) + std::shared_ptr sendReq(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) { - std::shared_ptr reply = sendRequest(message, timeout); + std::shared_ptr reply = sendRequest(message, timeout); if (!reply) return NULL; std::shared_ptr msg(new T); diff --git a/client/controller.cpp b/client/controller.cpp index e1f4517d..339a4960 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -8,10 +8,10 @@ #include "alsaPlayer.h" #include "timeProvider.h" #include "message/serverSettings.h" -#include "message/timeMsg.h" -#include "message/requestMsg.h" -#include "message/ackMsg.h" -#include "message/commandMsg.h" +#include "message/time.h" +#include "message/request.h" +#include "message/ack.h" +#include "message/command.h" using namespace std; @@ -27,13 +27,13 @@ void Controller::onException(ClientConnection* connection, const std::exception& } -void Controller::onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) +void Controller::onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer) { - if (baseMessage.type == message_type::payload) + if (baseMessage.type == message_type::kPayload) { if ((stream != NULL) && (decoder != NULL)) { - PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); + msg::PcmChunk* pcmChunk = new msg::PcmChunk(*sampleFormat, 0); pcmChunk->deserialize(baseMessage, buffer); //cout << "chunk: " << pcmChunk->payloadSize; if (decoder->decode(pcmChunk)) @@ -81,18 +81,18 @@ void Controller::worker() try { clientConnection->start(); - RequestMsg requestMsg(serversettings); - shared_ptr serverSettings(NULL); - while (active_ && !(serverSettings = clientConnection->sendReq(&requestMsg))); + msg::Request requestMsg(kServerSettings); + shared_ptr serverSettings(NULL); + while (active_ && !(serverSettings = clientConnection->sendReq(&requestMsg))); cout << "ServerSettings buffer: " << serverSettings->bufferMs << "\n"; - requestMsg.request = sampleformat; - while (active_ && !(sampleFormat = clientConnection->sendReq(&requestMsg))); + requestMsg.request = kSampleFormat; + while (active_ && !(sampleFormat = clientConnection->sendReq(&requestMsg))); cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; - requestMsg.request = header; - shared_ptr headerChunk(NULL); - while (active_ && !(headerChunk = clientConnection->sendReq(&requestMsg))); + requestMsg.request = kHeader; + shared_ptr headerChunk(NULL); + while (active_ && !(headerChunk = clientConnection->sendReq(&requestMsg))); cout << "Codec: " << headerChunk->codec << "\n"; if (headerChunk->codec == "ogg") decoder = new OggDecoder(); @@ -100,10 +100,10 @@ void Controller::worker() decoder = new PcmDecoder(); decoder->setHeader(headerChunk.get()); - RequestMsg timeReq(timemsg); + msg::Request timeReq(kTime); for (size_t n=0; n<50 && active_; ++n) { - shared_ptr reply = clientConnection->sendReq(&timeReq, chronos::msec(2000)); + shared_ptr reply = clientConnection->sendReq(&timeReq, chronos::msec(2000)); if (reply) { double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; @@ -119,14 +119,14 @@ void Controller::worker() Player player(pcmDevice_, stream); player.start(); - CommandMsg startStream("startStream"); - shared_ptr ackMsg(NULL); - while (active_ && !(ackMsg = clientConnection->sendReq(&startStream))); + msg::Command startStream("startStream"); + shared_ptr ackMsg(NULL); + while (active_ && !(ackMsg = clientConnection->sendReq(&startStream))); while (active_) { usleep(500*1000); - shared_ptr reply = clientConnection->sendReq(&timeReq); + shared_ptr reply = clientConnection->sendReq(&timeReq); if (reply) { double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; diff --git a/client/controller.h b/client/controller.h index 8468d1ff..89254620 100644 --- a/client/controller.h +++ b/client/controller.h @@ -16,7 +16,7 @@ public: Controller(); void start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency); void stop(); - virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer); + virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer); virtual void onException(ClientConnection* connection, const std::exception& exception); private: @@ -26,7 +26,7 @@ private: ClientConnection* clientConnection; Stream* stream; std::string ip; - std::shared_ptr sampleFormat; + std::shared_ptr sampleFormat; Decoder* decoder; PcmDevice pcmDevice_; size_t latency_; diff --git a/client/decoder.h b/client/decoder.h index d4d3d600..ca83281a 100644 --- a/client/decoder.h +++ b/client/decoder.h @@ -1,15 +1,15 @@ #ifndef DECODER_H #define DECODER_H #include "message/pcmChunk.h" -#include "message/headerMessage.h" +#include "message/header.h" class Decoder { public: Decoder() {}; virtual ~Decoder() {}; - virtual bool decode(PcmChunk* chunk) = 0; - virtual bool setHeader(HeaderMessage* chunk) = 0; + virtual bool decode(msg::PcmChunk* chunk) = 0; + virtual bool setHeader(msg::Header* chunk) = 0; }; diff --git a/client/oggDecoder.cpp b/client/oggDecoder.cpp index a2f66b9b..b204eab2 100644 --- a/client/oggDecoder.cpp +++ b/client/oggDecoder.cpp @@ -28,7 +28,7 @@ OggDecoder::~OggDecoder() } -bool OggDecoder::decode(PcmChunk* chunk) +bool OggDecoder::decode(msg::PcmChunk* chunk) { /* grab some data at the head of the stream. We want the first page @@ -122,7 +122,7 @@ bool OggDecoder::decode(PcmChunk* chunk) } -bool OggDecoder::setHeader(HeaderMessage* chunk) +bool OggDecoder::setHeader(msg::Header* chunk) { bytes = chunk->payloadSize; buffer=ogg_sync_buffer(&oy, bytes); diff --git a/client/oggDecoder.h b/client/oggDecoder.h index 73dc3d68..8994b332 100644 --- a/client/oggDecoder.h +++ b/client/oggDecoder.h @@ -9,11 +9,11 @@ class OggDecoder : public Decoder public: OggDecoder(); virtual ~OggDecoder(); - virtual bool decode(PcmChunk* chunk); - virtual bool setHeader(HeaderMessage* chunk); + virtual bool decode(msg::PcmChunk* chunk); + virtual bool setHeader(msg::Header* chunk); private: - bool decodePayload(PcmChunk* chunk); + bool decodePayload(msg::PcmChunk* chunk); ogg_sync_state oy; /* sync and verify incoming physical bitstream */ ogg_stream_state os; /* take physical pages, weld into a logical diff --git a/client/pcmDecoder.cpp b/client/pcmDecoder.cpp index 5ff18bf2..1bf38b49 100644 --- a/client/pcmDecoder.cpp +++ b/client/pcmDecoder.cpp @@ -5,17 +5,13 @@ PcmDecoder::PcmDecoder() : Decoder() } -bool PcmDecoder::decode(PcmChunk* chunk) +bool PcmDecoder::decode(msg::PcmChunk* chunk) { - /* WireChunk* wireChunk = chunk->wireChunk; - for (size_t n=0; nlength; ++n) - wireChunk->payload[n] *= 1; - */ return true; } -bool PcmDecoder::setHeader(HeaderMessage* chunk) +bool PcmDecoder::setHeader(msg::Header* chunk) { return true; } diff --git a/client/pcmDecoder.h b/client/pcmDecoder.h index 87a32df9..799373f2 100644 --- a/client/pcmDecoder.h +++ b/client/pcmDecoder.h @@ -7,8 +7,8 @@ class PcmDecoder : public Decoder { public: PcmDecoder(); - virtual bool decode(PcmChunk* chunk); - virtual bool setHeader(HeaderMessage* chunk); + virtual bool decode(msg::PcmChunk* chunk); + virtual bool setHeader(msg::Header* chunk); }; diff --git a/client/stream.cpp b/client/stream.cpp index 1ee22efd..8c65daff 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -8,7 +8,7 @@ using namespace std; using namespace chronos; -Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0), playedFrames(0) +Stream::Stream(const msg::SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0), playedFrames(0) { buffer.setSize(500); shortBuffer.setSize(100); @@ -16,8 +16,6 @@ Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(samp // cardBuffer.setSize(50); bufferMs = msec(500); - playedSamples = 0; - playedSamplesTime = time_point_hrc::min(); /* 48000 x ------- = ----- @@ -53,11 +51,11 @@ void Stream::clearChunks() } -void Stream::addChunk(PcmChunk* chunk) +void Stream::addChunk(msg::PcmChunk* chunk) { while (chunks.size() * chunk->duration().count() > 10000) chunks.pop(); - chunks.push(shared_ptr(chunk)); + chunks.push(shared_ptr(chunk)); // cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n"; } @@ -174,17 +172,6 @@ void Stream::resetBuffers() bool Stream::getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer) { -/*if (playedSamplesTime == time_point_hrc::min()) - playedSamplesTime = chronos::hrc::now() + outputBufferDacTime; -else -{ - playedSamples += framesPerBuffer; - chronos::msec since = std::chrono::duration_cast(chronos::hrc::now() + outputBufferDacTime - playedSamplesTime); - if (since.count() > 0) - cout << (double)playedSamples / (double)since.count() << "\n"; -} -*/ - if (outputBufferDacTime > bufferMs) return false; diff --git a/client/stream.h b/client/stream.h index 28b3f2c4..33dc240c 100644 --- a/client/stream.h +++ b/client/stream.h @@ -19,12 +19,12 @@ class Stream { public: - Stream(const SampleFormat& format); - void addChunk(PcmChunk* chunk); + Stream(const msg::SampleFormat& format); + void addChunk(msg::PcmChunk* chunk); void clearChunks(); bool getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer); void setBufferLen(size_t bufferLenMs); - const SampleFormat& format; + const msg::SampleFormat& format; private: chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer); @@ -36,20 +36,17 @@ private: void resetBuffers(); void setRealSampleRate(double sampleRate); - SampleFormat format_; + msg::SampleFormat format_; long lastTick; chronos::usec sleep; -unsigned long playedSamples; -chronos::time_point_hrc playedSamplesTime; - - Queue> chunks; + Queue> chunks; // DoubleBuffer cardBuffer; DoubleBuffer miniBuffer; DoubleBuffer buffer; DoubleBuffer shortBuffer; - std::shared_ptr chunk; + std::shared_ptr chunk; int median; int shortMedian; diff --git a/message/ackMsg.h b/message/ack.h similarity index 74% rename from message/ackMsg.h rename to message/ack.h index 584df6ef..f26abfff 100644 --- a/message/ackMsg.h +++ b/message/ack.h @@ -3,19 +3,21 @@ #include "message.h" +namespace msg +{ -class AckMsg : public BaseMessage +class Ack : public BaseMessage { public: - AckMsg() : BaseMessage(message_type::ackMsg), message("") + Ack() : BaseMessage(message_type::kAck), message("") { } - AckMsg(const std::string& _message) : BaseMessage(message_type::requestmsg), message(_message) + Ack(const std::string& _message) : BaseMessage(message_type::kAck), message(_message) { } - virtual ~AckMsg() + virtual ~Ack() { } @@ -43,6 +45,7 @@ protected: } }; +} diff --git a/message/commandMsg.h b/message/command.h similarity index 73% rename from message/commandMsg.h rename to message/command.h index 976316c3..04fba587 100644 --- a/message/commandMsg.h +++ b/message/command.h @@ -5,18 +5,21 @@ #include -class CommandMsg : public BaseMessage +namespace msg +{ + +class Command : public BaseMessage { public: - CommandMsg() : BaseMessage(message_type::commandmsg), command("") + Command() : BaseMessage(message_type::kCommand), command("") { } - CommandMsg(const std::string& _command) : BaseMessage(message_type::commandmsg), command(_command) + Command(const std::string& _command) : BaseMessage(message_type::kCommand), command(_command) { } - virtual ~CommandMsg() + virtual ~Command() { } @@ -44,7 +47,7 @@ protected: } }; - +} #endif diff --git a/message/headerMessage.h b/message/header.h similarity index 82% rename from message/headerMessage.h rename to message/header.h index 73a70280..7a3c731c 100644 --- a/message/headerMessage.h +++ b/message/header.h @@ -3,17 +3,18 @@ #include "message.h" +namespace msg +{ - -class HeaderMessage : public BaseMessage +class Header : public BaseMessage { public: - HeaderMessage(const std::string& codecName = "", size_t size = 0) : BaseMessage(message_type::header), payloadSize(size), codec(codecName) + Header(const std::string& codecName = "", size_t size = 0) : BaseMessage(message_type::kHeader), payloadSize(size), codec(codecName) { payload = (char*)malloc(size); } - virtual ~HeaderMessage() + virtual ~Header() { free(payload); } @@ -50,6 +51,7 @@ protected: } }; +} #endif diff --git a/message/message.h b/message/message.h index 5cc72715..01937879 100644 --- a/message/message.h +++ b/message/message.h @@ -30,15 +30,15 @@ struct membuf : public std::basic_streambuf enum message_type { - base = 0, - header = 1, - payload = 2, - sampleformat = 3, - serversettings = 4, - timemsg = 5, - requestmsg = 6, - ackMsg = 7, - commandmsg = 8 + kBase = 0, + kHeader = 1, + kPayload = 2, + kSampleFormat = 3, + kServerSettings = 4, + kTime = 5, + kRequest = 6, + kAck = 7, + kCommand = 8 }; @@ -90,10 +90,12 @@ struct tv } }; +namespace msg +{ struct BaseMessage { - BaseMessage() : type(base), id(0), refersTo(0) + BaseMessage() : type(kBase), id(0), refersTo(0) { } @@ -181,6 +183,7 @@ struct SerializedMessage char* buffer; }; +} #endif diff --git a/message/pcmChunk.cpp b/message/pcmChunk.cpp index e4c953b5..34c5ced4 100644 --- a/message/pcmChunk.cpp +++ b/message/pcmChunk.cpp @@ -6,6 +6,8 @@ using namespace std; +namespace msg +{ PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(sampleFormat.rate*sampleFormat.frameSize*ms / 1000), format(sampleFormat), idx(0) { @@ -51,4 +53,6 @@ int PcmChunk::readFrames(void* outputBuffer, size_t frameCount) } +} + diff --git a/message/pcmChunk.h b/message/pcmChunk.h index d70c0580..79143b6b 100644 --- a/message/pcmChunk.h +++ b/message/pcmChunk.h @@ -8,6 +8,8 @@ #include "common/timeDefs.h" +namespace msg +{ class PcmChunk : public WireChunk { @@ -62,7 +64,7 @@ private: uint32_t idx; }; - +} #endif diff --git a/message/requestMsg.h b/message/request.h similarity index 67% rename from message/requestMsg.h rename to message/request.h index 71fa5e62..13b1638e 100644 --- a/message/requestMsg.h +++ b/message/request.h @@ -4,19 +4,21 @@ #include "message.h" #include +namespace msg +{ -class RequestMsg : public BaseMessage +class Request : public BaseMessage { public: - RequestMsg() : BaseMessage(message_type::requestmsg), request(base) + Request() : BaseMessage(message_type::kRequest), request(kBase) { } - RequestMsg(message_type _request) : BaseMessage(message_type::requestmsg), request(_request) + Request(message_type _request) : BaseMessage(message_type::kRequest), request(_request) { } - virtual ~RequestMsg() + virtual ~Request() { } @@ -39,7 +41,7 @@ protected: } }; - +} #endif diff --git a/message/sampleFormat.cpp b/message/sampleFormat.cpp index 34ef9990..5ac79446 100644 --- a/message/sampleFormat.cpp +++ b/message/sampleFormat.cpp @@ -5,18 +5,21 @@ #include -SampleFormat::SampleFormat() : BaseMessage(message_type::sampleformat) +namespace msg +{ + +SampleFormat::SampleFormat() : BaseMessage(message_type::kSampleFormat) { } -SampleFormat::SampleFormat(const std::string& format) : BaseMessage(message_type::sampleformat) +SampleFormat::SampleFormat(const std::string& format) : BaseMessage(message_type::kSampleFormat) { setFormat(format); } -SampleFormat::SampleFormat(uint32_t sampleRate, uint16_t bitsPerSample, uint16_t channelCount) : BaseMessage(message_type::sampleformat) +SampleFormat::SampleFormat(uint32_t sampleRate, uint16_t bitsPerSample, uint16_t channelCount) : BaseMessage(message_type::kSampleFormat) { setFormat(sampleRate, bitsPerSample, channelCount); } @@ -45,5 +48,7 @@ void SampleFormat::setFormat(uint32_t rate, uint16_t bits, uint16_t channels) frameSize = channels*sampleSize; } +} + diff --git a/message/sampleFormat.h b/message/sampleFormat.h index a00e8746..c25aa8a7 100644 --- a/message/sampleFormat.h +++ b/message/sampleFormat.h @@ -4,6 +4,8 @@ #include #include "message.h" +namespace msg +{ class SampleFormat : public BaseMessage { @@ -63,6 +65,7 @@ protected: }; +} #endif diff --git a/message/serverSettings.h b/message/serverSettings.h index 811a1216..a1b4c8c4 100644 --- a/message/serverSettings.h +++ b/message/serverSettings.h @@ -4,11 +4,13 @@ #include "message/message.h" +namespace msg +{ class ServerSettings : public BaseMessage { public: - ServerSettings() : BaseMessage(message_type::serversettings) + ServerSettings() : BaseMessage(message_type::kServerSettings) { } @@ -35,7 +37,7 @@ protected: } }; - +} #endif diff --git a/message/timeMsg.h b/message/time.h similarity index 79% rename from message/timeMsg.h rename to message/time.h index 27e9b740..7d2bc25a 100644 --- a/message/timeMsg.h +++ b/message/time.h @@ -3,15 +3,17 @@ #include "message.h" +namespace msg +{ -class TimeMsg : public BaseMessage +class Time : public BaseMessage { public: - TimeMsg() : BaseMessage(message_type::timemsg) + Time() : BaseMessage(message_type::kTime) { } - virtual ~TimeMsg() + virtual ~Time() { } @@ -34,7 +36,7 @@ protected: } }; - +} #endif diff --git a/message/wireChunk.h b/message/wireChunk.h index 3dbc3bfe..12f8d840 100644 --- a/message/wireChunk.h +++ b/message/wireChunk.h @@ -10,10 +10,13 @@ #include "message.h" +namespace msg +{ + class WireChunk : public BaseMessage { public: - WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) + WireChunk(size_t size = 0) : BaseMessage(message_type::kPayload), payloadSize(size) { payload = (char*)malloc(size); } @@ -51,7 +54,7 @@ protected: } }; - +} #endif diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 7eb58747..1523f20c 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -1,8 +1,8 @@ #include "controlServer.h" -#include "message/timeMsg.h" -#include "message/ackMsg.h" -#include "message/requestMsg.h" -#include "message/commandMsg.h" +#include "message/time.h" +#include "message/ack.h" +#include "message/request.h" +#include "message/command.h" #include @@ -13,7 +13,7 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL -void ControlServer::send(shared_ptr message) +void ControlServer::send(shared_ptr message) { std::unique_lock mlock(mutex); for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) @@ -33,47 +33,47 @@ void ControlServer::send(shared_ptr message) } -void ControlServer::onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer) +void ControlServer::onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer) { // cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; - if (baseMessage.type == message_type::requestmsg) + if (baseMessage.type == message_type::kRequest) { - RequestMsg requestMsg; + msg::Request requestMsg; requestMsg.deserialize(baseMessage, buffer); // cout << "request: " << requestMsg.request << "\n"; - if (requestMsg.request == timemsg) + if (requestMsg.request == kTime) { // timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec); - TimeMsg timeMsg; + msg::Time timeMsg; timeMsg.refersTo = requestMsg.id; timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.; // tv diff = timeMsg.received - timeMsg.sent; // cout << "Latency: " << diff.sec << "." << diff.usec << "\n"; connection->send(&timeMsg); } - else if (requestMsg.request == serversettings) + else if (requestMsg.request == kServerSettings) { serverSettings->refersTo = requestMsg.id; connection->send(serverSettings); } - else if (requestMsg.request == sampleformat) + else if (requestMsg.request == kSampleFormat) { sampleFormat->refersTo = requestMsg.id; connection->send(sampleFormat); } - else if (requestMsg.request == header) + else if (requestMsg.request == kHeader) { headerChunk->refersTo = requestMsg.id; connection->send(headerChunk); } } - else if (baseMessage.type == message_type::commandmsg) + else if (baseMessage.type == message_type::kCommand) { - CommandMsg commandMsg; + msg::Command commandMsg; commandMsg.deserialize(baseMessage, buffer); if (commandMsg.command == "startStream") { - AckMsg ackMsg; + msg::Ack ackMsg; ackMsg.refersTo = commandMsg.id; connection->send(&ackMsg); connection->setStreamActive(true); @@ -117,14 +117,14 @@ void ControlServer::stop() } -void ControlServer::setHeader(HeaderMessage* header) +void ControlServer::setHeader(msg::Header* header) { if (header) headerChunk = header; } -void ControlServer::setFormat(SampleFormat* format) +void ControlServer::setFormat(msg::SampleFormat* format) { if (format) sampleFormat = format; @@ -132,7 +132,7 @@ void ControlServer::setFormat(SampleFormat* format) -void ControlServer::setServerSettings(ServerSettings* settings) +void ControlServer::setServerSettings(msg::ServerSettings* settings) { if (settings) serverSettings = settings; diff --git a/server/controlServer.h b/server/controlServer.h index 3fb507b0..1085a133 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -12,7 +12,7 @@ #include "serverSession.h" #include "common/queue.h" #include "message/message.h" -#include "message/headerMessage.h" +#include "message/header.h" #include "message/sampleFormat.h" #include "message/serverSettings.h" @@ -29,11 +29,11 @@ public: void start(); void stop(); - void send(shared_ptr message); - virtual void onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer); - void setHeader(HeaderMessage* header); - void setFormat(SampleFormat* format); - void setServerSettings(ServerSettings* settings); + void send(shared_ptr message); + virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer); + void setHeader(msg::Header* header); + void setFormat(msg::SampleFormat* format); + void setServerSettings(msg::ServerSettings* settings); private: void acceptor(); @@ -41,11 +41,11 @@ private: set> sessions; boost::asio::io_service io_service_; unsigned short port_; - HeaderMessage* headerChunk; - SampleFormat* sampleFormat; - ServerSettings* serverSettings; + msg::Header* headerChunk; + msg::SampleFormat* sampleFormat; + msg::ServerSettings* serverSettings; thread* acceptThread; - Queue> messages; + Queue> messages; }; diff --git a/server/encoder.h b/server/encoder.h index 3a6387e4..73062518 100644 --- a/server/encoder.h +++ b/server/encoder.h @@ -1,14 +1,14 @@ #ifndef ENCODER_H #define ENCODER_H #include "message/pcmChunk.h" -#include "message/headerMessage.h" +#include "message/header.h" #include "message/sampleFormat.h" class Encoder { public: - Encoder(const SampleFormat& format) : sampleFormat(format), headerChunk(NULL) + Encoder(const msg::SampleFormat& format) : sampleFormat(format), headerChunk(NULL) { } @@ -18,16 +18,16 @@ public: delete headerChunk; } - virtual double encode(PcmChunk* chunk) = 0; + virtual double encode(msg::PcmChunk* chunk) = 0; - virtual HeaderMessage* getHeader() + virtual msg::Header* getHeader() { return headerChunk; } protected: - SampleFormat sampleFormat; - HeaderMessage* headerChunk; + msg::SampleFormat sampleFormat; + msg::Header* headerChunk; }; diff --git a/server/flacEncoder.cpp b/server/flacEncoder.cpp index d717b247..a24779e8 100644 --- a/server/flacEncoder.cpp +++ b/server/flacEncoder.cpp @@ -4,13 +4,13 @@ using namespace std; -FlacEncoder::FlacEncoder(const SampleFormat& format) : Encoder(format) +FlacEncoder::FlacEncoder(const msg::SampleFormat& format) : Encoder(format) { headerChunk = new HeaderMessage("flac"); } -double FlacEncoder::encode(PcmChunk* chunk) +double FlacEncoder::encode(msg::PcmChunk* chunk) { return chunk->duration().count(); } diff --git a/server/flacEncoder.h b/server/flacEncoder.h index a5cc03b6..4a91673d 100644 --- a/server/flacEncoder.h +++ b/server/flacEncoder.h @@ -12,8 +12,8 @@ class FlacEncoder : public Encoder { public: - FlacEncoder(const SampleFormat& format); - virtual double encode(PcmChunk* chunk); + FlacEncoder(const msg::SampleFormat& format); + virtual double encode(msg::PcmChunk* chunk); protected: void initEncoder(); diff --git a/server/oggEncoder.cpp b/server/oggEncoder.cpp index 4898c905..0db01dd4 100644 --- a/server/oggEncoder.cpp +++ b/server/oggEncoder.cpp @@ -5,7 +5,7 @@ using namespace std; -OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format) +OggEncoder::OggEncoder(const msg::SampleFormat& format) : Encoder(format) { init(); lastGranulepos = -1; @@ -13,7 +13,7 @@ OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format) -double OggEncoder::encode(PcmChunk* chunk) +double OggEncoder::encode(msg::PcmChunk* chunk) { double res = 0; if (tv_sec == 0) @@ -126,7 +126,7 @@ void OggEncoder::init() *********************************************************************/ - ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 0.9); + ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 1.0); /* do not continue if setup failed; this can happen if we ask for a mode that libVorbis does not support (eg, too low a bitrate, etc, @@ -165,7 +165,7 @@ void OggEncoder::init() */ // while(!eos){ size_t pos(0); - headerChunk = new HeaderMessage("ogg"); + headerChunk = new msg::Header("ogg"); while (true) { int result=ogg_stream_flush(&os,&og); diff --git a/server/oggEncoder.h b/server/oggEncoder.h index 957ab38e..91e9e483 100644 --- a/server/oggEncoder.h +++ b/server/oggEncoder.h @@ -7,8 +7,8 @@ class OggEncoder : public Encoder { public: - OggEncoder(const SampleFormat& format); - virtual double encode(PcmChunk* chunk); + OggEncoder(const msg::SampleFormat& format); + virtual double encode(msg::PcmChunk* chunk); private: void init(); diff --git a/server/pcmEncoder.cpp b/server/pcmEncoder.cpp index 80b59113..7ab360e2 100644 --- a/server/pcmEncoder.cpp +++ b/server/pcmEncoder.cpp @@ -1,12 +1,12 @@ #include "pcmEncoder.h" -PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format) +PcmEncoder::PcmEncoder(const msg::SampleFormat& format) : Encoder(format) { - headerChunk = new HeaderMessage("pcm"); + headerChunk = new msg::Header("pcm"); } -double PcmEncoder::encode(PcmChunk* chunk) +double PcmEncoder::encode(msg::PcmChunk* chunk) { return chunk->duration().count(); } diff --git a/server/pcmEncoder.h b/server/pcmEncoder.h index 55ed426b..d53061e3 100644 --- a/server/pcmEncoder.h +++ b/server/pcmEncoder.h @@ -6,8 +6,8 @@ class PcmEncoder : public Encoder { public: - PcmEncoder(const SampleFormat& format); - virtual double encode(PcmChunk* chunk); + PcmEncoder(const msg::SampleFormat& format); + virtual double encode(msg::PcmChunk* chunk); }; diff --git a/server/serverSession.cpp b/server/serverSession.cpp index 84055435..de3ac691 100644 --- a/server/serverSession.cpp +++ b/server/serverSession.cpp @@ -78,7 +78,7 @@ void ServerSession::socketRead(void* _to, size_t _bytes) } -void ServerSession::add(shared_ptr message) +void ServerSession::add(shared_ptr message) { if (!message || !streamActive) return; @@ -89,7 +89,7 @@ void ServerSession::add(shared_ptr message) } -bool ServerSession::send(BaseMessage* message) +bool ServerSession::send(msg::BaseMessage* message) { std::unique_lock mlock(mutex_); if (!socket) @@ -107,7 +107,7 @@ bool ServerSession::send(BaseMessage* message) void ServerSession::getNextMessage() { //cout << "getNextMessage\n"; - BaseMessage baseMessage; + msg::BaseMessage baseMessage; size_t baseMsgSize = baseMessage.getSize(); vector buffer(baseMsgSize); socketRead(&buffer[0], baseMsgSize); @@ -151,7 +151,7 @@ void ServerSession::writer() { boost::asio::streambuf streambuf; std::ostream stream(&streambuf); - shared_ptr message; + shared_ptr message; while (active_) { if (messages.try_pop(message, std::chrono::milliseconds(500))) diff --git a/server/serverSession.h b/server/serverSession.h index edf804df..0c41b355 100644 --- a/server/serverSession.h +++ b/server/serverSession.h @@ -21,7 +21,7 @@ class ServerSession; class MessageReceiver { public: - virtual void onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer) = 0; + virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; }; @@ -32,8 +32,8 @@ public: ~ServerSession(); void start(); void stop(); - bool send(BaseMessage* message); - void add(std::shared_ptr message); + bool send(msg::BaseMessage* message); + void add(std::shared_ptr message); virtual bool active() { @@ -59,7 +59,7 @@ protected: std::thread* writerThread; std::shared_ptr socket; MessageReceiver* messageReceiver; - Queue> messages; + Queue> messages; }; diff --git a/server/snapServer.cpp b/server/snapServer.cpp index 3bff45c0..b60652f3 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -40,7 +40,7 @@ int main(int argc, char* argv[]) ("codec,c", po::value(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of the input fifo file") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") - ("buffer,b", po::value(&bufferMs)->default_value(500), "buffer [ms]") + ("buffer,b", po::value(&bufferMs)->default_value(1000), "buffer [ms]") ; po::variables_map vm; @@ -69,7 +69,7 @@ int main(int argc, char* argv[]) long nextTick = chronos::getTickCount(); mkfifo(fifoName.c_str(), 0777); - SampleFormat format(sampleFormat); + msg::SampleFormat format(sampleFormat); size_t duration = 50; //size_t chunkSize = duration*format.rate*format.frameSize / 1000; std::auto_ptr encoder; @@ -88,7 +88,7 @@ int main(int argc, char* argv[]) return 1; } - ServerSettings serverSettings; + msg::ServerSettings serverSettings; serverSettings.bufferMs = bufferMs; ControlServer* controlServer = new ControlServer(port); @@ -106,10 +106,10 @@ int main(int argc, char* argv[]) int fd = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK); try { - shared_ptr chunk;//(new WireChunk()); + shared_ptr chunk;//(new WireChunk()); while (!g_terminated)//cin.good()) { - chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); + chunk.reset(new msg::PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); int toRead = chunk->payloadSize; int len = 0; do