diff --git a/client/Makefile b/client/Makefile index 488fe725..32c9be07 100644 --- a/client/Makefile +++ b/client/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -g -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc -OBJ = snapClient.o stream.o player.o clientConnection.o timeProvider.o oggDecoder.o pcmDecoder.o controller.o ../common/socketConnection.o ../common/pcmChunk.o ../common/log.o ../common/sampleFormat.o +OBJ = snapClient.o stream.o player.o clientConnection.o timeProvider.o oggDecoder.o pcmDecoder.o controller.o ../common/pcmChunk.o ../common/log.o ../common/sampleFormat.o BIN = snapclient all: client diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index ed98b7d7..cd039c9e 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -1,27 +1,152 @@ -#include "clientConnection.h" #include #include #include #include "common/log.h" +#include "clientConnection.h" + using namespace std; -ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : SocketConnection(_receiver), ip(_ip), port(_port) +ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0), ip(_ip), port(_port) { } +ClientConnection::~ClientConnection() +{ +} + + + +void ClientConnection::socketRead(void* _to, size_t _bytes) +{ +// std::unique_lock mlock(mutex_); + size_t toRead = _bytes; + size_t len = 0; + do + { +// cout << "/"; +// cout.flush(); + boost::system::error_code error; + len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error); +//cout << "len: " << len << ", error: " << error << endl; + toRead = _bytes - len; +// cout << "\\"; +// cout.flush(); + } + while (toRead > 0); +} + + void ClientConnection::start() { tcp::resolver resolver(io_service); tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); iterator = resolver.resolve(query); - SocketConnection::start(); + receiverThread = new thread(&ClientConnection::worker, this); } +void ClientConnection::stop() +{ + active_ = false; + socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); + socket->close(); + receiverThread->join(); +} + + +bool ClientConnection::send(BaseMessage* message) +{ +// std::unique_lock mlock(mutex_); +//cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; + if (!connected()) + return false; +//cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); + tv t; + message->sent = t; + message->serialize(stream); + boost::asio::write(*socket.get(), streambuf); + return true; +} + + +shared_ptr ClientConnection::sendRequest(BaseMessage* message, size_t timeout) +{ + shared_ptr response(NULL); + if (++reqId == 0) + ++reqId; + message->id = reqId; + shared_ptr pendingRequest(new PendingRequest(reqId)); + + { + std::unique_lock mlock(mutex_); + pendingRequests.insert(pendingRequest); + } +// std::mutex mtx; + std::unique_lock lck(m); + send(message); + if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) + { + response = pendingRequest->response; + } + else + { + cout << "timeout while waiting for response to: " << reqId << "\n"; + } + { + std::unique_lock mlock(mutex_); + pendingRequests.erase(pendingRequest); + } + return response; +} + + +void ClientConnection::getNextMessage() +{ +//cout << "getNextMessage\n"; + BaseMessage baseMessage; + size_t baseMsgSize = baseMessage.getSize(); + vector buffer(baseMsgSize); + socketRead(&buffer[0], baseMsgSize); + baseMessage.deserialize(&buffer[0]); +//cout << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; + if (baseMessage.size > buffer.size()) + buffer.resize(baseMessage.size); + socketRead(&buffer[0], baseMessage.size); + tv t; + baseMessage.received = t; + + { + std::unique_lock mlock(mutex_); + for (auto req: pendingRequests) + { + if (req->id == baseMessage.refersTo) + { +//cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; +//long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec); +//cout << "latency: " << latency << "\n"; + req->response.reset(new SerializedMessage()); + req->response->message = baseMessage; + req->response->buffer = (char*)malloc(baseMessage.size); + memcpy(req->response->buffer, &buffer[0], baseMessage.size); + std::unique_lock lck(m); + req->cv.notify_one(); + return; + } + } + } + + if (messageReceiver != NULL) + messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); +} + + + void ClientConnection::worker() { active_ = true; @@ -65,3 +190,4 @@ void ClientConnection::worker() + diff --git a/client/clientConnection.h b/client/clientConnection.h index 7c2b8849..65ff2039 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -1,30 +1,96 @@ #ifndef CLIENT_CONNECTION_H #define CLIENT_CONNECTION_H -#include "common/socketConnection.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/message.h" using boost::asio::ip::tcp; +class ClientConnection; -class ClientConnection : public SocketConnection + +struct PendingRequest +{ + PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; + + uint16_t id; + std::shared_ptr response; + std::condition_variable cv; +}; + + +class MessageReceiver +{ +public: + virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; +}; + + +class ClientConnection { public: ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port); + virtual ~ClientConnection(); virtual void start(); + virtual void stop(); + virtual bool send(BaseMessage* _message); + virtual std::shared_ptr sendRequest(BaseMessage* message, size_t timeout); + + template + std::shared_ptr sendReq(BaseMessage* message, size_t timeout) + { + std::shared_ptr reply = sendRequest(message, timeout); + if (!reply) + return NULL; + std::shared_ptr msg(new T); + msg->deserialize(reply->message, reply->buffer); + return msg; + } + + virtual bool active() + { + return active_; + } + + virtual bool connected() + { + return (socket != 0); +// return (connected_ && socket); + } protected: virtual void worker(); -private: + void socketRead(void* _to, size_t _bytes); + std::shared_ptr socket; + +// boost::asio::ip::tcp::endpoint endpt; + std::atomic active_; + std::atomic connected_; + MessageReceiver* messageReceiver; + void getNextMessage(); + boost::asio::io_service io_service; + tcp::resolver::iterator iterator; + std::thread* receiverThread; + mutable std::mutex mutex_; + std::mutex m; + std::set> pendingRequests; + uint16_t reqId; std::string ip; size_t port; }; - #endif diff --git a/client/controller.cpp b/client/controller.cpp index 9e26885a..d6f06d08 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -19,7 +19,7 @@ Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL) } -void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) +void Controller::onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) { if (baseMessage.type == message_type::payload) { diff --git a/client/controller.h b/client/controller.h index db717ad7..49b96df2 100644 --- a/client/controller.h +++ b/client/controller.h @@ -15,7 +15,7 @@ public: Controller(); void start(const std::string& _ip, size_t _port, int _bufferMs); void stop(); - virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); + virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer); private: void worker(); diff --git a/common/socketConnection.cpp b/common/socketConnection.cpp deleted file mode 100644 index b3d69ada..00000000 --- a/common/socketConnection.cpp +++ /dev/null @@ -1,193 +0,0 @@ -#include "socketConnection.h" -#include -#include -#include -#include "log.h" - - - -using namespace std; - - -ClientConnection::ClientConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0) -{ -} - - -ClientConnection::~ClientConnection() -{ -} - - - -void ClientConnection::socketRead(void* _to, size_t _bytes) -{ -// std::unique_lock mlock(mutex_); - size_t toRead = _bytes; - size_t len = 0; - do - { -// cout << "/"; -// cout.flush(); - boost::system::error_code error; - len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error); -//cout << "len: " << len << ", error: " << error << endl; - toRead = _bytes - len; -// cout << "\\"; -// cout.flush(); - } - while (toRead > 0); -} - - -void ClientConnection::start() -{ - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); - iterator = resolver.resolve(query); - receiverThread = new thread(&ClientConnection::worker, this); -} - - -void ClientConnection::stop() -{ - active_ = false; - socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); - socket->close(); - receiverThread->join(); -} - - -bool ClientConnection::send(BaseMessage* message) -{ -// std::unique_lock mlock(mutex_); -//cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; - if (!connected()) - return false; -//cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - tv t; - message->sent = t; - message->serialize(stream); - boost::asio::write(*socket.get(), streambuf); - return true; -} - - -shared_ptr ClientConnection::sendRequest(BaseMessage* message, size_t timeout) -{ - shared_ptr response(NULL); - if (++reqId == 0) - ++reqId; - message->id = reqId; - shared_ptr pendingRequest(new PendingRequest(reqId)); - - { - std::unique_lock mlock(mutex_); - pendingRequests.insert(pendingRequest); - } -// std::mutex mtx; - std::unique_lock lck(m); - send(message); - if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) - { - response = pendingRequest->response; - } - else - { - cout << "timeout while waiting for response to: " << reqId << "\n"; - } - { - std::unique_lock mlock(mutex_); - pendingRequests.erase(pendingRequest); - } - return response; -} - - -void ClientConnection::getNextMessage() -{ -//cout << "getNextMessage\n"; - BaseMessage baseMessage; - size_t baseMsgSize = baseMessage.getSize(); - vector buffer(baseMsgSize); - socketRead(&buffer[0], baseMsgSize); - baseMessage.deserialize(&buffer[0]); -//cout << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n"; - if (baseMessage.size > buffer.size()) - buffer.resize(baseMessage.size); - socketRead(&buffer[0], baseMessage.size); - tv t; - baseMessage.received = t; - - { - std::unique_lock mlock(mutex_); - for (auto req: pendingRequests) - { - if (req->id == baseMessage.refersTo) - { -//cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; -//long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec); -//cout << "latency: " << latency << "\n"; - req->response.reset(new SerializedMessage()); - req->response->message = baseMessage; - req->response->buffer = (char*)malloc(baseMessage.size); - memcpy(req->response->buffer, &buffer[0], baseMessage.size); - std::unique_lock lck(m); - req->cv.notify_one(); - return; - } - } - } - - if (messageReceiver != NULL) - messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); -} - - - -void ClientConnection::worker() -{ - active_ = true; - while (active_) - { - connected_ = false; - try - { - { -// std::unique_lock mlock(mutex_); - cout << "connecting\n"; - socket.reset(new tcp::socket(io_service)); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - cout << "socket: " << socket->native() << "\n"; - setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - socket->connect(*iterator); - connected_ = true; - cout << "connected\n"; - std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; - } - while(active_) - { -// cout << "."; -// cout.flush(); - getNextMessage(); -// cout << "|"; -// cout.flush(); - } - } - catch (const std::exception& e) - { - connected_ = false; - cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; - usleep(1000*1000); - } - } -} - - - - diff --git a/common/socketConnection.h b/common/socketConnection.h deleted file mode 100644 index 0b546c64..00000000 --- a/common/socketConnection.h +++ /dev/null @@ -1,98 +0,0 @@ -#ifndef SOCKET_CONNECTION_H -#define SOCKET_CONNECTION_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include "message.h" - - -using boost::asio::ip::tcp; - - -class ClientConnection; - - -struct PendingRequest -{ - PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; - - uint16_t id; - std::shared_ptr response; - std::condition_variable cv; -}; - - -class MessageReceiver -{ -public: - virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; -}; - - -class ClientConnection -{ -public: - ClientConnection(MessageReceiver* _receiver); - virtual ~ClientConnection(); - virtual void start(); - virtual void stop(); - virtual bool send(BaseMessage* _message); - virtual std::shared_ptr sendRequest(BaseMessage* message, size_t timeout); - - template - std::shared_ptr sendReq(BaseMessage* message, size_t timeout) - { - std::shared_ptr reply = sendRequest(message, timeout); - if (!reply) - return NULL; - std::shared_ptr msg(new T); - msg->deserialize(reply->message, reply->buffer); - return msg; - } - - virtual bool active() - { - return active_; - } - - virtual bool connected() - { - return (socket != 0); -// return (connected_ && socket); - } - -protected: - virtual void worker(); - - void socketRead(void* _to, size_t _bytes); - std::shared_ptr socket; - -// boost::asio::ip::tcp::endpoint endpt; - std::atomic active_; - std::atomic connected_; - MessageReceiver* messageReceiver; - void getNextMessage(); - boost::asio::io_service io_service; - tcp::resolver::iterator iterator; - std::thread* receiverThread; - mutable std::mutex mutex_; - std::mutex m; - std::set> pendingRequests; - uint16_t reqId; - std::string ip; - size_t port; -}; - - - -#endif - - - -