first websocket draft, running on port 8080

This commit is contained in:
badaix 2019-09-28 23:42:03 +02:00
parent ec3f8d8ad5
commit 49bc832870
8 changed files with 198 additions and 34 deletions

View file

@ -21,8 +21,8 @@
#include "common/timeDefs.h" #include "common/timeDefs.h"
#include "message/message.h" #include "message/message.h"
#include <boost/asio.hpp>
#include <atomic> #include <atomic>
#include <boost/asio.hpp>
#include <condition_variable> #include <condition_variable>
#include <memory> #include <memory>
#include <mutex> #include <mutex>

View file

@ -21,17 +21,21 @@
#include "common/snapException.h" #include "common/snapException.h"
#include "common/utils.h" #include "common/utils.h"
#include "config.h" #include "config.h"
#include "control_session_tcp.hpp"
#include "control_session_ws.hpp"
#include "jsonrpcpp.hpp" #include "jsonrpcpp.hpp"
#include "message/time.h" #include "message/time.h"
#include <iostream> #include <iostream>
using namespace std; using namespace std;
using json = nlohmann::json; using json = nlohmann::json;
// TODO: make this more generic, and less copy/paste
ControlServer::ControlServer(boost::asio::io_context* io_context, size_t port, ControlMessageReceiver* controlMessageReceiver) ControlServer::ControlServer(boost::asio::io_context* io_context, size_t port, ControlMessageReceiver* controlMessageReceiver)
: acceptor_v4_(nullptr), acceptor_v6_(nullptr), io_context_(io_context), port_(port), controlMessageReceiver_(controlMessageReceiver) : acceptor_tcp_v4_(nullptr), acceptor_tcp_v6_(nullptr), acceptor_ws_v4_(nullptr), acceptor_ws_v6_(nullptr), io_context_(io_context), port_(port),
controlMessageReceiver_(controlMessageReceiver)
{ {
} }
@ -83,27 +87,44 @@ void ControlServer::onMessageReceived(ControlSession* connection, const std::str
void ControlServer::startAccept() void ControlServer::startAccept()
{ {
auto accept_handler = [this](error_code ec, tcp::socket socket) { auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) {
if (!ec) if (!ec)
handleAccept(std::move(socket)); handleAcceptTcp(std::move(socket));
else else
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
}; };
if (acceptor_v4_) auto accept_handler_ws = [this](error_code ec, tcp::socket socket) {
if (!ec)
handleAcceptWs(std::move(socket));
else
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
};
if (acceptor_tcp_v4_)
{ {
tcp::socket socket_v4(*io_context_); tcp::socket socket_v4(*io_context_);
acceptor_v4_->async_accept(accept_handler); acceptor_tcp_v4_->async_accept(accept_handler_tcp);
} }
if (acceptor_v6_) if (acceptor_tcp_v6_)
{ {
tcp::socket socket_v6(*io_context_); tcp::socket socket_v6(*io_context_);
acceptor_v6_->async_accept(accept_handler); acceptor_tcp_v6_->async_accept(accept_handler_tcp);
}
if (acceptor_ws_v4_)
{
tcp::socket socket_v4(*io_context_);
acceptor_ws_v4_->async_accept(accept_handler_ws);
}
if (acceptor_ws_v6_)
{
tcp::socket socket_v6(*io_context_);
acceptor_ws_v6_->async_accept(accept_handler_ws);
} }
} }
void ControlServer::handleAccept(tcp::socket socket) void ControlServer::handleAcceptTcp(tcp::socket socket)
{ {
try try
{ {
@ -130,17 +151,44 @@ void ControlServer::handleAccept(tcp::socket socket)
} }
void ControlServer::handleAcceptWs(tcp::socket socket)
{
try
{
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
// socket->set_option(boost::asio::ip::tcp::no_delay(false));
SLOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<ControlSessionWs> session = make_shared<ControlSessionWs>(this, std::move(socket));
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();
sessions_.emplace_back(session);
cleanup();
}
}
catch (const std::exception& e)
{
SLOG(ERROR) << "Exception in ControlServer::handleAccept: " << e.what() << endl;
}
startAccept();
}
void ControlServer::start() void ControlServer::start()
{ {
bool is_v6_only(true); bool is_v6_only(true);
tcp::endpoint endpoint_v6(tcp::v6(), port_); tcp::endpoint endpoint_tcp_v6(tcp::v6(), port_);
try try
{ {
acceptor_v6_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v6); acceptor_tcp_v6_ = make_shared<tcp::acceptor>(*io_context_, endpoint_tcp_v6);
boost::system::error_code ec; boost::system::error_code ec;
acceptor_v6_->set_option(boost::asio::ip::v6_only(false), ec); acceptor_tcp_v6_->set_option(boost::asio::ip::v6_only(false), ec);
boost::asio::ip::v6_only option; boost::asio::ip::v6_only option;
acceptor_v6_->get_option(option); acceptor_tcp_v6_->get_option(option);
is_v6_only = option.value(); is_v6_only = option.value();
LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n"; LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n";
} }
@ -149,12 +197,12 @@ void ControlServer::start()
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
} }
if (!acceptor_v6_ || is_v6_only) if (!acceptor_tcp_v6_ || is_v6_only)
{ {
tcp::endpoint endpoint_v4(tcp::v4(), port_); tcp::endpoint endpoint_v4(tcp::v4(), port_);
try try
{ {
acceptor_v4_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v4); acceptor_tcp_v4_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v4);
} }
catch (const boost::system::system_error& e) catch (const boost::system::system_error& e)
{ {
@ -162,21 +210,59 @@ void ControlServer::start()
} }
} }
tcp::endpoint endpoint_ws_v6(tcp::v6(), 8080);
try
{
acceptor_ws_v6_ = make_shared<tcp::acceptor>(*io_context_, endpoint_ws_v6);
boost::system::error_code ec;
acceptor_ws_v6_->set_option(boost::asio::ip::v6_only(false), ec);
boost::asio::ip::v6_only option;
acceptor_ws_v6_->get_option(option);
is_v6_only = option.value();
LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n";
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
if (!acceptor_ws_v6_ || is_v6_only)
{
tcp::endpoint endpoint_v4(tcp::v4(), port_);
try
{
acceptor_ws_v4_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v4);
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
startAccept(); startAccept();
} }
void ControlServer::stop() void ControlServer::stop()
{ {
if (acceptor_v4_) if (acceptor_tcp_v4_)
{ {
acceptor_v4_->cancel(); acceptor_tcp_v4_->cancel();
acceptor_v4_ = nullptr; acceptor_tcp_v4_ = nullptr;
} }
if (acceptor_v6_) if (acceptor_tcp_v6_)
{ {
acceptor_v6_->cancel(); acceptor_tcp_v6_->cancel();
acceptor_v6_ = nullptr; acceptor_tcp_v6_ = nullptr;
}
if (acceptor_ws_v4_)
{
acceptor_ws_v4_->cancel();
acceptor_ws_v4_ = nullptr;
}
if (acceptor_ws_v6_)
{
acceptor_ws_v6_->cancel();
acceptor_ws_v6_ = nullptr;
} }
std::lock_guard<std::recursive_mutex> mlock(session_mutex_); std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
for (auto s : sessions_) for (auto s : sessions_)

View file

@ -29,7 +29,7 @@
#include "common/queue.h" #include "common/queue.h"
#include "common/sampleFormat.h" #include "common/sampleFormat.h"
#include "control_session_tcp.hpp" #include "control_session.hpp"
#include "message/codecHeader.h" #include "message/codecHeader.h"
#include "message/message.h" #include "message/message.h"
#include "message/serverSettings.h" #include "message/serverSettings.h"
@ -57,13 +57,16 @@ public:
private: private:
void startAccept(); void startAccept();
void handleAccept(tcp::socket socket); void handleAcceptTcp(tcp::socket socket);
void handleAcceptWs(tcp::socket socket);
void cleanup(); void cleanup();
mutable std::recursive_mutex session_mutex_; mutable std::recursive_mutex session_mutex_;
std::vector<std::weak_ptr<ControlSession>> sessions_; std::vector<std::weak_ptr<ControlSession>> sessions_;
std::shared_ptr<tcp::acceptor> acceptor_v4_; std::shared_ptr<tcp::acceptor> acceptor_tcp_v4_;
std::shared_ptr<tcp::acceptor> acceptor_v6_; std::shared_ptr<tcp::acceptor> acceptor_tcp_v6_;
std::shared_ptr<tcp::acceptor> acceptor_ws_v4_;
std::shared_ptr<tcp::acceptor> acceptor_ws_v6_;
boost::asio::io_context* io_context_; boost::asio::io_context* io_context_;
size_t port_; size_t port_;

View file

@ -21,8 +21,8 @@
#include "common/queue.h" #include "common/queue.h"
#include "message/message.h" #include "message/message.h"
#include <boost/asio.hpp>
#include <atomic> #include <atomic>
#include <boost/asio.hpp>
#include <condition_variable> #include <condition_variable>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@ -51,7 +51,7 @@ public:
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the ControlMessageReceiver callback * Received messages from the client are passed to the ControlMessageReceiver callback
*/ */
class ControlSession : public std::enable_shared_from_this<ControlSession> class ControlSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to MessageReceiver

View file

@ -27,7 +27,7 @@
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the ControlMessageReceiver callback * Received messages from the client are passed to the ControlMessageReceiver callback
*/ */
class ControlSessionTcp : public ControlSession class ControlSessionTcp : public ControlSession, public std::enable_shared_from_this<ControlSession>
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to MessageReceiver

View file

@ -35,8 +35,66 @@ ControlSessionWs::~ControlSessionWs()
stop(); stop();
} }
void ControlSessionWs::start() void ControlSessionWs::start()
{ {
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async"); }));
// Accept the websocket handshake
auto self(shared_from_this());
ws_.async_accept([this, self](beast::error_code ec) { on_accept(ec); });
}
void ControlSessionWs::on_accept(beast::error_code ec)
{
if (ec)
{
LOG(ERROR) << "ControlSessionWs::on_accept, error: " << ec.message() << "\n";
return;
}
// Read a message
do_read();
}
void ControlSessionWs::do_read()
{
// Read a message into our buffer
auto self(shared_from_this());
ws_.async_read(buffer_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read(ec, bytes_transferred); });
}
void ControlSessionWs::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed
if (ec == websocket::error::closed)
return;
if (ec)
{
LOG(ERROR) << "ControlSessionWs::on_read error: " << ec.message() << "\n";
return;
}
std::string line{boost::beast::buffers_to_string(buffer_.data())};
if (!line.empty())
{
LOG(INFO) << "received: " << line << "\n";
if ((message_receiver_ != nullptr) && !line.empty())
message_receiver_->onMessageReceived(this, line);
}
buffer_.consume(bytes_transferred);
do_read();
} }
@ -47,10 +105,23 @@ void ControlSessionWs::stop()
void ControlSessionWs::sendAsync(const std::string& message) void ControlSessionWs::sendAsync(const std::string& message)
{ {
auto self(shared_from_this());
ws_.async_write(boost::asio::buffer(message), [this, self](std::error_code ec, std::size_t length) {
if (ec)
{
LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n";
}
else
{
LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n";
}
});
} }
bool ControlSessionWs::send(const std::string& message) bool ControlSessionWs::send(const std::string& message)
{ {
return true; boost::system::error_code ec;
ws_.write(boost::asio::buffer(message), ec);
return !ec;
} }

