From d8a6e6369180528bfad10c22ad589d1c41779ad7 Mon Sep 17 00:00:00 2001 From: badaix Date: Wed, 11 Dec 2019 22:47:11 +0100 Subject: [PATCH] Use promise/future for sync messages --- client/client_connection.cpp | 49 ++++++++++++++++++------------------ client/client_connection.hpp | 48 ++++++++++++++++++++++++++++------- 2 files changed, 64 insertions(+), 33 deletions(-) diff --git a/client/client_connection.cpp b/client/client_connection.cpp index 53de10f2..5c4f9ec8 100644 --- a/client/client_connection.cpp +++ b/client/client_connection.cpp @@ -130,21 +130,23 @@ bool ClientConnection::send(const msg::BaseMessage* message) } -shared_ptr ClientConnection::sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout) +unique_ptr ClientConnection::sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout) { - shared_ptr response(nullptr); + unique_ptr response(nullptr); if (++reqId_ >= 10000) reqId_ = 1; message->id = reqId_; // LOG(INFO) << "Req: " << message->id << "\n"; - shared_ptr pendingRequest(new PendingRequest(reqId_)); + shared_ptr pendingRequest = make_shared(reqId_); - std::unique_lock lock(pendingRequestsMutex_); - pendingRequests_.insert(pendingRequest); - send(message); - if (pendingRequest->cv.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) + { // scope for lock + std::unique_lock lock(pendingRequestsMutex_); + pendingRequests_.insert(pendingRequest); + send(message); + } + + if ((response = pendingRequest->waitForResponse(std::chrono::milliseconds(timeout))) != nullptr) { - response = pendingRequest->response; sumTimeout_ = chronos::msec(0); // LOG(INFO) << "Resp: " << pendingRequest->id << "\n"; } @@ -155,7 +157,11 @@ shared_ptr ClientConnection::sendRequest(const msg::Base if (sumTimeout_ > chronos::sec(10)) throw SnapException("sum timeout exceeded 10s"); } - pendingRequests_.erase(pendingRequest); + + { // scope for lock + std::unique_lock lock(pendingRequestsMutex_); + pendingRequests_.erase(pendingRequest); + } return response; } @@ -174,27 +180,22 @@ void ClientConnection::getNextMessage() // { // std::lock_guard socketLock(socketMutex_); socketRead(&buffer[0], baseMessage.size); - // } tv t; baseMessage.received = t; + // } - { + { // scope for lock std::unique_lock lock(pendingRequestsMutex_); - // LOG(DEBUG) << "got lock - getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", - // refers: " << baseMessage.refersTo << "\n"; + for (auto req : pendingRequests_) { - for (auto req : pendingRequests_) + if (req->id() == baseMessage.refersTo) { - if (req->id == baseMessage.refersTo) - { - req->response.reset(new msg::SerializedMessage()); - req->response->message = baseMessage; - req->response->buffer = (char*)malloc(baseMessage.size); - memcpy(req->response->buffer, &buffer[0], baseMessage.size); - lock.unlock(); - req->cv.notify_one(); - return; - } + auto response = make_unique(); + response->message = baseMessage; + response->buffer = (char*)malloc(baseMessage.size); + memcpy(response->buffer, &buffer[0], baseMessage.size); + req->setValue(std::move(response)); + return; } } } diff --git a/client/client_connection.hpp b/client/client_connection.hpp index 62a30534..7f15e5ad 100644 --- a/client/client_connection.hpp +++ b/client/client_connection.hpp @@ -38,13 +38,43 @@ class ClientConnection; /// Used to synchronize server requests (wait for server response) -struct PendingRequest +class PendingRequest { - PendingRequest(uint16_t reqId) : id(reqId), response(nullptr){}; +public: + PendingRequest(uint16_t reqId) : id_(reqId) + { + future_ = promise_.get_future(); + }; - uint16_t id; - std::shared_ptr response; - std::condition_variable cv; + template + std::unique_ptr waitForResponse(const std::chrono::duration& timeout) + { + try + { + if (future_.wait_for(timeout) == std::future_status::ready) + return future_.get(); + } + catch (...) + { + } + return nullptr; + } + + void setValue(std::unique_ptr value) + { + promise_.set_value(std::move(value)); + } + + uint16_t id() const + { + return id_; + } + +private: + uint16_t id_; + + std::promise> promise_; + std::future> future_; }; @@ -79,16 +109,16 @@ public: virtual bool send(const msg::BaseMessage* message); /// Send request to the server and wait for answer - virtual std::shared_ptr sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); + virtual std::unique_ptr sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); /// Send request to the server and wait for answer of type T template - std::shared_ptr sendReq(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) + std::unique_ptr sendReq(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) { - std::shared_ptr reply = sendRequest(message, timeout); + std::unique_ptr reply = sendRequest(message, timeout); if (!reply) return nullptr; - std::shared_ptr msg(new T); + std::unique_ptr msg(new T); msg->deserialize(reply->message, reply->buffer); return msg; }