// // blocking_tcp_echo_server.cpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // /* #include #include #include #include #include #include #include // localtime #include // stringstream #include #include #include #include #include "common/chunk.h" #include "pcmEncoder.h" #include "oggEncoder.h" #include */ #include #include #include #include #include #include #include // localtime #include // stringstream #include #include #include #include #include #include "common/timeUtils.h" #include "common/queue.h" #include "common/signalHandler.h" #include "common/utils.h" #include "common/sampleFormat.h" //#include "../server/pcmEncoder.h" //#include "../server/oggEncoder.h" #include "message.h" using boost::asio::ip::tcp; namespace po = boost::program_options; typedef boost::shared_ptr socket_ptr; using namespace std; using namespace std::chrono; bool g_terminated = false; /* int main(int argc, char* argv[]) { TestMessage* chunk = new TestMessage(1, (char*)"Hallo"); stringstream ss; chunk->serialize(ss); BaseMessage header; ss.seekg(0, ss.beg); header.read(ss); // cout << ss.str() << "\n"; // ss.read(reinterpret_cast(&header), sizeof(MessageHeader)); cout << "Header: " << header.type << ", " << header.size << "\n"; delete chunk; chunk = new TestMessage(); chunk->read(ss); cout << "Header: " << chunk->type << ", " << chunk->size << ", " << (int)chunk->logLevel << ", " << chunk->text << "\n"; chunk->tv_sec = 21; chunk->tv_usec = 2; chunk->payloadSize = 5; chunk->payload = (char*)malloc(5); chunk->payload[0] = 99; char* stream = chunk->serialize(); cout << "1\n"; for (size_t n=0; n<24; ++n) cout << (int)stream[n] << " "; delete chunk; cout << "\n3\n"; chunk = new WireChunk(); cout << "4\n"; chunk->deserialize(stream); cout << "5\n"; cout << chunk->tv_sec << ", " << chunk->tv_usec << ", " << chunk->payloadSize << ", " << chunk->payload << "\n"; return 0; } */ class Session { public: Session(socket_ptr sock) : active_(false), socket_(sock) { } void sender() { try { for (;;) { shared_ptr chunk(chunks.pop()); /* char* stream = chunk->serialize(); size_t written(0); size_t toWrite = sizeof(stream); do { written += boost::asio::write(*socket_, boost::asio::buffer(stream + written, toWrite - written));//, error); } while (written < toWrite); */ } } catch (std::exception& e) { std::cerr << "Exception in thread: " << e.what() << "\n"; active_ = false; } } void start() { active_ = true; senderThread = new thread(&Session::sender, this); // readerThread.join(); } void send(shared_ptr chunk) { if (!chunk) return; while (chunks.size() * chunk->getDuration() > 10000) chunks.pop(); chunks.push(chunk); } bool isActive() const { return active_; } private: bool active_; socket_ptr socket_; thread* senderThread; Queue> chunks; }; class Server { public: Server(unsigned short port) : port_(port), headerChunk(NULL) { } void acceptor() { tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); for (;;) { socket_ptr sock(new tcp::socket(io_service_)); a.accept(*sock); // cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; Session* session = new Session(sock); cout << "Sending header: " << headerChunk->payloadSize << "\n"; session->send(headerChunk); session->start(); sessions.insert(shared_ptr(session)); } } void setHeader(shared_ptr chunk) { if (chunk) headerChunk = shared_ptr(chunk); } void send(shared_ptr chunk) { // fwrite(chunk->wireChunk->payload, 1, chunk->wireChunk->length, stdout); for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) { if (!(*it)->isActive()) { cout << "Session inactive. Removing\n"; sessions.erase(it++); } else ++it; } for (auto s : sessions) s->send(chunk); } void start() { acceptThread = new thread(&Server::acceptor, this); } void stop() { // acceptThread->join(); } private: set> sessions; boost::asio::io_service io_service_; unsigned short port_; shared_ptr headerChunk; thread* acceptThread; }; class ServerException : public std::exception { public: ServerException(const std::string& what) : what_(what) { } virtual ~ServerException() throw() { } virtual const char* what() const throw() { return what_.c_str(); } private: std::string what_; }; int main(int argc, char* argv[]) { try { string sampleFormat; size_t port; string fifoName; string codec; bool runAsDaemon; po::options_description desc("Allowed options"); desc.add_options() ("help,h", "produce help message") ("port,p", po::value(&port)->default_value(98765), "port to listen on") ("sampleformat,s", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") ("codec,c", po::value(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") ; po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); po::notify(vm); if (vm.count("help")) { cout << desc << "\n"; return 1; } if (runAsDaemon) { daemonize(); syslog (LOG_NOTICE, "First daemon started."); } openlog ("firstdaemon", LOG_PID, LOG_DAEMON); using namespace std; // For atoi. Server* server = new Server(port); server->start(); timeval tvChunk; gettimeofday(&tvChunk, NULL); long nextTick = getTickCount(); mkfifo(fifoName.c_str(), 0777); size_t duration = 50; SampleFormat format(sampleFormat); /* std::auto_ptr encoder; if (codec == "ogg") encoder.reset(new OggEncoder(sampleFormat)); else if (codec == "pcm") encoder.reset(new PcmEncoder(sampleFormat)); else { cout << "unknown codec: " << codec << "\n"; return 1; } */ // shared_ptr header(new Chunk(format, encoder->getHeader())); // server->setHeader(header); while (!g_terminated) { int fd = open(fifoName.c_str(), O_RDONLY); try { shared_ptr chunk;//(new WireChunk()); while (true)//cin.good()) { chunk.reset(new Chunk(format, duration));//2*WIRE_CHUNK_SIZE)); int toRead = chunk->payloadSize; int len = 0; do { int count = read(fd, chunk->payload + len, toRead - len); if (count <= 0) throw ServerException("count = " + boost::lexical_cast(count)); len += count; } while (len < toRead); chunk->tv_sec = tvChunk.tv_sec; chunk->tv_usec = tvChunk.tv_usec; double chunkDuration = 50;//encoder->encode(chunk.get()); if (chunkDuration > 0) server->send(chunk); cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n"; // addUs(tvChunk, 1000*chunk->getDuration()); addUs(tvChunk, chunkDuration * 1000); nextTick += duration; long currentTick = getTickCount(); if (nextTick > currentTick) { usleep((nextTick - currentTick) * 1000); } else { gettimeofday(&tvChunk, NULL); nextTick = getTickCount(); } } } catch(const std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; } close(fd); } server->stop(); } catch (const std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; } syslog (LOG_NOTICE, "First daemon terminated."); closelog(); }