Streaming clients can connect via Websockets

This commit is contained in:
badaix 2020-07-02 22:26:53 +02:00
parent 5723791f8a
commit 01ce9a60c0
19 changed files with 789 additions and 356 deletions

View file

@ -31,6 +31,8 @@
using namespace std;
using json = nlohmann::json;
static constexpr auto LOG_TAG = "ControlServer";
ControlServer::ControlServer(boost::asio::io_context& io_context, const ServerSettings::Tcp& tcp_settings, const ServerSettings::Http& http_settings,
ControlMessageReceiver* controlMessageReceiver)
@ -51,7 +53,7 @@ void ControlServer::cleanup()
auto count = distance(new_end, sessions_.end());
if (count > 0)
{
LOG(ERROR) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n";
LOG(ERROR, LOG_TAG) << "Removing " << count << " inactive session(s), active sessions: " << sessions_.size() - count << "\n";
sessions_.erase(new_end, sessions_.end());
}
}
@ -72,29 +74,45 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu
}
std::string ControlServer::onMessageReceived(ControlSession* connection, const std::string& message)
std::string ControlServer::onMessageReceived(ControlSession* session, const std::string& message)
{
// LOG(DEBUG) << "received: \"" << message << "\"\n";
// LOG(DEBUG, LOG_TAG) << "received: \"" << message << "\"\n";
if (controlMessageReceiver_ != nullptr)
return controlMessageReceiver_->onMessageReceived(connection, message);
return controlMessageReceiver_->onMessageReceived(session, message);
return "";
}
void ControlServer::onNewSession(const shared_ptr<ControlSession>& session)
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();
sessions_.emplace_back(session);
cleanup();
}
void ControlServer::onNewSession(const std::shared_ptr<StreamSession>& session)
{
if (controlMessageReceiver_ != nullptr)
controlMessageReceiver_->onNewSession(session);
}
void ControlServer::startAccept()
{
auto accept_handler_tcp = [this](error_code ec, tcp::socket socket) {
if (!ec)
handleAccept<ControlSessionTcp>(std::move(socket));
else
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
};
auto accept_handler_http = [this](error_code ec, tcp::socket socket) {
if (!ec)
handleAccept<ControlSessionHttp>(std::move(socket), http_settings_);
else
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
LOG(ERROR, LOG_TAG) << "Error while accepting socket connection: " << ec.message() << "\n";
};
for (auto& acceptor : acceptor_tcp_)
@ -116,18 +134,13 @@ void ControlServer::handleAccept(tcp::socket socket, Args&&... args)
setsockopt(socket.native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket.native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
// socket->set_option(boost::asio::ip::tcp::no_delay(false));
LOG(NOTICE) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
LOG(NOTICE, LOG_TAG) << "ControlServer::NewConnection: " << socket.remote_endpoint().address().to_string() << endl;
shared_ptr<SessionType> session = make_shared<SessionType>(this, io_context_, std::move(socket), std::forward<Args>(args)...);
{
std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start();
sessions_.emplace_back(session);
cleanup();
}
onNewSession(session);
}
catch (const std::exception& e)
{
LOG(ERROR) << "Exception in ControlServer::handleAccept: " << e.what() << endl;
LOG(ERROR, LOG_TAG) << "Exception in ControlServer::handleAccept: " << e.what() << endl;
}
startAccept();
}
@ -142,13 +155,13 @@ void ControlServer::start()
{
try
{
LOG(INFO) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n";
LOG(INFO, LOG_TAG) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n";
acceptor_tcp_.emplace_back(
make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port)));
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
LOG(ERROR, LOG_TAG) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
}
@ -158,13 +171,13 @@ void ControlServer::start()
{
try
{
LOG(INFO) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n";
LOG(INFO, LOG_TAG) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n";
acceptor_http_.emplace_back(
make_unique<tcp::acceptor>(io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port)));
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n";
LOG(ERROR, LOG_TAG) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
}