From 06a7909b1a48e4cacd06c5bd9d09a6e0de83df3b Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Sun, 3 Aug 2014 08:37:21 +0000 Subject: [PATCH] xxx git-svn-id: svn://elaine/murooma/trunk@129 d8a302eb-03bc-478d-80e4-98257eca68ef --- blocking_tcp_echo_server.cpp | 82 ++++++++++++++++++++++++++++++++++++ client.cpp | 4 +- rtrClient.cpp | 17 +++++--- rtrServer.cpp | 11 ++--- stream.cpp | 79 +++------------------------------- 5 files changed, 106 insertions(+), 87 deletions(-) create mode 100644 blocking_tcp_echo_server.cpp diff --git a/blocking_tcp_echo_server.cpp b/blocking_tcp_echo_server.cpp new file mode 100644 index 00000000..8ff59215 --- /dev/null +++ b/blocking_tcp_echo_server.cpp @@ -0,0 +1,82 @@ +// +// blocking_tcp_echo_server.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#include +#include +#include +#include +#include +#include + +using boost::asio::ip::tcp; + +const int max_length = 1024; + +typedef boost::shared_ptr socket_ptr; + +void session(socket_ptr sock) +{ + try + { + for (;;) + { + char data[max_length]; + + boost::system::error_code error; + size_t length = sock->read_some(boost::asio::buffer(data), error); + if (error == boost::asio::error::eof) + break; // Connection closed cleanly by peer. + else if (error) + throw boost::system::system_error(error); // Some other error. + + boost::asio::write(*sock, boost::asio::buffer(data, length)); + } + } + catch (std::exception& e) + { + std::cerr << "Exception in thread: " << e.what() << "\n"; + } +} + +void server(boost::asio::io_service& io_service, unsigned short port) +{ + tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service)); + a.accept(*sock); + boost::thread t(boost::bind(session, sock)); + } +} + +int main(int argc, char* argv[]) +{ + try + { + if (argc != 2) + { + std::cerr << "Usage: blocking_tcp_echo_server \n"; + return 1; + } + + boost::asio::io_service io_service; + + using namespace std; // For atoi. + server(io_service, atoi(argv[1])); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return 0; +} + + diff --git a/client.cpp b/client.cpp index 38cdda42..c7065e5b 100644 --- a/client.cpp +++ b/client.cpp @@ -183,7 +183,7 @@ int main (int argc, char *argv[]) initAudio(); std::thread playerThread(player); -/* std::string cmd; + std::string cmd; while (true && (argc > 3)) { std::cout << "> "; @@ -200,7 +200,7 @@ int main (int argc, char *argv[]) stream->setBufferLen(atoi(cmd.c_str())); } } -*/ + std::thread controlThread(control, stream); // controlThread.join(); playerThread.join(); diff --git a/rtrClient.cpp b/rtrClient.cpp index 8894165b..aad4c6d6 100644 --- a/rtrClient.cpp +++ b/rtrClient.cpp @@ -24,16 +24,20 @@ void alive(zmq::socket_t* worker) // Tell the router we're ready for work zmq::message_t message; while (1) { - s_send (*worker, "ping"); - int res = worker->recv(&message); + string s = "ping"; + zmq::message_t message(s.size()); + memcpy (message.data(), s.data(), s.size()); + cout << "Send: " << worker->send(message); +/* int res = worker->recv(&message); if (res == 0) { std::string recvMsg = std::string(static_cast(message.data()), message.size()); } else cout << "Error: " << res << "\n"; - sleep(1); - } +*/ sleep(1); +// + } } @@ -54,7 +58,7 @@ void control(zmq::socket_t* worker) int main () { cout << getMacAddress() << "\n"; zmq::context_t context(1); - zmq::socket_t worker (context, ZMQ_REQ); + zmq::socket_t worker (context, ZMQ_PAIR); srand (time(NULL)); // We use a string identity for ease here string macAddress = getMacAddress(); @@ -65,7 +69,8 @@ int main () { // std::thread controlThread(control, &worker); std::thread aliveThread(alive, &worker); - controlThread.join(); + aliveThread.join(); +// controlThread.join(); return 0; } diff --git a/rtrServer.cpp b/rtrServer.cpp index 9fd0b471..fb93fe41 100644 --- a/rtrServer.cpp +++ b/rtrServer.cpp @@ -9,17 +9,18 @@ #include "utils.h" zmq::socket_t* client; +zmq::socket_t* client2; void receiver(zmq::socket_t* client) { while (true) { - std::string address = s_recv (*client); +// std::string address = s_recv (*client); // receiving and discarding'empty' message // s_recv (*client); // receiving and discarding 'ready' message std::string msg = s_recv (*client); - std::cout << "msg from " << address << ": " << msg << "\n"; + std::cout << "msg from " << msg << "\n"; } } @@ -33,12 +34,12 @@ void send(const std::string& address, const std::string& cmd) int main () { - zmq::context_t context(2); - client = new zmq::socket_t(context, ZMQ_ROUTER); + zmq::context_t context(1); + client = new zmq::socket_t(context, ZMQ_PAIR);//ZMQ_ROUTER); client->bind("tcp://0.0.0.0:123459"); std::thread receiveThread(receiver, client); - +receiveThread.join(); while (true) { std::string address; diff --git a/stream.cpp b/stream.cpp index dfe746da..4d7eaba9 100644 --- a/stream.cpp +++ b/stream.cpp @@ -13,12 +13,14 @@ Stream::Stream() : sleep(0), median(0), shortMedian(0), lastUpdate(0), currentSa } + void Stream::setBufferLen(size_t bufferLenMs) { bufferMs = bufferLenMs; } + void Stream::addChunk(Chunk* chunk) { Chunk* c = new Chunk(*chunk); @@ -29,6 +31,7 @@ void Stream::addChunk(Chunk* chunk) } + Chunk* Stream::getNextChunk() { Chunk* chunk = NULL; @@ -42,12 +45,14 @@ Chunk* Stream::getNextChunk() } + void Stream::getSilentPlayerChunk(short* outputBuffer) { memset(outputBuffer, 0, sizeof(short)*PLAYER_CHUNK_SIZE); } + timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) { Chunk* chunk = getNextChunk(); @@ -126,80 +131,6 @@ timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) -/* -timeval Stream::getNextPlayerChunk(short* outputBuffer, int correction) -{ - Chunk* chunk = getNextChunk(); - timeval tv = getTimeval(chunk); - - if (correction != 0) - { - int factor = ceil((float)PLAYER_CHUNK_MS / (float)abs(correction)); - std::cerr << "Correction: " << correction << ", factor: " << factor << "\n"; - size_t idx(chunk->idx); - for (size_t n=0; npayload[idx]; - *(outputBuffer + 2*n+1) = chunk->payload[idx + 1]; - if (n % factor == 0) - { - if (correction < 0) - idx += 4; - } - else - { - idx += 2; - } - if (idx >= WIRE_CHUNK_SIZE) - { -//std::cerr << "idx >= WIRE_CHUNK_SIZE: " << idx << "\t" << WIRE_CHUNK_SIZE << "\n"; - chunks.pop_front(); - delete chunk; - chunk = getNextChunk(); - idx -= WIRE_CHUNK_SIZE; - } - } -//std::cerr << "Idx: " << chunk->idx << " => " << idx+2 << "\t" << WIRE_CHUNK_SIZE << "\t" << PLAYER_CHUNK_SIZE/2 << "\n"; - chunk->idx = idx; - std::cerr << "Diff: " << diff_ms(getTimeval(chunk), tv) << "\t" << chunk->idx / PLAYER_CHUNK_MS_SIZE << "\n"; - return tv; - } - - - size_t missing = PLAYER_CHUNK_SIZE;// + correction*PLAYER_CHUNK_MS_SIZE; - if (chunk->idx + PLAYER_CHUNK_SIZE > WIRE_CHUNK_SIZE) - { -//std::cerr << "chunk->idx + PLAYER_CHUNK_SIZE >= WIRE_CHUNK_SIZE: " << chunk->idx + PLAYER_CHUNK_SIZE << " >= " << WIRE_CHUNK_SIZE << "\n"; - if (outputBuffer != NULL) - memcpy(outputBuffer, &chunk->payload[chunk->idx], sizeof(int16_t)*(WIRE_CHUNK_SIZE - chunk->idx)); - missing = chunk->idx + PLAYER_CHUNK_SIZE - WIRE_CHUNK_SIZE; -// mutex.lock(); - chunks.pop_front(); -// mutex.unlock(); - delete chunk; - - chunk = getNextChunk(); - } - - if (outputBuffer != NULL) - memcpy((outputBuffer + PLAYER_CHUNK_SIZE - missing), &chunk->payload[chunk->idx], sizeof(int16_t)*missing); - - timeval nextTv = tv; - addMs(nextTv, PLAYER_CHUNK_MS); -// setTimeval(chunk, nextTv); - chunk->idx += missing; - if (chunk->idx >= WIRE_CHUNK_SIZE) - { -// mutex.lock(); - chunks.pop_front(); -// mutex.unlock(); - delete chunk; - } - - return tv; -} -*/ - void Stream::getChunk(short* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer) { if (sleep != 0)