View file

@ -35,7 +35,7 @@ namespace net = boost::asio; // from <boost/asio.hpp>
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the ControlMessageReceiver callback * Received messages from the client are passed to the ControlMessageReceiver callback
*/ */
class ControlSessionWs : public ControlSession class ControlSessionWs : public ControlSession, public std::enable_shared_from_this<ControlSession>
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to MessageReceiver
@ -51,9 +51,13 @@ public:
void sendAsync(const std::string& message) override; void sendAsync(const std::string& message) override;
protected: protected:
void on_accept(beast::error_code ec);
void do_read();
void on_read(beast::error_code ec, std::size_t bytes_transferred);
websocket::stream<beast::tcp_stream> ws_; websocket::stream<beast::tcp_stream> ws_;
// beast::flat_buffer buffer_; beast::flat_buffer buffer_;
// beast::multi_buffer b; beast::multi_buffer b;
}; };

View file

@ -22,8 +22,8 @@
#include "common/queue.h" #include "common/queue.h"
#include "message/message.h" #include "message/message.h"
#include "streamreader/streamManager.h" #include "streamreader/streamManager.h"
#include <boost/asio.hpp>
#include <atomic> #include <atomic>
#include <boost/asio.hpp>
#include <condition_variable> #include <condition_variable>
#include <memory> #include <memory>
#include <mutex> #include <mutex>