merge socket and client connection

git-svn-id: svn://elaine/murooma/trunk@275 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-20 16:31:40 +00:00
parent 3dbbf8e3e5
commit d215580afe
10 changed files with 69 additions and 239 deletions

View file

@ -3,7 +3,7 @@ CC = /usr/bin/g++
CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -g -D_REENTRANT -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc
OBJ = snapClient.o stream.o player.o clientConnection.o ../common/socketConnection.o timeProvider.o streamClient.o oggDecoder.o pcmDecoder.o controller.o ../common/pcmChunk.o ../common/log.o ../common/sampleFormat.o
OBJ = snapClient.o stream.o player.o clientConnection.o timeProvider.o oggDecoder.o pcmDecoder.o controller.o ../common/socketConnection.o ../common/pcmChunk.o ../common/log.o ../common/sampleFormat.o
BIN = snapclient
all: client

View file

@ -27,11 +27,11 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa
{
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
pcmChunk->deserialize(baseMessage, buffer);
cout << "chunk: " << pcmChunk->payloadSize;
//cout << "chunk: " << pcmChunk->payloadSize;
if (decoder->decode(pcmChunk))
{
stream->addChunk(pcmChunk);
cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n";
//cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n";
}
else
delete pcmChunk;

View file

@ -4,8 +4,7 @@
#include <thread>
#include <atomic>
#include "common/message.h"
#include "common/socketConnection.h"
#include "streamClient.h"
#include "clientConnection.h"
#include "decoder.h"
#include "stream.h"

View file

@ -1,16 +0,0 @@
#include "streamClient.h"
StreamClient::StreamClient(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : ClientConnection(_receiver, _ip, _port)
{
}
StreamClient::~StreamClient()
{
}

View file

@ -1,23 +0,0 @@
#ifndef STREAM_CLIENT_H
#define STREAM_CLIENT_H
#include "clientConnection.h"
using boost::asio::ip::tcp;
class StreamClient : public ClientConnection
{
public:
StreamClient(MessageReceiver* _receiver, const std::string& _ip, size_t _port);
virtual ~StreamClient();
};
#endif

View file

@ -9,18 +9,18 @@
using namespace std;
SocketConnection::SocketConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0)
ClientConnection::ClientConnection(MessageReceiver* _receiver) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0)
{
}
SocketConnection::~SocketConnection()
ClientConnection::~ClientConnection()
{
}
void SocketConnection::socketRead(void* _to, size_t _bytes)
void ClientConnection::socketRead(void* _to, size_t _bytes)
{
// std::unique_lock<std::mutex> mlock(mutex_);
size_t toRead = _bytes;
@ -40,13 +40,16 @@ void SocketConnection::socketRead(void* _to, size_t _bytes)
}
void SocketConnection::start()
void ClientConnection::start()
{
receiverThread = new thread(&SocketConnection::worker, this);
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
iterator = resolver.resolve(query);
receiverThread = new thread(&ClientConnection::worker, this);
}
void SocketConnection::stop()
void ClientConnection::stop()
{
active_ = false;
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
@ -55,7 +58,7 @@ void SocketConnection::stop()
}
bool SocketConnection::send(BaseMessage* message)
bool ClientConnection::send(BaseMessage* message)
{
// std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
@ -72,7 +75,7 @@ bool SocketConnection::send(BaseMessage* message)
}
shared_ptr<SerializedMessage> SocketConnection::sendRequest(BaseMessage* message, size_t timeout)
shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message, size_t timeout)
{
shared_ptr<SerializedMessage> response(NULL);
if (++reqId == 0)
@ -103,7 +106,7 @@ shared_ptr<SerializedMessage> SocketConnection::sendRequest(BaseMessage* message
}
void SocketConnection::getNextMessage()
void ClientConnection::getNextMessage()
{
//cout << "getNextMessage\n";
BaseMessage baseMessage;
@ -144,5 +147,47 @@ void SocketConnection::getNextMessage()
void ClientConnection::worker()
{
active_ = true;
while (active_)
{
connected_ = false;
try
{
{
// std::unique_lock<std::mutex> mlock(mutex_);
cout << "connecting\n";
socket.reset(new tcp::socket(io_service));
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
cout << "socket: " << socket->native() << "\n";
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
socket->connect(*iterator);
connected_ = true;
cout << "connected\n";
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
}
while(active_)
{
// cout << ".";
// cout.flush();
getNextMessage();
// cout << "|";
// cout.flush();
}
}
catch (const std::exception& e)
{
connected_ = false;
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
usleep(1000*1000);
}
}
}

View file

@ -15,7 +15,7 @@
using boost::asio::ip::tcp;
class SocketConnection;
class ClientConnection;
struct PendingRequest
@ -31,15 +31,15 @@ struct PendingRequest
class MessageReceiver
{
public:
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0;
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0;
};
class SocketConnection
class ClientConnection
{
public:
SocketConnection(MessageReceiver* _receiver);
virtual ~SocketConnection();
ClientConnection(MessageReceiver* _receiver);
virtual ~ClientConnection();
virtual void start();
virtual void stop();
virtual bool send(BaseMessage* _message);
@ -68,7 +68,7 @@ public:
}
protected:
virtual void worker() = 0;
virtual void worker();
void socketRead(void* _to, size_t _bytes);
std::shared_ptr<tcp::socket> socket;
@ -85,6 +85,8 @@ protected:
std::mutex m;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
uint16_t reqId;
std::string ip;
size_t port;
};

View file

@ -25,21 +25,13 @@ void ServerSession::start()
void ServerSession::socketRead(void* _to, size_t _bytes)
{
// std::unique_lock<std::mutex> mlock(mutex_);
size_t toRead = _bytes;
size_t len = 0;
size_t read = 0;
do
{
// cout << "/";
// cout.flush();
boost::system::error_code error;
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error);
//cout << "len: " << len << ", error: " << error << endl;
toRead = _bytes - len;
// cout << "\\";
// cout.flush();
read += socket->read_some(boost::asio::buffer((char*)_to + read, _bytes - read), error);
}
while (toRead > 0);
while (read < _bytes);
}

