diff --git a/bin/startClient.sh b/bin/startClient.sh deleted file mode 100755 index 8b94e036..00000000 --- a/bin/startClient.sh +++ /dev/null @@ -1 +0,0 @@ -./client 290 8 1 \ No newline at end of file diff --git a/client/stream.cpp b/client/stream.cpp index 40090217..91f08fc2 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -8,10 +8,10 @@ using namespace std; Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0) { - pBuffer = new DoubleBuffer(500); - pShortBuffer = new DoubleBuffer(100); - pMiniBuffer = new DoubleBuffer(20); - pCardBuffer = new DoubleBuffer(50); + pBuffer = new DoubleBuffer(500); + pShortBuffer = new DoubleBuffer(100); + pMiniBuffer = new DoubleBuffer(20); + pCardBuffer = new DoubleBuffer(50); bufferMs = 500; } @@ -218,7 +218,7 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n" - int age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime; + long age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime; // if (pCardBuffer->full()) diff --git a/client/stream.h b/client/stream.h index 12eecd60..0a44b781 100644 --- a/client/stream.h +++ b/client/stream.h @@ -36,13 +36,13 @@ private: SampleFormat format_; long lastTick; - int sleep; + long sleep; Queue> chunks; - DoubleBuffer* pCardBuffer; - DoubleBuffer* pMiniBuffer; - DoubleBuffer* pBuffer; - DoubleBuffer* pShortBuffer; + DoubleBuffer* pCardBuffer; + DoubleBuffer* pMiniBuffer; + DoubleBuffer* pBuffer; + DoubleBuffer* pShortBuffer; std::shared_ptr chunk; int median; diff --git a/common/socketConnection.cpp b/common/socketConnection.cpp index 41554d48..eab18706 100644 --- a/common/socketConnection.cpp +++ b/common/socketConnection.cpp @@ -93,7 +93,7 @@ void ClientConnection::worker() try { { - std::unique_lock mlock(mutex_); +// std::unique_lock mlock(mutex_); tcp::resolver resolver(io_service); tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); iterator = resolver.resolve(query); diff --git a/common/socketConnection.h b/common/socketConnection.h index 757158ac..dd3efa49 100644 --- a/common/socketConnection.h +++ b/common/socketConnection.h @@ -30,6 +30,10 @@ public: virtual void start(); virtual void stop(); virtual void send(BaseMessage* _message); + virtual bool isActive() + { + return active_; + } protected: virtual void worker() = 0; diff --git a/decoder_example.c b/decoder_example.c deleted file mode 100644 index 68c40d13..00000000 --- a/decoder_example.c +++ /dev/null @@ -1,314 +0,0 @@ -/******************************************************************** - * * - * THIS FILE IS PART OF THE OggVorbis SOFTWARE CODEC SOURCE CODE. * - * USE, DISTRIBUTION AND REPRODUCTION OF THIS LIBRARY SOURCE IS * - * GOVERNED BY A BSD-STYLE SOURCE LICENSE INCLUDED WITH THIS SOURCE * - * IN 'COPYING'. PLEASE READ THESE TERMS BEFORE DISTRIBUTING. * - * * - * THE OggVorbis SOURCE CODE IS (C) COPYRIGHT 1994-2009 * - * by the Xiph.Org Foundation http://www.xiph.org/ * - * * - ******************************************************************** - - function: simple example decoder - last mod: $Id$ - - ********************************************************************/ - -/* Takes a vorbis bitstream from stdin and writes raw stereo PCM to - stdout. Decodes simple and chained OggVorbis files from beginning - to end. Vorbisfile.a is somewhat more complex than the code below. */ - -/* Note that this is POSIX, not ANSI code */ - -#include -#include -#include -#include - -#ifdef _WIN32 /* We need the following two to set stdin/stdout to binary */ -#include -#include -#endif - -#if defined(__MACOS__) && defined(__MWERKS__) -#include /* CodeWarrior's Mac "command-line" support */ -#endif - -ogg_int16_t convbuffer[4096]; /* take 8k out of the data segment, not the stack */ -int convsize=4096; - -extern void _VDBG_dump(void); - -int main(){ - ogg_sync_state oy; /* sync and verify incoming physical bitstream */ - ogg_stream_state os; /* take physical pages, weld into a logical - stream of packets */ - ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */ - ogg_packet op; /* one raw packet of data for decode */ - - vorbis_info vi; /* struct that stores all the static vorbis bitstream - settings */ - vorbis_comment vc; /* struct that stores all the bitstream user comments */ - vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ - vorbis_block vb; /* local working space for packet->PCM decode */ - - char *buffer; - int bytes; - -#ifdef _WIN32 /* We need to set stdin/stdout to binary mode. Damn windows. */ - /* Beware the evil ifdef. We avoid these where we can, but this one we - cannot. Don't add any more, you'll probably go to hell if you do. */ - _setmode( _fileno( stdin ), _O_BINARY ); - _setmode( _fileno( stdout ), _O_BINARY ); -#endif - -#if defined(macintosh) && defined(__MWERKS__) - { - int argc; - char **argv; - argc=ccommand(&argv); /* get a "command line" from the Mac user */ - /* this also lets the user set stdin and stdout */ - } -#endif - - /********** Decode setup ************/ - - ogg_sync_init(&oy); /* Now we can read pages */ - - while(1){ /* we repeat if the bitstream is chained */ - int eos=0; - int i; - - /* grab some data at the head of the stream. We want the first page - (which is guaranteed to be small and only contain the Vorbis - stream initial header) We need the first page to get the stream - serialno. */ - - /* submit a 4k block to libvorbis' Ogg layer */ - buffer=ogg_sync_buffer(&oy,4096); - bytes=fread(buffer,1,4096,stdin); - ogg_sync_wrote(&oy,bytes); - - /* Get the first page. */ - if(ogg_sync_pageout(&oy,&og)!=1){ - /* have we simply run out of data? If so, we're done. */ - if(bytes<4096)break; - - /* error case. Must not be Vorbis data */ - fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n"); - exit(1); - } - - /* Get the serial number and set up the rest of decode. */ - /* serialno first; use it to set up a logical stream */ - ogg_stream_init(&os,ogg_page_serialno(&og)); - - /* extract the initial header from the first page and verify that the - Ogg bitstream is in fact Vorbis data */ - - /* I handle the initial header first instead of just having the code - read all three Vorbis headers at once because reading the initial - header is an easy way to identify a Vorbis bitstream and it's - useful to see that functionality seperated out. */ - - vorbis_info_init(&vi); - vorbis_comment_init(&vc); - if(ogg_stream_pagein(&os,&og)<0){ - /* error; stream version mismatch perhaps */ - fprintf(stderr,"Error reading first page of Ogg bitstream data.\n"); - exit(1); - } - - if(ogg_stream_packetout(&os,&op)!=1){ - /* no page? must not be vorbis */ - fprintf(stderr,"Error reading initial header packet.\n"); - exit(1); - } - - if(vorbis_synthesis_headerin(&vi,&vc,&op)<0){ - /* error case; not a vorbis header */ - fprintf(stderr,"This Ogg bitstream does not contain Vorbis " - "audio data.\n"); - exit(1); - } - - /* At this point, we're sure we're Vorbis. We've set up the logical - (Ogg) bitstream decoder. Get the comment and codebook headers and - set up the Vorbis decoder */ - - /* The next two packets in order are the comment and codebook headers. - They're likely large and may span multiple pages. Thus we read - and submit data until we get our two packets, watching that no - pages are missing. If a page is missing, error out; losing a - header page is the only place where missing data is fatal. */ - - i=0; - while(i<2){ - while(i<2){ - int result=ogg_sync_pageout(&oy,&og); - if(result==0)break; /* Need more data */ - /* Don't complain about missing or corrupt data yet. We'll - catch it at the packet output phase */ - if(result==1){ - ogg_stream_pagein(&os,&og); /* we can ignore any errors here - as they'll also become apparent - at packetout */ - while(i<2){ - result=ogg_stream_packetout(&os,&op); - if(result==0)break; - if(result<0){ - /* Uh oh; data at some point was corrupted or missing! - We can't tolerate that in a header. Die. */ - fprintf(stderr,"Corrupt secondary header. Exiting.\n"); - exit(1); - } - result=vorbis_synthesis_headerin(&vi,&vc,&op); - if(result<0){ - fprintf(stderr,"Corrupt secondary header. Exiting.\n"); - exit(1); - } - i++; - } - } - } - /* no harm in not checking before adding more */ - buffer=ogg_sync_buffer(&oy,4096); - bytes=fread(buffer,1,4096,stdin); - if(bytes==0 && i<2){ - fprintf(stderr,"End of file before finding all Vorbis headers!\n"); - exit(1); - } - ogg_sync_wrote(&oy,bytes); - } - - /* Throw the comments plus a few lines about the bitstream we're - decoding */ - { - char **ptr=vc.user_comments; - while(*ptr){ - fprintf(stderr,"%s\n",*ptr); - ++ptr; - } - fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate); - fprintf(stderr,"Encoded by: %s\n\n",vc.vendor); - } - - convsize=4096/vi.channels; - - /* OK, got and parsed all three headers. Initialize the Vorbis - packet->PCM decoder. */ - if(vorbis_synthesis_init(&vd,&vi)==0){ /* central decode state */ - vorbis_block_init(&vd,&vb); /* local state for most of the decode - so multiple block decodes can - proceed in parallel. We could init - multiple vorbis_block structures - for vd here */ - - /* The rest is just a straight decode loop until end of stream */ - while(!eos){ - while(!eos){ - int result=ogg_sync_pageout(&oy,&og); - if(result==0)break; /* need more data */ - if(result<0){ /* missing or corrupt data at this page position */ - fprintf(stderr,"Corrupt or missing data in bitstream; " - "continuing...\n"); - }else{ - ogg_stream_pagein(&os,&og); /* can safely ignore errors at - this point */ - while(1){ - result=ogg_stream_packetout(&os,&op); - - if(result==0)break; /* need more data */ - if(result<0){ /* missing or corrupt data at this page position */ - /* no reason to complain; already complained above */ - }else{ - /* we have a packet. Decode it */ - float **pcm; - int samples; - - if(vorbis_synthesis(&vb,&op)==0) /* test for success! */ - vorbis_synthesis_blockin(&vd,&vb); - /* - - **pcm is a multichannel float vector. In stereo, for - example, pcm[0] is left, and pcm[1] is right. samples is - the size of each channel. Convert the float values - (-1.<=range<=1.) to whatever PCM format and write it out */ - - while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0){ - int j; - int clipflag=0; - int bout=(samples32767){ - val=32767; - clipflag=1; - } - if(val<-32768){ - val=-32768; - clipflag=1; - } - *ptr=val; - ptr+=vi.channels; - } - } - - if(clipflag) - fprintf(stderr,"Clipping in frame %ld\n",(long)(vd.sequence)); - - - fwrite(convbuffer,2*vi.channels,bout,stdout); - - vorbis_synthesis_read(&vd,bout); /* tell libvorbis how - many samples we - actually consumed */ - } - } - } - if(ogg_page_eos(&og))eos=1; - } - } - if(!eos){ - buffer=ogg_sync_buffer(&oy,4096); - bytes=fread(buffer,1,4096,stdin); - ogg_sync_wrote(&oy,bytes); - if(bytes==0)eos=1; - } - } - - /* ogg_page and ogg_packet structs always point to storage in - libvorbis. They're never freed or manipulated directly */ - - vorbis_block_clear(&vb); - vorbis_dsp_clear(&vd); - }else{ - fprintf(stderr,"Error: Corrupt header during playback initialization.\n"); - } - - /* clean up this logical bitstream; before exit we see if we're - followed by another [chained] */ - - ogg_stream_clear(&os); - vorbis_comment_clear(&vc); - vorbis_info_clear(&vi); /* must be called last */ - } - - /* OK, clean up the framer */ - ogg_sync_clear(&oy); - - fprintf(stderr,"Done.\n"); - return(0); -} diff --git a/encoder_example.cpp b/encoder_example.cpp deleted file mode 100644 index d8d92bc3..00000000 --- a/encoder_example.cpp +++ /dev/null @@ -1,252 +0,0 @@ -/******************************************************************** - * * - * THIS FILE IS PART OF THE OggVorbis SOFTWARE CODEC SOURCE CODE. * - * USE, DISTRIBUTION AND REPRODUCTION OF THIS LIBRARY SOURCE IS * - * GOVERNED BY A BSD-STYLE SOURCE LICENSE INCLUDED WITH THIS SOURCE * - * IN 'COPYING'. PLEASE READ THESE TERMS BEFORE DISTRIBUTING. * - * * - * THE OggVorbis SOURCE CODE IS (C) COPYRIGHT 1994-2007 * - * by the Xiph.Org Foundation http://www.xiph.org/ * - * * - ******************************************************************** - - function: simple example encoder - last mod: $Id$ - - ********************************************************************/ - -/* takes a stereo 16bit 44.1kHz WAV file from stdin and encodes it into - a Vorbis bitstream */ - -/* Note that this is POSIX, not ANSI, code */ - -#include -#include -#include -#include -#include -#include - -#ifdef _WIN32 /* We need the following two to set stdin/stdout to binary */ -#include -#include -#endif - -#if defined(__MACOS__) && defined(__MWERKS__) -#include /* CodeWarrior's Mac "command-line" support */ -#endif - -#define READ 1024 -signed char readbuffer[READ*4+44]; /* out of the data segment, not the stack */ - -int main(){ - ogg_stream_state os; /* take physical pages, weld into a logical - stream of packets */ - ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */ - ogg_packet op; /* one raw packet of data for decode */ - - vorbis_info vi; /* struct that stores all the static vorbis bitstream - settings */ - vorbis_comment vc; /* struct that stores all the user comments */ - - vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */ - vorbis_block vb; /* local working space for packet->PCM decode */ - - int eos=0,ret; - int i, founddata; - -#if defined(macintosh) && defined(__MWERKS__) - int argc = 0; - char **argv = NULL; - argc = ccommand(&argv); /* get a "command line" from the Mac user */ - /* this also lets the user set stdin and stdout */ -#endif - - /* we cheat on the WAV header; we just bypass 44 bytes (simplest WAV - header is 44 bytes) and assume that the data is 44.1khz, stereo, 16 bit - little endian pcm samples. This is just an example, after all. */ - -#ifdef _WIN32 /* We need to set stdin/stdout to binary mode. Damn windows. */ - /* if we were reading/writing a file, it would also need to in - binary mode, eg, fopen("file.wav","wb"); */ - /* Beware the evil ifdef. We avoid these where we can, but this one we - cannot. Don't add any more, you'll probably go to hell if you do. */ - _setmode( _fileno( stdin ), _O_BINARY ); - _setmode( _fileno( stdout ), _O_BINARY ); -#endif - - - /* we cheat on the WAV header; we just bypass the header and never - verify that it matches 16bit/stereo/44.1kHz. This is just an - example, after all. */ - - readbuffer[0] = '\0'; - for (i=0, founddata=0; i<30 && ! feof(stdin) && ! ferror(stdin); i++) - { - fread(readbuffer,1,2,stdin); - - if ( ! strncmp((char*)readbuffer, "da", 2) ){ - founddata = 1; - fread(readbuffer,1,6,stdin); - break; - } - } - - /********** Encode setup ************/ - - vorbis_info_init(&vi); - - /* choose an encoding mode. A few possibilities commented out, one - actually used: */ - - /********************************************************************* - Encoding using a VBR quality mode. The usable range is -.1 - (lowest quality, smallest file) to 1. (highest quality, largest file). - Example quality mode .4: 44kHz stereo coupled, roughly 128kbps VBR - - ret = vorbis_encode_init_vbr(&vi,2,44100,.4); - - --------------------------------------------------------------------- - - Encoding using an average bitrate mode (ABR). - example: 44kHz stereo coupled, average 128kbps VBR - - ret = vorbis_encode_init(&vi,2,44100,-1,128000,-1); - - --------------------------------------------------------------------- - - Encode using a quality mode, but select that quality mode by asking for - an approximate bitrate. This is not ABR, it is true VBR, but selected - using the bitrate interface, and then turning bitrate management off: - - ret = ( vorbis_encode_setup_managed(&vi,2,44100,-1,128000,-1) || - vorbis_encode_ctl(&vi,OV_ECTL_RATEMANAGE2_SET,NULL) || - vorbis_encode_setup_init(&vi)); - - *********************************************************************/ - - ret=vorbis_encode_init_vbr(&vi,2,44100,0.1); - - /* do not continue if setup failed; this can happen if we ask for a - mode that libVorbis does not support (eg, too low a bitrate, etc, - will return 'OV_EIMPL') */ - - if(ret)exit(1); - - /* add a comment */ - vorbis_comment_init(&vc); - vorbis_comment_add_tag(&vc,"ENCODER","encoder_example.c"); - - /* set up the analysis state and auxiliary encoding storage */ - vorbis_analysis_init(&vd,&vi); - vorbis_block_init(&vd,&vb); - - /* set up our packet->stream encoder */ - /* pick a random serial number; that way we can more likely build - chained streams just by concatenation */ - srand(time(NULL)); - ogg_stream_init(&os,rand()); - - /* Vorbis streams begin with three headers; the initial header (with - most of the codec setup parameters) which is mandated by the Ogg - bitstream spec. The second header holds any comment fields. The - third header holds the bitstream codebook. We merely need to - make the headers, then pass them to libvorbis one at a time; - libvorbis handles the additional Ogg bitstream constraints */ - - { - ogg_packet header; - ogg_packet header_comm; - ogg_packet header_code; - - vorbis_analysis_headerout(&vd,&vc,&header,&header_comm,&header_code); - ogg_stream_packetin(&os,&header); /* automatically placed in its own - page */ - ogg_stream_packetin(&os,&header_comm); - ogg_stream_packetin(&os,&header_code); - - /* This ensures the actual - * audio data will start on a new page, as per spec - */ - while(!eos){ - int result=ogg_stream_flush(&os,&og); - if(result==0)break; - fwrite(og.header,1,og.header_len,stdout); - fwrite(og.body,1,og.body_len,stdout); - } - - } - - while(!eos){ - long i; - long bytes=fread(readbuffer,1,READ*4,stdin); /* stereo hardwired here */ - - if(bytes==0){ - /* end of file. this can be done implicitly in the mainline, - but it's easier to see here in non-clever fashion. - Tell the library we're at end of stream so that it can handle - the last frame and mark end of stream in the output properly */ - vorbis_analysis_wrote(&vd,0); - - }else{ - /* data to encode */ - - /* expose the buffer to submit data */ - float **buffer=vorbis_analysis_buffer(&vd,READ); - - /* uninterleave samples */ - for(i=0;i < - * - * Examples: - * $ ./play 44100 2 5 < /dev/urandom - * $ ./play 22050 1 8 < /path/to/file.wav - * - * Copyright (C) 2009 Alessandro Ghedini - * -------------------------------------------------------------- - * "THE BEER-WARE LICENSE" (Revision 42): - * Alessandro Ghedini wrote this file. As long as you retain this - * notice you can do whatever you want with this stuff. If we - * meet some day, and you think this stuff is worth it, you can - * buy me a beer in return. - * -------------------------------------------------------------- - */ - -#include -#include -#include - -using namespace std; - -#define PCM_DEVICE "default" - -int main(int argc, char **argv) { - unsigned int pcm, tmp, dir, rate; - int channels, seconds; - snd_pcm_t *pcm_handle; - snd_pcm_hw_params_t *params; - snd_pcm_uframes_t frames; - char *buff; - int buff_size, loops; - - if (argc < 4) { - printf("Usage: %s \n", - argv[0]); - return -1; - } - - rate = atoi(argv[1]); - channels = atoi(argv[2]); - seconds = atoi(argv[3]); - - /* Open the PCM device in playback mode */ - if (pcm = snd_pcm_open(&pcm_handle, PCM_DEVICE, - SND_PCM_STREAM_PLAYBACK, 0) < 0) - printf("ERROR: Can't open \"%s\" PCM device. %s\n", - PCM_DEVICE, snd_strerror(pcm)); - - /* Allocate parameters object and fill it with default values*/ - snd_pcm_hw_params_alloca(¶ms); - - snd_pcm_hw_params_any(pcm_handle, params); - - /* Set parameters */ - if (pcm = snd_pcm_hw_params_set_access(pcm_handle, params, - SND_PCM_ACCESS_RW_INTERLEAVED) < 0) - printf("ERROR: Can't set interleaved mode. %s\n", snd_strerror(pcm)); - - if (pcm = snd_pcm_hw_params_set_format(pcm_handle, params, - SND_PCM_FORMAT_S16_LE) < 0) - printf("ERROR: Can't set format. %s\n", snd_strerror(pcm)); - - if (pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels) < 0) - printf("ERROR: Can't set channels number. %s\n", snd_strerror(pcm)); - - if (pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0) < 0) - printf("ERROR: Can't set rate. %s\n", snd_strerror(pcm)); - - long unsigned int periodsize = 2*480; - if (pcm = snd_pcm_hw_params_set_buffer_size_near(pcm_handle, params, &periodsize) < 0) - printf("Unable to set buffer size %li: %s\n", (long int)periodsize, snd_strerror(pcm)); - - /* Write parameters */ - if (pcm = snd_pcm_hw_params(pcm_handle, params) < 0) - printf("ERROR: Can't set harware parameters. %s\n", snd_strerror(pcm)); - - /* Resume information */ - printf("PCM name: '%s'\n", snd_pcm_name(pcm_handle)); - - printf("PCM state: %s\n", snd_pcm_state_name(snd_pcm_state(pcm_handle))); - - snd_pcm_hw_params_get_channels(params, &tmp); - printf("channels: %i ", tmp); - - if (tmp == 1) - printf("(mono)\n"); - else if (tmp == 2) - printf("(stereo)\n"); - - snd_pcm_hw_params_get_rate(params, &tmp, 0); - printf("rate: %d bps\n", tmp); - - printf("seconds: %d\n", seconds); - - /* Allocate buffer to hold single period */ - snd_pcm_hw_params_get_period_size(params, &frames, 0); - printf("frames: %d\n", frames); - - buff_size = frames * channels * 2 /* 2 -> sample size */; - buff = (char *) malloc(buff_size); - - snd_pcm_hw_params_get_period_time(params, &tmp, NULL); - printf("period time: %d\n", tmp); - - for (loops = (seconds * 1000000) / tmp; loops > 0; loops--) { - - if (pcm = read(0, buff, buff_size) == 0) { - printf("Early end of file.\n"); - return 0; - } -//usleep(10000); - if (pcm = snd_pcm_writei(pcm_handle, buff, frames) == -EPIPE) { - printf("XRUN.\n"); - snd_pcm_prepare(pcm_handle); - } else if (pcm < 0) { - printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm)); - } -snd_pcm_sframes_t avail; -snd_pcm_sframes_t delay; -snd_pcm_avail_delay(pcm_handle, &avail, &delay); - -cout << "avail: " << avail << "\t" << delay / 48 << "\n"; -cout.flush(); - } -cout << "end\n"; -cout.flush(); - - snd_pcm_drain(pcm_handle); - snd_pcm_close(pcm_handle); - free(buff); - - return 0; -} - - diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 306bb90b..0dea39d4 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -2,13 +2,14 @@ -StreamSession::StreamSession(socket_ptr sock) : active_(false), socket_(sock) +StreamSession::StreamSession(std::shared_ptr _socket) : ServerConnection(NULL, _socket) { } -void StreamSession::sender() +void StreamSession::worker() { + active_ = true; try { boost::asio::streambuf streambuf; @@ -16,8 +17,7 @@ void StreamSession::sender() for (;;) { shared_ptr message(messages.pop()); - message->serialize(stream); - boost::asio::write(*socket_, streambuf); + ServerConnection::send(message.get()); } } catch (std::exception& e) @@ -25,29 +25,21 @@ void StreamSession::sender() std::cerr << "Exception in thread: " << e.what() << "\n"; active_ = false; } + active_ = false; } -void StreamSession::start() -{ - active_ = true; - senderThread = new thread(&StreamSession::sender, this); -// readerThread.join(); -} void StreamSession::send(shared_ptr message) { if (!message) return; - while (messages.size() > 100)//* chunk->getDuration() > 10000) + while (messages.size() > 100)// chunk->getDuration() > 10000) messages.pop(); messages.push(message); } -bool StreamSession::isActive() const -{ - return active_; -} + diff --git a/server/streamServer.h b/server/streamServer.h index 162a7b1c..e84ff295 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -12,6 +12,7 @@ #include "common/message.h" #include "common/headerMessage.h" #include "common/sampleFormat.h" +#include "common/socketConnection.h" using boost::asio::ip::tcp; @@ -20,19 +21,14 @@ using namespace std; -class StreamSession +class StreamSession : public ServerConnection { public: StreamSession(socket_ptr sock); - - void start(); void send(shared_ptr message); - bool isActive() const; -private: - void sender(); - bool active_; - socket_ptr socket_; +protected: + virtual void worker(); thread* senderThread; Queue> messages; }; diff --git a/test/Makefile b/test/Makefile deleted file mode 100644 index f1281c36..00000000 --- a/test/Makefile +++ /dev/null @@ -1,19 +0,0 @@ -VERSION = 0.01 -CC = /usr/bin/g++ -CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. -LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg - -OBJ = test.o ../common/message.o ../common/sampleFormat.o ../server/oggEncoder.o ../server/pcmEncoder.o -BIN = test - -all: server - -server: $(OBJ) - $(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS) - -%.o: %.cpp - $(CC) $(CFLAGS) -c $< -o $@ - -clean: - rm -rf $(BIN) $(OBJ) - diff --git a/test/test.cpp b/test/test.cpp deleted file mode 100644 index d9fda212..00000000 --- a/test/test.cpp +++ /dev/null @@ -1,308 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include // localtime -#include // stringstream -#include -#include -#include -#include -#include -#include "common/timeUtils.h" -#include "common/queue.h" -#include "common/signalHandler.h" -#include "common/utils.h" -#include "common/sampleFormat.h" -#include "../server/pcmEncoder.h" -#include "../server/oggEncoder.h" -#include "common/message.h" - - -using boost::asio::ip::tcp; -namespace po = boost::program_options; - - -typedef boost::shared_ptr socket_ptr; -using namespace std; -using namespace std::chrono; - - -bool g_terminated = false; - - - -class Session -{ -public: - Session(socket_ptr sock) : active_(false), socket_(sock) - { - } - - void sender() - { - try - { - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - for (;;) - { - shared_ptr message(messages.pop()); - message->serialize(stream); - boost::asio::write(*socket_, streambuf); - } - } - catch (std::exception& e) - { - std::cerr << "Exception in thread: " << e.what() << "\n"; - active_ = false; - } - } - - void start() - { - active_ = true; - senderThread = new thread(&Session::sender, this); -// readerThread.join(); - } - - void send(shared_ptr message) - { - if (!message) - return; - - while (messages.size() > 100)//* chunk->getDuration() > 10000) - messages.pop(); - messages.push(message); - } - - bool isActive() const - { - return active_; - } - -private: - bool active_; - socket_ptr socket_; - thread* senderThread; - Queue> messages; -}; - - -class Server -{ -public: - Server(unsigned short port) : port_(port), headerChunk(NULL) - { - } - - void acceptor() - { - tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); - for (;;) - { - socket_ptr sock(new tcp::socket(io_service_)); - a.accept(*sock); - cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; - Session* session = new Session(sock); - session->send(headerChunk); - session->start(); - sessions.insert(shared_ptr(session)); - } - } - - void setHeader(shared_ptr header) - { - if (header) - headerChunk = shared_ptr(header); - } - - void send(shared_ptr message) - { - for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) - { - if (!(*it)->isActive()) - { - cout << "Session inactive. Removing\n"; - sessions.erase(it++); - } - else - ++it; - } - - for (auto s : sessions) - s->send(message); - } - - void start() - { - acceptThread = new thread(&Server::acceptor, this); - } - - void stop() - { -// acceptThread->join(); - } - -private: - set> sessions; - boost::asio::io_service io_service_; - unsigned short port_; - shared_ptr headerChunk; - thread* acceptThread; -}; - - -class ServerException : public std::exception -{ -public: - ServerException(const std::string& what) : what_(what) - { - } - - virtual ~ServerException() throw() - { - } - - virtual const char* what() const throw() - { - return what_.c_str(); - } - -private: - std::string what_; -}; - - -int main(int argc, char* argv[]) -{ - try - { - string sampleFormat; - - size_t port; - string fifoName; - string codec; - bool runAsDaemon; - - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("port,p", po::value(&port)->default_value(98765), "port to listen on") - ("sampleformat,s", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") - ("codec,c", po::value(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") - ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file") - ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") - ; - - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); - - if (vm.count("help")) - { - cout << desc << "\n"; - return 1; - } - - - if (runAsDaemon) - { - daemonize(); - syslog (LOG_NOTICE, "First daemon started."); - } - - openlog ("firstdaemon", LOG_PID, LOG_DAEMON); - - using namespace std; // For atoi. - Server* server = new Server(port); - server->start(); - - timeval tvChunk; - gettimeofday(&tvChunk, NULL); - long nextTick = getTickCount(); - - mkfifo(fifoName.c_str(), 0777); - SampleFormat format(sampleFormat); -size_t duration = 50; -//size_t chunkSize = duration*format.rate*format.frameSize / 1000; - std::auto_ptr encoder; - if (codec == "ogg") - encoder.reset(new OggEncoder(sampleFormat)); - else if (codec == "pcm") - encoder.reset(new PcmEncoder(sampleFormat)); - else - { - cout << "unknown codec: " << codec << "\n"; - return 1; - } - - shared_ptr header(encoder->getHeader()); - server->setHeader(header); - - while (!g_terminated) - { - int fd = open(fifoName.c_str(), O_RDONLY); - try - { - shared_ptr chunk;//(new WireChunk()); - while (true)//cin.good()) - { - chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); - int toRead = chunk->payloadSize; - int len = 0; - do - { - int count = read(fd, chunk->payload + len, toRead - len); - if (count <= 0) - throw ServerException("count = " + boost::lexical_cast(count)); - - len += count; - } - while (len < toRead); - - chunk->tv_sec = tvChunk.tv_sec; - chunk->tv_usec = tvChunk.tv_usec; - double chunkDuration = encoder->encode(chunk.get()); - if (chunkDuration > 0) - server->send(chunk); -//cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n"; -// addUs(tvChunk, 1000*chunk->getDuration()); - addUs(tvChunk, chunkDuration * 1000); - nextTick += duration; - long currentTick = getTickCount(); - if (nextTick > currentTick) - { - usleep((nextTick - currentTick) * 1000); - } - else - { - gettimeofday(&tvChunk, NULL); - nextTick = getTickCount(); - } - } - } - catch(const std::exception& e) - { - std::cerr << "Exception: " << e.what() << std::endl; - } - close(fd); - } - - server->stop(); - } - catch (const std::exception& e) - { - std::cerr << "Exception: " << e.what() << std::endl; - } - - syslog (LOG_NOTICE, "First daemon terminated."); - closelog(); -} - - - - - diff --git a/xxx/blocking_tcp_echo_client.cpp b/xxx/blocking_tcp_echo_client.cpp deleted file mode 100644 index 6ee9dcef..00000000 --- a/xxx/blocking_tcp_echo_client.cpp +++ /dev/null @@ -1,89 +0,0 @@ -// -// blocking_tcp_echo_client.cpp -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// -// Copyright (c) 2003-2010 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#include -#include -#include -#include -#include -#include // localtime -#include // stringstream -#include - - -using boost::asio::ip::tcp; -using namespace std; -using namespace std::chrono; - - -enum { max_length = 1024 }; - - -std::string return_current_time_and_date() -{ - auto now = system_clock::now(); - auto in_time_t = system_clock::to_time_t(now); - system_clock::duration ms = now.time_since_epoch(); - char buff[20]; - strftime(buff, 20, "%Y-%m-%d %H:%M:%S", localtime(&in_time_t)); - stringstream ss; - ss << buff << "." << std::setw(3) << std::setfill('0') << ((ms / milliseconds(1)) % 1000); - return ss.str(); -} - -int main(int argc, char* argv[]) -{ - if (argc != 3) - { - std::cerr << "Usage: blocking_tcp_echo_client \n"; - return 1; - } - try - { - boost::asio::io_service io_service; - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), argv[1], argv[2]); - tcp::resolver::iterator iterator = resolver.resolve(query); - - while (true) - { - try - { - tcp::socket s(io_service); - s.connect(*iterator); - boost::array buf; - boost::system::error_code error; - - while (true) - { - size_t len = s.read_some(boost::asio::buffer(buf), error); - if (error == boost::asio::error::eof) - break; - - std::cout.write(buf.data(), len); - std::cout.flush(); - } - } - catch (std::exception& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - usleep(100*1000); - } - } - } - catch (std::exception& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } - - return 0; -} - - diff --git a/xxx/receiver.cpp b/xxx/receiver.cpp deleted file mode 100644 index 02d3e011..00000000 --- a/xxx/receiver.cpp +++ /dev/null @@ -1,93 +0,0 @@ -// -// receiver.cpp -// ~~~~~~~~~~~~ -// -// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#include -#include -#include -#include "boost/bind.hpp" - -const short multicast_port = 30001; - -class receiver -{ -public: - receiver(boost::asio::io_service& io_service, - const boost::asio::ip::address& listen_address, - const boost::asio::ip::address& multicast_address) - : socket_(io_service) - { - // Create the socket so that multiple may be bound to the same address. - boost::asio::ip::udp::endpoint listen_endpoint( - listen_address, multicast_port); - socket_.open(listen_endpoint.protocol()); - socket_.set_option(boost::asio::ip::udp::socket::reuse_address(true)); - socket_.bind(listen_endpoint); - - // Join the multicast group. - socket_.set_option( - boost::asio::ip::multicast::join_group(multicast_address)); - - socket_.async_receive_from( - boost::asio::buffer(data_, max_length), sender_endpoint_, - boost::bind(&receiver::handle_receive_from, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - } - - void handle_receive_from(const boost::system::error_code& error, - size_t bytes_recvd) - { - if (!error) - { - std::cout.write(data_, bytes_recvd); - std::cout << std::endl; - - socket_.async_receive_from( - boost::asio::buffer(data_, max_length), sender_endpoint_, - boost::bind(&receiver::handle_receive_from, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - } - } - -private: - boost::asio::ip::udp::socket socket_; - boost::asio::ip::udp::endpoint sender_endpoint_; - enum { max_length = 1024 }; - char data_[max_length]; -}; - -int main(int argc, char* argv[]) -{ - try - { - if (argc != 3) - { - std::cerr << "Usage: receiver \n"; - std::cerr << " For IPv4, try:\n"; - std::cerr << " receiver 0.0.0.0 239.255.0.1\n"; - std::cerr << " For IPv6, try:\n"; - std::cerr << " receiver 0::0 ff31::8000:1234\n"; - return 1; - } - - boost::asio::io_service io_service; - receiver r(io_service, - boost::asio::ip::address::from_string(argv[1]), - boost::asio::ip::address::from_string(argv[2])); - io_service.run(); - } - catch (std::exception& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } - - return 0; -} diff --git a/xxx/rtr.cpp b/xxx/rtr.cpp deleted file mode 100644 index b1da9b60..00000000 --- a/xxx/rtr.cpp +++ /dev/null @@ -1,84 +0,0 @@ -// -// Custom routing Router to Mama (ROUTER to REQ) -// -// Olivier Chamoux - -#include "zhelpers.hpp" - -#define NBR_WORKERS 10 - -static void * -worker_thread (void *arg) { - - zmq::context_t * context = (zmq::context_t *)arg; - zmq::socket_t worker (*context, ZMQ_REQ); - - // We use a string identity for ease here - s_set_id (worker); - worker.connect("ipc://routing.ipc"); - - int total = 0; - while (1) { - // Tell the router we're ready for work - s_send (worker, "ready"); - - // Get workload from router, until finished - std::string workload = s_recv (worker); - int finished = (workload.compare("END") == 0); - - if (finished) { - std::cout << "Processed: " << total << " tasks" << std::endl; - break; - } - total++; - - // Do some random work - s_sleep(within (100) + 1); - } - return (NULL); -} - -int main () { - zmq::context_t context(1); - zmq::socket_t client (context, ZMQ_ROUTER); - client.bind("ipc://routing.ipc"); - - int worker_nbr; - for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { - pthread_t worker; - pthread_create (&worker, NULL, worker_thread, &context); - } - int task_nbr; - for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) { - // LRU worker is next waiting in queue - std::string address = s_recv (client); - { - // receiving and discarding'empty' message - s_recv (client); - // receiving and discarding 'ready' message - s_recv (client); - } - - s_sendmore (client, address); - s_sendmore (client, ""); - s_send (client, "This is the workload"); - } - // Now ask mamas to shut down and report their results - for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) { - std::string address = s_recv (client); - { - // receiving and discarding'empty' message - s_recv (client); - // receiving and discarding 'ready' message - s_recv (client); - } - - s_sendmore (client, address); - s_sendmore (client, ""); - s_send (client, "END"); - } - sleep (1); // Give 0MQ/2.0.x time to flush output - return 0; -} - - diff --git a/xxx/rtrClient.cpp b/xxx/rtrClient.cpp deleted file mode 100644 index aad4c6d6..00000000 --- a/xxx/rtrClient.cpp +++ /dev/null @@ -1,79 +0,0 @@ -// -// Custom routing Router to Mama (ROUTER to REQ) -// -// Olivier Chamoux - - -#include -#include -#include -#include -#include -#include "zhelpers.hpp" -#include -#include -#include -#include "utils.h" - -using namespace std; - - - -void alive(zmq::socket_t* worker) -{ - // Tell the router we're ready for work - zmq::message_t message; - while (1) { - string s = "ping"; - zmq::message_t message(s.size()); - memcpy (message.data(), s.data(), s.size()); - cout << "Send: " << worker->send(message); -/* int res = worker->recv(&message); - if (res == 0) - { - std::string recvMsg = std::string(static_cast(message.data()), message.size()); - } - else - cout << "Error: " << res << "\n"; -*/ sleep(1); -// - } -} - - -void control(zmq::socket_t* worker) -{ - // Tell the router we're ready for work - s_send (*worker, "ready"); - while (1) { - std::string cmd = s_recv (*worker); - vector splitCmd = split(cmd); - for (size_t n=0; n - -#include -#include -#include "zhelpers.hpp" -#include "utils.h" - -zmq::socket_t* client; -zmq::socket_t* client2; - -void receiver(zmq::socket_t* client) -{ - while (true) - { -// std::string address = s_recv (*client); - // receiving and discarding'empty' message -// s_recv (*client); - // receiving and discarding 'ready' message - std::string msg = s_recv (*client); - std::cout << "msg from " << msg << "\n"; - } -} - - -void send(const std::string& address, const std::string& cmd) -{ - s_sendmore (*client, address); -// s_sendmore (*client, ""); - s_send (*client, cmd); -} - - -int main () { - zmq::context_t context(1); - client = new zmq::socket_t(context, ZMQ_PAIR);//ZMQ_ROUTER); - client->bind("tcp://0.0.0.0:123459"); - - std::thread receiveThread(receiver, client); -receiveThread.join(); - while (true) - { - std::string address; - std::string cmd; - - std::cout << "Address: "; - std::getline(std::cin, address); - std::cout << "command: "; - std::getline(std::cin, cmd); - std::cout << std::endl; - send(trim(address), trim(cmd)); - } - return 0; -} - - diff --git a/xxx/sender.cpp b/xxx/sender.cpp deleted file mode 100644 index ddc43da0..00000000 --- a/xxx/sender.cpp +++ /dev/null @@ -1,111 +0,0 @@ -// -// sender.cpp -// ~~~~~~~~~~ -// -// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#include -#include -#include -#include -#include "boost/bind.hpp" -#include "boost/date_time/posix_time/posix_time_types.hpp" - -const short multicast_port = 30001; -const int max_message_count = 100; - -class sender -{ -public: - sender(boost::asio::io_service& io_service, - const boost::asio::ip::address& multicast_address) - : endpoint_(multicast_address, multicast_port), - socket_(io_service, endpoint_.protocol()), - timer_(io_service), - message_count_(0) - { - std::ostringstream os; - os << "Message " << message_count_++; - message_ = os.str(); - - socket_.async_send_to( - boost::asio::buffer(message_), endpoint_, - boost::bind(&sender::handle_send_to, this, - boost::asio::placeholders::error)); - } - - void handle_send_to(const boost::system::error_code& error) - { - if (!error) - { - std::ostringstream os; - os << "Message " << message_count_++; - message_ = os.str(); - - socket_.async_send_to( - boost::asio::buffer(message_), endpoint_, - boost::bind(&sender::handle_send_to, this, - boost::asio::placeholders::error)); - sleep - } -/* if (!error && message_count_ < max_message_count) - { - timer_.expires_from_now(boost::posix_time::seconds(1)); - timer_.async_wait( - boost::bind(&sender::handle_timeout, this, - boost::asio::placeholders::error)); - } -*/ } - -/* void handle_timeout(const boost::system::error_code& error) - { - if (!error) - { - std::ostringstream os; - os << "Message " << message_count_++; - message_ = os.str(); - - socket_.async_send_to( - boost::asio::buffer(message_), endpoint_, - boost::bind(&sender::handle_send_to, this, - boost::asio::placeholders::error)); - } - } -*/ -private: - boost::asio::ip::udp::endpoint endpoint_; - boost::asio::ip::udp::socket socket_; - boost::asio::deadline_timer timer_; - int message_count_; - std::string message_; -}; - -int main(int argc, char* argv[]) -{ - try - { - if (argc != 2) - { - std::cerr << "Usage: sender \n"; - std::cerr << " For IPv4, try:\n"; - std::cerr << " sender 239.255.0.1\n"; - std::cerr << " For IPv6, try:\n"; - std::cerr << " sender ff31::8000:1234\n"; - return 1; - } - - boost::asio::io_service io_service; - sender s(io_service, boost::asio::ip::address::from_string(argv[1])); - io_service.run(); - } - catch (std::exception& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } - - return 0; -} diff --git a/xxx/server.cpp b/xxx/server.cpp deleted file mode 100644 index 028383fb..00000000 --- a/xxx/server.cpp +++ /dev/null @@ -1,75 +0,0 @@ -// -// Weather update server in C++ -// Binds PUB socket to tcp://*:5556 -// Publishes random weather updates -// -// Olivier Chamoux -// -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "chunk.h" -#include "timeUtils.h" - - -using namespace std; - - - -int main () { - // Prepare our context and publisher - zmq::context_t context (1); - zmq::socket_t publisher (context, ZMQ_PUB); - publisher.bind("tcp://0.0.0.0:123458"); - - char c[2]; - WireChunk* chunk = new WireChunk(); - timeval tvChunk; - gettimeofday(&tvChunk, NULL); - long nextTick = getTickCount(); - - cin.sync(); - - while (cin.good()) - { - for (size_t n=0; (npayload[n] = (int)c[0] + ((int)c[1] << 8); - } - -// if (!cin.good()) -// cin.clear(); - - chunk->tv_sec = tvChunk.tv_sec; - chunk->tv_usec = tvChunk.tv_usec; - zmq::message_t message(sizeof(WireChunk)); - memcpy(message.data(), chunk, sizeof(WireChunk)); - publisher.send(message); - - addMs(tvChunk, WIRE_CHUNK_MS); - nextTick += WIRE_CHUNK_MS; - long currentTick = getTickCount(); - if (nextTick > currentTick) - { - usleep((nextTick - currentTick) * 1000); - } - else - { - cin.sync(); - gettimeofday(&tvChunk, NULL); - nextTick = getTickCount(); - } - } - delete chunk; - return 0; -} - -