make control server more generic

This commit is contained in:
badaix 2019-09-29 20:59:02 +02:00
parent 49bc832870
commit 8920e8c710
4 changed files with 75 additions and 136 deletions

View file

@ -29,20 +29,18 @@
#include <iostream>
using namespace std;
using json = nlohmann::json;
// TODO: make this more generic, and less copy/paste
ControlServer::ControlServer(boost::asio::io_context* io_context, size_t port, ControlMessageReceiver* controlMessageReceiver)
: acceptor_tcp_v4_(nullptr), acceptor_tcp_v6_(nullptr), acceptor_ws_v4_(nullptr), acceptor_ws_v6_(nullptr), io_context_(io_context), port_(port),
controlMessageReceiver_(controlMessageReceiver)
: io_context_(io_context), port_(port), controlMessageReceiver_(controlMessageReceiver)
{
}
ControlServer::~ControlServer()
{
// stop();
stop();
}
@ -89,42 +87,34 @@ void ControlServer::startAccept()
{
auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) {
if (!ec)
handleAcceptTcp(std::move(socket));
handleAccept<ControlSessionTcp>(std::move(socket));
else
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
};
auto accept_handler_ws = [this](error_code ec, tcp::socket socket) {
if (!ec)
handleAcceptWs(std::move(socket));
handleAccept<ControlSessionWs>(std::move(socket));
else
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
};
if (acceptor_tcp_v4_)
{
tcp::socket socket_v4(*io_context_);
acceptor_tcp_v4_->async_accept(accept_handler_tcp);
}
if (acceptor_tcp_v6_)
{
tcp::socket socket_v6(*io_context_);
acceptor_tcp_v6_->async_accept(accept_handler_tcp);
}
if (acceptor_ws_v4_)
{
tcp::socket socket_v4(*io_context_);
acceptor_ws_v4_->async_accept(accept_handler_ws);
}
if (acceptor_ws_v6_)
{
tcp::socket socket_v6(*io_context_);
acceptor_ws_v6_->async_accept(accept_handler_ws);
}
if (acceptor_tcp_.first)
acceptor_tcp_.first->async_accept(accept_handler_tcp);
if (acceptor_tcp_.second)
acceptor_tcp_.second->async_accept(accept_handler_tcp);
if (acceptor_ws_.first)
acceptor_ws_.first->async_accept(accept_handler_ws);
if (acceptor_ws_.second)
acceptor_ws_.second->async_accept(accept_handler_ws);
}
void ControlServer::handleAcceptTcp(tcp::socket socket)
template <typename SessionType>
void ControlServer::handleAccept(tcp::socket socket)
{
try
{
@ -135,7 +125,7 @@ void ControlServer::handleAcceptTcp(tcp::socket socket)
setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
// socket->set_option(boost::asio::ip::tcp::no_delay(false));
SLOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<ControlSessionTcp> session = make_shared<ControlSessionTcp>(this, std::move(socket));
shared_ptr<SessionType> session = make_shared<SessionType>(this, std::move(socket));
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();
@ -150,120 +140,64 @@ void ControlServer::handleAcceptTcp(tcp::socket socket)
startAccept();
}
void ControlServer::handleAcceptWs(tcp::socket socket)
std::pair<acceptor_ptr, acceptor_ptr> ControlServer::createAcceptors(size_t port)
{
bool is_v6_only(true);
tcp::endpoint endpoint_tcp_v6(tcp::v6(), port);
acceptor_ptr acceptor_v4;
acceptor_ptr acceptor_v6;
try
{
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
// socket->set_option(boost::asio::ip::tcp::no_delay(false));
SLOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<ControlSessionWs> session = make_shared<ControlSessionWs>(this, std::move(socket));
acceptor_v6 = make_unique<tcp::acceptor>(*io_context_, endpoint_tcp_v6);
boost::system::error_code ec;
acceptor_v6->set_option(boost::asio::ip::v6_only(true), ec);
boost::asio::ip::v6_only option;
acceptor_v6->get_option(option);
is_v6_only = option.value();
LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n";
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
if (!acceptor_v6 || is_v6_only)
{
tcp::endpoint endpoint_v4(tcp::v4(), port);
try
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();
sessions_.emplace_back(session);
cleanup();
acceptor_v4 = make_unique<tcp::acceptor>(*io_context_, endpoint_v4);
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
catch (const std::exception& e)
{
SLOG(ERROR) << "Exception in ControlServer::handleAccept: " << e.what() << endl;
}
startAccept();
return make_pair<acceptor_ptr, acceptor_ptr>(std::move(acceptor_v4), std::move(acceptor_v6));
}
void ControlServer::start()
{
bool is_v6_only(true);
tcp::endpoint endpoint_tcp_v6(tcp::v6(), port_);
try
{
acceptor_tcp_v6_ = make_shared<tcp::acceptor>(*io_context_, endpoint_tcp_v6);
boost::system::error_code ec;
acceptor_tcp_v6_->set_option(boost::asio::ip::v6_only(false), ec);
boost::asio::ip::v6_only option;
acceptor_tcp_v6_->get_option(option);
is_v6_only = option.value();
LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n";
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
if (!acceptor_tcp_v6_ || is_v6_only)
{
tcp::endpoint endpoint_v4(tcp::v4(), port_);
try
{
acceptor_tcp_v4_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v4);
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
tcp::endpoint endpoint_ws_v6(tcp::v6(), 8080);
try
{
acceptor_ws_v6_ = make_shared<tcp::acceptor>(*io_context_, endpoint_ws_v6);
boost::system::error_code ec;
acceptor_ws_v6_->set_option(boost::asio::ip::v6_only(false), ec);
boost::asio::ip::v6_only option;
acceptor_ws_v6_->get_option(option);
is_v6_only = option.value();
LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n";
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
if (!acceptor_ws_v6_ || is_v6_only)
{
tcp::endpoint endpoint_v4(tcp::v4(), port_);
try
{
acceptor_ws_v4_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v4);
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
acceptor_tcp_ = createAcceptors(port_);
acceptor_ws_ = createAcceptors(8080);
startAccept();
}
void ControlServer::stop()
{
if (acceptor_tcp_v4_)
{
acceptor_tcp_v4_->cancel();
acceptor_tcp_v4_ = nullptr;
}
if (acceptor_tcp_v6_)
{
acceptor_tcp_v6_->cancel();
acceptor_tcp_v6_ = nullptr;
}
if (acceptor_ws_v4_)
{
acceptor_ws_v4_->cancel();
acceptor_ws_v4_ = nullptr;
}
if (acceptor_ws_v6_)
{
acceptor_ws_v6_->cancel();
acceptor_ws_v6_ = nullptr;
}
auto cancel_accept = [](tcp::acceptor* acceptor) {
if (acceptor)
{
acceptor->cancel();
acceptor = nullptr;
}
};
cancel_accept(acceptor_tcp_.first.get());
cancel_accept(acceptor_tcp_.second.get());
cancel_accept(acceptor_ws_.first.get());
cancel_accept(acceptor_ws_.second.get());
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
for (auto s : sessions_)
{