View file

@ -1,101 +0,0 @@
#include "streamServer.h"
StreamSession::StreamSession(std::shared_ptr<tcp::socket> _socket) : ServerConnection(NULL, _socket)
{
}
void StreamSession::worker()
{
active_ = true;
try
{
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
for (;;)
{
shared_ptr<BaseMessage> message(messages.pop());
ServerConnection::send(message.get());
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
active_ = false;
}
active_ = false;
}
void StreamSession::send(shared_ptr<BaseMessage> message)
{
if (!message)
return;
while (messages.size() > 100)// chunk->getDuration() > 10000)
messages.pop();
messages.push(message);
}
StreamServer::StreamServer(unsigned short port) : port_(port), headerChunk(NULL)
{
}
void StreamServer::acceptor()
{
tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service_));
a.accept(*sock);
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n";
StreamSession* session = new StreamSession(sock);
sessions.insert(shared_ptr<StreamSession>(session));
session->start();
}
}
void StreamServer::send(shared_ptr<BaseMessage> message)
{
for (std::set<shared_ptr<StreamSession>>::iterator it = sessions.begin(); it != sessions.end(); )
{
if (!(*it)->active())
{
cout << "Session inactive. Removing\n";
sessions.erase(it++);
}
else
++it;
}
for (auto s : sessions)
s->send(message);
}
void StreamServer::start()
{
acceptThread = new thread(&StreamServer::acceptor, this);
}
void StreamServer::stop()
{
// acceptThread->join();
}

View file

@ -1,68 +0,0 @@
#ifndef STREAM_SERVER_H
#define STREAM_SERVER_H
#include <boost/asio.hpp>
#include <vector>
#include <thread>
#include <memory>
#include <set>
#include <sstream>
#include "serverConnection.h"
#include "common/timeUtils.h"
#include "common/queue.h"
#include "common/message.h"
#include "common/headerMessage.h"
#include "common/sampleFormat.h"
using boost::asio::ip::tcp;
typedef std::shared_ptr<tcp::socket> socket_ptr;
using namespace std;
class StreamServer
{
public:
StreamServer(unsigned short port);
void send(shared_ptr<BaseMessage> message);
void start();
void stop();
private:
void acceptor();
set<shared_ptr<ServerSession>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
shared_ptr<HeaderMessage> headerChunk;
shared_ptr<SampleFormat> sampleFormat;
thread* acceptThread;
};
class ServerException : public std::exception
{
public:
ServerException(const std::string& what) : what_(what)
{
}
virtual ~ServerException() throw()
{
}
virtual const char* what() const throw()
{
return what_.c_str();
}
private:
std::string what_;
};
#endif