bind stream server to configurable interface

This commit is contained in:
badaix 2019-10-12 11:26:47 +02:00
parent 9ced0aa438
commit 484cd5e672
6 changed files with 66 additions and 56 deletions

View file

@ -140,15 +140,35 @@ void ControlServer::start()
if (tcp_settings_.enabled) if (tcp_settings_.enabled)
{ {
for (const auto& address : tcp_settings_.bind_to_address) for (const auto& address : tcp_settings_.bind_to_address)
{
try
{
LOG(INFO) << "Creating TCP acceptor for address: " << address << ", port: " << tcp_settings_.port << "\n";
acceptor_tcp_.emplace_back( acceptor_tcp_.emplace_back(
make_unique<tcp::acceptor>(*io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port))); make_unique<tcp::acceptor>(*io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), tcp_settings_.port)));
} }
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
}
if (http_settings_.enabled) if (http_settings_.enabled)
{ {
for (const auto& address : http_settings_.bind_to_address) for (const auto& address : http_settings_.bind_to_address)
{
try
{
LOG(INFO) << "Creating HTTP acceptor for address: " << address << ", port: " << http_settings_.port << "\n";
acceptor_http_.emplace_back( acceptor_http_.emplace_back(
make_unique<tcp::acceptor>(*io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port))); make_unique<tcp::acceptor>(*io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), http_settings_.port)));
} }
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating HTTP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
}
}
startAccept(); startAccept();
} }
@ -162,6 +182,9 @@ void ControlServer::stop()
for (auto& acceptor : acceptor_http_) for (auto& acceptor : acceptor_http_)
acceptor->cancel(); acceptor->cancel();
acceptor_tcp_.clear();
acceptor_http_.clear();
std::lock_guard<std::recursive_mutex> mlock(session_mutex_); std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
cleanup(); cleanup();
for (auto s : sessions_) for (auto s : sessions_)

View file

@ -48,6 +48,7 @@ struct ServerSettings
std::string sampleFormat{"48000:16:2"}; std::string sampleFormat{"48000:16:2"};
size_t streamReadMs{20}; size_t streamReadMs{20};
bool sendAudioToMutedClients{false}; bool sendAudioToMutedClients{false};
std::vector<std::string> bind_to_address{{"0.0.0.0"}};
}; };
struct LoggingSettings struct LoggingSettings

View file

@ -21,7 +21,11 @@
# enable HTTP Json RPC (HTTP POST and websockets) # enable HTTP Json RPC (HTTP POST and websockets)
#enabled = true #enabled = true
# address to listen on # address to listen on, can be specified multiple times
# use "0.0.0.0" to bind to any IPv4 address or :: to bind to any IPv6 address
# or "127.0.0.1" or "::1" to bind to localhost IPv4 or IPv6, respectively
# use the address of a specific network interface to just listen to and accept
# connections from that interface
#bind_to_address = 0.0.0.0 #bind_to_address = 0.0.0.0
# which port the server should listen to # which port the server should listen to
@ -39,7 +43,11 @@ doc_root = /home/johannes/Develop/snapcast/control
# enable TCP Json RPC # enable TCP Json RPC
#enabled = true #enabled = true
# address to listen on # address to listen on, can be specified multiple times
# use "0.0.0.0" to bind to any IPv4 address or :: to bind to any IPv6 address
# or "127.0.0.1" or "::1" to bind to localhost IPv4 or IPv6, respectively
# use the address of a specific network interface to just listen to and accept
# connections from that interface
#bind_to_address = 0.0.0.0 #bind_to_address = 0.0.0.0
# which port the server should listen to # which port the server should listen to
@ -51,8 +59,11 @@ doc_root = /home/johannes/Develop/snapcast/control
# Stream settings ############################################################# # Stream settings #############################################################
# #
[stream] [stream]
# address to listen on # address to listen on, can be specified multiple times
# TODO: not implemented yet # use "0.0.0.0" to bind to any IPv4 address or :: to bind to any IPv6 address
# or "127.0.0.1" or "::1" to bind to localhost IPv4 or IPv6, respectively
# use the address of a specific network interface to just listen to and accept
# connections from that interface
#bind_to_address = 0.0.0.0 #bind_to_address = 0.0.0.0
# which port the server should listen to # which port the server should listen to

View file

