diff --git a/Makefile b/Makefile index 39df447a..ab57c68f 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,9 @@ VERSION = 0.01 CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -g -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" -LDFLAGS = -lrt -lzmq -lpthread -lportaudio +LDFLAGS = -lrt -lzmq -lpthread -lportaudio -lboost_system -OBJ_SERVER = server.o +OBJ_SERVER = blocking_tcp_echo_server.o BIN_SERVER = server OBJ_CLIENT = client.o stream.o chunk.o BIN_CLIENT = client diff --git a/blocking_tcp_echo_client.cpp b/blocking_tcp_echo_client.cpp index 008c2b40..6ee9dcef 100644 --- a/blocking_tcp_echo_client.cpp +++ b/blocking_tcp_echo_client.cpp @@ -83,7 +83,7 @@ int main(int argc, char* argv[]) std::cerr << "Exception: " << e.what() << "\n"; } - return 0; + return 0; } diff --git a/blocking_tcp_echo_server.cpp b/blocking_tcp_echo_server.cpp index 111794c3..1bb2a7e5 100644 --- a/blocking_tcp_echo_server.cpp +++ b/blocking_tcp_echo_server.cpp @@ -18,6 +18,12 @@ #include // localtime #include // stringstream #include +#include +#include +#include "chunk.h" +#include "timeUtils.h" +#include "queue.h" + using boost::asio::ip::tcp; @@ -41,72 +47,157 @@ std::string return_current_time_and_date() } -void session(socket_ptr sock) +class Session { - try - { - for (;;) - { -/* char data[max_length]; +public: + Session(socket_ptr sock) : socket_(sock) + { + } - boost::system::error_code error; - size_t length = sock->read_some(boost::asio::buffer(data), error); - if (error == boost::asio::error::eof) - break; // Connection closed cleanly by peer. - else if (error) - throw boost::system::system_error(error); // Some other error. -*/ - string response(return_current_time_and_date() + "\tHallo\n"); - boost::system::error_code error; - cout << response << "\n"; - boost::asio::write(*sock, boost::asio::buffer(response), boost::asio::transfer_all(), error); - if (error == boost::asio::error::eof) - break; // Connection closed cleanly by peer. - else if (error) - throw boost::system::system_error(error); // Some other error. - sleep(1); -//std::cout << return_current_time_and_date() << "\n"; - } - } - catch (std::exception& e) - { - std::cerr << "Exception in thread: " << e.what() << "\n"; - } -} + void sender() + { + try + { + for (;;) + { + shared_ptr chunk(chunks.pop()); + boost::system::error_code error; + boost::asio::write(*socket_, boost::asio::buffer(chunk.get(), sizeof(WireChunk)), boost::asio::transfer_all(), error); + if (error == boost::asio::error::eof) + break; // Connection closed cleanly by peer. + else if (error) + throw boost::system::system_error(error); // Some other error. + } + } + catch (std::exception& e) + { + std::cerr << "Exception in thread: " << e.what() << "\n"; + } + } -void server(boost::asio::io_service& io_service, unsigned short port) + void start() + { + senderThread = new thread(&Session::sender, this); +// readerThread.join(); + } + + void send(shared_ptr chunk) + { + chunks.push(chunk); + } + +private: + socket_ptr socket_; + thread* senderThread; + Queue> chunks; +}; + + +class Server { - tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port)); - for (;;) - { - socket_ptr sock(new tcp::socket(io_service)); - a.accept(*sock); - cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; - boost::thread t(boost::bind(session, sock)); - } -} +public: + Server(unsigned short port) : session(NULL), port_(port) + { + } + + void acceptor() + { + tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service_)); + a.accept(*sock); + cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + session = new Session(sock); + session->start(); + } + } + + void send(shared_ptr chunk) + { + if (session != 0) + session->send(chunk); + } + + void start() + { + acceptThread = new thread(&Server::acceptor, this); + } + + void stop() + { + acceptThread->join(); + } + +private: + Session* session; + boost::asio::io_service io_service_; + unsigned short port_; + thread* acceptThread; +}; + int main(int argc, char* argv[]) { - try - { - if (argc != 2) - { - std::cerr << "Usage: blocking_tcp_echo_server \n"; - return 1; - } + try + { + if (argc != 2) + { + std::cerr << "Usage: blocking_tcp_echo_server \n"; + return 1; + } - boost::asio::io_service io_service; - using namespace std; // For atoi. - server(io_service, atoi(argv[1])); - } - catch (std::exception& e) - { - std::cerr << "Exception: " << e.what() << "\n"; - } + using namespace std; // For atoi. + Server* server = new Server(atoi(argv[1])); + server->start(); - return 0; + char c[2]; + timeval tvChunk; + gettimeofday(&tvChunk, NULL); + long nextTick = getTickCount(); + + while (cin.good()) + { + shared_ptr chunk(new WireChunk()); + for (size_t n=0; (npayload[n] = (int)c[0] + ((int)c[1] << 8); + } + + // if (!cin.good()) + // cin.clear(); + + chunk->tv_sec = tvChunk.tv_sec; + chunk->tv_usec = tvChunk.tv_usec; + server->send(chunk); + + addMs(tvChunk, WIRE_CHUNK_MS); + nextTick += WIRE_CHUNK_MS; + long currentTick = getTickCount(); + if (nextTick > currentTick) + { + usleep((nextTick - currentTick) * 1000); + } + else + { + cin.sync(); + gettimeofday(&tvChunk, NULL); + nextTick = getTickCount(); + } + } + return 0; + server->stop(); + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } + + return 0; } + diff --git a/client.cpp b/client.cpp index 5d2b1f19..e931a351 100644 --- a/client.cpp +++ b/client.cpp @@ -15,12 +15,14 @@ #include #include #include +#include #include "chunk.h" #include "utils.h" #include "stream.h" #include "zhelpers.hpp" +using boost::asio::ip::tcp; using namespace std; @@ -32,18 +34,43 @@ Stream* stream; void player() { - zmq::context_t context (1); - zmq::socket_t subscriber (context, ZMQ_SUB); - subscriber.connect("tcp://192.168.0.2:123458"); + try + { + boost::asio::io_service io_service; + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), "192.168.0.2", "98765"); + tcp::resolver::iterator iterator = resolver.resolve(query); - const char* filter = ""; - subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); - zmq::message_t update; - while (1) - { - subscriber.recv(&update); - stream->addChunk(new Chunk((WireChunk*)(update.data()))); - } + while (true) + { + try + { + tcp::socket s(io_service); + s.connect(*iterator); + boost::array buf; + boost::system::error_code error; + + while (true) + { + WireChunk* wireChunk = new WireChunk(); + size_t len = s.read_some(boost::asio::buffer(wireChunk, sizeof(WireChunk))); + if (error == boost::asio::error::eof) + break; + + stream->addChunk(new Chunk(wireChunk)); + } + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + usleep(100*1000); + } + } + } + catch (std::exception& e) + { + std::cerr << "Exception: " << e.what() << "\n"; + } }