sync send

git-svn-id: svn://elaine/murooma/trunk@256 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-13 18:45:54 +00:00
parent 979f6460d3
commit 3369363453
8 changed files with 111 additions and 25 deletions

View file

@ -18,7 +18,7 @@ Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL)
void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer)
{
//cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
//cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::payload)
{
if ((stream != NULL) && (decoder != NULL))

View file

@ -5,13 +5,15 @@
#include <iostream>
#include <streambuf>
#include <vector>
#include <sys/time.h>
template<typename CharT, typename TraitsT = std::char_traits<CharT> >
class vectorwrapbuf : public std::basic_streambuf<CharT, TraitsT> {
class vectorwrapbuf : public std::basic_streambuf<CharT, TraitsT>
{
public:
vectorwrapbuf(std::vector<CharT> &vec) {
vectorwrapbuf(std::vector<CharT> &vec)
{
this->setg(vec.data(), vec.data(), vec.data() + vec.size());
}
};
@ -19,7 +21,8 @@ public:
struct membuf : public std::basic_streambuf<char>
{
membuf(char* begin, char* end) {
membuf(char* begin, char* end)
{
this->setg(begin, begin, end);
}
};
@ -36,13 +39,31 @@ enum message_type
struct tv
{
tv()
{
timeval t;
gettimeofday(&t, NULL);
sec = t.tv_sec;
usec = t.tv_usec;
}
tv(timeval tv) : sec(tv.tv_sec), usec(tv.tv_usec) {};
tv(int32_t _sec, int32_t _usec) : sec(_sec), usec(_usec) {};
int32_t sec;
int32_t usec;
};
struct BaseMessage
{
BaseMessage() : type(base)
BaseMessage() : type(base), id(0), refersTo(0)
{
}
BaseMessage(message_type type_) : type(type_)
BaseMessage(message_type type_) : type(type_), id(0), refersTo(0)
{
}
@ -53,6 +74,12 @@ struct BaseMessage
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char*>(&type), sizeof(uint16_t));
stream.read(reinterpret_cast<char*>(&id), sizeof(uint16_t));
stream.read(reinterpret_cast<char*>(&refersTo), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&sent.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&sent.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&received.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&received.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char*>(&size), sizeof(uint32_t));
}
@ -66,6 +93,10 @@ struct BaseMessage
void deserialize(const BaseMessage& baseMessage, char* payload)
{
type = baseMessage.type;
id = baseMessage.id;
refersTo = baseMessage.refersTo;
sent = baseMessage.sent;
received = baseMessage.received;
size = baseMessage.size;
membuf databuf(payload, payload + size);
std::istream is(&databuf);
@ -75,6 +106,12 @@ struct BaseMessage
virtual void serialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char*>(&type), sizeof(uint16_t));
stream.write(reinterpret_cast<char*>(&id), sizeof(uint16_t));
stream.write(reinterpret_cast<char*>(&refersTo), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&sent.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&sent.usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&received.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&received.usec), sizeof(int32_t));
size = getSize();
stream.write(reinterpret_cast<char*>(&size), sizeof(uint32_t));
doserialize(stream);
@ -82,11 +119,16 @@ struct BaseMessage
virtual uint32_t getSize()
{
return sizeof(uint16_t) + sizeof(uint32_t);
return 3*sizeof(uint16_t) + 2*sizeof(tv) + sizeof(uint32_t);
};
uint16_t type;
uint16_t id;
uint16_t refersTo;
tv sent;
tv received;
uint32_t size;
protected:
virtual void doserialize(std::ostream& stream)
{

View file

@ -26,8 +26,8 @@ public:
std::chrono::milliseconds::rep relativeIdxTp = ((double)idx / ((double)format.rate/1000.));
return
tp +
std::chrono::seconds(tv_sec) +
std::chrono::milliseconds(tv_usec / 1000) +
std::chrono::seconds(timestamp.sec) +
std::chrono::milliseconds(timestamp.usec / 1000) +
std::chrono::milliseconds(relativeIdxTp);
}

View file

@ -10,7 +10,7 @@
using namespace std;
SocketConnection::SocketConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver)
SocketConnection::SocketConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0)
{
}
@ -47,17 +47,37 @@ void SocketConnection::stop()
}
void SocketConnection::send(BaseMessage* message)
bool SocketConnection::send(BaseMessage* message)
{
std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
if (!connected())
return;
return false;
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(*socket.get(), streambuf);
return true;
}
BaseMessage* SocketConnection::sendRequest(BaseMessage* message, size_t timeout)
{
BaseMessage* response(NULL);
if (++reqId == 0)
++reqId;
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId));
pendingRequests.insert(pendingRequest);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
send(message);
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
response = pendingRequest->response;
pendingRequests.erase(pendingRequest);
return response;
}
@ -73,6 +93,17 @@ void SocketConnection::getNextMessage()
if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size);
socketRead(&buffer[0], baseMessage.size);
tv t;
baseMessage.received = t;
for (auto req: pendingRequests)
{
if (req->id == baseMessage.refersTo)
{
req->cv.notify_one();
return;
}
}
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]);

View file

@ -6,6 +6,8 @@
#include <atomic>
#include <mutex>
#include <boost/asio.hpp>
#include <condition_variable>
#include <set>
#include "message.h"
@ -15,6 +17,16 @@ using boost::asio::ip::tcp;
class SocketConnection;
struct PendingRequest
{
PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {};
uint16_t id;
BaseMessage* response;
std::condition_variable cv;
};
class MessageReceiver
{
public:
@ -29,7 +41,8 @@ public:
virtual ~SocketConnection();
virtual void start();
virtual void stop();
virtual void send(BaseMessage* _message);
virtual bool send(BaseMessage* _message);
virtual BaseMessage* sendRequest(BaseMessage* message, size_t timeout);
virtual bool active()
{
@ -57,6 +70,8 @@ protected:
tcp::resolver::iterator iterator;
std::thread* receiverThread;
mutable std::mutex mutex_;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
uint16_t reqId;
};

View file

@ -10,7 +10,6 @@
#include "message.h"
class WireChunk : public BaseMessage
{
public:
@ -26,8 +25,8 @@ public:
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&tv_sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&tv_usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&timestamp.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&timestamp.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
@ -38,16 +37,15 @@ public:
return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize;
}
int32_t tv_sec;
int32_t tv_usec;
tv timestamp;
uint32_t payloadSize;
char* payload;
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&tv_sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&tv_usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&timestamp.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&timestamp.usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize);
}

View file

@ -23,8 +23,8 @@ double OggEncoder::encode(PcmChunk* chunk)
double res = 0;
if (tv_sec == 0)
{
tv_sec = chunk->tv_sec;
tv_usec = chunk->tv_usec;
tv_sec = chunk->timestamp.sec;
tv_usec = chunk->timestamp.usec;
}
//cout << "-> pcm: " << wireChunk->length << endl;
int bytes = chunk->payloadSize / 4;

View file

@ -112,8 +112,8 @@ size_t duration = 50;
}
while (len < toRead);
chunk->tv_sec = tvChunk.tv_sec;
chunk->tv_usec = tvChunk.tv_usec;
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
double chunkDuration = encoder->encode(chunk.get());
if (chunkDuration > 0)
server->send(chunk);