@ -91,6 +91,8 @@ int main(int argc, char* argv[])
conf.add<Value<int>>("b", "stream.buffer", "Buffer [ms]", settings.stream.bufferMs, &settings.stream.bufferMs); conf.add<Value<int>>("b", "stream.buffer", "Buffer [ms]", settings.stream.bufferMs, &settings.stream.bufferMs);
conf.add<Value<bool>>("", "stream.send_to_muted", "Send audio to muted clients", settings.stream.sendAudioToMutedClients, conf.add<Value<bool>>("", "stream.send_to_muted", "Send audio to muted clients", settings.stream.sendAudioToMutedClients,
&settings.stream.sendAudioToMutedClients); &settings.stream.sendAudioToMutedClients);
auto stream_bind_to_address = conf.add<Value<string>>("", "stream.bind_to_address", "address for the server to listen on",
settings.stream.bind_to_address.front(), &settings.stream.bind_to_address[0]);
// HTTP RPC settings // HTTP RPC settings
conf.add<Value<bool>>("", "http.enabled", "enable HTTP Json RPC (HTTP POST and websockets)", settings.http.enabled, &settings.http.enabled); conf.add<Value<bool>>("", "http.enabled", "enable HTTP Json RPC (HTTP POST and websockets)", settings.http.enabled, &settings.http.enabled);
@ -123,6 +125,12 @@ int main(int argc, char* argv[])
for (size_t n = 0; n < http_bind_to_address->count(); ++n) for (size_t n = 0; n < http_bind_to_address->count(); ++n)
settings.http.bind_to_address.push_back(http_bind_to_address->value(n)); settings.http.bind_to_address.push_back(http_bind_to_address->value(n));
} }
if (stream_bind_to_address->is_set())
{
settings.stream.bind_to_address.clear();
for (size_t n = 0; n < stream_bind_to_address->count(); ++n)
settings.stream.bind_to_address.push_back(stream_bind_to_address->value(n));
}
} }
catch (const std::invalid_argument& e) catch (const std::invalid_argument& e)
{ {

View file

@ -29,8 +29,7 @@ using namespace std;
using json = nlohmann::json; using json = nlohmann::json;
StreamServer::StreamServer(boost::asio::io_context* io_context, const ServerSettings& serverSettings) StreamServer::StreamServer(boost::asio::io_context* io_context, const ServerSettings& serverSettings) : io_context_(io_context), settings_(serverSettings)
: io_context_(io_context), acceptor_v4_(nullptr), acceptor_v6_(nullptr), settings_(serverSettings)
{ {
} }
@ -771,14 +770,8 @@ void StreamServer::startAccept()
LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n"; LOG(ERROR) << "Error while accepting socket connection: " << ec.message() << "\n";
}; };
if (acceptor_v4_) for (auto& acceptor : acceptor_)
{ acceptor->async_accept(accept_handler);
acceptor_v4_->async_accept(accept_handler);
}
if (acceptor_v6_)
{
acceptor_v6_->async_accept(accept_handler);
}
} }
@ -829,29 +822,13 @@ void StreamServer::start()
} }
streamManager_->start(); streamManager_->start();
bool is_v6_only(true); for (const auto& address : settings_.stream.bind_to_address)
tcp::endpoint endpoint_v6(tcp::v6(), settings_.stream.port); {
try try
{ {
acceptor_v6_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v6); LOG(INFO) << "Creating stream acceptor for address: " << address << ", port: " << settings_.stream.port << "\n";
boost::system::error_code ec; acceptor_.emplace_back(
acceptor_v6_->set_option(boost::asio::ip::v6_only(false), ec); make_unique<tcp::acceptor>(*io_context_, tcp::endpoint(boost::asio::ip::address::from_string(address), settings_.stream.port)));
boost::asio::ip::v6_only option;
acceptor_v6_->get_option(option);
is_v6_only = option.value();
LOG(DEBUG) << "IPv6 only: " << is_v6_only << "\n";
}
catch (const boost::system::system_error& e)
{
LOG(ERROR) << "error creating TCP acceptor: " << e.what() << ", code: " << e.code() << "\n";
}
if (!acceptor_v6_ || is_v6_only)
{
tcp::endpoint endpoint_v4(tcp::v4(), settings_.stream.port);
try
{
acceptor_v4_ = make_shared<tcp::acceptor>(*io_context_, endpoint_v4);
} }
catch (const boost::system::system_error& e) catch (const boost::system::system_error& e)
{ {
@ -880,13 +857,10 @@ void StreamServer::stop()
{ {
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session : sessions_) // it = sessions_.begin(); it != sessions_.end(); ++it) for (auto session : sessions_)
{ {
if (session) if (session)
{
session->stop(); session->stop();
session = nullptr;
}
} }
sessions_.clear(); sessions_.clear();
} }
@ -897,14 +871,7 @@ void StreamServer::stop()
controlServer_ = nullptr; controlServer_ = nullptr;
} }
if (acceptor_v4_) for (auto& acceptor : acceptor_)
{ acceptor->cancel();
acceptor_v4_->cancel(); acceptor_.clear();
acceptor_v4_ = nullptr;
}
if (acceptor_v6_)
{
acceptor_v6_->cancel();
acceptor_v6_ = nullptr;
}
} }

View file

@ -40,8 +40,9 @@
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
typedef std::shared_ptr<tcp::socket> socket_ptr; using acceptor_ptr = std::unique_ptr<tcp::acceptor>;
typedef std::shared_ptr<StreamSession> session_ptr; using socket_ptr = std::shared_ptr<tcp::socket>;
using session_ptr = std::shared_ptr<StreamSession>;
/// Forwars PCM data to the connected clients /// Forwars PCM data to the connected clients
@ -85,8 +86,7 @@ private:
mutable std::recursive_mutex sessionsMutex_; mutable std::recursive_mutex sessionsMutex_;
std::set<session_ptr> sessions_; std::set<session_ptr> sessions_;
boost::asio::io_context* io_context_; boost::asio::io_context* io_context_;
std::shared_ptr<tcp::acceptor> acceptor_v4_; std::vector<acceptor_ptr> acceptor_;
std::shared_ptr<tcp::acceptor> acceptor_v6_;
ServerSettings settings_; ServerSettings settings_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_; Queue<std::shared_ptr<msg::BaseMessage>> messages_;