requestMsg

git-svn-id: svn://elaine/murooma/trunk@267 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-15 21:25:52 +00:00
parent 444ec9b4ee
commit 19e77f0f20
7 changed files with 170 additions and 60 deletions

View file

@ -8,6 +8,7 @@
#include "player.h"
#include "common/serverSettings.h"
#include "common/timeMsg.h"
#include "common/requestMsg.h"
using namespace std;
@ -36,7 +37,7 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa
delete pcmChunk;
}
}
else if (baseMessage.type == message_type::header)
/* else if (baseMessage.type == message_type::header)
{
if (decoder != NULL)
{
@ -58,6 +59,7 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
}
*/
}
@ -84,14 +86,27 @@ void Controller::worker()
// Decoder* decoder;
active_ = true;
while ((sampleFormat == NULL) && (streamClient == NULL))
{
usleep(10000);
}
streamClient->start();
RequestMsg requestMsg("serverSettings");
shared_ptr<ServerSettings> serverSettings(NULL);
while (!(serverSettings = controlConnection->sendReq<ServerSettings>(&requestMsg, 2000)));
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
stream = new Stream(SampleFormat(*sampleFormat));
requestMsg.request = "sampleFormat";
while (!(sampleFormat = controlConnection->sendReq<SampleFormat>(&requestMsg, 2000)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
if (decoder != NULL)
{
requestMsg.request = "headerChunk";
shared_ptr<HeaderMessage> headerChunk(NULL);
while (!(headerChunk = controlConnection->sendReq<HeaderMessage>(&requestMsg, 2000)));
decoder->setHeader(headerChunk.get());
}
streamClient->start();
stream = new Stream(*sampleFormat);
stream->setBufferLen(bufferMs);
Player player(stream);
@ -101,23 +116,17 @@ void Controller::worker()
while (active_)
{
usleep(1000000);//1000000);
TimeMsg timeMsg;
try
{
shared_ptr<PendingRequest> reply = controlConnection->sendRequest(&timeMsg, 2000);
RequestMsg requestMsg("time");
shared_ptr<TimeMsg> reply = controlConnection->sendReq<TimeMsg>(&requestMsg, 2000);
if (reply)
{
if (reply->response->type == message_type::timemsg)
{
//cout << "Reply: " << reply->response->type << ", size: " << reply->response->size << ", sent: " << reply->response->sent.sec << "," << reply->response->sent.usec << ", recv: " << reply->response->received.sec << "," << reply->response->received.usec << "\n";
TimeMsg timeMsg;
timeMsg.deserialize(*reply->response, reply->buffer);
double latency = (timeMsg.received.sec - timeMsg.sent.sec) + (timeMsg.received.usec - timeMsg.sent.usec) / 1000000.;
cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl;
timeBuffer.add((timeMsg.latency - latency) / 2);
cout << timeBuffer.median() << "\n";
}
//cout << "Reply: " << reply->message.type << ", size: " << reply->message.size << ", sent: " << reply->message.sent.sec << "," << reply->message.sent.usec << ", recv: " << reply->message.received.sec << "," << reply->message.received.usec << "\n";
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
// cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl;
timeBuffer.add((reply->latency - latency) * 10000 / 2);
cout << timeBuffer.median() << "\n";
}
}
catch (const std::exception& e)

View file

@ -24,11 +24,11 @@ private:
std::thread* controllerThread;
StreamClient* streamClient;
ClientConnection* controlConnection;
SampleFormat* sampleFormat;
Decoder* decoder;
Stream* stream;
int bufferMs;
std::string ip;
std::shared_ptr<SampleFormat> sampleFormat;
};

View file

@ -35,7 +35,8 @@ enum message_type
payload = 2,
sampleformat = 3,
serversettings = 4,
timemsg = 5
timemsg = 5,
requestmsg = 6
};
@ -88,7 +89,6 @@ struct tv
};
struct BaseMessage
{
BaseMessage() : type(base), id(0), refersTo(0)
@ -168,6 +168,18 @@ protected:
};
struct SerializedMessage
{
~SerializedMessage()
{
free(buffer);
}
BaseMessage message;
char* buffer;
};
#endif

52
common/requestMsg.h Normal file
View file

@ -0,0 +1,52 @@
#ifndef REQUEST_MSG_H
#define REQUEST_MSG_H
#include "message.h"
#include <string>
class RequestMsg : public BaseMessage
{
public:
RequestMsg() : BaseMessage(message_type::requestmsg)
{
}
RequestMsg(const std::string& _request) : BaseMessage(message_type::requestmsg), request(_request)
{
}
virtual ~RequestMsg()
{
}
virtual void read(std::istream& stream)
{
int16_t size;
stream.read(reinterpret_cast<char *>(&size), sizeof(int16_t));
request.resize(size);
stream.read(&request[0], size);
}
virtual uint32_t getSize()
{
return sizeof(int16_t) + request.size();
}
std::string request;
protected:
virtual void doserialize(std::ostream& stream)
{
int16_t size(request.size());
stream.write(reinterpret_cast<char *>(&size), sizeof(int16_t));
stream.write(request.c_str(), size);
}
};
#endif

