socket stuff

git-svn-id: svn://elaine/murooma/trunk@273 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-20 16:18:02 +00:00
parent feabfee936
commit 3116ec0257
9 changed files with 266 additions and 98 deletions

View file

@ -3,7 +3,7 @@ CC = /usr/bin/g++
CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg
OBJ = snapServer.o streamServer.o controlServer.o pcmEncoder.o oggEncoder.o serverConnection.o ../common/log.o ../common/socketConnection.o ../common/pcmChunk.o ../common/sampleFormat.o
OBJ = snapServer.o controlServer.o pcmEncoder.o oggEncoder.o serverSession.o ../common/log.o ../common/pcmChunk.o ../common/sampleFormat.o
BIN = snapserver
all: server

View file

@ -9,7 +9,28 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL
}
void ControlServer::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer)
void ControlServer::send(shared_ptr<BaseMessage> message)
{
for (std::set<shared_ptr<ServerSession>>::iterator it = sessions.begin(); it != sessions.end(); )
{
if (!(*it)->active())
{
cout << "Session inactive. Removing\n";
sessions.erase(it++);
}
else
++it;
}
for (auto s : sessions)
s->add(message);
}
void ControlServer::onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer)
{
// cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::requestmsg)
@ -59,8 +80,8 @@ void ControlServer::acceptor()
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n";
ServerConnection* session = new ServerConnection(this, sock);
sessions.insert(shared_ptr<ServerConnection>(session));
ServerSession* session = new ServerSession(this, sock);
sessions.insert(shared_ptr<ServerSession>(session));
session->start();
}
}

View file

@ -8,7 +8,7 @@
#include <set>
#include <sstream>
#include "serverConnection.h"
#include "serverSession.h"
#include "common/timeUtils.h"
#include "common/queue.h"
#include "common/message.h"
@ -29,20 +29,43 @@ public:
void start();
void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
void send(shared_ptr<BaseMessage> message);
virtual void onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer);
void setHeader(HeaderMessage* header);
void setFormat(SampleFormat* format);
void setServerSettings(ServerSettings* settings);
private:
void acceptor();
set<shared_ptr<ServerConnection>> sessions;
set<shared_ptr<ServerSession>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
HeaderMessage* headerChunk;
SampleFormat* sampleFormat;
ServerSettings* serverSettings;
thread* acceptThread;
Queue<shared_ptr<BaseMessage>> messages;
};
class ServerException : public std::exception
{
public:
ServerException(const std::string& what) : what_(what)
{
}
virtual ~ServerException() throw()
{
}
virtual const char* what() const throw()
{
return what_.c_str();
}
private:
std::string what_;
};

View file

@ -1,43 +0,0 @@
#include "serverConnection.h"
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <mutex>
#include "common/log.h"
using namespace std;
ServerConnection::ServerConnection(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket) : SocketConnection(_receiver)
{
socket = _socket;
}
void ServerConnection::worker()
{
active_ = true;
try
{
while (active_)
{
getNextMessage();
}
}
catch (const std::exception& e)
{
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
}
active_ = false;
}

View file

@ -1,28 +0,0 @@
#ifndef SERVER_CONNECTION_H
#define SERVER_CONNECTION_H
#include "common/socketConnection.h"
using boost::asio::ip::tcp;
class ServerConnection : public SocketConnection
{
public:
ServerConnection(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket);
protected:
virtual void worker();
};
#endif

142
server/serverSession.cpp Normal file
View file

