#ifndef CLIENT_CONNECTION_H #define CLIENT_CONNECTION_H #include #include #include #include #include #include #include #include #include "message/message.h" #include "common/timeDefs.h" using boost::asio::ip::tcp; class ClientConnection; struct PendingRequest { PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; uint16_t id; std::shared_ptr response; std::condition_variable cv; }; class MessageReceiver { public: virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; virtual void onException(ClientConnection* connection, const std::exception& exception) = 0; }; class ClientConnection { public: ClientConnection(MessageReceiver* receiver, const std::string& ip, size_t port); virtual ~ClientConnection(); virtual void start(); virtual void stop(); virtual bool send(msg::BaseMessage* message); virtual std::shared_ptr sendRequest(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); template std::shared_ptr sendReq(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) { std::shared_ptr reply = sendRequest(message, timeout); if (!reply) return NULL; std::shared_ptr msg(new T); msg->deserialize(reply->message, reply->buffer); return msg; } virtual bool active() { return active_; } virtual bool connected() { return (socket_ != 0); // return (connected_ && socket); } protected: virtual void reader(); void socketRead(void* to, size_t bytes); void getNextMessage(); boost::asio::io_service io_service_; std::shared_ptr socket_; std::atomic active_; std::atomic connected_; MessageReceiver* messageReceiver_; mutable std::mutex mutex_; std::set> pendingRequests_; uint16_t reqId_; std::string ip_; size_t port_; std::thread* readerThread_; chronos::msec sumTimeout_; }; #endif