diff --git a/SnapClient.layout b/SnapClient.layout index adbc1ab7..5093c06c 100644 --- a/SnapClient.layout +++ b/SnapClient.layout @@ -1,92 +1,12 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + @@ -101,24 +21,34 @@ - + - + - + - + - + + + + + + + + + + + @@ -126,7 +56,12 @@ - + + + + + + @@ -136,14 +71,54 @@ - + + + + + + + + + + + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -151,4 +126,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/SnapServer.layout b/SnapServer.layout index ad8fd5f5..91a3282d 100644 --- a/SnapServer.layout +++ b/SnapServer.layout @@ -1,26 +1,81 @@ - + + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -31,74 +86,19 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index 4c6ff1ba..ed98b7d7 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -15,49 +15,52 @@ ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string void ClientConnection::start() { - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); - iterator = resolver.resolve(query); - SocketConnection::start(); + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); + iterator = resolver.resolve(query); + SocketConnection::start(); } void ClientConnection::worker() { - active_ = true; - while (active_) - { - connected_ = false; - try - { - { + active_ = true; + while (active_) + { + connected_ = false; + try + { + { // std::unique_lock mlock(mutex_); - cout << "connecting\n"; - socket.reset(new tcp::socket(io_service)); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - socket->connect(*iterator); - connected_ = true; - cout << "connected\n"; - std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; - } - while(active_) - { - cout << "."; - getNextMessage(); - cout << "|"; - cout.flush(); - } - } - catch (const std::exception& e) - { - connected_ = false; - cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; - usleep(1000*1000); - } - } + cout << "connecting\n"; + socket.reset(new tcp::socket(io_service)); + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + cout << "socket: " << socket->native() << "\n"; + setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + socket->connect(*iterator); + connected_ = true; + cout << "connected\n"; + std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; + } + while(active_) + { +// cout << "."; +// cout.flush(); + getNextMessage(); +// cout << "|"; +// cout.flush(); + } + } + catch (const std::exception& e) + { + connected_ = false; + cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; + usleep(1000*1000); + } + } } diff --git a/client/clientConnection.h b/client/clientConnection.h index c8897858..7c2b8849 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -11,15 +11,15 @@ using boost::asio::ip::tcp; class ClientConnection : public SocketConnection { public: - ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port); - virtual void start(); + ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port); + virtual void start(); protected: - virtual void worker(); + virtual void worker(); private: - std::string ip; - size_t port; + std::string ip; + size_t port; }; diff --git a/client/controller.cpp b/client/controller.cpp index db9311a9..f0bed1ae 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -21,96 +21,96 @@ Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL) void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) { - if (baseMessage.type == message_type::payload) - { - if ((stream != NULL) && (decoder != NULL)) - { - PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); - pcmChunk->deserialize(baseMessage, buffer); + if (baseMessage.type == message_type::payload) + { + if ((stream != NULL) && (decoder != NULL)) + { + PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); + pcmChunk->deserialize(baseMessage, buffer); //cout << "chunk: " << pcmChunk->payloadSize; - if (decoder->decode(pcmChunk)) - { - stream->addChunk(pcmChunk); + if (decoder->decode(pcmChunk)) + { + stream->addChunk(pcmChunk); //cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n"; - } - else - delete pcmChunk; - } - } + } + else + delete pcmChunk; + } + } } void Controller::start(const std::string& _ip, size_t _port, int _bufferMs) { - bufferMs = _bufferMs; - ip = _ip; + bufferMs = _bufferMs; + ip = _ip; - controlConnection = new ClientConnection(this, ip, _port); - controlConnection->start(); + controlConnection = new ClientConnection(this, ip, _port); + controlConnection->start(); - controllerThread = new thread(&Controller::worker, this); + controllerThread = new thread(&Controller::worker, this); } void Controller::stop() { - active_ = false; + active_ = false; } void Controller::worker() { // Decoder* decoder; - active_ = true; - decoder = NULL; + active_ = true; + decoder = NULL; - while (active_) - { - try - { - RequestMsg requestMsg("serverSettings"); - shared_ptr serverSettings(NULL); - while (!(serverSettings = controlConnection->sendReq(&requestMsg, 1000))); - cout << "ServerSettings port: " << serverSettings->port << "\n"; - streamClient = new StreamClient(this, ip, serverSettings->port); + while (active_) + { + try + { + RequestMsg requestMsg("serverSettings"); + shared_ptr serverSettings(NULL); + while (!(serverSettings = controlConnection->sendReq(&requestMsg, 1000))); + cout << "ServerSettings port: " << serverSettings->port << "\n"; + streamClient = new StreamClient(this, ip, serverSettings->port); - requestMsg.request = "sampleFormat"; - while (!(sampleFormat = controlConnection->sendReq(&requestMsg, 1000))); - cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; + requestMsg.request = "sampleFormat"; + while (!(sampleFormat = controlConnection->sendReq(&requestMsg, 1000))); + cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; - decoder = new OggDecoder(); - if (decoder != NULL) - { - requestMsg.request = "headerChunk"; - shared_ptr headerChunk(NULL); - while (!(headerChunk = controlConnection->sendReq(&requestMsg, 1000))); - decoder->setHeader(headerChunk.get()); - } + decoder = new OggDecoder(); + if (decoder != NULL) + { + requestMsg.request = "headerChunk"; + shared_ptr headerChunk(NULL); + while (!(headerChunk = controlConnection->sendReq(&requestMsg, 1000))); + decoder->setHeader(headerChunk.get()); + } - RequestMsg timeReq("time"); - for (size_t n=0; n<10; ++n) - { - shared_ptr reply = controlConnection->sendReq(&timeReq, 2000); - if (reply) - { - double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; - TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); - usleep(1000); - } - } + RequestMsg timeReq("time"); + for (size_t n=0; n<10; ++n) + { + shared_ptr reply = controlConnection->sendReq(&timeReq, 2000); + if (reply) + { + double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; + TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); + usleep(1000); + } + } - streamClient->start(); - stream = new Stream(*sampleFormat); - stream->setBufferLen(bufferMs); + streamClient->start(); + stream = new Stream(*sampleFormat); + stream->setBufferLen(bufferMs); - Player player(stream); - player.start(); + Player player(stream); + player.start(); - try - { - while (active_) - { - usleep(1000000); + try + { + while (active_) + { + usleep(1000000); shared_ptr reply = controlConnection->sendReq(&timeReq, 1000); if (reply) { @@ -119,31 +119,32 @@ void Controller::worker() TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); cout << TimeProvider::getInstance().getDiffToServer() << "\n"; } - } - } - catch (const std::exception& e) - { - cout << "Stopping player\n"; - player.stop(); - cout << "Stopping streamClient\n"; - streamClient->stop(); - delete streamClient; - streamClient = NULL; - delete stream; - stream = NULL; - cout << "done\n"; - throw e; - } - } - catch (const std::exception& e) - { - cout << "Exception in Controller::worker(): " << e.what() << "\n"; - if (decoder != NULL) - delete decoder; - decoder = NULL; - usleep(1000000); - } - } + } + } + catch (const std::exception& e) + { + cout << "Stopping player\n"; + player.stop(); + cout << "Stopping streamClient\n"; + streamClient->stop(); + delete streamClient; + streamClient = NULL; + cout << "Deleting stream\n"; + delete stream; + stream = NULL; + cout << "done\n"; + throw e; + } + } + catch (const std::exception& e) + { + cout << "Exception in Controller::worker(): " << e.what() << "\n"; + if (decoder != NULL) + delete decoder; + decoder = NULL; + usleep(1000000); + } + } } diff --git a/client/controller.h b/client/controller.h index 98048427..a3f77c07 100644 --- a/client/controller.h +++ b/client/controller.h @@ -13,22 +13,22 @@ class Controller : public MessageReceiver { public: - Controller(); - void start(const std::string& _ip, size_t _port, int _bufferMs); - void stop(); - virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); + Controller(); + void start(const std::string& _ip, size_t _port, int _bufferMs); + void stop(); + virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); private: - void worker(); - std::atomic active_; - std::thread* controllerThread; - StreamClient* streamClient; - ClientConnection* controlConnection; - Stream* stream; - int bufferMs; - std::string ip; - std::shared_ptr sampleFormat; - Decoder* decoder; + void worker(); + std::atomic active_; + std::thread* controllerThread; + StreamClient* streamClient; + ClientConnection* controlConnection; + Stream* stream; + int bufferMs; + std::string ip; + std::shared_ptr sampleFormat; + Decoder* decoder; }; diff --git a/client/decoder.h b/client/decoder.h index 3a2c5c16..7ef85143 100644 --- a/client/decoder.h +++ b/client/decoder.h @@ -6,10 +6,10 @@ class Decoder { public: - Decoder() {}; - virtual ~Decoder() {}; - virtual bool decode(PcmChunk* chunk) = 0; - virtual bool setHeader(HeaderMessage* chunk) = 0; + Decoder() {}; + virtual ~Decoder() {}; + virtual bool decode(PcmChunk* chunk) = 0; + virtual bool setHeader(HeaderMessage* chunk) = 0; }; diff --git a/client/doubleBuffer.h b/client/doubleBuffer.h index e9732d34..ef3415d9 100644 --- a/client/doubleBuffer.h +++ b/client/doubleBuffer.h @@ -8,69 +8,69 @@ template class DoubleBuffer { public: - DoubleBuffer(size_t size = 10) : bufferSize(size) - { - } + DoubleBuffer(size_t size = 10) : bufferSize(size) + { + } - inline void add(const T& element) - { - buffer.push_back(element); - if (buffer.size() > bufferSize) - buffer.pop_front(); - } + inline void add(const T& element) + { + buffer.push_back(element); + if (buffer.size() > bufferSize) + buffer.pop_front(); + } - T median() const - { - if (buffer.empty()) - return 0; - std::deque tmpBuffer(buffer.begin(), buffer.end()); - std::sort(tmpBuffer.begin(), tmpBuffer.end()); - return tmpBuffer[tmpBuffer.size() / 2]; - } + T median() const + { + if (buffer.empty()) + return 0; + std::deque tmpBuffer(buffer.begin(), buffer.end()); + std::sort(tmpBuffer.begin(), tmpBuffer.end()); + return tmpBuffer[tmpBuffer.size() / 2]; + } - double mean() const - { - if (buffer.empty()) - return 0; - double mean = 0.; - for (size_t n=0; n tmpBuffer(buffer.begin(), buffer.end()); - std::sort(tmpBuffer.begin(), tmpBuffer.end()); - return tmpBuffer[(size_t)(tmpBuffer.size() * ((float)percentil / (float)100))]; - } + T percentil(unsigned int percentil) const + { + if (buffer.empty()) + return 0; + std::deque tmpBuffer(buffer.begin(), buffer.end()); + std::sort(tmpBuffer.begin(), tmpBuffer.end()); + return tmpBuffer[(size_t)(tmpBuffer.size() * ((float)percentil / (float)100))]; + } - inline bool full() const - { - return (buffer.size() == bufferSize); - } + inline bool full() const + { + return (buffer.size() == bufferSize); + } - inline void clear() - { - buffer.clear(); - } + inline void clear() + { + buffer.clear(); + } - inline size_t size() const - { - return buffer.size(); - } + inline size_t size() const + { + return buffer.size(); + } - void setSize(size_t size) - { - bufferSize = size; - } + void setSize(size_t size) + { + bufferSize = size; + } private: - size_t bufferSize; - std::deque buffer; + size_t bufferSize; + std::deque buffer; }; diff --git a/client/oggDecoder.cpp b/client/oggDecoder.cpp index 1ee1721e..60e93e5f 100644 --- a/client/oggDecoder.cpp +++ b/client/oggDecoder.cpp @@ -10,205 +10,205 @@ using namespace std; OggDecoder::OggDecoder() : Decoder() { - ogg_sync_init(&oy); /* Now we can read pages */ - convsize = 4096; - convbuffer = (ogg_int16_t*)malloc(convsize * sizeof(ogg_int16_t)); + ogg_sync_init(&oy); /* Now we can read pages */ + convsize = 4096; + convbuffer = (ogg_int16_t*)malloc(convsize * sizeof(ogg_int16_t)); } OggDecoder::~OggDecoder() { // ogg_sync_init(&oy); /* Now we can read pages */ - delete convbuffer; + delete convbuffer; } bool OggDecoder::decode(PcmChunk* chunk) { - /* grab some data at the head of the stream. We want the first page - (which is guaranteed to be small and only contain the Vorbis - stream initial header) We need the first page to get the stream - serialno. */ - bytes = chunk->payloadSize; - buffer=ogg_sync_buffer(&oy, bytes); - memcpy(buffer, chunk->payload, bytes); - ogg_sync_wrote(&oy,bytes); + /* grab some data at the head of the stream. We want the first page + (which is guaranteed to be small and only contain the Vorbis + stream initial header) We need the first page to get the stream + serialno. */ + bytes = chunk->payloadSize; + buffer=ogg_sync_buffer(&oy, bytes); + memcpy(buffer, chunk->payload, bytes); + ogg_sync_wrote(&oy,bytes); - chunk->payloadSize = 0; - convsize=4096;//bytes/vi.channels; - /* The rest is just a straight decode loop until end of stream */ - // while(!eos){ - while(true) - { - int result=ogg_sync_pageout(&oy,&og); - if (result==0) - break; /* need more data */ - if(result<0) - { - /* missing or corrupt data at this page position */ - fprintf(stderr,"Corrupt or missing data in bitstream; continuing...\n"); - continue; - } + chunk->payloadSize = 0; + convsize=4096;//bytes/vi.channels; + /* The rest is just a straight decode loop until end of stream */ + // while(!eos){ + while(true) + { + int result=ogg_sync_pageout(&oy,&og); + if (result==0) + break; /* need more data */ + if(result<0) + { + /* missing or corrupt data at this page position */ + fprintf(stderr,"Corrupt or missing data in bitstream; continuing...\n"); + continue; + } - ogg_stream_pagein(&os,&og); /* can safely ignore errors at + ogg_stream_pagein(&os,&og); /* can safely ignore errors at this point */ - while(1) - { - result=ogg_stream_packetout(&os,&op); + while(1) + { + result=ogg_stream_packetout(&os,&op); - if(result==0) - break; /* need more data */ - if(result<0) - continue; /* missing or corrupt data at this page position */ - /* no reason to complain; already complained above */ - /* we have a packet. Decode it */ - float **pcm; - int samples; + if(result==0) + break; /* need more data */ + if(result<0) + continue; /* missing or corrupt data at this page position */ + /* no reason to complain; already complained above */ + /* we have a packet. Decode it */ + float **pcm; + int samples; - if(vorbis_synthesis(&vb,&op)==0) /* test for success! */ - vorbis_synthesis_blockin(&vd,&vb); - /* + if(vorbis_synthesis(&vb,&op)==0) /* test for success! */ + vorbis_synthesis_blockin(&vd,&vb); + /* - **pcm is a multichannel float vector. In stereo, for - example, pcm[0] is left, and pcm[1] is right. samples is - the size of each channel. Convert the float values - (-1.<=range<=1.) to whatever PCM format and write it out */ + **pcm is a multichannel float vector. In stereo, for + example, pcm[0] is left, and pcm[1] is right. samples is + the size of each channel. Convert the float values + (-1.<=range<=1.) to whatever PCM format and write it out */ - while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0) - { - int bout=(samples32767) - val=32767; - else if(val<-32768) - val=-32768; - *ptr=val; - ptr+=vi.channels; - } - } + while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0) + { + int bout=(samples32767) + val=32767; + else if(val<-32768) + val=-32768; + *ptr=val; + ptr+=vi.channels; + } + } - size_t oldSize = chunk->payloadSize; - size_t size = 2*vi.channels * bout; - chunk->payloadSize += size; - chunk->payload = (char*)realloc(chunk->payload, chunk->payloadSize); - memcpy(chunk->payload + oldSize, convbuffer, size); - /* tell libvorbis how many samples we actually consumed */ - vorbis_synthesis_read(&vd,bout); - } - } - } - // if(ogg_page_eos(&og))eos=1; - // ogg_stream_clear(&os); - // vorbis_comment_clear(&vc); - // vorbis_info_clear(&vi); /* must be called last */ - return true; + size_t oldSize = chunk->payloadSize; + size_t size = 2*vi.channels * bout; + chunk->payloadSize += size; + chunk->payload = (char*)realloc(chunk->payload, chunk->payloadSize); + memcpy(chunk->payload + oldSize, convbuffer, size); + /* tell libvorbis how many samples we actually consumed */ + vorbis_synthesis_read(&vd,bout); + } + } + } + // if(ogg_page_eos(&og))eos=1; + // ogg_stream_clear(&os); + // vorbis_comment_clear(&vc); + // vorbis_info_clear(&vi); /* must be called last */ + return true; } bool OggDecoder::setHeader(HeaderMessage* chunk) { - bytes = chunk->payloadSize; - buffer=ogg_sync_buffer(&oy, bytes); - memcpy(buffer, chunk->payload, bytes); - ogg_sync_wrote(&oy,bytes); + bytes = chunk->payloadSize; + buffer=ogg_sync_buffer(&oy, bytes); + memcpy(buffer, chunk->payload, bytes); + ogg_sync_wrote(&oy,bytes); - if(ogg_sync_pageout(&oy,&og)!=1) - { - fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n"); - return false; - } + if(ogg_sync_pageout(&oy,&og)!=1) + { + fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n"); + return false; + } - ogg_stream_init(&os,ogg_page_serialno(&og)); + ogg_stream_init(&os,ogg_page_serialno(&og)); - vorbis_info_init(&vi); - vorbis_comment_init(&vc); - if(ogg_stream_pagein(&os,&og)<0) - { - fprintf(stderr,"Error reading first page of Ogg bitstream data.\n"); - return false; - } + vorbis_info_init(&vi); + vorbis_comment_init(&vc); + if(ogg_stream_pagein(&os,&og)<0) + { + fprintf(stderr,"Error reading first page of Ogg bitstream data.\n"); + return false; + } - if(ogg_stream_packetout(&os,&op)!=1) - { - fprintf(stderr,"Error reading initial header packet.\n"); - return false; - } + if(ogg_stream_packetout(&os,&op)!=1) + { + fprintf(stderr,"Error reading initial header packet.\n"); + return false; + } - if(vorbis_synthesis_headerin(&vi,&vc,&op)<0) - { - fprintf(stderr,"This Ogg bitstream does not contain Vorbis audio data.\n"); - return false; - } + if(vorbis_synthesis_headerin(&vi,&vc,&op)<0) + { + fprintf(stderr,"This Ogg bitstream does not contain Vorbis audio data.\n"); + return false; + } - int i(0); - while(i<2) - { - while(i<2) - { - int result=ogg_sync_pageout(&oy,&og); - if(result==0) - break; /* Need more data */ - /* Don't complain about missing or corrupt data yet. We'll - catch it at the packet output phase */ - if(result==1) - { - ogg_stream_pagein(&os,&og); /* we can ignore any errors here as they'll also become apparent at packetout */ - while(i<2) - { - result=ogg_stream_packetout(&os,&op); - if(result==0) - break; - if(result<0) - { - /* Uh oh; data at some point was corrupted or missing! - We can't tolerate that in a header. Die. */ - fprintf(stderr,"Corrupt secondary header. Exiting.\n"); - return false; - } - result=vorbis_synthesis_headerin(&vi,&vc,&op); - if(result<0) - { - fprintf(stderr,"Corrupt secondary header. Exiting.\n"); - return false; - } - i++; - } - } - } - } + int i(0); + while(i<2) + { + while(i<2) + { + int result=ogg_sync_pageout(&oy,&og); + if(result==0) + break; /* Need more data */ + /* Don't complain about missing or corrupt data yet. We'll + catch it at the packet output phase */ + if(result==1) + { + ogg_stream_pagein(&os,&og); /* we can ignore any errors here as they'll also become apparent at packetout */ + while(i<2) + { + result=ogg_stream_packetout(&os,&op); + if(result==0) + break; + if(result<0) + { + /* Uh oh; data at some point was corrupted or missing! + We can't tolerate that in a header. Die. */ + fprintf(stderr,"Corrupt secondary header. Exiting.\n"); + return false; + } + result=vorbis_synthesis_headerin(&vi,&vc,&op); + if(result<0) + { + fprintf(stderr,"Corrupt secondary header. Exiting.\n"); + return false; + } + i++; + } + } + } + } - /* Throw the comments plus a few lines about the bitstream we're decoding */ - char **ptr=vc.user_comments; - while(*ptr) - { - fprintf(stderr,"%s\n",*ptr); - ++ptr; - } - fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate); - fprintf(stderr,"Encoded by: %s\n\n",vc.vendor); + /* Throw the comments plus a few lines about the bitstream we're decoding */ + char **ptr=vc.user_comments; + while(*ptr) + { + fprintf(stderr,"%s\n",*ptr); + ++ptr; + } + fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate); + fprintf(stderr,"Encoded by: %s\n\n",vc.vendor); - /* OK, got and parsed all three headers. Initialize the Vorbis - packet->PCM decoder. */ - if(vorbis_synthesis_init(&vd,&vi)==0) /* central decode state */ - vorbis_block_init(&vd,&vb); /* local state for most of the decode + /* OK, got and parsed all three headers. Initialize the Vorbis + packet->PCM decoder. */ + if(vorbis_synthesis_init(&vd,&vi)==0) /* central decode state */ + vorbis_block_init(&vd,&vb); /* local state for most of the decode so multiple block decodes can proceed in parallel. We could init multiple vorbis_block structures for vd here */ - return false; + return false; } diff --git a/client/oggDecoder.h b/client/oggDecoder.h index 32836372..73dc3d68 100644 --- a/client/oggDecoder.h +++ b/client/oggDecoder.h @@ -7,31 +7,31 @@ class OggDecoder : public Decoder { public: - OggDecoder(); - virtual ~OggDecoder(); - virtual bool decode(PcmChunk* chunk); - virtual bool setHeader(HeaderMessage* chunk); + OggDecoder(); + virtual ~OggDecoder(); + virtual bool decode(PcmChunk* chunk); + virtual bool setHeader(HeaderMessage* chunk); private: - bool decodePayload(PcmChunk* chunk); + bool decodePayload(PcmChunk* chunk); - ogg_sync_state oy; /* sync and verify incoming physical bitstream */ - ogg_stream_state os; /* take physical pages, weld into a logical + ogg_sync_state oy; /* sync and verify incoming physical bitstream */ + ogg_stream_state os; /* take physical pages, weld into a logical stream of packets */ - ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */ - ogg_packet op; /* one raw packet of data for decode */ + ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */ + ogg_packet op; /* one raw packet of data for decode */ - vorbis_info vi; /* struct that stores all the static vorbis bitstream + vorbis_info vi; /* struct that stores all the static vorbis bitstream settings */ - vorbis_comment vc; /* struct that stores all the bitstream user comments */ - vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ - vorbis_block vb; /* local working space for packet->PCM decode */ + vorbis_comment vc; /* struct that stores all the bitstream user comments */ + vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ + vorbis_block vb; /* local working space for packet->PCM decode */ - ogg_int16_t* convbuffer; /* take 8k out of the data segment, not the stack */ - int convsize; + ogg_int16_t* convbuffer; /* take 8k out of the data segment, not the stack */ + int convsize; - char *buffer; - int bytes; + char *buffer; + int bytes; }; diff --git a/client/pcmDecoder.cpp b/client/pcmDecoder.cpp index 955aae5d..5ff18bf2 100644 --- a/client/pcmDecoder.cpp +++ b/client/pcmDecoder.cpp @@ -7,17 +7,17 @@ PcmDecoder::PcmDecoder() : Decoder() bool PcmDecoder::decode(PcmChunk* chunk) { - /* WireChunk* wireChunk = chunk->wireChunk; - for (size_t n=0; nlength; ++n) - wireChunk->payload[n] *= 1; - */ - return true; + /* WireChunk* wireChunk = chunk->wireChunk; + for (size_t n=0; nlength; ++n) + wireChunk->payload[n] *= 1; + */ + return true; } bool PcmDecoder::setHeader(HeaderMessage* chunk) { - return true; + return true; } diff --git a/client/pcmDecoder.h b/client/pcmDecoder.h index 922cb886..87a32df9 100644 --- a/client/pcmDecoder.h +++ b/client/pcmDecoder.h @@ -6,9 +6,9 @@ class PcmDecoder : public Decoder { public: - PcmDecoder(); - virtual bool decode(PcmChunk* chunk); - virtual bool setHeader(HeaderMessage* chunk); + PcmDecoder(); + virtual bool decode(PcmChunk* chunk); + virtual bool setHeader(HeaderMessage* chunk); }; diff --git a/client/player.cpp b/client/player.cpp index 0b0711d6..650cb07d 100644 --- a/client/player.cpp +++ b/client/player.cpp @@ -15,137 +15,137 @@ Player::Player(Stream* stream) : active_(false), stream_(stream) void Player::start() { - unsigned int pcm, tmp, rate; - int channels; - snd_pcm_hw_params_t *params; - int buff_size; + unsigned int pcm, tmp, rate; + int channels; + snd_pcm_hw_params_t *params; + int buff_size; - rate = stream_->format.rate; - channels = stream_->format.channels; + rate = stream_->format.rate; + channels = stream_->format.channels; - /* Open the PCM device in playback mode */ - if ((pcm = snd_pcm_open(&pcm_handle, PCM_DEVICE, SND_PCM_STREAM_PLAYBACK, 0)) < 0) - cout << "ERROR: Can't open " << PCM_DEVICE << " PCM device. " << snd_strerror(pcm) << "\n"; + /* Open the PCM device in playback mode */ + if ((pcm = snd_pcm_open(&pcm_handle, PCM_DEVICE, SND_PCM_STREAM_PLAYBACK, 0)) < 0) + cout << "ERROR: Can't open " << PCM_DEVICE << " PCM device. " << snd_strerror(pcm) << "\n"; - /* struct snd_pcm_playback_info_t pinfo; - if ( (pcm = snd_pcm_playback_info( pcm_handle, &pinfo )) < 0 ) - fprintf( stderr, "Error: playback info error: %s\n", snd_strerror( err ) ); - printf("buffer: '%d'\n", pinfo.buffer_size); - */ - /* Allocate parameters object and fill it with default values*/ - snd_pcm_hw_params_alloca(¶ms); + /* struct snd_pcm_playback_info_t pinfo; + if ( (pcm = snd_pcm_playback_info( pcm_handle, &pinfo )) < 0 ) + fprintf( stderr, "Error: playback info error: %s\n", snd_strerror( err ) ); + printf("buffer: '%d'\n", pinfo.buffer_size); + */ + /* Allocate parameters object and fill it with default values*/ + snd_pcm_hw_params_alloca(¶ms); - snd_pcm_hw_params_any(pcm_handle, params); + snd_pcm_hw_params_any(pcm_handle, params); - /* Set parameters */ - if ((pcm = snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) - cout << "ERROR: Can't set interleaved mode. " << snd_strerror(pcm) << "\n"; + /* Set parameters */ + if ((pcm = snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) + cout << "ERROR: Can't set interleaved mode. " << snd_strerror(pcm) << "\n"; - if ((pcm = snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE)) < 0) - cout << "ERROR: Can't set format. " << snd_strerror(pcm) << "\n"; + if ((pcm = snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE)) < 0) + cout << "ERROR: Can't set format. " << snd_strerror(pcm) << "\n"; - if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels)) < 0) - cout << "ERROR: Can't set channels number. " << snd_strerror(pcm) << "\n"; + if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels)) < 0) + cout << "ERROR: Can't set channels number. " << snd_strerror(pcm) << "\n"; - if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0)) < 0) - cout << "ERROR: Can't set rate. " << snd_strerror(pcm) << "\n"; + if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0)) < 0) + cout << "ERROR: Can't set rate. " << snd_strerror(pcm) << "\n"; - unsigned int buffer_time; - snd_pcm_hw_params_get_buffer_time_max(params, &buffer_time, 0); - if (buffer_time > BUFFER_TIME) - buffer_time = BUFFER_TIME; + unsigned int buffer_time; + snd_pcm_hw_params_get_buffer_time_max(params, &buffer_time, 0); + if (buffer_time > BUFFER_TIME) + buffer_time = BUFFER_TIME; - unsigned int period_time = buffer_time / 4; + unsigned int period_time = buffer_time / 4; - snd_pcm_hw_params_set_period_time_near(pcm_handle, params, &period_time, 0); - snd_pcm_hw_params_set_buffer_time_near(pcm_handle, params, &buffer_time, 0); + snd_pcm_hw_params_set_period_time_near(pcm_handle, params, &period_time, 0); + snd_pcm_hw_params_set_buffer_time_near(pcm_handle, params, &buffer_time, 0); // long unsigned int periodsize = stream_->format.msRate() * 50;//2*rate/50; // if ((pcm = snd_pcm_hw_params_set_buffer_size_near(pcm_handle, params, &periodsize)) < 0) // cout << "Unable to set buffer size " << (long int)periodsize << ": " << snd_strerror(pcm) << "\n"; - /* Write parameters */ - if ((pcm = snd_pcm_hw_params(pcm_handle, params)) < 0) - cout << "ERROR: Can't set harware parameters. " << snd_strerror(pcm) << "\n"; + /* Write parameters */ + if ((pcm = snd_pcm_hw_params(pcm_handle, params)) < 0) + cout << "ERROR: Can't set harware parameters. " << snd_strerror(pcm) << "\n"; - /* Resume information */ - cout << "PCM name: " << snd_pcm_name(pcm_handle) << "\n"; - cout << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle)) << "\n"; - snd_pcm_hw_params_get_channels(params, &tmp); - cout << "channels: " << tmp << "\n"; + /* Resume information */ + cout << "PCM name: " << snd_pcm_name(pcm_handle) << "\n"; + cout << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle)) << "\n"; + snd_pcm_hw_params_get_channels(params, &tmp); + cout << "channels: " << tmp << "\n"; - if (tmp == 1) - printf("(mono)\n"); - else if (tmp == 2) - printf("(stereo)\n"); + if (tmp == 1) + printf("(mono)\n"); + else if (tmp == 2) + printf("(stereo)\n"); - snd_pcm_hw_params_get_rate(params, &tmp, 0); - cout << "rate: " << tmp << " bps\n"; + snd_pcm_hw_params_get_rate(params, &tmp, 0); + cout << "rate: " << tmp << " bps\n"; - /* Allocate buffer to hold single period */ - snd_pcm_hw_params_get_period_size(params, &frames, 0); - cout << "frames: " << frames << "\n"; + /* Allocate buffer to hold single period */ + snd_pcm_hw_params_get_period_size(params, &frames, 0); + cout << "frames: " << frames << "\n"; - buff_size = frames * channels * 2 /* 2 -> sample size */; - buff = (char *) malloc(buff_size); + buff_size = frames * channels * 2 /* 2 -> sample size */; + buff = (char *) malloc(buff_size); - snd_pcm_hw_params_get_period_time(params, &tmp, NULL); - cout << "period time: " << tmp << "\n"; + snd_pcm_hw_params_get_period_time(params, &tmp, NULL); + cout << "period time: " << tmp << "\n"; - snd_pcm_sw_params_t *swparams; - snd_pcm_sw_params_alloca(&swparams); - snd_pcm_sw_params_current(pcm_handle, swparams); + snd_pcm_sw_params_t *swparams; + snd_pcm_sw_params_alloca(&swparams); + snd_pcm_sw_params_current(pcm_handle, swparams); - snd_pcm_sw_params_set_avail_min(pcm_handle, swparams, frames); - snd_pcm_sw_params_set_start_threshold(pcm_handle, swparams, frames); + snd_pcm_sw_params_set_avail_min(pcm_handle, swparams, frames); + snd_pcm_sw_params_set_start_threshold(pcm_handle, swparams, frames); // snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames); - snd_pcm_sw_params(pcm_handle, swparams); + snd_pcm_sw_params(pcm_handle, swparams); - playerThread = new thread(&Player::worker, this); + playerThread = new thread(&Player::worker, this); } void Player::stop() { - active_ = false; - playerThread->join(); - delete playerThread; + active_ = false; + playerThread->join(); + delete playerThread; } void Player::worker() { - unsigned int pcm; - snd_pcm_sframes_t avail; - snd_pcm_sframes_t delay; - active_ = true; - while (active_) - { - snd_pcm_avail_delay(pcm_handle, &avail, &delay); + unsigned int pcm; + snd_pcm_sframes_t avail; + snd_pcm_sframes_t delay; + active_ = true; + while (active_) + { + snd_pcm_avail_delay(pcm_handle, &avail, &delay); - if (stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames, 500)) - { - if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) - { - printf("XRUN.\n"); - snd_pcm_prepare(pcm_handle); - } - else if (pcm < 0) - { - printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm)); - } - } - } + if (stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames, 500)) + { + if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) + { + printf("XRUN.\n"); + snd_pcm_prepare(pcm_handle); + } + else if (pcm < 0) + { + printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm)); + } + } + } - snd_pcm_drain(pcm_handle); - snd_pcm_close(pcm_handle); - free(buff); + snd_pcm_drain(pcm_handle); + snd_pcm_close(pcm_handle); + free(buff); } diff --git a/client/player.h b/client/player.h index 38606ead..3db37f51 100644 --- a/client/player.h +++ b/client/player.h @@ -11,18 +11,18 @@ class Player { public: - Player(Stream* stream); - void start(); - void stop(); + Player(Stream* stream); + void start(); + void stop(); private: - void worker(); - snd_pcm_t* pcm_handle; - snd_pcm_uframes_t frames; - char *buff; - std::atomic active_; - Stream* stream_; - std::thread* playerThread; + void worker(); + snd_pcm_t* pcm_handle; + snd_pcm_uframes_t frames; + char *buff; + std::atomic active_; + Stream* stream_; + std::thread* playerThread; }; diff --git a/client/snapClient.cpp b/client/snapClient.cpp index c71d2e82..ea3bbde2 100644 --- a/client/snapClient.cpp +++ b/client/snapClient.cpp @@ -22,47 +22,47 @@ namespace po = boost::program_options; int main (int argc, char *argv[]) { - int deviceIdx; - string ip; - int bufferMs; - size_t port; - bool runAsDaemon; + int deviceIdx; + string ip; + int bufferMs; + size_t port; + bool runAsDaemon; // string sampleFormat; - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("port,p", po::value(&port)->default_value(98765), "port where the server listens on") - ("ip,i", po::value(&ip)->default_value("192.168.0.2"), "server IP") - ("soundcard,s", po::value(&deviceIdx)->default_value(-1), "index of the soundcard") + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port,p", po::value(&port)->default_value(98765), "port where the server listens on") + ("ip,i", po::value(&ip)->default_value("192.168.0.2"), "server IP") + ("soundcard,s", po::value(&deviceIdx)->default_value(-1), "index of the soundcard") // ("sampleformat,f", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") - ("buffer,b", po::value(&bufferMs)->default_value(300), "buffer size [ms]") - ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") - ; + ("buffer,b", po::value(&bufferMs)->default_value(300), "buffer size [ms]") + ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") + ; - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); - if (vm.count("help")) - { - cout << desc << "\n"; - return 1; - } + if (vm.count("help")) + { + cout << desc << "\n"; + return 1; + } - std::clog.rdbuf(new Log("snapclient", LOG_DAEMON)); - if (runAsDaemon) - { - daemonize(); - std::clog << kLogNotice << "daemon started" << std::endl; - } + std::clog.rdbuf(new Log("snapclient", LOG_DAEMON)); + if (runAsDaemon) + { + daemonize(); + std::clog << kLogNotice << "daemon started" << std::endl; + } - Controller controller; - controller.start(ip, port, bufferMs); + Controller controller; + controller.start(ip, port, bufferMs); - while(true) - usleep(10000); + while(true) + usleep(10000); - return 0; + return 0; } diff --git a/client/stream.cpp b/client/stream.cpp index 9a6d5068..f7cc6b84 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -9,34 +9,34 @@ using namespace std; Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0) { - buffer.setSize(500); - shortBuffer.setSize(100); - miniBuffer.setSize(20); - cardBuffer.setSize(50); - bufferMs = 500; + buffer.setSize(500); + shortBuffer.setSize(100); + miniBuffer.setSize(20); + cardBuffer.setSize(50); + bufferMs = 500; } void Stream::setBufferLen(size_t bufferLenMs) { - bufferMs = bufferLenMs; + bufferMs = bufferLenMs; } void Stream::clearChunks() { - while (chunks.size() > 0) - chunks.pop(); + while (chunks.size() > 0) + chunks.pop(); } void Stream::addChunk(PcmChunk* chunk) { - while (chunks.size() * chunk->getDuration() > 10000) - chunks.pop(); - chunks.push(shared_ptr(chunk)); + while (chunks.size() * chunk->getDuration() > 10000) + chunks.pop(); + chunks.push(shared_ptr(chunk)); // cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n"; } @@ -44,11 +44,11 @@ void Stream::addChunk(PcmChunk* chunk) time_point_ms Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer) { - if (!chunk) - chunk = chunks.pop(); - time_point_ms tp = chunk->timePoint(); - memset(outputBuffer, 0, framesPerBuffer * format.frameSize); - return tp; + if (!chunk) + chunk = chunks.pop(); + time_point_ms tp = chunk->timePoint(); + memset(outputBuffer, 0, framesPerBuffer * format.frameSize); + return tp; } @@ -72,204 +72,204 @@ time_point_ms Stream::seekTo(const time_point_ms& to) time_point_ms Stream::seek(long ms) { - if (!chunk) - chunk = chunks.pop(); + if (!chunk) + chunk = chunks.pop(); - if (ms <= 0) - return chunk->timePoint(); + if (ms <= 0) + return chunk->timePoint(); // time_point_ms tp = chunk->timePoint(); - while (ms > chunk->getTimeLeft()) - { - chunk = chunks.pop(); - ms -= min(ms, (long)chunk->getTimeLeft()); - } - chunk->seek(ms * format.msRate()); - return chunk->timePoint(); + while (ms > chunk->getTimeLeft()) + { + chunk = chunks.pop(); + ms -= min(ms, (long)chunk->getTimeLeft()); + } + chunk->seek(ms * format.msRate()); + return chunk->timePoint(); } time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction) { - if (!chunk) - if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout))) - throw 0; + if (!chunk) + if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout))) + throw 0; - time_point_ms tp = chunk->timePoint(); - int read = 0; - int toRead = framesPerBuffer + correction*format.msRate(); - char* buffer; + time_point_ms tp = chunk->timePoint(); + int read = 0; + int toRead = framesPerBuffer + correction*format.msRate(); + char* buffer; - if (correction != 0) - { - int msBuffer = floor(framesPerBuffer / format.msRate()); - if (abs(correction) > msBuffer / 2) - correction = copysign(msBuffer / 2, correction); - buffer = (char*)malloc(toRead * format.frameSize); - } - else - buffer = (char*)outputBuffer; + if (correction != 0) + { + int msBuffer = floor(framesPerBuffer / format.msRate()); + if (abs(correction) > msBuffer / 2) + correction = copysign(msBuffer / 2, correction); + buffer = (char*)malloc(toRead * format.frameSize); + } + else + buffer = (char*)outputBuffer; - while (read < toRead) - { - read += chunk->readFrames(buffer + read*format.frameSize, toRead - read); - if (chunk->isEndOfChunk()) - if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout))) - throw 0; - } + while (read < toRead) + { + read += chunk->readFrames(buffer + read*format.frameSize, toRead - read); + if (chunk->isEndOfChunk()) + if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout))) + throw 0; + } - if (correction != 0) - { - float factor = (float)toRead / framesPerBuffer;//(float)(framesPerBuffer*channels_); - std::cout << "correction: " << correction << ", factor: " << factor << "\n"; - float idx = 0; - for (size_t n=0; n -msBuffer/2) // sleep = 0; - if (sleep < -msBuffer/2) - return true; - } - else if (sleep > msBuffer/2) - { - /* cout << "Sleep " << sleep; - time_point_ms ms(std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch())); - ms -= std::chrono::milliseconds((long int)(bufferMs - outputBufferDacTime)); - cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n"; - sleep = Chunk::getAge(seekTo(ms)) - bufferMs + outputBufferDacTime; - cout << " after: " << sleep << "\n"; - */ - if (!chunk) - chunk = chunks.pop(); - while (sleep > chunk->getDuration()) - { - chunk = chunks.pop(); - sleep = chunk->getAge() - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); + if (sleep < -msBuffer/2) + return true; + } + else if (sleep > msBuffer/2) + { + /* cout << "Sleep " << sleep; + time_point_ms ms(std::chrono::duration_cast(std::chrono::high_resolution_clock::now().time_since_epoch())); + ms -= std::chrono::milliseconds((long int)(bufferMs - outputBufferDacTime)); + cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n"; + sleep = Chunk::getAge(seekTo(ms)) - bufferMs + outputBufferDacTime; + cout << " after: " << sleep << "\n"; + */ + if (!chunk) + chunk = chunks.pop(); + while (sleep > chunk->getDuration()) + { + chunk = chunks.pop(); + sleep = chunk->getAge() - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); // cout << "chunk->getAge() > chunk->getDuration(): " << chunk->getAge() - bufferMs + outputBufferDacTime << " > " << chunk->getDuration() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime << ", needed: " << msBuffer << ", sleep: " << sleep << "\n"; - usleep(1000); - } - cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n"; - sleep = 0; - } - else if (sleep < 0) - { - ++sleep; - correction = -1; - } - else if (sleep > 0) - { - --sleep; - correction = 1; - } - } + usleep(1000); + } + cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n"; + sleep = 0; + } + else if (sleep < 0) + { + ++sleep; + correction = -1; + } + else if (sleep > 0) + { + --sleep; + correction = 1; + } + } - long age(0); - try - { - age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); - } - catch(int e) - { - return false; - } + long age(0); + try + { + age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); + } + catch(int e) + { + return false; + } - if (sleep == 0) - { - if (buffer.full() && (abs(median) > 1)) - { - cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n"; - sleep = median; - } - else if (shortBuffer.full() && (abs(shortMedian) > 5)) - { - cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n"; - sleep = shortMedian; - } - else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50)) - { - cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n"; - sleep = miniBuffer.mean(); - } - else if (abs(age) > 200) - { - cout << "age > 50: " << age << "\n"; - sleep = age; - } - } + if (sleep == 0) + { + if (buffer.full() && (abs(median) > 1)) + { + cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n"; + sleep = median; + } + else if (shortBuffer.full() && (abs(shortMedian) > 5)) + { + cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n"; + sleep = shortMedian; + } + else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50)) + { + cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n"; + sleep = miniBuffer.mean(); + } + else if (abs(age) > 200) + { + cout << "age > 50: " << age << "\n"; + sleep = age; + } + } - if (sleep != 0) - std::cerr << "Sleep: " << sleep << "\n"; + if (sleep != 0) + std::cerr << "Sleep: " << sleep << "\n"; // std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n"; - if (ticks > 2) - { + if (ticks > 2) + { // cout << age << "\n"; - updateBuffers(age); - } - time_t now = time(NULL); - if (now != lastUpdate) - { - lastUpdate = now; - median = buffer.median(); - shortMedian = shortBuffer.median(); - std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime << "\n"; - } - return true; + updateBuffers(age); + } + time_t now = time(NULL); + if (now != lastUpdate) + { + lastUpdate = now; + median = buffer.median(); + shortMedian = shortBuffer.median(); + std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime << "\n"; + } + return true; } diff --git a/client/stream.h b/client/stream.h index 6b425e71..7b16dcf1 100644 --- a/client/stream.h +++ b/client/stream.h @@ -18,37 +18,37 @@ class Stream { public: - Stream(const SampleFormat& format); - void addChunk(PcmChunk* chunk); - void clearChunks(); - bool getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout); - void setBufferLen(size_t bufferLenMs); - const SampleFormat& format; + Stream(const SampleFormat& format); + void addChunk(PcmChunk* chunk); + void clearChunks(); + bool getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout); + void setBufferLen(size_t bufferLenMs); + const SampleFormat& format; private: - time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0); - time_point_ms getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer); - time_point_ms seek(long ms); + time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0); + time_point_ms getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer); + time_point_ms seek(long ms); // time_point_ms seekTo(const time_point_ms& to); - void updateBuffers(int age); - void resetBuffers(); + void updateBuffers(int age); + void resetBuffers(); - SampleFormat format_; + SampleFormat format_; - long lastTick; - long sleep; + long lastTick; + long sleep; - Queue> chunks; - DoubleBuffer cardBuffer; - DoubleBuffer miniBuffer; - DoubleBuffer buffer; - DoubleBuffer shortBuffer; - std::shared_ptr chunk; + Queue> chunks; + DoubleBuffer cardBuffer; + DoubleBuffer miniBuffer; + DoubleBuffer buffer; + DoubleBuffer shortBuffer; + std::shared_ptr chunk; - int median; - int shortMedian; - time_t lastUpdate; - int bufferMs; + int median; + int shortMedian; + time_t lastUpdate; + int bufferMs; }; diff --git a/client/streamClient.h b/client/streamClient.h index d8676de1..d02707dc 100644 --- a/client/streamClient.h +++ b/client/streamClient.h @@ -10,8 +10,8 @@ using boost::asio::ip::tcp; class StreamClient : public ClientConnection { public: - StreamClient(MessageReceiver* _receiver, const std::string& _ip, size_t _port); - virtual ~StreamClient(); + StreamClient(MessageReceiver* _receiver, const std::string& _ip, size_t _port); + virtual ~StreamClient(); }; diff --git a/client/timeProvider.cpp b/client/timeProvider.cpp index 4e017aef..d894cf15 100644 --- a/client/timeProvider.cpp +++ b/client/timeProvider.cpp @@ -3,26 +3,26 @@ TimeProvider::TimeProvider() : diffToServer(0) { - diffBuffer.setSize(60); + diffBuffer.setSize(60); } void TimeProvider::setDiffToServer(double ms) { - diffBuffer.add(ms * 1000); - diffToServer = diffBuffer.median(); + diffBuffer.add(ms * 1000); + diffToServer = diffBuffer.median(); } long TimeProvider::getDiffToServer() { - return diffToServer; + return diffToServer; } long TimeProvider::getDiffToServerMs() { - return diffToServer / 1000; + return diffToServer / 1000; } diff --git a/client/timeProvider.h b/client/timeProvider.h index f6cc8489..3c6e95cf 100644 --- a/client/timeProvider.h +++ b/client/timeProvider.h @@ -6,26 +6,26 @@ class TimeProvider { public: - static TimeProvider& getInstance() - { - static TimeProvider instance; - return instance; - } + static TimeProvider& getInstance() + { + static TimeProvider instance; + return instance; + } - void setDiffToServer(double ms); - long getDiffToServer(); - long getDiffToServerMs(); + void setDiffToServer(double ms); + long getDiffToServer(); + long getDiffToServerMs(); private: - TimeProvider(); // Constructor? (the {} brackets) are needed here. - // Dont forget to declare these two. You want to make sure they - // are unaccessable otherwise you may accidently get copies of - // your singleton appearing. - TimeProvider(TimeProvider const&); // Don't Implement - void operator=(TimeProvider const&); // Don't implement + TimeProvider(); // Constructor? (the {} brackets) are needed here. + // Dont forget to declare these two. You want to make sure they + // are unaccessable otherwise you may accidently get copies of + // your singleton appearing. + TimeProvider(TimeProvider const&); // Don't Implement + void operator=(TimeProvider const&); // Don't implement - DoubleBuffer diffBuffer; - long diffToServer; + DoubleBuffer diffBuffer; + long diffToServer; }; diff --git a/common/headerMessage.h b/common/headerMessage.h index b44ec14a..d0b78e30 100644 --- a/common/headerMessage.h +++ b/common/headerMessage.h @@ -8,37 +8,37 @@ class HeaderMessage : public BaseMessage { public: - HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size) - { - payload = (char*)malloc(size); - } + HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size) + { + payload = (char*)malloc(size); + } - virtual ~HeaderMessage() - { - free(payload); - } + virtual ~HeaderMessage() + { + free(payload); + } - virtual void read(std::istream& stream) - { - stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); - payload = (char*)realloc(payload, payloadSize); - stream.read(payload, payloadSize); - } + virtual void read(std::istream& stream) + { + stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + payload = (char*)realloc(payload, payloadSize); + stream.read(payload, payloadSize); + } - virtual uint32_t getSize() - { - return sizeof(uint32_t) + payloadSize; - } + virtual uint32_t getSize() + { + return sizeof(uint32_t) + payloadSize; + } - uint32_t payloadSize; - char* payload; + uint32_t payloadSize; + char* payload; protected: - virtual void doserialize(std::ostream& stream) - { - stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); - stream.write(payload, payloadSize); - } + virtual void doserialize(std::ostream& stream) + { + stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + stream.write(payload, payloadSize); + } }; diff --git a/common/log.cpp b/common/log.cpp index 968ce64d..5d17078d 100644 --- a/common/log.cpp +++ b/common/log.cpp @@ -2,47 +2,47 @@ Log::Log(std::string ident, int facility) { - facility_ = facility; - priority_ = LOG_DEBUG; - strncpy(ident_, ident.c_str(), sizeof(ident_)); - ident_[sizeof(ident_)-1] = '\0'; + facility_ = facility; + priority_ = LOG_DEBUG; + strncpy(ident_, ident.c_str(), sizeof(ident_)); + ident_[sizeof(ident_)-1] = '\0'; - openlog(ident_, LOG_PID, facility_); + openlog(ident_, LOG_PID, facility_); } int Log::sync() { - if (buffer_.length()) - { - if (priority_ == dbg) - std::cout << buffer_.c_str(); - else - syslog(priority_, "%s", buffer_.c_str()); - buffer_.erase(); - priority_ = LOG_DEBUG; // default to debug for each message - } - return 0; + if (buffer_.length()) + { + if (priority_ == dbg) + std::cout << buffer_.c_str(); + else + syslog(priority_, "%s", buffer_.c_str()); + buffer_.erase(); + priority_ = LOG_DEBUG; // default to debug for each message + } + return 0; } int Log::overflow(int c) { - if (c != EOF) - { - buffer_ += static_cast(c); - } - else - { - sync(); - } - return c; + if (c != EOF) + { + buffer_ += static_cast(c); + } + else + { + sync(); + } + return c; } std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority) { - static_cast(os.rdbuf())->priority_ = (int)log_priority; - if (log_priority == dbg) - os.flush(); - return os; + static_cast(os.rdbuf())->priority_ = (int)log_priority; + if (log_priority == dbg) + os.flush(); + return os; } diff --git a/common/log.h b/common/log.h index d1e5c544..48501bcb 100644 --- a/common/log.h +++ b/common/log.h @@ -9,15 +9,15 @@ enum LogPriority { - kLogEmerg = LOG_EMERG, // system is unusable - kLogAlert = LOG_ALERT, // action must be taken immediately - kLogCrit = LOG_CRIT, // critical conditions - kLogErr = LOG_ERR, // error conditions - kLogWarning = LOG_WARNING, // warning conditions - kLogNotice = LOG_NOTICE, // normal, but significant, condition - kLogInfo = LOG_INFO, // informational message - kLogDebug = LOG_DEBUG, // debug-level message - dbg + kLogEmerg = LOG_EMERG, // system is unusable + kLogAlert = LOG_ALERT, // action must be taken immediately + kLogCrit = LOG_CRIT, // critical conditions + kLogErr = LOG_ERR, // error conditions + kLogWarning = LOG_WARNING, // warning conditions + kLogNotice = LOG_NOTICE, // normal, but significant, condition + kLogInfo = LOG_INFO, // informational message + kLogDebug = LOG_DEBUG, // debug-level message + dbg }; std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority); @@ -25,18 +25,18 @@ std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority); class Log : public std::basic_streambuf > { public: - explicit Log(std::string ident, int facility); + explicit Log(std::string ident, int facility); protected: - int sync(); - int overflow(int c); + int sync(); + int overflow(int c); private: - friend std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority); - std::string buffer_; - int facility_; - int priority_; - char ident_[50]; + friend std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority); + std::string buffer_; + int facility_; + int priority_; + char ident_[50]; }; diff --git a/common/message.h b/common/message.h index 427170ef..109685ab 100644 --- a/common/message.h +++ b/common/message.h @@ -12,171 +12,171 @@ template > class vectorwrapbuf : public std::basic_streambuf { public: - vectorwrapbuf(std::vector &vec) - { - this->setg(vec.data(), vec.data(), vec.data() + vec.size()); - } + vectorwrapbuf(std::vector &vec) + { + this->setg(vec.data(), vec.data(), vec.data() + vec.size()); + } }; struct membuf : public std::basic_streambuf { - membuf(char* begin, char* end) - { - this->setg(begin, begin, end); - } + membuf(char* begin, char* end) + { + this->setg(begin, begin, end); + } }; enum message_type { - base = 0, - header = 1, - payload = 2, - sampleformat = 3, - serversettings = 4, - timemsg = 5, - requestmsg = 6 + base = 0, + header = 1, + payload = 2, + sampleformat = 3, + serversettings = 4, + timemsg = 5, + requestmsg = 6 }; struct tv { - tv() - { - timeval t; - gettimeofday(&t, NULL); - sec = t.tv_sec; - usec = t.tv_usec; - } - tv(timeval tv) : sec(tv.tv_sec), usec(tv.tv_usec) {}; - tv(int32_t _sec, int32_t _usec) : sec(_sec), usec(_usec) {}; + tv() + { + timeval t; + gettimeofday(&t, NULL); + sec = t.tv_sec; + usec = t.tv_usec; + } + tv(timeval tv) : sec(tv.tv_sec), usec(tv.tv_usec) {}; + tv(int32_t _sec, int32_t _usec) : sec(_sec), usec(_usec) {}; - int32_t sec; - int32_t usec; - /* - 5.3 - 6.2 = -0.9 - -1 - 0.1 + int32_t sec; + int32_t usec; + /* + 5.3 - 6.2 = -0.9 + -1 + 0.1 - 5.3 - 6.4 = -1.1 - -1 - -0.1 - */ + 5.3 - 6.4 = -1.1 + -1 + -0.1 + */ //(timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec) - tv operator-(const tv& other) - { - tv result(*this); - result.sec -= other.sec; - result.usec -= other.usec; - if (result.usec > 0) - { - result.sec += 1; - result.usec = 1000000 - result.usec; - } - else if (result.usec < 0) - { - result.usec *= -1; - } - /* else if (result.usec >= 1000000) - { - result.usec -= 1000000; - result.sec += 1; - } - */ return result; - } + tv operator-(const tv& other) + { + tv result(*this); + result.sec -= other.sec; + result.usec -= other.usec; + if (result.usec > 0) + { + result.sec += 1; + result.usec = 1000000 - result.usec; + } + else if (result.usec < 0) + { + result.usec *= -1; + } + /* else if (result.usec >= 1000000) + { + result.usec -= 1000000; + result.sec += 1; + } + */ return result; + } }; struct BaseMessage { - BaseMessage() : type(base), id(0), refersTo(0) - { - } + BaseMessage() : type(base), id(0), refersTo(0) + { + } - BaseMessage(message_type type_) : type(type_), id(0), refersTo(0) - { - } + BaseMessage(message_type type_) : type(type_), id(0), refersTo(0) + { + } - virtual ~BaseMessage() - { - } + virtual ~BaseMessage() + { + } - virtual void read(std::istream& stream) - { - stream.read(reinterpret_cast(&type), sizeof(uint16_t)); - stream.read(reinterpret_cast(&id), sizeof(uint16_t)); - stream.read(reinterpret_cast(&refersTo), sizeof(uint16_t)); - stream.read(reinterpret_cast(&sent.sec), sizeof(int32_t)); - stream.read(reinterpret_cast(&sent.usec), sizeof(int32_t)); - stream.read(reinterpret_cast(&received.sec), sizeof(int32_t)); - stream.read(reinterpret_cast(&received.usec), sizeof(int32_t)); - stream.read(reinterpret_cast(&size), sizeof(uint32_t)); - } + virtual void read(std::istream& stream) + { + stream.read(reinterpret_cast(&type), sizeof(uint16_t)); + stream.read(reinterpret_cast(&id), sizeof(uint16_t)); + stream.read(reinterpret_cast(&refersTo), sizeof(uint16_t)); + stream.read(reinterpret_cast(&sent.sec), sizeof(int32_t)); + stream.read(reinterpret_cast(&sent.usec), sizeof(int32_t)); + stream.read(reinterpret_cast(&received.sec), sizeof(int32_t)); + stream.read(reinterpret_cast(&received.usec), sizeof(int32_t)); + stream.read(reinterpret_cast(&size), sizeof(uint32_t)); + } - void deserialize(char* payload) - { - membuf databuf(payload, payload + BaseMessage::getSize()); - std::istream is(&databuf); - read(is); - } + void deserialize(char* payload) + { + membuf databuf(payload, payload + BaseMessage::getSize()); + std::istream is(&databuf); + read(is); + } - void deserialize(const BaseMessage& baseMessage, char* payload) - { - type = baseMessage.type; - id = baseMessage.id; - refersTo = baseMessage.refersTo; - sent = baseMessage.sent; - received = baseMessage.received; - size = baseMessage.size; - membuf databuf(payload, payload + size); - std::istream is(&databuf); - read(is); - } + void deserialize(const BaseMessage& baseMessage, char* payload) + { + type = baseMessage.type; + id = baseMessage.id; + refersTo = baseMessage.refersTo; + sent = baseMessage.sent; + received = baseMessage.received; + size = baseMessage.size; + membuf databuf(payload, payload + size); + std::istream is(&databuf); + read(is); + } - virtual void serialize(std::ostream& stream) - { - stream.write(reinterpret_cast(&type), sizeof(uint16_t)); - stream.write(reinterpret_cast(&id), sizeof(uint16_t)); - stream.write(reinterpret_cast(&refersTo), sizeof(uint16_t)); - stream.write(reinterpret_cast(&sent.sec), sizeof(int32_t)); - stream.write(reinterpret_cast(&sent.usec), sizeof(int32_t)); - stream.write(reinterpret_cast(&received.sec), sizeof(int32_t)); - stream.write(reinterpret_cast(&received.usec), sizeof(int32_t)); - size = getSize(); - stream.write(reinterpret_cast(&size), sizeof(uint32_t)); - doserialize(stream); - } + virtual void serialize(std::ostream& stream) + { + stream.write(reinterpret_cast(&type), sizeof(uint16_t)); + stream.write(reinterpret_cast(&id), sizeof(uint16_t)); + stream.write(reinterpret_cast(&refersTo), sizeof(uint16_t)); + stream.write(reinterpret_cast(&sent.sec), sizeof(int32_t)); + stream.write(reinterpret_cast(&sent.usec), sizeof(int32_t)); + stream.write(reinterpret_cast(&received.sec), sizeof(int32_t)); + stream.write(reinterpret_cast(&received.usec), sizeof(int32_t)); + size = getSize(); + stream.write(reinterpret_cast(&size), sizeof(uint32_t)); + doserialize(stream); + } - virtual uint32_t getSize() - { - return 3*sizeof(uint16_t) + 2*sizeof(tv) + sizeof(uint32_t); - }; + virtual uint32_t getSize() + { + return 3*sizeof(uint16_t) + 2*sizeof(tv) + sizeof(uint32_t); + }; - uint16_t type; - uint16_t id; - uint16_t refersTo; - tv sent; - tv received; - uint32_t size; + uint16_t type; + uint16_t id; + uint16_t refersTo; + tv sent; + tv received; + uint32_t size; protected: - virtual void doserialize(std::ostream& stream) - { - }; + virtual void doserialize(std::ostream& stream) + { + }; }; struct SerializedMessage { - ~SerializedMessage() - { - free(buffer); - } + ~SerializedMessage() + { + free(buffer); + } - BaseMessage message; - char* buffer; + BaseMessage message; + char* buffer; }; diff --git a/common/pcmChunk.cpp b/common/pcmChunk.cpp index ad776561..f92c2ea3 100644 --- a/common/pcmChunk.cpp +++ b/common/pcmChunk.cpp @@ -24,56 +24,56 @@ PcmChunk::~PcmChunk() bool PcmChunk::isEndOfChunk() const { - return idx >= getFrameCount(); + return idx >= getFrameCount(); } double PcmChunk::getFrameCount() const { - return (payloadSize / format.frameSize); + return (payloadSize / format.frameSize); } double PcmChunk::getDuration() const { - return getFrameCount() / format.msRate(); + return getFrameCount() / format.msRate(); } double PcmChunk::getTimeLeft() const { - return (getFrameCount() - idx) / format.msRate(); + return (getFrameCount() - idx) / format.msRate(); } int PcmChunk::seek(int frames) { - idx += frames; - if (idx > getFrameCount()) - idx = getFrameCount(); - if (idx < 0) - idx = 0; - return idx; + idx += frames; + if (idx > getFrameCount()) + idx = getFrameCount(); + if (idx < 0) + idx = 0; + return idx; } int PcmChunk::readFrames(void* outputBuffer, size_t frameCount) { //logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl; - int result = frameCount; - if (idx + frameCount > (payloadSize / format.frameSize)) - result = (payloadSize / format.frameSize) - idx; + int result = frameCount; + if (idx + frameCount > (payloadSize / format.frameSize)) + result = (payloadSize / format.frameSize) - idx; //logd << ", from: " << format.frameSize*idx << ", to: " << format.frameSize*idx + format.frameSize*result; - if (outputBuffer != NULL) - memcpy((char*)outputBuffer, (char*)(payload) + format.frameSize*idx, format.frameSize*result); + if (outputBuffer != NULL) + memcpy((char*)outputBuffer, (char*)(payload) + format.frameSize*idx, format.frameSize*result); - idx += result; + idx += result; //logd << ", new idx: " << idx << ", result: " << result << ", wireChunk->length: " << wireChunk->length << ", format.frameSize: " << format.frameSize << "\n";//std::endl; - return result; + return result; } diff --git a/common/pcmChunk.h b/common/pcmChunk.h index 8c8118fc..07c85064 100644 --- a/common/pcmChunk.h +++ b/common/pcmChunk.h @@ -13,57 +13,57 @@ typedef std::chrono::time_point - inline T getAge() const - { - return getAge(timePoint()); - } + template + inline T getAge() const + { + return getAge(timePoint()); + } - inline long getAge() const - { - return getAge().count(); - } + inline long getAge() const + { + return getAge().count(); + } - inline static long getAge(const time_point_ms& time_point) - { - return getAge(time_point).count(); - } + inline static long getAge(const time_point_ms& time_point) + { + return getAge(time_point).count(); + } - template - static inline T getAge(const std::chrono::time_point& time_point) - { - return std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - time_point); - } + template + static inline T getAge(const std::chrono::time_point& time_point) + { + return std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - time_point); + } - int seek(int frames); - double getDuration() const; - double getDurationUs() const; - double getTimeLeft() const; - double getFrameCount() const; + int seek(int frames); + double getDuration() const; + double getDurationUs() const; + double getTimeLeft() const; + double getFrameCount() const; - SampleFormat format; + SampleFormat format; private: // SampleFormat format_; - uint32_t idx; + uint32_t idx; }; diff --git a/common/queue.h b/common/queue.h index 4e3a28f0..4af2c009 100644 --- a/common/queue.h +++ b/common/queue.h @@ -11,81 +11,81 @@ class Queue { public: - T pop() - { - std::unique_lock mlock(mutex_); - while (queue_.empty()) - { - cond_.wait(mlock); - } - auto val = queue_.front(); - queue_.pop(); - return val; - } + T pop() + { + std::unique_lock mlock(mutex_); + while (queue_.empty()) + { + cond_.wait(mlock); + } + auto val = queue_.front(); + queue_.pop(); + return val; + } - T front() - { - std::unique_lock mlock(mutex_); - while (queue_.empty()) - cond_.wait(mlock); + T front() + { + std::unique_lock mlock(mutex_); + while (queue_.empty()) + cond_.wait(mlock); - return queue_.front(); - } + return queue_.front(); + } - bool try_pop(T& item, std::chrono::milliseconds timeout) - { - std::unique_lock mlock(mutex_); + bool try_pop(T& item, std::chrono::milliseconds timeout) + { + std::unique_lock mlock(mutex_); - if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); })) - return false; + if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); })) + return false; - item = std::move(queue_.front()); - queue_.pop(); + item = std::move(queue_.front()); + queue_.pop(); - return true; - } + return true; + } - void pop(T& item) - { - std::unique_lock mlock(mutex_); - while (queue_.empty()) - { - cond_.wait(mlock); - } - item = queue_.front(); - queue_.pop(); - } + void pop(T& item) + { + std::unique_lock mlock(mutex_); + while (queue_.empty()) + { + cond_.wait(mlock); + } + item = queue_.front(); + queue_.pop(); + } - void push(const T& item) - { - std::unique_lock mlock(mutex_); - queue_.push(item); - mlock.unlock(); - cond_.notify_one(); - } + void push(const T& item) + { + std::unique_lock mlock(mutex_); + queue_.push(item); + mlock.unlock(); + cond_.notify_one(); + } - void push(T&& item) - { - std::unique_lock mlock(mutex_); - queue_.push(std::move(item)); - mlock.unlock(); - cond_.notify_one(); - } + void push(T&& item) + { + std::unique_lock mlock(mutex_); + queue_.push(std::move(item)); + mlock.unlock(); + cond_.notify_one(); + } - size_t size() const - { - std::unique_lock mlock(mutex_); - return queue_.size(); - } + size_t size() const + { + std::unique_lock mlock(mutex_); + return queue_.size(); + } - Queue()=default; - Queue(const Queue&) = delete; // disable copying - Queue& operator=(const Queue&) = delete; // disable assignment + Queue()=default; + Queue(const Queue&) = delete; // disable copying + Queue& operator=(const Queue&) = delete; // disable assignment private: - std::queue queue_; - mutable std::mutex mutex_; - std::condition_variable cond_; + std::queue queue_; + mutable std::mutex mutex_; + std::condition_variable cond_; }; diff --git a/common/requestMsg.h b/common/requestMsg.h index e915d4e6..e6ccd0b4 100644 --- a/common/requestMsg.h +++ b/common/requestMsg.h @@ -8,40 +8,40 @@ class RequestMsg : public BaseMessage { public: - RequestMsg() : BaseMessage(message_type::requestmsg) - { - } + RequestMsg() : BaseMessage(message_type::requestmsg) + { + } - RequestMsg(const std::string& _request) : BaseMessage(message_type::requestmsg), request(_request) - { - } + RequestMsg(const std::string& _request) : BaseMessage(message_type::requestmsg), request(_request) + { + } - virtual ~RequestMsg() - { - } + virtual ~RequestMsg() + { + } - virtual void read(std::istream& stream) - { - int16_t size; - stream.read(reinterpret_cast(&size), sizeof(int16_t)); - request.resize(size); - stream.read(&request[0], size); - } + virtual void read(std::istream& stream) + { + int16_t size; + stream.read(reinterpret_cast(&size), sizeof(int16_t)); + request.resize(size); + stream.read(&request[0], size); + } - virtual uint32_t getSize() - { - return sizeof(int16_t) + request.size(); - } + virtual uint32_t getSize() + { + return sizeof(int16_t) + request.size(); + } - std::string request; + std::string request; protected: - virtual void doserialize(std::ostream& stream) - { - int16_t size(request.size()); - stream.write(reinterpret_cast(&size), sizeof(int16_t)); - stream.write(request.c_str(), size); - } + virtual void doserialize(std::ostream& stream) + { + int16_t size(request.size()); + stream.write(reinterpret_cast(&size), sizeof(int16_t)); + stream.write(request.c_str(), size); + } }; diff --git a/common/sampleFormat.cpp b/common/sampleFormat.cpp index 0f1cd8fb..ba9f9cd1 100644 --- a/common/sampleFormat.cpp +++ b/common/sampleFormat.cpp @@ -12,37 +12,37 @@ SampleFormat::SampleFormat() : BaseMessage(message_type::sampleformat) SampleFormat::SampleFormat(const std::string& format) : BaseMessage(message_type::sampleformat) { - setFormat(format); + setFormat(format); } SampleFormat::SampleFormat(uint16_t sampleRate, uint16_t bitsPerSample, uint16_t channelCount) : BaseMessage(message_type::sampleformat) { - setFormat(sampleRate, bitsPerSample, channelCount); + setFormat(sampleRate, bitsPerSample, channelCount); } void SampleFormat::setFormat(const std::string& format) { - std::vector strs; - boost::split(strs, format, boost::is_any_of(":")); - if (strs.size() == 3) - setFormat( - boost::lexical_cast(strs[0]), - boost::lexical_cast(strs[1]), - boost::lexical_cast(strs[2])); + std::vector strs; + boost::split(strs, format, boost::is_any_of(":")); + if (strs.size() == 3) + setFormat( + boost::lexical_cast(strs[0]), + boost::lexical_cast(strs[1]), + boost::lexical_cast(strs[2])); } void SampleFormat::setFormat(uint16_t rate, uint16_t bits, uint16_t channels) { - this->rate = rate; - this->bits = bits; - this->channels = channels; - sampleSize = bits / 8; - if (bits == 24) - sampleSize = 4; - frameSize = channels*sampleSize; + this->rate = rate; + this->bits = bits; + this->channels = channels; + sampleSize = bits / 8; + if (bits == 24) + sampleSize = 4; + frameSize = channels*sampleSize; } diff --git a/common/sampleFormat.h b/common/sampleFormat.h index 895ae864..c14aaebe 100644 --- a/common/sampleFormat.h +++ b/common/sampleFormat.h @@ -8,56 +8,56 @@ class SampleFormat : public BaseMessage { public: - SampleFormat(); - SampleFormat(const std::string& format); - SampleFormat(uint16_t rate, uint16_t bits, uint16_t channels); + SampleFormat(); + SampleFormat(const std::string& format); + SampleFormat(uint16_t rate, uint16_t bits, uint16_t channels); - void setFormat(const std::string& format); - void setFormat(uint16_t rate, uint16_t bits, uint16_t channels); + void setFormat(const std::string& format); + void setFormat(uint16_t rate, uint16_t bits, uint16_t channels); - uint16_t rate; - uint16_t bits; - uint16_t channels; + uint16_t rate; + uint16_t bits; + uint16_t channels; - uint16_t sampleSize; - uint16_t frameSize; + uint16_t sampleSize; + uint16_t frameSize; - float msRate() const - { - return (float)rate/1000.f; - } + float msRate() const + { + return (float)rate/1000.f; + } - virtual void read(std::istream& stream) - { - stream.read(reinterpret_cast(&rate), sizeof(uint16_t)); - stream.read(reinterpret_cast(&bits), sizeof(uint16_t)); - stream.read(reinterpret_cast(&channels), sizeof(uint16_t)); - stream.read(reinterpret_cast(&sampleSize), sizeof(uint16_t)); - stream.read(reinterpret_cast(&frameSize), sizeof(uint16_t)); - } + virtual void read(std::istream& stream) + { + stream.read(reinterpret_cast(&rate), sizeof(uint16_t)); + stream.read(reinterpret_cast(&bits), sizeof(uint16_t)); + stream.read(reinterpret_cast(&channels), sizeof(uint16_t)); + stream.read(reinterpret_cast(&sampleSize), sizeof(uint16_t)); + stream.read(reinterpret_cast(&frameSize), sizeof(uint16_t)); + } - virtual uint32_t getSize() - { - return 5*sizeof(int16_t); - } + virtual uint32_t getSize() + { + return 5*sizeof(int16_t); + } protected: - virtual void doserialize(std::ostream& stream) - { - stream.write(reinterpret_cast(&rate), sizeof(uint16_t)); - stream.write(reinterpret_cast(&bits), sizeof(uint16_t)); - stream.write(reinterpret_cast(&channels), sizeof(uint16_t)); - stream.write(reinterpret_cast(&sampleSize), sizeof(uint16_t)); - stream.write(reinterpret_cast(&frameSize), sizeof(uint16_t)); - } + virtual void doserialize(std::ostream& stream) + { + stream.write(reinterpret_cast(&rate), sizeof(uint16_t)); + stream.write(reinterpret_cast(&bits), sizeof(uint16_t)); + stream.write(reinterpret_cast(&channels), sizeof(uint16_t)); + stream.write(reinterpret_cast(&sampleSize), sizeof(uint16_t)); + stream.write(reinterpret_cast(&frameSize), sizeof(uint16_t)); + } - /*private: - uint16_t rate_; - uint16_t bits_; - uint16_t channels_; - uint16_t bytes_; - uint16_t frameSize_; - */ + /*private: + uint16_t rate_; + uint16_t bits_; + uint16_t channels_; + uint16_t bytes_; + uint16_t frameSize_; + */ }; diff --git a/common/serverSettings.h b/common/serverSettings.h index 0b1610db..cbcf2f3c 100644 --- a/common/serverSettings.h +++ b/common/serverSettings.h @@ -8,31 +8,31 @@ class ServerSettings : public BaseMessage { public: - ServerSettings(size_t _port = 0) : BaseMessage(message_type::serversettings), port(_port) - { - } + ServerSettings(size_t _port = 0) : BaseMessage(message_type::serversettings), port(_port) + { + } - virtual ~ServerSettings() - { - } + virtual ~ServerSettings() + { + } - virtual void read(std::istream& stream) - { - stream.read(reinterpret_cast(&port), sizeof(int32_t)); - } + virtual void read(std::istream& stream) + { + stream.read(reinterpret_cast(&port), sizeof(int32_t)); + } - virtual uint32_t getSize() - { - return sizeof(int32_t); - } + virtual uint32_t getSize() + { + return sizeof(int32_t); + } - int32_t port; + int32_t port; protected: - virtual void doserialize(std::ostream& stream) - { - stream.write(reinterpret_cast(&port), sizeof(int32_t)); - } + virtual void doserialize(std::ostream& stream) + { + stream.write(reinterpret_cast(&port), sizeof(int32_t)); + } }; diff --git a/common/signalHandler.h b/common/signalHandler.h index 6bd7a260..4f2cd5c0 100644 --- a/common/signalHandler.h +++ b/common/signalHandler.h @@ -9,19 +9,19 @@ extern bool g_terminated; void signal_handler(int sig) { - switch(sig) - { - case SIGHUP: - syslog(LOG_WARNING, "Received SIGHUP signal."); - break; - case SIGTERM: - syslog(LOG_WARNING, "Received SIGTERM signal."); - g_terminated = true; - break; - default: - syslog(LOG_WARNING, "Unhandled signal "); - break; - } + switch(sig) + { + case SIGHUP: + syslog(LOG_WARNING, "Received SIGHUP signal."); + break; + case SIGTERM: + syslog(LOG_WARNING, "Received SIGTERM signal."); + g_terminated = true; + break; + default: + syslog(LOG_WARNING, "Unhandled signal "); + break; + } } #endif diff --git a/common/socketConnection.cpp b/common/socketConnection.cpp index 13943399..ab23c4ef 100644 --- a/common/socketConnection.cpp +++ b/common/socketConnection.cpp @@ -23,27 +23,35 @@ SocketConnection::~SocketConnection() void SocketConnection::socketRead(void* _to, size_t _bytes) { // std::unique_lock mlock(mutex_); - size_t toRead = _bytes; - size_t len = 0; - do - { - len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead)); - toRead = _bytes - len; - } - while (toRead > 0); + size_t toRead = _bytes; + size_t len = 0; + do + { +// cout << "/"; +// cout.flush(); + boost::system::error_code error; + len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error); +//cout << "len: " << len << ", error: " << error << endl; + toRead = _bytes - len; +// cout << "\\"; +// cout.flush(); + } + while (toRead > 0); } void SocketConnection::start() { - receiverThread = new thread(&SocketConnection::worker, this); + receiverThread = new thread(&SocketConnection::worker, this); } void SocketConnection::stop() { - active_ = false; - receiverThread->join(); + active_ = false; + socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); + socket->close(); + receiverThread->join(); } @@ -51,87 +59,87 @@ bool SocketConnection::send(BaseMessage* message) { // std::unique_lock mlock(mutex_); //cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; - if (!connected()) - return false; + if (!connected()) + return false; //cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - tv t; - message->sent = t; - message->serialize(stream); - boost::asio::write(*socket.get(), streambuf); - return true; + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); + tv t; + message->sent = t; + message->serialize(stream); + boost::asio::write(*socket.get(), streambuf); + return true; } shared_ptr SocketConnection::sendRequest(BaseMessage* message, size_t timeout) { - shared_ptr response(NULL); - if (++reqId == 0) - ++reqId; - message->id = reqId; - shared_ptr pendingRequest(new PendingRequest(reqId)); + shared_ptr response(NULL); + if (++reqId == 0) + ++reqId; + message->id = reqId; + shared_ptr pendingRequest(new PendingRequest(reqId)); - { - std::unique_lock mlock(mutex_); - pendingRequests.insert(pendingRequest); - } + { + std::unique_lock mlock(mutex_); + pendingRequests.insert(pendingRequest); + } // std::mutex mtx; - std::unique_lock lck(m); - send(message); - if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) - { - response = pendingRequest->response; - } - else - { - cout << "timeout while waiting for response to: " << reqId << "\n"; - } - { - std::unique_lock mlock(mutex_); - pendingRequests.erase(pendingRequest); - } - return response; + std::unique_lock lck(m); + send(message); + if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) + { + response = pendingRequest->response; + } + else + { + cout << "timeout while waiting for response to: " << reqId << "\n"; + } + { + std::unique_lock mlock(mutex_); + pendingRequests.erase(pendingRequest); + } + return response; } void SocketConnection::getNextMessage() { //cout << "getNextMessage\n"; - BaseMessage baseMessage; - size_t baseMsgSize = baseMessage.getSize(); - vector buffer(baseMsgSize); - socketRead(&buffer[0], baseMsgSize); - baseMessage.deserialize(&buffer[0]); + BaseMessage baseMessage; + size_t baseMsgSize = baseMessage.getSize(); + vector buffer(baseMsgSize); + socketRead(&buffer[0], baseMsgSize); + baseMessage.deserialize(&buffer[0]); //cout << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; - if (baseMessage.size > buffer.size()) - buffer.resize(baseMessage.size); - socketRead(&buffer[0], baseMessage.size); - tv t; - baseMessage.received = t; + if (baseMessage.size > buffer.size()) + buffer.resize(baseMessage.size); + socketRead(&buffer[0], baseMessage.size); + tv t; + baseMessage.received = t; - { - std::unique_lock mlock(mutex_); - for (auto req: pendingRequests) - { - if (req->id == baseMessage.refersTo) - { + { + std::unique_lock mlock(mutex_); + for (auto req: pendingRequests) + { + if (req->id == baseMessage.refersTo) + { //cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; //long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec); //cout << "latency: " << latency << "\n"; - req->response.reset(new SerializedMessage()); - req->response->message = baseMessage; - req->response->buffer = (char*)malloc(baseMessage.size); - memcpy(req->response->buffer, &buffer[0], baseMessage.size); - std::unique_lock lck(m); - req->cv.notify_one(); - return; - } - } - } + req->response.reset(new SerializedMessage()); + req->response->message = baseMessage; + req->response->buffer = (char*)malloc(baseMessage.size); + memcpy(req->response->buffer, &buffer[0], baseMessage.size); + std::unique_lock lck(m); + req->cv.notify_one(); + return; + } + } + } - if (messageReceiver != NULL) - messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); + if (messageReceiver != NULL) + messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); } diff --git a/common/socketConnection.h b/common/socketConnection.h index 06ff5a9c..2ee71d90 100644 --- a/common/socketConnection.h +++ b/common/socketConnection.h @@ -20,71 +20,71 @@ class SocketConnection; struct PendingRequest { - PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; + PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; - uint16_t id; - std::shared_ptr response; - std::condition_variable cv; + uint16_t id; + std::shared_ptr response; + std::condition_variable cv; }; class MessageReceiver { public: - virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; + virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; }; class SocketConnection { public: - SocketConnection(MessageReceiver* _receiver); - virtual ~SocketConnection(); - virtual void start(); - virtual void stop(); - virtual bool send(BaseMessage* _message); - virtual std::shared_ptr sendRequest(BaseMessage* message, size_t timeout); + SocketConnection(MessageReceiver* _receiver); + virtual ~SocketConnection(); + virtual void start(); + virtual void stop(); + virtual bool send(BaseMessage* _message); + virtual std::shared_ptr sendRequest(BaseMessage* message, size_t timeout); - template - std::shared_ptr sendReq(BaseMessage* message, size_t timeout) - { - std::shared_ptr reply = sendRequest(message, timeout); - if (!reply) - return NULL; - std::shared_ptr msg(new T); - msg->deserialize(reply->message, reply->buffer); - return msg; - } + template + std::shared_ptr sendReq(BaseMessage* message, size_t timeout) + { + std::shared_ptr reply = sendRequest(message, timeout); + if (!reply) + return NULL; + std::shared_ptr msg(new T); + msg->deserialize(reply->message, reply->buffer); + return msg; + } - virtual bool active() - { - return active_; - } + virtual bool active() + { + return active_; + } - virtual bool connected() - { - return (socket != 0); + virtual bool connected() + { + return (socket != 0); // return (connected_ && socket); - } + } protected: - virtual void worker() = 0; + virtual void worker() = 0; - void socketRead(void* _to, size_t _bytes); - std::shared_ptr socket; + void socketRead(void* _to, size_t _bytes); + std::shared_ptr socket; // boost::asio::ip::tcp::endpoint endpt; - std::atomic active_; - std::atomic connected_; - MessageReceiver* messageReceiver; - void getNextMessage(); - boost::asio::io_service io_service; - tcp::resolver::iterator iterator; - std::thread* receiverThread; - mutable std::mutex mutex_; - std::mutex m; - std::set> pendingRequests; - uint16_t reqId; + std::atomic active_; + std::atomic connected_; + MessageReceiver* messageReceiver; + void getNextMessage(); + boost::asio::io_service io_service; + tcp::resolver::iterator iterator; + std::thread* receiverThread; + mutable std::mutex mutex_; + std::mutex m; + std::set> pendingRequests; + uint16_t reqId; }; diff --git a/common/timeMsg.h b/common/timeMsg.h index 45aed0a7..27e9b740 100644 --- a/common/timeMsg.h +++ b/common/timeMsg.h @@ -7,31 +7,31 @@ class TimeMsg : public BaseMessage { public: - TimeMsg() : BaseMessage(message_type::timemsg) - { - } + TimeMsg() : BaseMessage(message_type::timemsg) + { + } - virtual ~TimeMsg() - { - } + virtual ~TimeMsg() + { + } - virtual void read(std::istream& stream) - { - stream.read(reinterpret_cast(&latency), sizeof(double)); - } + virtual void read(std::istream& stream) + { + stream.read(reinterpret_cast(&latency), sizeof(double)); + } - virtual uint32_t getSize() - { - return sizeof(double); - } + virtual uint32_t getSize() + { + return sizeof(double); + } - double latency; + double latency; protected: - virtual void doserialize(std::ostream& stream) - { - stream.write(reinterpret_cast(&latency), sizeof(double)); - } + virtual void doserialize(std::ostream& stream) + { + stream.write(reinterpret_cast(&latency), sizeof(double)); + } }; diff --git a/common/timeUtils.h b/common/timeUtils.h index 78fe7a89..a86967f1 100644 --- a/common/timeUtils.h +++ b/common/timeUtils.h @@ -48,32 +48,32 @@ inline static long getAge(const time_point_ms& time_point) static void addMs(timeval& tv, int ms) { - if (ms < 0) - { - timeval t; - t.tv_sec = -ms / 1000; - t.tv_usec = (-ms % 1000) * 1000; - timersub(&tv, &t, &tv); - return; - } - tv.tv_usec += ms*1000; - tv.tv_sec += (tv.tv_usec / 1000000); - tv.tv_usec %= 1000000; + if (ms < 0) + { + timeval t; + t.tv_sec = -ms / 1000; + t.tv_usec = (-ms % 1000) * 1000; + timersub(&tv, &t, &tv); + return; + } + tv.tv_usec += ms*1000; + tv.tv_sec += (tv.tv_usec / 1000000); + tv.tv_usec %= 1000000; } static void addUs(timeval& tv, int us) { - if (us < 0) - { - timeval t; - t.tv_sec = -us / 1000000; - t.tv_usec = (-us % 1000000); - timersub(&tv, &t, &tv); - return; - } - tv.tv_usec += us; - tv.tv_sec += (tv.tv_usec / 1000000); - tv.tv_usec %= 1000000; + if (us < 0) + { + timeval t; + t.tv_sec = -us / 1000000; + t.tv_usec = (-us % 1000000); + timersub(&tv, &t, &tv); + return; + } + tv.tv_usec += us; + tv.tv_sec += (tv.tv_usec / 1000000); + tv.tv_usec %= 1000000; } @@ -86,17 +86,17 @@ static void addUs(timeval& tv, int us) static long getTickCount() { - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - return now.tv_sec*1000 + now.tv_nsec / 1000000; + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + return now.tv_sec*1000 + now.tv_nsec / 1000000; } static long getuTickCount() { - struct timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); - return now.tv_sec*1000000 + now.tv_nsec / 1000; + struct timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + return now.tv_sec*1000000 + now.tv_nsec / 1000; } diff --git a/common/utils.h b/common/utils.h index 56d463d4..3e67c052 100644 --- a/common/utils.h +++ b/common/utils.h @@ -15,81 +15,81 @@ // trim from start static inline std::string <rim(std::string &s) { - s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun(std::isspace)))); - return s; + s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun(std::isspace)))); + return s; } // trim from end static inline std::string &rtrim(std::string &s) { - s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun(std::isspace))).base(), s.end()); - return s; + s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun(std::isspace))).base(), s.end()); + return s; } // trim from both ends static inline std::string &trim(std::string &s) { - return ltrim(rtrim(s)); + return ltrim(rtrim(s)); } static std::string getMacAddress() { - std::ifstream t("/sys/class/net/eth0/address"); - std::string str((std::istreambuf_iterator(t)), - std::istreambuf_iterator()); - return trim(str); + std::ifstream t("/sys/class/net/eth0/address"); + std::string str((std::istreambuf_iterator(t)), + std::istreambuf_iterator()); + return trim(str); } std::vector split(const std::string& str) { - std::istringstream iss(str); - std::vector splitStr; - std::copy(std::istream_iterator(iss), std::istream_iterator(), std::back_inserter >(splitStr)); - return splitStr; + std::istringstream iss(str); + std::vector splitStr; + std::copy(std::istream_iterator(iss), std::istream_iterator(), std::back_inserter >(splitStr)); + return splitStr; } static void daemonize() { - /* Our process ID and Session ID */ - pid_t pid, sid; + /* Our process ID and Session ID */ + pid_t pid, sid; - /* Fork off the parent process */ - pid = fork(); - if (pid < 0) - exit(EXIT_FAILURE); + /* Fork off the parent process */ + pid = fork(); + if (pid < 0) + exit(EXIT_FAILURE); - /* If we got a good PID, then - we can exit the parent process. */ - if (pid > 0) - exit(EXIT_SUCCESS); + /* If we got a good PID, then + we can exit the parent process. */ + if (pid > 0) + exit(EXIT_SUCCESS); - /* Change the file mode mask */ - umask(0); + /* Change the file mode mask */ + umask(0); - /* Open any logs here */ + /* Open any logs here */ - /* Create a new SID for the child process */ - sid = setsid(); - if (sid < 0) - { - /* Log the failure */ - exit(EXIT_FAILURE); - } + /* Create a new SID for the child process */ + sid = setsid(); + if (sid < 0) + { + /* Log the failure */ + exit(EXIT_FAILURE); + } - /* Change the current working directory */ - if ((chdir("/")) < 0) - { - /* Log the failure */ - exit(EXIT_FAILURE); - } + /* Change the current working directory */ + if ((chdir("/")) < 0) + { + /* Log the failure */ + exit(EXIT_FAILURE); + } - /* Close out the standard file descriptors */ - close(STDIN_FILENO); - close(STDOUT_FILENO); - close(STDERR_FILENO); + /* Close out the standard file descriptors */ + close(STDIN_FILENO); + close(STDOUT_FILENO); + close(STDERR_FILENO); } diff --git a/common/wireChunk.h b/common/wireChunk.h index ff8ea732..3dbc3bfe 100644 --- a/common/wireChunk.h +++ b/common/wireChunk.h @@ -13,42 +13,42 @@ class WireChunk : public BaseMessage { public: - WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) - { - payload = (char*)malloc(size); - } + WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size) + { + payload = (char*)malloc(size); + } - virtual ~WireChunk() - { - free(payload); - } + virtual ~WireChunk() + { + free(payload); + } - virtual void read(std::istream& stream) - { - stream.read(reinterpret_cast(×tamp.sec), sizeof(int32_t)); - stream.read(reinterpret_cast(×tamp.usec), sizeof(int32_t)); - stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); - payload = (char*)realloc(payload, payloadSize); - stream.read(payload, payloadSize); - } + virtual void read(std::istream& stream) + { + stream.read(reinterpret_cast(×tamp.sec), sizeof(int32_t)); + stream.read(reinterpret_cast(×tamp.usec), sizeof(int32_t)); + stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + payload = (char*)realloc(payload, payloadSize); + stream.read(payload, payloadSize); + } - virtual uint32_t getSize() - { - return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize; - } + virtual uint32_t getSize() + { + return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize; + } - tv timestamp; - uint32_t payloadSize; - char* payload; + tv timestamp; + uint32_t payloadSize; + char* payload; protected: - virtual void doserialize(std::ostream& stream) - { - stream.write(reinterpret_cast(×tamp.sec), sizeof(int32_t)); - stream.write(reinterpret_cast(×tamp.usec), sizeof(int32_t)); - stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); - stream.write(payload, payloadSize); - } + virtual void doserialize(std::ostream& stream) + { + stream.write(reinterpret_cast(×tamp.sec), sizeof(int32_t)); + stream.write(reinterpret_cast(×tamp.usec), sizeof(int32_t)); + stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + stream.write(payload, payloadSize); + } }; diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 476f360a..09007903 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -12,63 +12,63 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL void ControlServer::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) { // cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; - if (baseMessage.type == message_type::requestmsg) - { - RequestMsg requestMsg; - requestMsg.deserialize(baseMessage, buffer); - cout << "request: " << requestMsg.request << "\n"; - if (requestMsg.request == "time") - { + if (baseMessage.type == message_type::requestmsg) + { + RequestMsg requestMsg; + requestMsg.deserialize(baseMessage, buffer); + cout << "request: " << requestMsg.request << "\n"; + if (requestMsg.request == "time") + { // timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec); - TimeMsg timeMsg; - timeMsg.refersTo = requestMsg.id; - timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.; + TimeMsg timeMsg; + timeMsg.refersTo = requestMsg.id; + timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.; // tv diff = timeMsg.received - timeMsg.sent; // cout << "Latency: " << diff.sec << "." << diff.usec << "\n"; - connection->send(&timeMsg); - } - else if (requestMsg.request == "serverSettings") - { - serverSettings->refersTo = requestMsg.id; - connection->send(serverSettings); - } - else if (requestMsg.request == "sampleFormat") - { - sampleFormat->refersTo = requestMsg.id; - connection->send(sampleFormat); - } - else if (requestMsg.request == "headerChunk") - { - headerChunk->refersTo = requestMsg.id; - connection->send(headerChunk); - } - } + connection->send(&timeMsg); + } + else if (requestMsg.request == "serverSettings") + { + serverSettings->refersTo = requestMsg.id; + connection->send(serverSettings); + } + else if (requestMsg.request == "sampleFormat") + { + sampleFormat->refersTo = requestMsg.id; + connection->send(sampleFormat); + } + else if (requestMsg.request == "headerChunk") + { + headerChunk->refersTo = requestMsg.id; + connection->send(headerChunk); + } + } } void ControlServer::acceptor() { - tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); - for (;;) - { - socket_ptr sock(new tcp::socket(io_service_)); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - a.accept(*sock); - cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n"; - ServerConnection* session = new ServerConnection(this, sock); - sessions.insert(shared_ptr(session)); - session->start(); - } + tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service_)); + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + a.accept(*sock); + setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n"; + ServerConnection* session = new ServerConnection(this, sock); + sessions.insert(shared_ptr(session)); + session->start(); + } } void ControlServer::start() { - acceptThread = new thread(&ControlServer::acceptor, this); + acceptThread = new thread(&ControlServer::acceptor, this); } @@ -80,23 +80,23 @@ void ControlServer::stop() void ControlServer::setHeader(HeaderMessage* header) { - if (header) - headerChunk = header; + if (header) + headerChunk = header; } void ControlServer::setFormat(SampleFormat* format) { - if (format) - sampleFormat = format; + if (format) + sampleFormat = format; } void ControlServer::setServerSettings(ServerSettings* settings) { - if (settings) - serverSettings = settings; + if (settings) + serverSettings = settings; } diff --git a/server/controlServer.h b/server/controlServer.h index c5c5e4ec..36f487ea 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -25,24 +25,24 @@ using namespace std; class ControlServer : public MessageReceiver { public: - ControlServer(unsigned short port); + ControlServer(unsigned short port); - void start(); - void stop(); - virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); - void setHeader(HeaderMessage* header); - void setFormat(SampleFormat* format); - void setServerSettings(ServerSettings* settings); + void start(); + void stop(); + virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); + void setHeader(HeaderMessage* header); + void setFormat(SampleFormat* format); + void setServerSettings(ServerSettings* settings); private: - void acceptor(); - set> sessions; - boost::asio::io_service io_service_; - unsigned short port_; - HeaderMessage* headerChunk; - SampleFormat* sampleFormat; - ServerSettings* serverSettings; - thread* acceptThread; + void acceptor(); + set> sessions; + boost::asio::io_service io_service_; + unsigned short port_; + HeaderMessage* headerChunk; + SampleFormat* sampleFormat; + ServerSettings* serverSettings; + thread* acceptThread; }; diff --git a/server/encoder.h b/server/encoder.h index f6c42562..90de75a4 100644 --- a/server/encoder.h +++ b/server/encoder.h @@ -8,18 +8,18 @@ class Encoder { public: - Encoder(const SampleFormat& format) : sampleFormat(format) - { - } + Encoder(const SampleFormat& format) : sampleFormat(format) + { + } - virtual double encode(PcmChunk* chunk) = 0; - virtual HeaderMessage* getHeader() - { - return NULL; - } + virtual double encode(PcmChunk* chunk) = 0; + virtual HeaderMessage* getHeader() + { + return NULL; + } protected: - SampleFormat sampleFormat; + SampleFormat sampleFormat; }; diff --git a/server/oggEncoder.cpp b/server/oggEncoder.cpp index c5c9bbab..501cab95 100644 --- a/server/oggEncoder.cpp +++ b/server/oggEncoder.cpp @@ -7,183 +7,183 @@ using namespace std; OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format), headerChunk(NULL) { - init(); - lastGranulepos = -1; + init(); + lastGranulepos = -1; } HeaderMessage* OggEncoder::getHeader() { - return headerChunk; + return headerChunk; } double OggEncoder::encode(PcmChunk* chunk) { - double res = 0; - if (tv_sec == 0) - { - tv_sec = chunk->timestamp.sec; - tv_usec = chunk->timestamp.usec; - } + double res = 0; + if (tv_sec == 0) + { + tv_sec = chunk->timestamp.sec; + tv_usec = chunk->timestamp.usec; + } //cout << "-> pcm: " << wireChunk->length << endl; - int bytes = chunk->payloadSize / 4; - float **buffer=vorbis_analysis_buffer(&vd, bytes); + int bytes = chunk->payloadSize / 4; + float **buffer=vorbis_analysis_buffer(&vd, bytes); - /* uninterleave samples */ - for(int i=0; ipayload[i*4+1]<<8)| - (0x00ff&(int)chunk->payload[i*4]))/32768.f; - buffer[1][i]=((chunk->payload[i*4+3]<<8)| - (0x00ff&(int)chunk->payload[i*4+2]))/32768.f; - } + /* uninterleave samples */ + for(int i=0; ipayload[i*4+1]<<8)| + (0x00ff&(int)chunk->payload[i*4]))/32768.f; + buffer[1][i]=((chunk->payload[i*4+3]<<8)| + (0x00ff&(int)chunk->payload[i*4+2]))/32768.f; + } - /* tell the library how much we actually submitted */ - vorbis_analysis_wrote(&vd, bytes); + /* tell the library how much we actually submitted */ + vorbis_analysis_wrote(&vd, bytes); - /* vorbis does some data preanalysis, then divvies up blocks for - more involved (potentially parallel) processing. Get a single - block for encoding now */ - size_t pos = 0; - while(vorbis_analysis_blockout(&vd,&vb)==1) - { - /* analysis, assume we want to use bitrate management */ - vorbis_analysis(&vb,NULL); - vorbis_bitrate_addblock(&vb); + /* vorbis does some data preanalysis, then divvies up blocks for + more involved (potentially parallel) processing. Get a single + block for encoding now */ + size_t pos = 0; + while(vorbis_analysis_blockout(&vd,&vb)==1) + { + /* analysis, assume we want to use bitrate management */ + vorbis_analysis(&vb,NULL); + vorbis_bitrate_addblock(&vb); - while(vorbis_bitrate_flushpacket(&vd,&op)) - { - /* weld the packet into the bitstream */ - ogg_stream_packetin(&os,&op); + while(vorbis_bitrate_flushpacket(&vd,&op)) + { + /* weld the packet into the bitstream */ + ogg_stream_packetin(&os,&op); - /* write out pages (if any) */ - while(true) - { + /* write out pages (if any) */ + while(true) + { // int result = ogg_stream_pageout(&os,&og); - int result = ogg_stream_flush(&os,&og); - if (result == 0) - break; - res = true; + int result = ogg_stream_flush(&os,&og); + if (result == 0) + break; + res = true; - size_t nextLen = pos + og.header_len + og.body_len; - if (chunk->payloadSize < nextLen) - chunk->payload = (char*)realloc(chunk->payload, nextLen); + size_t nextLen = pos + og.header_len + og.body_len; + if (chunk->payloadSize < nextLen) + chunk->payload = (char*)realloc(chunk->payload, nextLen); - memcpy(chunk->payload + pos, og.header, og.header_len); - pos += og.header_len; - memcpy(chunk->payload + pos, og.body, og.body_len); - pos += og.body_len; - } - } - } - if (res) - { - if (lastGranulepos == -1) - res = os.granulepos; - else - res = os.granulepos - lastGranulepos; - res /= 48.; - lastGranulepos = os.granulepos; - chunk->payload = (char*)realloc(chunk->payload, pos); - chunk->payloadSize = pos; - tv_sec = 0; - tv_usec = 0; - } - return res; + memcpy(chunk->payload + pos, og.header, og.header_len); + pos += og.header_len; + memcpy(chunk->payload + pos, og.body, og.body_len); + pos += og.body_len; + } + } + } + if (res) + { + if (lastGranulepos == -1) + res = os.granulepos; + else + res = os.granulepos - lastGranulepos; + res /= 48.; + lastGranulepos = os.granulepos; + chunk->payload = (char*)realloc(chunk->payload, pos); + chunk->payloadSize = pos; + tv_sec = 0; + tv_usec = 0; + } + return res; } void OggEncoder::init() { - /********** Encode setup ************/ - tv_sec = 0; - tv_usec = 0; + /********** Encode setup ************/ + tv_sec = 0; + tv_usec = 0; - vorbis_info_init(&vi); + vorbis_info_init(&vi); - /* choose an encoding mode. A few possibilities commented out, one - actually used: */ + /* choose an encoding mode. A few possibilities commented out, one + actually used: */ - /********************************************************************* - Encoding using a VBR quality mode. The usable range is -.1 - (lowest quality, smallest file) to 1. (highest quality, largest file). - Example quality mode .4: 44kHz stereo coupled, roughly 128kbps VBR + /********************************************************************* + Encoding using a VBR quality mode. The usable range is -.1 + (lowest quality, smallest file) to 1. (highest quality, largest file). + Example quality mode .4: 44kHz stereo coupled, roughly 128kbps VBR - ret = vorbis_encode_init_vbr(&vi,2,44100,.4); + ret = vorbis_encode_init_vbr(&vi,2,44100,.4); - --------------------------------------------------------------------- + --------------------------------------------------------------------- - Encoding using an average bitrate mode (ABR). - example: 44kHz stereo coupled, average 128kbps VBR + Encoding using an average bitrate mode (ABR). + example: 44kHz stereo coupled, average 128kbps VBR - ret = vorbis_encode_init(&vi,2,44100,-1,128000,-1); + ret = vorbis_encode_init(&vi,2,44100,-1,128000,-1); - --------------------------------------------------------------------- + --------------------------------------------------------------------- - Encode using a quality mode, but select that quality mode by asking for - an approximate bitrate. This is not ABR, it is true VBR, but selected - using the bitrate interface, and then turning bitrate management off: + Encode using a quality mode, but select that quality mode by asking for + an approximate bitrate. This is not ABR, it is true VBR, but selected + using the bitrate interface, and then turning bitrate management off: - ret = ( vorbis_encode_setup_managed(&vi,2,44100,-1,128000,-1) || - vorbis_encode_ctl(&vi,OV_ECTL_RATEMANAGE2_SET,NULL) || - vorbis_encode_setup_init(&vi)); + ret = ( vorbis_encode_setup_managed(&vi,2,44100,-1,128000,-1) || + vorbis_encode_ctl(&vi,OV_ECTL_RATEMANAGE2_SET,NULL) || + vorbis_encode_setup_init(&vi)); - *********************************************************************/ + *********************************************************************/ - ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 0.7); + ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 0.7); - /* do not continue if setup failed; this can happen if we ask for a - mode that libVorbis does not support (eg, too low a bitrate, etc, - will return 'OV_EIMPL') */ + /* do not continue if setup failed; this can happen if we ask for a + mode that libVorbis does not support (eg, too low a bitrate, etc, + will return 'OV_EIMPL') */ - if(ret)exit(1); + if(ret)exit(1); - /* add a comment */ - vorbis_comment_init(&vc); - vorbis_comment_add_tag(&vc,"ENCODER","snapstream"); + /* add a comment */ + vorbis_comment_init(&vc); + vorbis_comment_add_tag(&vc,"ENCODER","snapstream"); - /* set up the analysis state and auxiliary encoding storage */ - vorbis_analysis_init(&vd,&vi); - vorbis_block_init(&vd,&vb); + /* set up the analysis state and auxiliary encoding storage */ + vorbis_analysis_init(&vd,&vi); + vorbis_block_init(&vd,&vb); - /* set up our packet->stream encoder */ - /* pick a random serial number; that way we can more likely build - chained streams just by concatenation */ - srand(time(NULL)); - ogg_stream_init(&os,rand()); + /* set up our packet->stream encoder */ + /* pick a random serial number; that way we can more likely build + chained streams just by concatenation */ + srand(time(NULL)); + ogg_stream_init(&os,rand()); - /* Vorbis streams begin with three headers; the initial header (with - most of the codec setup parameters) which is mandated by the Ogg - bitstream spec. The second header holds any comment fields. The - third header holds the bitstream codebook. We merely need to - make the headers, then pass them to libvorbis one at a time; - libvorbis handles the additional Ogg bitstream constraints */ + /* Vorbis streams begin with three headers; the initial header (with + most of the codec setup parameters) which is mandated by the Ogg + bitstream spec. The second header holds any comment fields. The + third header holds the bitstream codebook. We merely need to + make the headers, then pass them to libvorbis one at a time; + libvorbis handles the additional Ogg bitstream constraints */ - vorbis_analysis_headerout(&vd,&vc,&header,&header_comm,&header_code); - ogg_stream_packetin(&os,&header); - ogg_stream_packetin(&os,&header_comm); - ogg_stream_packetin(&os,&header_code); + vorbis_analysis_headerout(&vd,&vc,&header,&header_comm,&header_code); + ogg_stream_packetin(&os,&header); + ogg_stream_packetin(&os,&header_comm); + ogg_stream_packetin(&os,&header_code); - /* This ensures the actual - * audio data will start on a new page, as per spec - */ + /* This ensures the actual + * audio data will start on a new page, as per spec + */ // while(!eos){ - size_t pos(0); - headerChunk = new HeaderMessage(); - while (true) - { - int result=ogg_stream_flush(&os,&og); - if (result == 0) - break; - headerChunk->payloadSize += og.header_len + og.body_len; - headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->payloadSize); - cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n"; - memcpy(headerChunk->payload + pos, og.header, og.header_len); - pos += og.header_len; - memcpy(headerChunk->payload + pos, og.body, og.body_len); - pos += og.body_len; - } + size_t pos(0); + headerChunk = new HeaderMessage(); + while (true) + { + int result=ogg_stream_flush(&os,&og); + if (result == 0) + break; + headerChunk->payloadSize += og.header_len + og.body_len; + headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->payloadSize); + cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n"; + memcpy(headerChunk->payload + pos, og.header, og.header_len); + pos += og.header_len; + memcpy(headerChunk->payload + pos, og.body, og.body_len); + pos += og.body_len; + } // fwrite(og.header,1,og.header_len,stdout); diff --git a/server/oggEncoder.h b/server/oggEncoder.h index 09cda2ce..53c8f2c9 100644 --- a/server/oggEncoder.h +++ b/server/oggEncoder.h @@ -7,37 +7,37 @@ class OggEncoder : public Encoder { public: - OggEncoder(const SampleFormat& format); - virtual double encode(PcmChunk* chunk); - virtual HeaderMessage* getHeader(); + OggEncoder(const SampleFormat& format); + virtual double encode(PcmChunk* chunk); + virtual HeaderMessage* getHeader(); private: - void init(); + void init(); - ogg_stream_state os; /* take physical pages, weld into a logical + ogg_stream_state os; /* take physical pages, weld into a logical stream of packets */ - ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */ - ogg_packet op; /* one raw packet of data for decode */ + ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */ + ogg_packet op; /* one raw packet of data for decode */ - vorbis_info vi; /* struct that stores all the static vorbis bitstream + vorbis_info vi; /* struct that stores all the static vorbis bitstream settings */ - vorbis_comment vc; /* struct that stores all the user comments */ + vorbis_comment vc; /* struct that stores all the user comments */ - vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ - vorbis_block vb; /* local working space for packet->PCM decode */ + vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ + vorbis_block vb; /* local working space for packet->PCM decode */ - ogg_packet header; - ogg_packet header_comm; - ogg_packet header_code; + ogg_packet header; + ogg_packet header_comm; + ogg_packet header_code; - ogg_int64_t lastGranulepos; - HeaderMessage* headerChunk; + ogg_int64_t lastGranulepos; + HeaderMessage* headerChunk; - int eos=0,ret; - int i, founddata; + int eos=0,ret; + int i, founddata; - int32_t tv_sec; - int32_t tv_usec; + int32_t tv_sec; + int32_t tv_usec; }; diff --git a/server/pcmEncoder.cpp b/server/pcmEncoder.cpp index a9921850..167eb00f 100644 --- a/server/pcmEncoder.cpp +++ b/server/pcmEncoder.cpp @@ -7,11 +7,11 @@ PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format) double PcmEncoder::encode(PcmChunk* chunk) { - /* WireChunk* wireChunk = chunk->wireChunk; - for (size_t n=0; nlength; ++n) - wireChunk->payload[n] *= 1; - */ - return chunk->getDuration(); + /* WireChunk* wireChunk = chunk->wireChunk; + for (size_t n=0; nlength; ++n) + wireChunk->payload[n] *= 1; + */ + return chunk->getDuration(); } diff --git a/server/pcmEncoder.h b/server/pcmEncoder.h index a03c1faf..55ed426b 100644 --- a/server/pcmEncoder.h +++ b/server/pcmEncoder.h @@ -6,8 +6,8 @@ class PcmEncoder : public Encoder { public: - PcmEncoder(const SampleFormat& format); - virtual double encode(PcmChunk* chunk); + PcmEncoder(const SampleFormat& format); + virtual double encode(PcmChunk* chunk); }; diff --git a/server/serverConnection.cpp b/server/serverConnection.cpp index f05366ed..94e775b9 100644 --- a/server/serverConnection.cpp +++ b/server/serverConnection.cpp @@ -11,25 +11,25 @@ using namespace std; ServerConnection::ServerConnection(MessageReceiver* _receiver, std::shared_ptr _socket) : SocketConnection(_receiver) { - socket = _socket; + socket = _socket; } void ServerConnection::worker() { - active_ = true; - try - { - while (active_) - { - getNextMessage(); - } - } - catch (const std::exception& e) - { - cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; - } - active_ = false; + active_ = true; + try + { + while (active_) + { + getNextMessage(); + } + } + catch (const std::exception& e) + { + cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; + } + active_ = false; } diff --git a/server/serverConnection.h b/server/serverConnection.h index 7ff1b577..b1b181b4 100644 --- a/server/serverConnection.h +++ b/server/serverConnection.h @@ -12,10 +12,10 @@ using boost::asio::ip::tcp; class ServerConnection : public SocketConnection { public: - ServerConnection(MessageReceiver* _receiver, std::shared_ptr _socket); + ServerConnection(MessageReceiver* _receiver, std::shared_ptr _socket); protected: - virtual void worker(); + virtual void worker(); }; diff --git a/server/snapServer.cpp b/server/snapServer.cpp index 7956b8c2..186228b9 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -22,133 +22,133 @@ using namespace std; int main(int argc, char* argv[]) { - try - { - string sampleFormat; + try + { + string sampleFormat; - size_t port; - string fifoName; - string codec; - bool runAsDaemon; + size_t port; + string fifoName; + string codec; + bool runAsDaemon; - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("port,p", po::value(&port)->default_value(98765), "port to listen on") - ("sampleformat,s", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") - ("codec,c", po::value(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") - ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file") - ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") - ; + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port,p", po::value(&port)->default_value(98765), "port to listen on") + ("sampleformat,s", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") + ("codec,c", po::value(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") + ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file") + ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") + ; - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); - if (vm.count("help")) - { - cout << desc << "\n"; - return 1; - } + if (vm.count("help")) + { + cout << desc << "\n"; + return 1; + } - if (runAsDaemon) - { - daemonize(); - syslog (LOG_NOTICE, "First daemon started."); - } + if (runAsDaemon) + { + daemonize(); + syslog (LOG_NOTICE, "First daemon started."); + } - openlog ("firstdaemon", LOG_PID, LOG_DAEMON); + openlog ("firstdaemon", LOG_PID, LOG_DAEMON); - using namespace std; // For atoi. + using namespace std; // For atoi. - timeval tvChunk; - gettimeofday(&tvChunk, NULL); - long nextTick = getTickCount(); + timeval tvChunk; + gettimeofday(&tvChunk, NULL); + long nextTick = getTickCount(); - mkfifo(fifoName.c_str(), 0777); - SampleFormat format(sampleFormat); - size_t duration = 50; + mkfifo(fifoName.c_str(), 0777); + SampleFormat format(sampleFormat); + size_t duration = 50; //size_t chunkSize = duration*format.rate*format.frameSize / 1000; - std::auto_ptr encoder; - if (codec == "ogg") - encoder.reset(new OggEncoder(sampleFormat)); - else if (codec == "pcm") - encoder.reset(new PcmEncoder(sampleFormat)); - else - { - cout << "unknown codec: " << codec << "\n"; - return 1; - } + std::auto_ptr encoder; + if (codec == "ogg") + encoder.reset(new OggEncoder(sampleFormat)); + else if (codec == "pcm") + encoder.reset(new PcmEncoder(sampleFormat)); + else + { + cout << "unknown codec: " << codec << "\n"; + return 1; + } - std::auto_ptr serverSettings(new ServerSettings(port + 1)); - ControlServer* controlServer = new ControlServer(port); - controlServer->setServerSettings(serverSettings.get()); - controlServer->setFormat(&format); - controlServer->setHeader(encoder->getHeader()); - controlServer->start(); + std::auto_ptr serverSettings(new ServerSettings(port + 1)); + ControlServer* controlServer = new ControlServer(port); + controlServer->setServerSettings(serverSettings.get()); + controlServer->setFormat(&format); + controlServer->setHeader(encoder->getHeader()); + controlServer->start(); - StreamServer* server = new StreamServer(port + 1); - server->start(); + StreamServer* server = new StreamServer(port + 1); + server->start(); - while (!g_terminated) - { - int fd = open(fifoName.c_str(), O_RDONLY); - try - { - shared_ptr chunk;//(new WireChunk()); - while (true)//cin.good()) - { - chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); - int toRead = chunk->payloadSize; - int len = 0; - do - { - int count = read(fd, chunk->payload + len, toRead - len); - if (count <= 0) - throw ServerException("count = " + boost::lexical_cast(count)); + while (!g_terminated) + { + int fd = open(fifoName.c_str(), O_RDONLY); + try + { + shared_ptr chunk;//(new WireChunk()); + while (true)//cin.good()) + { + chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); + int toRead = chunk->payloadSize; + int len = 0; + do + { + int count = read(fd, chunk->payload + len, toRead - len); + if (count <= 0) + throw ServerException("count = " + boost::lexical_cast(count)); - len += count; - } - while (len < toRead); + len += count; + } + while (len < toRead); - chunk->timestamp.sec = tvChunk.tv_sec; - chunk->timestamp.usec = tvChunk.tv_usec; - double chunkDuration = encoder->encode(chunk.get()); - if (chunkDuration > 0) - server->send(chunk); + chunk->timestamp.sec = tvChunk.tv_sec; + chunk->timestamp.usec = tvChunk.tv_usec; + double chunkDuration = encoder->encode(chunk.get()); + if (chunkDuration > 0) + server->send(chunk); //cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n"; // addUs(tvChunk, 1000*chunk->getDuration()); - addUs(tvChunk, chunkDuration * 1000); - nextTick += duration; - long currentTick = getTickCount(); - if (nextTick > currentTick) - { - usleep((nextTick - currentTick) * 1000); - } - else - { - gettimeofday(&tvChunk, NULL); - nextTick = getTickCount(); - } - } - } - catch(const std::exception& e) - { - std::cerr << "Exception: " << e.what() << std::endl; - } - close(fd); - } + addUs(tvChunk, chunkDuration * 1000); + nextTick += duration; + long currentTick = getTickCount(); + if (nextTick > currentTick) + { + usleep((nextTick - currentTick) * 1000); + } + else + { + gettimeofday(&tvChunk, NULL); + nextTick = getTickCount(); + } + } + } + catch(const std::exception& e) + { + std::cerr << "Exception: " << e.what() << std::endl; + } + close(fd); + } - server->stop(); - } - catch (const std::exception& e) - { - std::cerr << "Exception: " << e.what() << std::endl; - } + server->stop(); + } + catch (const std::exception& e) + { + std::cerr << "Exception: " << e.what() << std::endl; + } - syslog (LOG_NOTICE, "First daemon terminated."); - closelog(); + syslog (LOG_NOTICE, "First daemon terminated."); + closelog(); } diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 96dd27b9..2ca74cbd 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -9,34 +9,34 @@ StreamSession::StreamSession(std::shared_ptr _socket) : ServerConne void StreamSession::worker() { - active_ = true; - try - { - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - for (;;) - { - shared_ptr message(messages.pop()); - ServerConnection::send(message.get()); - } - } - catch (std::exception& e) - { - std::cerr << "Exception in thread: " << e.what() << "\n"; - active_ = false; - } - active_ = false; + active_ = true; + try + { + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); + for (;;) + { + shared_ptr message(messages.pop()); + ServerConnection::send(message.get()); + } + } + catch (std::exception& e) + { + std::cerr << "Exception in thread: " << e.what() << "\n"; + active_ = false; + } + active_ = false; } void StreamSession::send(shared_ptr message) { - if (!message) - return; + if (!message) + return; - while (messages.size() > 100)// chunk->getDuration() > 10000) - messages.pop(); - messages.push(message); + while (messages.size() > 100)// chunk->getDuration() > 10000) + messages.pop(); + messages.push(message); } @@ -50,45 +50,45 @@ StreamServer::StreamServer(unsigned short port) : port_(port), headerChunk(NULL) void StreamServer::acceptor() { - tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); - for (;;) - { - socket_ptr sock(new tcp::socket(io_service_)); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - a.accept(*sock); - cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n"; - StreamSession* session = new StreamSession(sock); - sessions.insert(shared_ptr(session)); - session->start(); - } + tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service_)); + a.accept(*sock); + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + StreamSession* session = new StreamSession(sock); + sessions.insert(shared_ptr(session)); + session->start(); + } } void StreamServer::send(shared_ptr message) { - for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) - { - if (!(*it)->active()) - { - cout << "Session inactive. Removing\n"; - sessions.erase(it++); - } - else - ++it; - } + for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) + { + if (!(*it)->active()) + { + cout << "Session inactive. Removing\n"; + sessions.erase(it++); + } + else + ++it; + } - for (auto s : sessions) - s->send(message); + for (auto s : sessions) + s->send(message); } void StreamServer::start() { - acceptThread = new thread(&StreamServer::acceptor, this); + acceptThread = new thread(&StreamServer::acceptor, this); } diff --git a/server/streamServer.h b/server/streamServer.h index 1c148958..4c378aa8 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -25,13 +25,13 @@ using namespace std; class StreamSession : public ServerConnection { public: - StreamSession(socket_ptr sock); - void send(shared_ptr message); + StreamSession(socket_ptr sock); + void send(shared_ptr message); protected: - virtual void worker(); - thread* senderThread; - Queue> messages; + virtual void worker(); + thread* senderThread; + Queue> messages; }; @@ -39,41 +39,41 @@ protected: class StreamServer { public: - StreamServer(unsigned short port); - void send(shared_ptr message); + StreamServer(unsigned short port); + void send(shared_ptr message); - void start(); - void stop(); + void start(); + void stop(); private: - void acceptor(); - set> sessions; - boost::asio::io_service io_service_; - unsigned short port_; - shared_ptr headerChunk; - shared_ptr sampleFormat; - thread* acceptThread; + void acceptor(); + set> sessions; + boost::asio::io_service io_service_; + unsigned short port_; + shared_ptr headerChunk; + shared_ptr sampleFormat; + thread* acceptThread; }; class ServerException : public std::exception { public: - ServerException(const std::string& what) : what_(what) - { - } + ServerException(const std::string& what) : what_(what) + { + } - virtual ~ServerException() throw() - { - } + virtual ~ServerException() throw() + { + } - virtual const char* what() const throw() - { - return what_.c_str(); - } + virtual const char* what() const throw() + { + return what_.c_str(); + } private: - std::string what_; + std::string what_; };