exception

git-svn-id: svn://elaine/murooma/trunk@316 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-12-21 20:21:25 +00:00
parent fd3b671633
commit 8afca63ded
4 changed files with 45 additions and 21 deletions

View file

@ -4,13 +4,13 @@
#include "common/log.h" #include "common/log.h"
#include "clientConnection.h" #include "clientConnection.h"
#include "common/utils.h" #include "common/utils.h"
#include "common/snapException.h"
using namespace std; using namespace std;
ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(1), ip(_ip), port(_port), readerThread(NULL), timeouts(0) ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(1), ip(_ip), port(_port), readerThread(NULL), sumTimeout(chronos::msec(0))
{ {
} }
@ -110,10 +110,10 @@ bool ClientConnection::send(BaseMessage* message)
} }
shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message, size_t timeout) shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message, const chronos::msec& timeout)
{ {
shared_ptr<SerializedMessage> response(NULL); shared_ptr<SerializedMessage> response(NULL);
if (++reqId == 100) if (++reqId == 10000)
reqId = 1; reqId = 1;
message->id = reqId; message->id = reqId;
//cout << "Req: " << reqId << "\n"; //cout << "Req: " << reqId << "\n";
@ -129,18 +129,15 @@ shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{ {
response = pendingRequest->response; response = pendingRequest->response;
timeouts = 0; sumTimeout = chronos::msec(0);
//cout << "Resp: " << pendingRequest->id << "\n"; //cout << "Resp: " << pendingRequest->id << "\n";
} }
else else
{ {
++timeouts; sumTimeout += timeout;
cout << "timeout while waiting for response to: " << reqId << ", timeout " << timeouts << "\n"; cout << "timeout while waiting for response to: " << reqId << ", timeout " << sumTimeout.count() << "\n";
if (timeouts > 2*60) if (sumTimeout > chronos::sec(10))
{ throw snapException("sum timeout exceeded 10s");
std::exception e;
throw e;
}
} }
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);

View file

@ -10,6 +10,7 @@
#include <condition_variable> #include <condition_variable>
#include <set> #include <set>
#include "message/message.h" #include "message/message.h"
#include "common/timeDefs.h"
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
@ -44,10 +45,10 @@ public:
virtual void start(); virtual void start();
virtual void stop(); virtual void stop();
virtual bool send(BaseMessage* _message); virtual bool send(BaseMessage* _message);
virtual std::shared_ptr<SerializedMessage> sendRequest(BaseMessage* message, size_t timeout); virtual std::shared_ptr<SerializedMessage> sendRequest(BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000));
template <typename T> template <typename T>
std::shared_ptr<T> sendReq(BaseMessage* message, size_t timeout) std::shared_ptr<T> sendReq(BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000))
{ {
std::shared_ptr<SerializedMessage> reply = sendRequest(message, timeout); std::shared_ptr<SerializedMessage> reply = sendRequest(message, timeout);
if (!reply) if (!reply)
@ -88,7 +89,7 @@ protected:
std::string ip; std::string ip;
size_t port; size_t port;
std::thread* readerThread; std::thread* readerThread;
int timeouts; chronos::msec sumTimeout;
}; };

View file

@ -77,16 +77,16 @@ void Controller::worker()
clientConnection->start(); clientConnection->start();
RequestMsg requestMsg(serversettings); RequestMsg requestMsg(serversettings);
shared_ptr<ServerSettings> serverSettings(NULL); shared_ptr<ServerSettings> serverSettings(NULL);
while (!(serverSettings = clientConnection->sendReq<ServerSettings>(&requestMsg, 1000))); while (!(serverSettings = clientConnection->sendReq<ServerSettings>(&requestMsg)));
cout << "ServerSettings buffer: " << serverSettings->bufferMs << "\n"; cout << "ServerSettings buffer: " << serverSettings->bufferMs << "\n";
requestMsg.request = sampleformat; requestMsg.request = sampleformat;
while (!(sampleFormat = clientConnection->sendReq<SampleFormat>(&requestMsg, 1000))); while (!(sampleFormat = clientConnection->sendReq<SampleFormat>(&requestMsg)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
requestMsg.request = header; requestMsg.request = header;
shared_ptr<HeaderMessage> headerChunk(NULL); shared_ptr<HeaderMessage> headerChunk(NULL);
while (!(headerChunk = clientConnection->sendReq<HeaderMessage>(&requestMsg, 1000))); while (!(headerChunk = clientConnection->sendReq<HeaderMessage>(&requestMsg)));
cout << "Codec: " << headerChunk->codec << "\n"; cout << "Codec: " << headerChunk->codec << "\n";
if (headerChunk->codec == "ogg") if (headerChunk->codec == "ogg")
decoder = new OggDecoder(); decoder = new OggDecoder();
@ -97,7 +97,7 @@ void Controller::worker()
RequestMsg timeReq(timemsg); RequestMsg timeReq(timemsg);
for (size_t n=0; n<50; ++n) for (size_t n=0; n<50; ++n)
{ {
shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq, 2000); shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq, chronos::msec(2000));
if (reply) if (reply)
{ {
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
@ -115,14 +115,14 @@ void Controller::worker()
CommandMsg startStream("startStream"); CommandMsg startStream("startStream");
shared_ptr<AckMsg> ackMsg(NULL); shared_ptr<AckMsg> ackMsg(NULL);
while (!(ackMsg = clientConnection->sendReq<AckMsg>(&startStream, 1000))); while (!(ackMsg = clientConnection->sendReq<AckMsg>(&startStream)));
try try
{ {
while (active_) while (active_)
{ {
usleep(500*1000); usleep(500*1000);
shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq, 1000); shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq);
if (reply) if (reply)
{ {
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;

26
common/snapException.h Normal file
View file

@ -0,0 +1,26 @@
#ifndef SNAP_EXCEPTION_H
#define SNAP_EXCEPTION_H
#include <exception>
#include <string>
struct snapException : std::exception {
snapException(const std::string& what) noexcept
{
what_ = what;
}
const char* what() const noexcept
{
return what_.c_str();
}
private:
std::string what_;
};
#endif