From 49bc8328703dfeae168d16bf891ede385f573abc Mon Sep 17 00:00:00 2001 From: badaix Date: Sat, 28 Sep 2019 23:42:03 +0200 Subject: [PATCH] first websocket draft, running on port 8080 --- client/clientConnection.h | 2 +- server/controlServer.cpp | 128 +++++++++++++++++++++++++++------ server/controlServer.h | 11 +-- server/control_session.hpp | 4 +- server/control_session_tcp.hpp | 2 +- server/control_session_ws.cpp | 73 ++++++++++++++++++- server/control_session_ws.hpp | 10 ++- server/streamSession.h | 2 +- 8 files changed, 198 insertions(+), 34 deletions(-) diff --git a/client/clientConnection.h b/client/clientConnection.h index 6d81ccf6..e2ef8cb3 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -21,8 +21,8 @@ #include "common/timeDefs.h" #include "message/message.h" -#include #include +#include #include #include #include diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 5137ed51..90847927 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -21,17 +21,21 @@ #include "common/snapException.h" #include "common/utils.h" #include "config.h" +#include "control_session_tcp.hpp" +#include "control_session_ws.hpp" #include "jsonrpcpp.hpp" #include "message/time.h" + #include using namespace std; using json = nlohmann::json; - +// TODO: make this more generic, and less copy/paste ControlServer::ControlServer(boost::asio::io_context* io_context, size_t port, ControlMessageReceiver* controlMessageReceiver) - : acceptor_v4_(nullptr), acceptor_v6_(nullptr), io_context_(io_context), port_(port), controlMessageReceiver_(controlMessageReceiver) + : acceptor_tcp_v4_(nullptr), acceptor_tcp_v6_(nullptr), acceptor_ws_v4_(nullptr), acceptor_ws_v6_(nullptr), io_context_(io_context), port_(port), + controlMessageReceiver_(controlMessageReceiver) { } @@ -83,27 +87,44 @@ void ControlServer::onMessageReceived(ControlSession* connection, const std::str void ControlServer::startAccept() { - auto accept_handler = [this](error_code ec, tcp::socket socket) { + auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) { if (!ec) - handleAccept(std::move(socket)); + handleAcceptTcp(std::move(socket)); else LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; }; - if (acceptor_v4_) + auto accept_handler_ws = [this](error_code ec, tcp::socket socket) { + if (!ec) + handleAcceptWs(std::move(socket)); + else + LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; + }; + + if (acceptor_tcp_v4_) { tcp::socket socket_v4(*io_context_); - acceptor_v4_->async_accept(accept_handler); + acceptor_tcp_v4_->async_accept(accept_handler_tcp); } - if (acceptor_v6_) + if (acceptor_tcp_v6_) { tcp::socket socket_v6(*io_context_); - acceptor_v6_->async_accept(accept_handler); + acceptor_tcp_v6_->async_accept(accept_handler_tcp); + } + if (acceptor_ws_v4_) + { + tcp::socket socket_v4(*io_context_); + acceptor_ws_v4_->async_accept(accept_handler_ws); + } + if (acceptor_ws_v6_) + { + tcp::socket socket_v6(*io_context_); + acceptor_ws_v6_->async_accept(accept_handler_ws); } } -void ControlServer::handleAccept(tcp::socket socket) +void ControlServer::handleAcceptTcp(tcp::socket socket) { try { @@ -130,17 +151,44 @@ void ControlServer::handleAccept(tcp::socket socket) } +void ControlServer::handleAcceptWs(tcp::socket socket) +{ + try + { + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + // socket->set_option(boost::asio::ip::tcp::no_delay(false)); + SLOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl; + shared_ptr session = make_shared(this, std::move(socket)); + { + std::lock_guard mlock(session_mutex_); + session->start(); + sessions_.emplace_back(session); + cleanup(); + } + } + catch (const std::exception& e) + { + SLOG(ERROR) << "Exception in ControlServer::handleAccept: " << e.what() << endl; + } + startAccept(); +} + + void ControlServer::start() { bool is_v6_only(true); - tcp::endpoint endpoint_v6(tcp::v6(), port_); + tcp::endpoint endpoint_tcp_v6(tcp::v6(), port_); try { - acceptor_v6_ = make_shared(*io_context_, endpoint_v6); + acceptor_tcp_v6_ = make_shared(*io_context_, endpoint_tcp_v6); boost::system::error_code ec; - acceptor_v6_->set_option(boost::asio::ip::v6_only(false), ec); + acceptor_tcp_v6_->set_option(boost::asio::ip::v6_only(false), ec); boost::asio::ip::v6_only option; - acceptor_v6_->get_option(option); + acceptor_tcp_v6_->get_option(option); is_v6_only = option.value(); LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n"; } @@ -149,12 +197,12 @@ void ControlServer::start() LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; } - if (!acceptor_v6_ || is_v6_only) + if (!acceptor_tcp_v6_ || is_v6_only) { tcp::endpoint endpoint_v4(tcp::v4(), port_); try { - acceptor_v4_ = make_shared(*io_context_, endpoint_v4); + acceptor_tcp_v4_ = make_shared(*io_context_, endpoint_v4); } catch (const boost::system::system_error& e) { @@ -162,21 +210,59 @@ void ControlServer::start() } } + tcp::endpoint endpoint_ws_v6(tcp::v6(), 8080); + try + { + acceptor_ws_v6_ = make_shared(*io_context_, endpoint_ws_v6); + boost::system::error_code ec; + acceptor_ws_v6_->set_option(boost::asio::ip::v6_only(false), ec); + boost::asio::ip::v6_only option; + acceptor_ws_v6_->get_option(option); + is_v6_only = option.value(); + LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n"; + } + catch (const boost::system::system_error& e) + { + LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; + } + + if (!acceptor_ws_v6_ || is_v6_only) + { + tcp::endpoint endpoint_v4(tcp::v4(), port_); + try + { + acceptor_ws_v4_ = make_shared(*io_context_, endpoint_v4); + } + catch (const boost::system::system_error& e) + { + LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n"; + } + } startAccept(); } void ControlServer::stop() { - if (acceptor_v4_) + if (acceptor_tcp_v4_) { - acceptor_v4_->cancel(); - acceptor_v4_ = nullptr; + acceptor_tcp_v4_->cancel(); + acceptor_tcp_v4_ = nullptr; } - if (acceptor_v6_) + if (acceptor_tcp_v6_) { - acceptor_v6_->cancel(); - acceptor_v6_ = nullptr; + acceptor_tcp_v6_->cancel(); + acceptor_tcp_v6_ = nullptr; + } + if (acceptor_ws_v4_) + { + acceptor_ws_v4_->cancel(); + acceptor_ws_v4_ = nullptr; + } + if (acceptor_ws_v6_) + { + acceptor_ws_v6_->cancel(); + acceptor_ws_v6_ = nullptr; } std::lock_guard mlock(session_mutex_); for (auto s : sessions_) diff --git a/server/controlServer.h b/server/controlServer.h index e048ea50..d489cd5a 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -29,7 +29,7 @@ #include "common/queue.h" #include "common/sampleFormat.h" -#include "control_session_tcp.hpp" +#include "control_session.hpp" #include "message/codecHeader.h" #include "message/message.h" #include "message/serverSettings.h" @@ -57,13 +57,16 @@ public: private: void startAccept(); - void handleAccept(tcp::socket socket); + void handleAcceptTcp(tcp::socket socket); + void handleAcceptWs(tcp::socket socket); void cleanup(); mutable std::recursive_mutex session_mutex_; std::vector> sessions_; - std::shared_ptr acceptor_v4_; - std::shared_ptr acceptor_v6_; + std::shared_ptr acceptor_tcp_v4_; + std::shared_ptr acceptor_tcp_v6_; + std::shared_ptr acceptor_ws_v4_; + std::shared_ptr acceptor_ws_v6_; boost::asio::io_context* io_context_; size_t port_; diff --git a/server/control_session.hpp b/server/control_session.hpp index 7167bc8c..733585db 100644 --- a/server/control_session.hpp +++ b/server/control_session.hpp @@ -21,8 +21,8 @@ #include "common/queue.h" #include "message/message.h" -#include #include +#include #include #include #include @@ -51,7 +51,7 @@ public: * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSession : public std::enable_shared_from_this +class ControlSession { public: /// ctor. Received message from the client are passed to MessageReceiver diff --git a/server/control_session_tcp.hpp b/server/control_session_tcp.hpp index d49349d1..efdb1862 100644 --- a/server/control_session_tcp.hpp +++ b/server/control_session_tcp.hpp @@ -27,7 +27,7 @@ * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSessionTcp : public ControlSession +class ControlSessionTcp : public ControlSession, public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to MessageReceiver diff --git a/server/control_session_ws.cpp b/server/control_session_ws.cpp index 590cea97..6f2cd938 100644 --- a/server/control_session_ws.cpp +++ b/server/control_session_ws.cpp @@ -35,8 +35,66 @@ ControlSessionWs::~ControlSessionWs() stop(); } + void ControlSessionWs::start() { + // Set suggested timeout settings for the websocket + ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server)); + + // Set a decorator to change the Server of the handshake + ws_.set_option(websocket::stream_base::decorator( + [](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async"); })); + + // Accept the websocket handshake + auto self(shared_from_this()); + ws_.async_accept([this, self](beast::error_code ec) { on_accept(ec); }); +} + + +void ControlSessionWs::on_accept(beast::error_code ec) +{ + if (ec) + { + LOG(ERROR) << "ControlSessionWs::on_accept, error: " << ec.message() << "\n"; + return; + } + + // Read a message + do_read(); +} + + +void ControlSessionWs::do_read() +{ + // Read a message into our buffer + auto self(shared_from_this()); + ws_.async_read(buffer_, [this, self](beast::error_code ec, std::size_t bytes_transferred) { on_read(ec, bytes_transferred); }); +} + + +void ControlSessionWs::on_read(beast::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == websocket::error::closed) + return; + + if (ec) + { + LOG(ERROR) << "ControlSessionWs::on_read error: " << ec.message() << "\n"; + return; + } + + std::string line{boost::beast::buffers_to_string(buffer_.data())}; + if (!line.empty()) + { + LOG(INFO) << "received: " << line << "\n"; + if ((message_receiver_ != nullptr) && !line.empty()) + message_receiver_->onMessageReceived(this, line); + } + buffer_.consume(bytes_transferred); + do_read(); } @@ -47,10 +105,23 @@ void ControlSessionWs::stop() void ControlSessionWs::sendAsync(const std::string& message) { + auto self(shared_from_this()); + ws_.async_write(boost::asio::buffer(message), [this, self](std::error_code ec, std::size_t length) { + if (ec) + { + LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n"; + } + else + { + LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n"; + } + }); } bool ControlSessionWs::send(const std::string& message) { - return true; + boost::system::error_code ec; + ws_.write(boost::asio::buffer(message), ec); + return !ec; } diff --git a/server/control_session_ws.hpp b/server/control_session_ws.hpp index 453c786f..7089a3f1 100644 --- a/server/control_session_ws.hpp +++ b/server/control_session_ws.hpp @@ -35,7 +35,7 @@ namespace net = boost::asio; // from * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the ControlMessageReceiver callback */ -class ControlSessionWs : public ControlSession +class ControlSessionWs : public ControlSession, public std::enable_shared_from_this { public: /// ctor. Received message from the client are passed to MessageReceiver @@ -51,9 +51,13 @@ public: void sendAsync(const std::string& message) override; protected: + void on_accept(beast::error_code ec); + void do_read(); + void on_read(beast::error_code ec, std::size_t bytes_transferred); + websocket::stream ws_; - // beast::flat_buffer buffer_; - // beast::multi_buffer b; + beast::flat_buffer buffer_; + beast::multi_buffer b; }; diff --git a/server/streamSession.h b/server/streamSession.h index 2baddf5c..479476c1 100644 --- a/server/streamSession.h +++ b/server/streamSession.h @@ -22,8 +22,8 @@ #include "common/queue.h" #include "message/message.h" #include "streamreader/streamManager.h" -#include #include +#include #include #include #include