@ -0,0 +1,142 @@
#include "serverSession.h"
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <mutex>
#include "common/log.h"
using namespace std;
ServerSession::ServerSession(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket) : messageReceiver(_receiver)
{
socket = _socket;
}
void ServerSession::start()
{
active_ = true;
readerThread = new thread(&ServerSession::reader, this);
writerThread = new thread(&ServerSession::writer, this);
}
void ServerSession::socketRead(void* _to, size_t _bytes)
{
// std::unique_lock<std::mutex> mlock(mutex_);
size_t toRead = _bytes;
size_t len = 0;
do
{
// cout << "/";
// cout.flush();
boost::system::error_code error;
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error);
//cout << "len: " << len << ", error: " << error << endl;
toRead = _bytes - len;
// cout << "\\";
// cout.flush();
}
while (toRead > 0);
}
void ServerSession::add(shared_ptr<BaseMessage> message)
{
if (!message)
return;
while (messages.size() > 100)// chunk->getDuration() > 10000)
messages.pop();
messages.push(message);
}
bool ServerSession::send(BaseMessage* message)
{
// std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
if (!connected())
return false;
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(*socket.get(), streambuf);
return true;
}
void ServerSession::getNextMessage()
{
cout << "getNextMessage\n";
BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize);
socketRead(&buffer[0], baseMsgSize);
baseMessage.deserialize(&buffer[0]);
cout << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size);
socketRead(&buffer[0], baseMessage.size);
tv t;
baseMessage.received = t;
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]);
}
void ServerSession::reader()
{
active_ = true;
try
{
while (active_)
{
getNextMessage();
}
}
catch (const std::exception& e)
{
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
}
active_ = false;
}
void ServerSession::writer()
{
try
{
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
for (;;)
{
shared_ptr<BaseMessage> message(messages.pop());
send(message.get());
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
active_ = false;
}
}

68
server/serverSession.h Normal file
View file

@ -0,0 +1,68 @@
#ifndef SERVER_CONNECTION_H
#define SERVER_CONNECTION_H
#include <string>
#include <thread>
#include <atomic>
#include <mutex>
#include <memory>
#include <boost/asio.hpp>
#include <condition_variable>
#include <set>
#include "common/message.h"
#include "common/queue.h"
using boost::asio::ip::tcp;
class ServerSession;
class MessageReceiver
{
public:
virtual void onMessageReceived(ServerSession* connection, const BaseMessage& baseMessage, char* buffer) = 0;
};
class ServerSession
{
public:
ServerSession(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket);
void start();
bool send(BaseMessage* message);
void add(std::shared_ptr<BaseMessage> message);
virtual bool connected()
{
return (socket != 0);
// return (connected_ && socket);
}
virtual bool active()
{
return active_;
}
protected:
void socketRead(void* _to, size_t _bytes);
void getNextMessage();
void reader();
void writer();
std::atomic<bool> active_;
std::thread* readerThread;
std::thread* writerThread;
std::shared_ptr<tcp::socket> socket;
MessageReceiver* messageReceiver;
Queue<std::shared_ptr<BaseMessage>> messages;
};
#endif

View file

@ -8,7 +8,6 @@
#include "../server/pcmEncoder.h"
#include "../server/oggEncoder.h"
#include "common/message.h"
#include "streamServer.h"
#include "controlServer.h"
@ -88,8 +87,8 @@ int main(int argc, char* argv[])
controlServer->setHeader(encoder->getHeader());
controlServer->start();
StreamServer* server = new StreamServer(port + 1);
server->start();
// StreamServer* server = new StreamServer(port + 1);
// server->start();
while (!g_terminated)
{
@ -116,7 +115,7 @@ int main(int argc, char* argv[])
chunk->timestamp.usec = tvChunk.tv_usec;
double chunkDuration = encoder->encode(chunk.get());
if (chunkDuration > 0)
server->send(chunk);
controlServer->send(chunk);
//cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n";
// addUs(tvChunk, 1000*chunk->getDuration());
addUs(tvChunk, chunkDuration * 1000);
@ -140,7 +139,7 @@ int main(int argc, char* argv[])
close(fd);
}
server->stop();
// server->stop();
}
catch (const std::exception& e)
{

View file

@ -22,20 +22,6 @@ using namespace std;
class StreamSession : public ServerConnection
{
public:
StreamSession(socket_ptr sock);
void send(shared_ptr<BaseMessage> message);
protected:
virtual void worker();
thread* senderThread;
Queue<shared_ptr<BaseMessage>> messages;
};
class StreamServer
{
public:
@ -47,7 +33,7 @@ public:
private:
void acceptor();
set<shared_ptr<StreamSession>> sessions;
set<shared_ptr<ServerSession>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
shared_ptr<HeaderMessage> headerChunk;