diff --git a/client/controller.cpp b/client/controller.cpp index 07caa4eb..0f839c83 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -8,6 +8,7 @@ #include "player.h" #include "common/serverSettings.h" #include "common/timeMsg.h" +#include "common/requestMsg.h" using namespace std; @@ -36,7 +37,7 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa delete pcmChunk; } } - else if (baseMessage.type == message_type::header) +/* else if (baseMessage.type == message_type::header) { if (decoder != NULL) { @@ -58,6 +59,7 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa cout << "ServerSettings port: " << serverSettings->port << "\n"; streamClient = new StreamClient(this, ip, serverSettings->port); } +*/ } @@ -84,14 +86,27 @@ void Controller::worker() // Decoder* decoder; active_ = true; - while ((sampleFormat == NULL) && (streamClient == NULL)) - { - usleep(10000); - } - - streamClient->start(); + RequestMsg requestMsg("serverSettings"); + shared_ptr serverSettings(NULL); + while (!(serverSettings = controlConnection->sendReq(&requestMsg, 2000))); + cout << "ServerSettings port: " << serverSettings->port << "\n"; + streamClient = new StreamClient(this, ip, serverSettings->port); - stream = new Stream(SampleFormat(*sampleFormat)); + requestMsg.request = "sampleFormat"; + while (!(sampleFormat = controlConnection->sendReq(&requestMsg, 2000))); + cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; + + if (decoder != NULL) + { + requestMsg.request = "headerChunk"; + shared_ptr headerChunk(NULL); + while (!(headerChunk = controlConnection->sendReq(&requestMsg, 2000))); + decoder->setHeader(headerChunk.get()); + } + + + streamClient->start(); + stream = new Stream(*sampleFormat); stream->setBufferLen(bufferMs); Player player(stream); @@ -101,23 +116,17 @@ void Controller::worker() while (active_) { usleep(1000000);//1000000); - TimeMsg timeMsg; - try { - shared_ptr reply = controlConnection->sendRequest(&timeMsg, 2000); + RequestMsg requestMsg("time"); + shared_ptr reply = controlConnection->sendReq(&requestMsg, 2000); if (reply) { - if (reply->response->type == message_type::timemsg) - { - //cout << "Reply: " << reply->response->type << ", size: " << reply->response->size << ", sent: " << reply->response->sent.sec << "," << reply->response->sent.usec << ", recv: " << reply->response->received.sec << "," << reply->response->received.usec << "\n"; - TimeMsg timeMsg; - timeMsg.deserialize(*reply->response, reply->buffer); - double latency = (timeMsg.received.sec - timeMsg.sent.sec) + (timeMsg.received.usec - timeMsg.sent.usec) / 1000000.; - cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl; - timeBuffer.add((timeMsg.latency - latency) / 2); - cout << timeBuffer.median() << "\n"; - } +//cout << "Reply: " << reply->message.type << ", size: " << reply->message.size << ", sent: " << reply->message.sent.sec << "," << reply->message.sent.usec << ", recv: " << reply->message.received.sec << "," << reply->message.received.usec << "\n"; + double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; +// cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl; + timeBuffer.add((reply->latency - latency) * 10000 / 2); + cout << timeBuffer.median() << "\n"; } } catch (const std::exception& e) diff --git a/client/controller.h b/client/controller.h index 91b47034..de66ec03 100644 --- a/client/controller.h +++ b/client/controller.h @@ -24,11 +24,11 @@ private: std::thread* controllerThread; StreamClient* streamClient; ClientConnection* controlConnection; - SampleFormat* sampleFormat; Decoder* decoder; Stream* stream; int bufferMs; std::string ip; + std::shared_ptr sampleFormat; }; diff --git a/common/message.h b/common/message.h index 129ab68e..ce4d0f9a 100644 --- a/common/message.h +++ b/common/message.h @@ -35,7 +35,8 @@ enum message_type payload = 2, sampleformat = 3, serversettings = 4, - timemsg = 5 + timemsg = 5, + requestmsg = 6 }; @@ -88,7 +89,6 @@ struct tv }; - struct BaseMessage { BaseMessage() : type(base), id(0), refersTo(0) @@ -168,6 +168,18 @@ protected: }; +struct SerializedMessage +{ + ~SerializedMessage() + { + free(buffer); + } + + BaseMessage message; + char* buffer; +}; + + #endif diff --git a/common/requestMsg.h b/common/requestMsg.h new file mode 100644 index 00000000..e6ccd0b4 --- /dev/null +++ b/common/requestMsg.h @@ -0,0 +1,52 @@ +#ifndef REQUEST_MSG_H +#define REQUEST_MSG_H + +#include "message.h" +#include + + +class RequestMsg : public BaseMessage +{ +public: + RequestMsg() : BaseMessage(message_type::requestmsg) + { + } + + RequestMsg(const std::string& _request) : BaseMessage(message_type::requestmsg), request(_request) + { + } + + virtual ~RequestMsg() + { + } + + virtual void read(std::istream& stream) + { + int16_t size; + stream.read(reinterpret_cast(&size), sizeof(int16_t)); + request.resize(size); + stream.read(&request[0], size); + } + + virtual uint32_t getSize() + { + return sizeof(int16_t) + request.size(); + } + + std::string request; + +protected: + virtual void doserialize(std::ostream& stream) + { + int16_t size(request.size()); + stream.write(reinterpret_cast(&size), sizeof(int16_t)); + stream.write(request.c_str(), size); + } +}; + + + + +#endif + + diff --git a/common/socketConnection.cpp b/common/socketConnection.cpp index 9a7a4baf..845b19d5 100644 --- a/common/socketConnection.cpp +++ b/common/socketConnection.cpp @@ -49,7 +49,7 @@ void SocketConnection::stop() bool SocketConnection::send(BaseMessage* message) { - std::unique_lock mlock(mutex_); +// std::unique_lock mlock(mutex_); //cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; if (!connected()) return false; @@ -64,26 +64,33 @@ bool SocketConnection::send(BaseMessage* message) } -shared_ptr SocketConnection::sendRequest(BaseMessage* message, size_t timeout) +shared_ptr SocketConnection::sendRequest(BaseMessage* message, size_t timeout) { - shared_ptr response(NULL); + shared_ptr response(NULL); if (++reqId == 0) ++reqId; message->id = reqId; shared_ptr pendingRequest(new PendingRequest(reqId)); - pendingRequests.insert(pendingRequest); - std::mutex mtx; - std::unique_lock lck(mtx); + + { + std::unique_lock mlock(mutex_); + pendingRequests.insert(pendingRequest); + } +// std::mutex mtx; + std::unique_lock lck(m); send(message); if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) { - response = pendingRequest; + response = pendingRequest->response; } else { cout << "timeout while waiting for response to: " << reqId << "\n"; } - pendingRequests.erase(pendingRequest); + { + std::unique_lock mlock(mutex_); + pendingRequests.erase(pendingRequest); + } return response; } @@ -103,18 +110,23 @@ void SocketConnection::getNextMessage() tv t; baseMessage.received = t; - for (auto req: pendingRequests) { - if (req->id == baseMessage.refersTo) + std::unique_lock mlock(mutex_); + for (auto req: pendingRequests) { + if (req->id == baseMessage.refersTo) + { //cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; //long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec); //cout << "latency: " << latency << "\n"; - req->response = new BaseMessage(baseMessage); - req->buffer = (char*)malloc(baseMessage.size); - memcpy(req->buffer, &buffer[0], baseMessage.size); - req->cv.notify_one(); - return; + req->response.reset(new SerializedMessage()); + req->response->message = baseMessage; + req->response->buffer = (char*)malloc(baseMessage.size); + memcpy(req->response->buffer, &buffer[0], baseMessage.size); + std::unique_lock lck(m); + req->cv.notify_one(); + return; + } } } diff --git a/common/socketConnection.h b/common/socketConnection.h index 7f76edec..bc1afdc7 100644 --- a/common/socketConnection.h +++ b/common/socketConnection.h @@ -20,18 +20,10 @@ class SocketConnection; struct PendingRequest { - PendingRequest(uint16_t reqId) : id(reqId), response(NULL), buffer(NULL) {}; - ~PendingRequest() - { - if (response != NULL); - delete response; - if (buffer != NULL) - free(buffer); - } + PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {}; uint16_t id; - BaseMessage* response; - char* buffer; + std::shared_ptr response; std::condition_variable cv; }; @@ -51,7 +43,18 @@ public: virtual void start(); virtual void stop(); virtual bool send(BaseMessage* _message); - virtual std::shared_ptr sendRequest(BaseMessage* message, size_t timeout); + virtual std::shared_ptr sendRequest(BaseMessage* message, size_t timeout); + + template + std::shared_ptr sendReq(BaseMessage* message, size_t timeout) + { + std::shared_ptr reply = sendRequest(message, timeout); + if (!reply) + return NULL; + std::shared_ptr msg(new T); + msg->deserialize(reply->message, reply->buffer); + return msg; + } virtual bool active() { @@ -79,7 +82,8 @@ protected: tcp::resolver::iterator iterator; std::thread* receiverThread; mutable std::mutex mutex_; - std::set> pendingRequests; + std::mutex m; + std::set> pendingRequests; uint16_t reqId; }; diff --git a/server/controlServer.cpp b/server/controlServer.cpp index e9483368..c54a0dc9 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -1,5 +1,6 @@ #include "controlServer.h" #include "common/timeMsg.h" +#include "common/requestMsg.h" #include @@ -11,16 +12,36 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL void ControlServer::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) { // cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; - if (baseMessage.type == message_type::timemsg) + if (baseMessage.type == message_type::requestmsg) { - TimeMsg timeMsg; - timeMsg.deserialize(baseMessage, buffer); + RequestMsg requestMsg; + requestMsg.deserialize(baseMessage, buffer); + cout << "request: " << requestMsg.request << "\n"; + if (requestMsg.request == "time") + { // timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec); - timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) + (timeMsg.received.usec - timeMsg.sent.usec) / 1000000.; + TimeMsg timeMsg; + timeMsg.refersTo = requestMsg.id; + timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.; // tv diff = timeMsg.received - timeMsg.sent; // cout << "Latency: " << diff.sec << "." << diff.usec << "\n"; - timeMsg.refersTo = timeMsg.id; - connection->send(&timeMsg); + connection->send(&timeMsg); + } + else if (requestMsg.request == "serverSettings") + { + serverSettings->refersTo = requestMsg.id; + connection->send(serverSettings); + } + else if (requestMsg.request == "sampleFormat") + { + sampleFormat->refersTo = requestMsg.id; + connection->send(sampleFormat); + } + else if (requestMsg.request == "headerChunk") + { + headerChunk->refersTo = requestMsg.id; + connection->send(headerChunk); + } } } @@ -36,9 +57,9 @@ void ControlServer::acceptor() ServerConnection* session = new ServerConnection(this, sock); sessions.insert(shared_ptr(session)); session->start(); - session->send(serverSettings); - session->send(sampleFormat); - session->send(headerChunk); +// session->send(serverSettings); +// session->send(sampleFormat); +// session->send(headerChunk); } }