From 3369363453428bc6cfea0af9e69fd0e1fdc68b9f Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Sat, 13 Sep 2014 18:45:54 +0000 Subject: [PATCH] sync send git-svn-id: svn://elaine/murooma/trunk@256 d8a302eb-03bc-478d-80e4-98257eca68ef --- client/controller.cpp | 2 +- common/message.h | 56 ++++++++++++++++++++++++++++++++----- common/pcmChunk.h | 4 +-- common/socketConnection.cpp | 37 ++++++++++++++++++++++-- common/socketConnection.h | 17 ++++++++++- common/wireChunk.h | 12 ++++---- server/oggEncoder.cpp | 4 +-- server/snapServer.cpp | 4 +-- 8 files changed, 111 insertions(+), 25 deletions(-) diff --git a/client/controller.cpp b/client/controller.cpp index 132e71e3..101cc80d 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -18,7 +18,7 @@ Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL) void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) { -//cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; +//cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; if (baseMessage.type == message_type::payload) { if ((stream != NULL) && (decoder != NULL)) diff --git a/common/message.h b/common/message.h index ac6d13e3..24323fe5 100644 --- a/common/message.h +++ b/common/message.h @@ -5,13 +5,15 @@ #include #include #include - +#include template > -class vectorwrapbuf : public std::basic_streambuf { +class vectorwrapbuf : public std::basic_streambuf +{ public: - vectorwrapbuf(std::vector &vec) { + vectorwrapbuf(std::vector &vec) + { this->setg(vec.data(), vec.data(), vec.data() + vec.size()); } }; @@ -19,7 +21,8 @@ public: struct membuf : public std::basic_streambuf { - membuf(char* begin, char* end) { + membuf(char* begin, char* end) + { this->setg(begin, begin, end); } }; @@ -36,13 +39,31 @@ enum message_type +struct tv +{ + tv() + { + timeval t; + gettimeofday(&t, NULL); + sec = t.tv_sec; + usec = t.tv_usec; + } + tv(timeval tv) : sec(tv.tv_sec), usec(tv.tv_usec) {}; + tv(int32_t _sec, int32_t _usec) : sec(_sec), usec(_usec) {}; + + int32_t sec; + int32_t usec; +}; + + + struct BaseMessage { - BaseMessage() : type(base) + BaseMessage() : type(base), id(0), refersTo(0) { } - BaseMessage(message_type type_) : type(type_) + BaseMessage(message_type type_) : type(type_), id(0), refersTo(0) { } @@ -53,6 +74,12 @@ struct BaseMessage virtual void read(std::istream& stream) { stream.read(reinterpret_cast(&type), sizeof(uint16_t)); + stream.read(reinterpret_cast(&id), sizeof(uint16_t)); + stream.read(reinterpret_cast(&refersTo), sizeof(uint16_t)); + stream.read(reinterpret_cast(&sent.sec), sizeof(int32_t)); + stream.read(reinterpret_cast(&sent.usec), sizeof(int32_t)); + stream.read(reinterpret_cast(&received.sec), sizeof(int32_t)); + stream.read(reinterpret_cast(&received.usec), sizeof(int32_t)); stream.read(reinterpret_cast(&size), sizeof(uint32_t)); } @@ -66,6 +93,10 @@ struct BaseMessage void deserialize(const BaseMessage& baseMessage, char* payload) { type = baseMessage.type; + id = baseMessage.id; + refersTo = baseMessage.refersTo; + sent = baseMessage.sent; + received = baseMessage.received; size = baseMessage.size; membuf databuf(payload, payload + size); std::istream is(&databuf); @@ -75,6 +106,12 @@ struct BaseMessage virtual void serialize(std::ostream& stream) { stream.write(reinterpret_cast(&type), sizeof(uint16_t)); + stream.write(reinterpret_cast(&id), sizeof(uint16_t)); + stream.write(reinterpret_cast(&refersTo), sizeof(uint16_t)); + stream.write(reinterpret_cast(&sent.sec), sizeof(int32_t)); + stream.write(reinterpret_cast(&sent.usec), sizeof(int32_t)); + stream.write(reinterpret_cast(&received.sec), sizeof(int32_t)); + stream.write(reinterpret_cast(&received.usec), sizeof(int32_t)); size = getSize(); stream.write(reinterpret_cast(&size), sizeof(uint32_t)); doserialize(stream); @@ -82,11 +119,16 @@ struct BaseMessage virtual uint32_t getSize() { - return sizeof(uint16_t) + sizeof(uint32_t); + return 3*sizeof(uint16_t) + 2*sizeof(tv) + sizeof(uint32_t); }; uint16_t type; + uint16_t id; + uint16_t refersTo; + tv sent; + tv received; uint32_t size; + protected: virtual void doserialize(std::ostream& stream) { diff --git a/common/pcmChunk.h b/common/pcmChunk.h index c451942c..a2a768a0 100644 --- a/common/pcmChunk.h +++ b/common/pcmChunk.h @@ -26,8 +26,8 @@ public: std::chrono::milliseconds::rep relativeIdxTp = ((double)idx / ((double)format.rate/1000.)); return tp + - std::chrono::seconds(tv_sec) + - std::chrono::milliseconds(tv_usec / 1000) + + std::chrono::seconds(timestamp.sec) + + std::chrono::milliseconds(timestamp.usec / 1000) + std::chrono::milliseconds(relativeIdxTp); } diff --git a/common/socketConnection.cpp b/common/socketConnection.cpp index e96060f0..0fb4d55e 100644 --- a/common/socketConnection.cpp +++ b/common/socketConnection.cpp @@ -10,7 +10,7 @@ using namespace std; -SocketConnection::SocketConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver) +SocketConnection::SocketConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0) { } @@ -47,17 +47,37 @@ void SocketConnection::stop() } -void SocketConnection::send(BaseMessage* message) +bool SocketConnection::send(BaseMessage* message) { std::unique_lock mlock(mutex_); //cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; if (!connected()) - return; + return false; //cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; boost::asio::streambuf streambuf; std::ostream stream(&streambuf); + tv t; + message->sent = t; message->serialize(stream); boost::asio::write(*socket.get(), streambuf); + return true; +} + + +BaseMessage* SocketConnection::sendRequest(BaseMessage* message, size_t timeout) +{ + BaseMessage* response(NULL); + if (++reqId == 0) + ++reqId; + shared_ptr pendingRequest(new PendingRequest(reqId)); + pendingRequests.insert(pendingRequest); + std::mutex mtx; + std::unique_lock lck(mtx); + send(message); + if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) + response = pendingRequest->response; + pendingRequests.erase(pendingRequest); + return response; } @@ -73,7 +93,18 @@ void SocketConnection::getNextMessage() if (baseMessage.size > buffer.size()) buffer.resize(baseMessage.size); socketRead(&buffer[0], baseMessage.size); + tv t; + baseMessage.received = t; + for (auto req: pendingRequests) + { + if (req->id == baseMessage.refersTo) + { + req->cv.notify_one(); + return; + } + } + if (messageReceiver != NULL) messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); } diff --git a/common/socketConnection.h b/common/socketConnection.h index 5f513476..548368ba 100644 --- a/common/socketConnection.h +++ b/common/socketConnection.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "message.h" @@ -15,6 +17,16 @@ using boost::asio::ip::tcp; class SocketConnection; +struct PendingRequest +{ + PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; + + uint16_t id; + BaseMessage* response; + std::condition_variable cv; +}; + + class MessageReceiver { public: @@ -29,7 +41,8 @@ public: virtual ~SocketConnection(); virtual void start(); virtual void stop(); - virtual void send(BaseMessage* _message); + virtual bool send(BaseMessage* _message); + virtual BaseMessage* sendRequest(BaseMessage* message, size_t timeout); virtual bool active() { @@ -57,6 +70,8 @@ protected: tcp::resolver::iterator iterator; std::thread* receiverThread; mutable std::mutex mutex_; + std::set> pendingRequests; + uint16_t reqId; }; diff --git a/common/wireChunk.h b/common/wireChunk.h index 77de1f5e..3dbc3bfe 100644 --- a/common/wireChunk.h +++ b/common/wireChunk.h @@ -10,7 +10,6 @@ #include "message.h" - class WireChunk : public BaseMessage { public: @@ -26,8 +25,8 @@ public: virtual void read(std::istream& stream) { - stream.read(reinterpret_cast(&tv_sec), sizeof(int32_t)); - stream.read(reinterpret_cast(&tv_usec), sizeof(int32_t)); + stream.read(reinterpret_cast(×tamp.sec), sizeof(int32_t)); + stream.read(reinterpret_cast(×tamp.usec), sizeof(int32_t)); stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); payload = (char*)realloc(payload, payloadSize); stream.read(payload, payloadSize); @@ -38,16 +37,15 @@ public: return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize; } - int32_t tv_sec; - int32_t tv_usec; + tv timestamp; uint32_t payloadSize; char* payload; protected: virtual void doserialize(std::ostream& stream) { - stream.write(reinterpret_cast(&tv_sec), sizeof(int32_t)); - stream.write(reinterpret_cast(&tv_usec), sizeof(int32_t)); + stream.write(reinterpret_cast(×tamp.sec), sizeof(int32_t)); + stream.write(reinterpret_cast(×tamp.usec), sizeof(int32_t)); stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); stream.write(payload, payloadSize); } diff --git a/server/oggEncoder.cpp b/server/oggEncoder.cpp index 3c687c80..7b1d63d8 100644 --- a/server/oggEncoder.cpp +++ b/server/oggEncoder.cpp @@ -23,8 +23,8 @@ double OggEncoder::encode(PcmChunk* chunk) double res = 0; if (tv_sec == 0) { - tv_sec = chunk->tv_sec; - tv_usec = chunk->tv_usec; + tv_sec = chunk->timestamp.sec; + tv_usec = chunk->timestamp.usec; } //cout << "-> pcm: " << wireChunk->length << endl; int bytes = chunk->payloadSize / 4; diff --git a/server/snapServer.cpp b/server/snapServer.cpp index c247b0af..6acca013 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -112,8 +112,8 @@ size_t duration = 50; } while (len < toRead); - chunk->tv_sec = tvChunk.tv_sec; - chunk->tv_usec = tvChunk.tv_usec; + chunk->timestamp.sec = tvChunk.tv_sec; + chunk->timestamp.usec = tvChunk.tv_usec; double chunkDuration = encoder->encode(chunk.get()); if (chunkDuration > 0) server->send(chunk);