View file

@ -49,7 +49,7 @@ void SocketConnection::stop()
bool SocketConnection::send(BaseMessage* message)
{
std::unique_lock<std::mutex> mlock(mutex_);
// std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
if (!connected())
return false;
@ -64,26 +64,33 @@ bool SocketConnection::send(BaseMessage* message)
}
shared_ptr<PendingRequest> SocketConnection::sendRequest(BaseMessage* message, size_t timeout)
shared_ptr<SerializedMessage> SocketConnection::sendRequest(BaseMessage* message, size_t timeout)
{
shared_ptr<PendingRequest> response(NULL);
shared_ptr<SerializedMessage> response(NULL);
if (++reqId == 0)
++reqId;
message->id = reqId;
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId));
pendingRequests.insert(pendingRequest);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.insert(pendingRequest);
}
// std::mutex mtx;
std::unique_lock<std::mutex> lck(m);
send(message);
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{
response = pendingRequest;
response = pendingRequest->response;
}
else
{
cout << "timeout while waiting for response to: " << reqId << "\n";
}
pendingRequests.erase(pendingRequest);
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.erase(pendingRequest);
}
return response;
}
@ -103,18 +110,23 @@ void SocketConnection::getNextMessage()
tv t;
baseMessage.received = t;
for (auto req: pendingRequests)
{
if (req->id == baseMessage.refersTo)
std::unique_lock<std::mutex> mlock(mutex_);
for (auto req: pendingRequests)
{
if (req->id == baseMessage.refersTo)
{
//cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
//long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec);
//cout << "latency: " << latency << "\n";
req->response = new BaseMessage(baseMessage);
req->buffer = (char*)malloc(baseMessage.size);
memcpy(req->buffer, &buffer[0], baseMessage.size);
req->cv.notify_one();
return;
req->response.reset(new SerializedMessage());
req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size);
std::unique_lock<std::mutex> lck(m);
req->cv.notify_one();
return;
}
}
}

View file

@ -20,18 +20,10 @@ class SocketConnection;
struct PendingRequest
{
PendingRequest(uint16_t reqId) : id(reqId), response(NULL), buffer(NULL) {};
~PendingRequest()
{
if (response != NULL);
delete response;
if (buffer != NULL)
free(buffer);
}
PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {};
uint16_t id;
BaseMessage* response;
char* buffer;
std::shared_ptr<SerializedMessage> response;
std::condition_variable cv;
};
@ -51,7 +43,18 @@ public:
virtual void start();
virtual void stop();
virtual bool send(BaseMessage* _message);
virtual std::shared_ptr<PendingRequest> sendRequest(BaseMessage* message, size_t timeout);
virtual std::shared_ptr<SerializedMessage> sendRequest(BaseMessage* message, size_t timeout);
template <typename T>
std::shared_ptr<T> sendReq(BaseMessage* message, size_t timeout)
{
std::shared_ptr<SerializedMessage> reply = sendRequest(message, timeout);
if (!reply)
return NULL;
std::shared_ptr<T> msg(new T);
msg->deserialize(reply->message, reply->buffer);
return msg;
}
virtual bool active()
{
@ -79,7 +82,8 @@ protected:
tcp::resolver::iterator iterator;
std::thread* receiverThread;
mutable std::mutex mutex_;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
std::mutex m;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
uint16_t reqId;
};

View file

@ -1,5 +1,6 @@
#include "controlServer.h"
#include "common/timeMsg.h"
#include "common/requestMsg.h"
#include <iostream>
@ -11,16 +12,36 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL
void ControlServer::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer)
{
// cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::timemsg)
if (baseMessage.type == message_type::requestmsg)
{
TimeMsg timeMsg;
timeMsg.deserialize(baseMessage, buffer);
RequestMsg requestMsg;
requestMsg.deserialize(baseMessage, buffer);
cout << "request: " << requestMsg.request << "\n";
if (requestMsg.request == "time")
{
// timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec);
timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) + (timeMsg.received.usec - timeMsg.sent.usec) / 1000000.;
TimeMsg timeMsg;
timeMsg.refersTo = requestMsg.id;
timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.;
// tv diff = timeMsg.received - timeMsg.sent;
// cout << "Latency: " << diff.sec << "." << diff.usec << "\n";
timeMsg.refersTo = timeMsg.id;
connection->send(&timeMsg);
connection->send(&timeMsg);
}
else if (requestMsg.request == "serverSettings")
{
serverSettings->refersTo = requestMsg.id;
connection->send(serverSettings);
}
else if (requestMsg.request == "sampleFormat")
{
sampleFormat->refersTo = requestMsg.id;
connection->send(sampleFormat);
}
else if (requestMsg.request == "headerChunk")
{
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
}
}
}
@ -36,9 +57,9 @@ void ControlServer::acceptor()
ServerConnection* session = new ServerConnection(this, sock);
sessions.insert(shared_ptr<ServerConnection>(session));
session->start();
session->send(serverSettings);
session->send(sampleFormat);
session->send(headerChunk);
// session->send(serverSettings);
// session->send(sampleFormat);
// session->send(headerChunk);
}
}