From fd616956cbfa107ef8a34246ebd26d7119b54cb4 Mon Sep 17 00:00:00 2001 From: badaix Date: Thu, 28 Nov 2019 20:07:43 +0100 Subject: [PATCH] Add client mode to TcpStream --- common/str_compat.hpp | 12 +++++ server/streamreader/tcp_stream.cpp | 80 ++++++++++++++++++++++-------- server/streamreader/tcp_stream.hpp | 4 ++ 3 files changed, 75 insertions(+), 21 deletions(-) diff --git a/common/str_compat.hpp b/common/str_compat.hpp index 0d71a3d4..b9341775 100644 --- a/common/str_compat.hpp +++ b/common/str_compat.hpp @@ -66,6 +66,18 @@ static int stoi(const std::string& str) #endif } +static int stoi(const std::string& str, int def) +{ + try + { + return cpt::stoi(str); + } + catch(...) + { + return def; + } +} + static double stod(const std::string& str) { #ifdef NO_CPP11_STRING diff --git a/server/streamreader/tcp_stream.cpp b/server/streamreader/tcp_stream.cpp index ccbf7bb3..f0479744 100644 --- a/server/streamreader/tcp_stream.cpp +++ b/server/streamreader/tcp_stream.cpp @@ -25,6 +25,7 @@ #include "common/aixlog.hpp" #include "common/snap_exception.hpp" #include "common/str_compat.hpp" +#include "common/utils/string_utils.hpp" #include "encoder/encoder_factory.hpp" #include "tcp_stream.hpp" @@ -33,37 +34,74 @@ using namespace std; -TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : AsioStream(pcmListener, ioc, uri) +TcpStream::TcpStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) + : AsioStream(pcmListener, ioc, uri), reconnect_timer_(ioc) { - size_t port = 4953; - try - { - port = cpt::stoi(uri_.getQuery("port", cpt::to_string(port))); - } - catch (...) + host_ = uri_.host; + auto host_port = utils::string::split(host_, ':'); + port_ = 4953; + if (host_port.size() == 2) { + host_ = host_port[0]; + port_ = cpt::stoi(host_port[1], port_); } - LOG(INFO) << "TcpStream port: " << port << "\n"; - acceptor_ = make_unique(ioc_, tcp::endpoint(boost::asio::ip::address::from_string(uri_.host), port)); + auto mode = uri_.getQuery("mode", "server"); + if (mode == "server") + is_server_ = true; + else if (mode == "client") + is_server_ = false; + else + throw SnapException("mode must be 'client' or 'server'"); + + port_ = cpt::stoi(uri_.getQuery("port", cpt::to_string(port_)), port_); + + LOG(INFO) << "TcpStream host: " << host_ << ", port: " << port_ << ", is server: " << is_server_ << "\n"; + if (is_server_) + acceptor_ = make_unique(ioc_, tcp::endpoint(boost::asio::ip::address::from_string(host_), port_)); } void TcpStream::connect() { auto self = shared_from_this(); - acceptor_->async_accept([this, self](boost::system::error_code ec, tcp::socket socket) { - if (!ec) - { - LOG(DEBUG) << "New client connection\n"; - stream_ = make_unique(move(socket)); - on_connect(); - } - else - { - LOG(ERROR) << "Accept failed: " << ec.message() << "\n"; - } - }); + + if (is_server_) + { + acceptor_->async_accept([this, self](boost::system::error_code ec, tcp::socket socket) { + if (!ec) + { + LOG(DEBUG) << "New client connection\n"; + stream_ = make_unique(move(socket)); + on_connect(); + } + else + { + LOG(ERROR) << "Accept failed: " << ec.message() << "\n"; + } + }); + } + else + { + stream_ = make_unique(ioc_); + boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(host_), port_); + stream_->async_connect(endpoint, [self, this](const boost::system::error_code& ec) { + if (!ec) + { + LOG(DEBUG) << "Connected\n"; + on_connect(); + } + else + { + LOG(DEBUG) << "Connect failed: " << ec.message() << "\n"; + reconnect_timer_.expires_from_now(boost::posix_time::milliseconds(1000)); + reconnect_timer_.async_wait([self, this](const boost::system::error_code& ec) { + if (!ec) + connect(); + }); + } + }); + } } diff --git a/server/streamreader/tcp_stream.hpp b/server/streamreader/tcp_stream.hpp index 57b72fe0..0bd6cec0 100644 --- a/server/streamreader/tcp_stream.hpp +++ b/server/streamreader/tcp_stream.hpp @@ -40,6 +40,10 @@ protected: void connect() override; void disconnect() override; std::unique_ptr acceptor_; + std::string host_; + size_t port_; + bool is_server_; + boost::asio::deadline_timer reconnect_timer_; };