mirror of
https://github.com/badaix/snapcast.git
synced 2025-04-30 10:47:12 +02:00
Add client support for websockets
This commit is contained in:
parent
9fbf273caa
commit
6c02252d84
13 changed files with 393 additions and 118 deletions
|
@ -24,16 +24,22 @@
|
||||||
#include "common/str_compat.hpp"
|
#include "common/str_compat.hpp"
|
||||||
|
|
||||||
// 3rd party headers
|
// 3rd party headers
|
||||||
|
#include <boost/asio/buffer.hpp>
|
||||||
#include <boost/asio/read.hpp>
|
#include <boost/asio/read.hpp>
|
||||||
#include <boost/asio/streambuf.hpp>
|
#include <boost/asio/streambuf.hpp>
|
||||||
#include <boost/asio/write.hpp>
|
#include <boost/asio/write.hpp>
|
||||||
|
|
||||||
// standard headers
|
// standard headers
|
||||||
|
#include <boost/beast/core/flat_buffer.hpp>
|
||||||
|
#include <boost/system/detail/error_code.hpp>
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <optional>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
namespace http = beast::http; // from <boost/beast/http.hpp>
|
||||||
|
|
||||||
static constexpr auto LOG_TAG = "Connection";
|
static constexpr auto LOG_TAG = "Connection";
|
||||||
|
|
||||||
|
@ -93,35 +99,92 @@ bool PendingRequest::operator<(const PendingRequest& other) const
|
||||||
|
|
||||||
|
|
||||||
ClientConnection::ClientConnection(boost::asio::io_context& io_context, ClientSettings::Server server)
|
ClientConnection::ClientConnection(boost::asio::io_context& io_context, ClientSettings::Server server)
|
||||||
: strand_(boost::asio::make_strand(io_context.get_executor())), resolver_(strand_), socket_(strand_), reqId_(1), server_(std::move(server))
|
: strand_(boost::asio::make_strand(io_context.get_executor())), resolver_(strand_), reqId_(1), server_(std::move(server)),
|
||||||
|
base_msg_size_(base_message_.getSize())
|
||||||
{
|
{
|
||||||
base_msg_size_ = base_message_.getSize();
|
|
||||||
buffer_.resize(base_msg_size_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
ClientConnection::~ClientConnection()
|
void ClientConnection::sendNext()
|
||||||
|
{
|
||||||
|
auto& message = messages_.front();
|
||||||
|
boost::asio::streambuf streambuf;
|
||||||
|
std::ostream stream(&streambuf);
|
||||||
|
tv t;
|
||||||
|
message.msg->sent = t;
|
||||||
|
message.msg->serialize(stream);
|
||||||
|
ResultHandler handler = message.handler;
|
||||||
|
|
||||||
|
write(streambuf, [this, handler](boost::system::error_code ec, std::size_t length)
|
||||||
|
{
|
||||||
|
if (ec)
|
||||||
|
LOG(ERROR, LOG_TAG) << "Failed to send message, error: " << ec.message() << "\n";
|
||||||
|
else
|
||||||
|
LOG(TRACE, LOG_TAG) << "Wrote " << length << " bytes to socket\n";
|
||||||
|
|
||||||
|
messages_.pop_front();
|
||||||
|
if (handler)
|
||||||
|
handler(ec);
|
||||||
|
|
||||||
|
if (!messages_.empty())
|
||||||
|
sendNext();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnection::send(const msg::message_ptr& message, const ResultHandler& handler)
|
||||||
|
{
|
||||||
|
boost::asio::post(strand_, [this, message, handler]()
|
||||||
|
{
|
||||||
|
messages_.emplace_back(message, handler);
|
||||||
|
if (messages_.size() > 1)
|
||||||
|
{
|
||||||
|
LOG(DEBUG, LOG_TAG) << "outstanding async_write\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
sendNext();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnection::sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<msg::BaseMessage>& handler)
|
||||||
|
{
|
||||||
|
boost::asio::post(strand_, [this, message, timeout, handler]()
|
||||||
|
{
|
||||||
|
pendingRequests_.erase(
|
||||||
|
std::remove_if(pendingRequests_.begin(), pendingRequests_.end(), [](const std::weak_ptr<PendingRequest>& request) { return request.expired(); }),
|
||||||
|
pendingRequests_.end());
|
||||||
|
unique_ptr<msg::BaseMessage> response(nullptr);
|
||||||
|
static constexpr uint16_t max_req_id = 10000;
|
||||||
|
if (++reqId_ >= max_req_id)
|
||||||
|
reqId_ = 1;
|
||||||
|
message->id = reqId_;
|
||||||
|
auto request = make_shared<PendingRequest>(strand_, reqId_, handler);
|
||||||
|
pendingRequests_.push_back(request);
|
||||||
|
request->startTimer(timeout);
|
||||||
|
send(message, [handler](const boost::system::error_code& ec)
|
||||||
|
{
|
||||||
|
if (ec)
|
||||||
|
handler(ec, nullptr);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////////// TCP /////////////////////////////////////
|
||||||
|
|
||||||
|
ClientConnectionTcp::ClientConnectionTcp(boost::asio::io_context& io_context, ClientSettings::Server server)
|
||||||
|
: ClientConnection(io_context, std::move(server)), socket_(strand_)
|
||||||
|
{
|
||||||
|
buffer_.resize(base_msg_size_);
|
||||||
|
}
|
||||||
|
|
||||||
|
ClientConnectionTcp::~ClientConnectionTcp()
|
||||||
{
|
{
|
||||||
disconnect();
|
disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ClientConnectionTcp::connect(const ResultHandler& handler)
|
||||||
std::string ClientConnection::getMacAddress()
|
|
||||||
{
|
|
||||||
std::string mac =
|
|
||||||
#ifndef WINDOWS
|
|
||||||
::getMacAddress(socket_.native_handle());
|
|
||||||
#else
|
|
||||||
::getMacAddress(socket_.local_endpoint().address().to_string());
|
|
||||||
#endif
|
|
||||||
if (mac.empty())
|
|
||||||
mac = "00:00:00:00:00:00";
|
|
||||||
LOG(INFO, LOG_TAG) << "My MAC: \"" << mac << "\", socket: " << socket_.native_handle() << "\n";
|
|
||||||
return mac;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::connect(const ResultHandler& handler)
|
|
||||||
{
|
{
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
LOG(INFO, LOG_TAG) << "Resolving host IP for: " << server_.host << "\n";
|
LOG(INFO, LOG_TAG) << "Resolving host IP for: " << server_.host << "\n";
|
||||||
|
@ -180,8 +243,7 @@ void ClientConnection::connect(const ResultHandler& handler)
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ClientConnectionTcp::disconnect()
|
||||||
void ClientConnection::disconnect()
|
|
||||||
{
|
{
|
||||||
LOG(DEBUG, LOG_TAG) << "Disconnecting\n";
|
LOG(DEBUG, LOG_TAG) << "Disconnecting\n";
|
||||||
if (!socket_.is_open())
|
if (!socket_.is_open())
|
||||||
|
@ -201,73 +263,22 @@ void ClientConnection::disconnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::sendNext()
|
std::string ClientConnectionTcp::getMacAddress()
|
||||||
{
|
{
|
||||||
auto& message = messages_.front();
|
std::string mac =
|
||||||
static boost::asio::streambuf streambuf;
|
#ifndef WINDOWS
|
||||||
std::ostream stream(&streambuf);
|
::getMacAddress(socket_.native_handle());
|
||||||
tv t;
|
#else
|
||||||
message.msg->sent = t;
|
::getMacAddress(socket_.local_endpoint().address().to_string());
|
||||||
message.msg->serialize(stream);
|
#endif
|
||||||
auto handler = message.handler;
|
if (mac.empty())
|
||||||
|
mac = "00:00:00:00:00:00";
|
||||||
boost::asio::async_write(socket_, streambuf, [this, handler](boost::system::error_code ec, std::size_t length)
|
LOG(INFO, LOG_TAG) << "My MAC: \"" << mac << "\", socket: " << socket_.native_handle() << "\n";
|
||||||
{
|
return mac;
|
||||||
if (ec)
|
|
||||||
LOG(ERROR, LOG_TAG) << "Failed to send message, error: " << ec.message() << "\n";
|
|
||||||
else
|
|
||||||
LOG(TRACE, LOG_TAG) << "Wrote " << length << " bytes to socket\n";
|
|
||||||
|
|
||||||
messages_.pop_front();
|
|
||||||
if (handler)
|
|
||||||
handler(ec);
|
|
||||||
|
|
||||||
if (!messages_.empty())
|
|
||||||
sendNext();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::send(const msg::message_ptr& message, const ResultHandler& handler)
|
void ClientConnectionTcp::getNextMessage(const MessageHandler<msg::BaseMessage>& handler)
|
||||||
{
|
|
||||||
boost::asio::post(strand_, [this, message, handler]()
|
|
||||||
{
|
|
||||||
messages_.emplace_back(message, handler);
|
|
||||||
if (messages_.size() > 1)
|
|
||||||
{
|
|
||||||
LOG(DEBUG, LOG_TAG) << "outstanding async_write\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
sendNext();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<msg::BaseMessage>& handler)
|
|
||||||
{
|
|
||||||
boost::asio::post(strand_, [this, message, timeout, handler]()
|
|
||||||
{
|
|
||||||
pendingRequests_.erase(
|
|
||||||
std::remove_if(pendingRequests_.begin(), pendingRequests_.end(), [](const std::weak_ptr<PendingRequest>& request) { return request.expired(); }),
|
|
||||||
pendingRequests_.end());
|
|
||||||
unique_ptr<msg::BaseMessage> response(nullptr);
|
|
||||||
static constexpr uint16_t max_req_id = 10000;
|
|
||||||
if (++reqId_ >= max_req_id)
|
|
||||||
reqId_ = 1;
|
|
||||||
message->id = reqId_;
|
|
||||||
auto request = make_shared<PendingRequest>(strand_, reqId_, handler);
|
|
||||||
pendingRequests_.push_back(request);
|
|
||||||
request->startTimer(timeout);
|
|
||||||
send(message, [handler](const boost::system::error_code& ec)
|
|
||||||
{
|
|
||||||
if (ec)
|
|
||||||
handler(ec, nullptr);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& handler)
|
|
||||||
{
|
{
|
||||||
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, handler](boost::system::error_code ec, std::size_t length) mutable
|
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_), [this, handler](boost::system::error_code ec, std::size_t length) mutable
|
||||||
{
|
{
|
||||||
|
@ -336,3 +347,172 @@ void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& ha
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnectionTcp::write(boost::asio::streambuf& buffer, WriteHandler&& write_handler)
|
||||||
|
{
|
||||||
|
boost::asio::async_write(socket_, buffer, write_handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///////////////////////////////// Websockets //////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
ClientConnectionWs::ClientConnectionWs(boost::asio::io_context& io_context, ClientSettings::Server server)
|
||||||
|
: ClientConnection(io_context, std::move(server)), tcp_ws_(strand_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
ClientConnectionWs::~ClientConnectionWs()
|
||||||
|
{
|
||||||
|
disconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnectionWs::connect(const ResultHandler& handler)
|
||||||
|
{
|
||||||
|
boost::system::error_code ec;
|
||||||
|
LOG(INFO, LOG_TAG) << "Resolving host IP for: " << server_.host << "\n";
|
||||||
|
auto iterator = resolver_.resolve(server_.host, cpt::to_string(server_.port), boost::asio::ip::resolver_query_base::numeric_service, ec);
|
||||||
|
if (ec)
|
||||||
|
{
|
||||||
|
LOG(ERROR, LOG_TAG) << "Failed to resolve host '" << server_.host << "', error: " << ec.message() << "\n";
|
||||||
|
handler(ec);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& iter : iterator)
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Resolved IP: " << iter.endpoint().address().to_string() << "\n";
|
||||||
|
|
||||||
|
for (const auto& iter : iterator)
|
||||||
|
{
|
||||||
|
LOG(INFO, LOG_TAG) << "Connecting to " << iter.endpoint() << "\n";
|
||||||
|
if (tcp_ws_)
|
||||||
|
{
|
||||||
|
tcp_ws_->binary(true);
|
||||||
|
tcp_ws_->next_layer().connect(iter, ec);
|
||||||
|
|
||||||
|
// Set suggested timeout settings for the websocket
|
||||||
|
tcp_ws_->set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
|
||||||
|
|
||||||
|
// Set a decorator to change the User-Agent of the handshake
|
||||||
|
tcp_ws_->set_option(websocket::stream_base::decorator([](websocket::request_type& req)
|
||||||
|
{ req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-async"); }));
|
||||||
|
|
||||||
|
// Perform the websocket handshake
|
||||||
|
tcp_ws_->handshake("127.0.0.1", "/stream", ec);
|
||||||
|
handler(ec);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ec || (ec == boost::system::errc::interrupted))
|
||||||
|
{
|
||||||
|
// We were successful or interrupted, e.g. by sig int
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ec)
|
||||||
|
LOG(ERROR, LOG_TAG) << "Failed to connect to host '" << server_.host << "', error: " << ec.message() << "\n";
|
||||||
|
else
|
||||||
|
LOG(NOTICE, LOG_TAG) << "Connected to " << tcp_ws_->next_layer().remote_endpoint().address().to_string() << "\n";
|
||||||
|
|
||||||
|
handler(ec);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnectionWs::disconnect()
|
||||||
|
{
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Disconnecting\n";
|
||||||
|
if (!tcp_ws_->is_open())
|
||||||
|
{
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Not connected\n";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
boost::system::error_code ec;
|
||||||
|
tcp_ws_->close(websocket::close_code::normal, ec);
|
||||||
|
if (ec)
|
||||||
|
LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << "\n";
|
||||||
|
boost::asio::post(strand_, [this]() { pendingRequests_.clear(); });
|
||||||
|
LOG(DEBUG, LOG_TAG) << "Disconnected\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
std::string ClientConnectionWs::getMacAddress()
|
||||||
|
{
|
||||||
|
std::string mac =
|
||||||
|
#ifndef WINDOWS
|
||||||
|
::getMacAddress(tcp_ws_->next_layer().native_handle());
|
||||||
|
#else
|
||||||
|
::getMacAddress(tcp_ws_->next_layer().local_endpoint().address().to_string());
|
||||||
|
#endif
|
||||||
|
if (mac.empty())
|
||||||
|
mac = "00:00:00:00:00:00";
|
||||||
|
LOG(INFO, LOG_TAG) << "My MAC: \"" << mac << "\", socket: " << tcp_ws_->next_layer().native_handle() << "\n";
|
||||||
|
return mac;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnectionWs::getNextMessage(const MessageHandler<msg::BaseMessage>& handler)
|
||||||
|
{
|
||||||
|
tcp_ws_->async_read(buffer_, [this, handler](beast::error_code ec, std::size_t bytes_transferred) mutable
|
||||||
|
{
|
||||||
|
tv now;
|
||||||
|
LOG(DEBUG, LOG_TAG) << "on_read_ws, ec: " << ec << ", bytes_transferred: " << bytes_transferred << "\n";
|
||||||
|
|
||||||
|
// This indicates that the session was closed
|
||||||
|
if (ec == websocket::error::closed)
|
||||||
|
{
|
||||||
|
if (handler)
|
||||||
|
handler(ec, nullptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ec)
|
||||||
|
{
|
||||||
|
LOG(ERROR, LOG_TAG) << "ControlSessionWebsocket::on_read_ws error: " << ec.message() << "\n";
|
||||||
|
if (handler)
|
||||||
|
handler(ec, nullptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer_.consume(bytes_transferred);
|
||||||
|
|
||||||
|
auto* data = static_cast<char*>(buffer_.data().data());
|
||||||
|
base_message_.deserialize(data);
|
||||||
|
|
||||||
|
base_message_.received = now;
|
||||||
|
|
||||||
|
auto response = msg::factory::createMessage(base_message_, data + base_msg_size_);
|
||||||
|
if (!response)
|
||||||
|
LOG(WARNING, LOG_TAG) << "Failed to deserialize message of type: " << base_message_.type << "\n";
|
||||||
|
else
|
||||||
|
LOG(DEBUG, LOG_TAG) << "getNextMessage: " << response->type << ", size: " << response->size << ", id: " << response->id
|
||||||
|
<< ", refers: " << response->refersTo << "\n";
|
||||||
|
|
||||||
|
for (auto iter = pendingRequests_.begin(); iter != pendingRequests_.end(); ++iter)
|
||||||
|
{
|
||||||
|
auto request = *iter;
|
||||||
|
if (auto req = request.lock())
|
||||||
|
{
|
||||||
|
if (req->id() == base_message_.refersTo)
|
||||||
|
{
|
||||||
|
req->setValue(std::move(response));
|
||||||
|
pendingRequests_.erase(iter);
|
||||||
|
getNextMessage(handler);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (handler)
|
||||||
|
handler(ec, std::move(response));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ClientConnectionWs::write(boost::asio::streambuf& buffer, WriteHandler&& write_handler)
|
||||||
|
{
|
||||||
|
tcp_ws_->async_write(boost::asio::buffer(buffer.data()), write_handler);
|
||||||
|
}
|
||||||
|
|
|
@ -29,6 +29,10 @@
|
||||||
#include <boost/asio/ip/tcp.hpp>
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
#include <boost/asio/steady_timer.hpp>
|
#include <boost/asio/steady_timer.hpp>
|
||||||
#include <boost/asio/strand.hpp>
|
#include <boost/asio/strand.hpp>
|
||||||
|
#include <boost/asio/streambuf.hpp>
|
||||||
|
#include <boost/beast/core.hpp>
|
||||||
|
#include <boost/beast/ssl.hpp>
|
||||||
|
#include <boost/beast/websocket.hpp>
|
||||||
|
|
||||||
// standard headers
|
// standard headers
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
@ -36,7 +40,13 @@
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
|
||||||
using boost::asio::ip::tcp;
|
// using boost::asio::ip::tcp;
|
||||||
|
namespace beast = boost::beast; // from <boost/beast.hpp>
|
||||||
|
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
|
||||||
|
using tcp_socket = boost::asio::ip::tcp::socket;
|
||||||
|
using ssl_socket = boost::asio::ssl::stream<tcp_socket>;
|
||||||
|
using tcp_websocket = websocket::stream<tcp_socket>;
|
||||||
|
using ssl_websocket = websocket::stream<ssl_socket>;
|
||||||
|
|
||||||
|
|
||||||
class ClientConnection;
|
class ClientConnection;
|
||||||
|
@ -87,17 +97,19 @@ class ClientConnection
|
||||||
public:
|
public:
|
||||||
/// Result callback with boost::error_code
|
/// Result callback with boost::error_code
|
||||||
using ResultHandler = std::function<void(const boost::system::error_code&)>;
|
using ResultHandler = std::function<void(const boost::system::error_code&)>;
|
||||||
|
/// Result callback of a write operation
|
||||||
|
using WriteHandler = std::function<void(boost::system::error_code ec, std::size_t length)>;
|
||||||
|
|
||||||
/// c'tor
|
/// c'tor
|
||||||
ClientConnection(boost::asio::io_context& io_context, ClientSettings::Server server);
|
ClientConnection(boost::asio::io_context& io_context, ClientSettings::Server server);
|
||||||
/// d'tor
|
/// d'tor
|
||||||
virtual ~ClientConnection();
|
virtual ~ClientConnection() = default;
|
||||||
|
|
||||||
/// async connect
|
/// async connect
|
||||||
/// @param handler async result handler
|
/// @param handler async result handler
|
||||||
void connect(const ResultHandler& handler);
|
virtual void connect(const ResultHandler& handler) = 0;
|
||||||
/// disconnect the socket
|
/// disconnect the socket
|
||||||
void disconnect();
|
virtual void disconnect() = 0;
|
||||||
|
|
||||||
/// async send a message
|
/// async send a message
|
||||||
/// @param message the message
|
/// @param message the message
|
||||||
|
@ -126,35 +138,35 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @return MAC address of the client
|
/// @return MAC address of the client
|
||||||
std::string getMacAddress();
|
virtual std::string getMacAddress() = 0;
|
||||||
|
|
||||||
/// async get the next message
|
/// async get the next message
|
||||||
/// @param handler the next received message or error
|
/// @param handler the next received message or error
|
||||||
void getNextMessage(const MessageHandler<msg::BaseMessage>& handler);
|
virtual void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
virtual void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) = 0;
|
||||||
|
|
||||||
/// Send next pending message from messages_
|
/// Send next pending message from messages_
|
||||||
void sendNext();
|
void sendNext();
|
||||||
|
|
||||||
/// Base message holding the received message
|
/// Base message holding the received message
|
||||||
msg::BaseMessage base_message_;
|
msg::BaseMessage base_message_;
|
||||||
/// Receive buffer
|
|
||||||
std::vector<char> buffer_;
|
|
||||||
/// Size of a base message (= message header)
|
|
||||||
size_t base_msg_size_;
|
|
||||||
|
|
||||||
/// Strand to serialize send/receive
|
/// Strand to serialize send/receive
|
||||||
boost::asio::strand<boost::asio::any_io_executor> strand_;
|
boost::asio::strand<boost::asio::any_io_executor> strand_;
|
||||||
|
|
||||||
/// TCP resolver
|
/// TCP resolver
|
||||||
tcp::resolver resolver_;
|
boost::asio::ip::tcp::resolver resolver_;
|
||||||
/// TCP socket
|
|
||||||
tcp::socket socket_;
|
|
||||||
/// List of pending requests, waiting for a response (Message::refersTo)
|
/// List of pending requests, waiting for a response (Message::refersTo)
|
||||||
std::vector<std::weak_ptr<PendingRequest>> pendingRequests_;
|
std::vector<std::weak_ptr<PendingRequest>> pendingRequests_;
|
||||||
/// unique request id to match a response
|
/// unique request id to match a response
|
||||||
uint16_t reqId_;
|
uint16_t reqId_;
|
||||||
/// Server settings (host and port)
|
/// Server settings (host and port)
|
||||||
ClientSettings::Server server_;
|
ClientSettings::Server server_;
|
||||||
|
/// Size of a base message (= message header)
|
||||||
|
const size_t base_msg_size_;
|
||||||
|
|
||||||
/// A pending request
|
/// A pending request
|
||||||
struct PendingMessage
|
struct PendingMessage
|
||||||
|
@ -172,3 +184,53 @@ protected:
|
||||||
/// Pending messages to be sent
|
/// Pending messages to be sent
|
||||||
std::deque<PendingMessage> messages_;
|
std::deque<PendingMessage> messages_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/// Plain TCP connection
|
||||||
|
class ClientConnectionTcp : public ClientConnection
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/// c'tor
|
||||||
|
ClientConnectionTcp(boost::asio::io_context& io_context, ClientSettings::Server server);
|
||||||
|
/// d'tor
|
||||||
|
virtual ~ClientConnectionTcp();
|
||||||
|
|
||||||
|
void connect(const ResultHandler& handler) override;
|
||||||
|
void disconnect() override;
|
||||||
|
std::string getMacAddress() override;
|
||||||
|
void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) override;
|
||||||
|
|
||||||
|
/// TCP socket
|
||||||
|
tcp_socket socket_;
|
||||||
|
/// Receive buffer
|
||||||
|
std::vector<char> buffer_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/// Websocket connection
|
||||||
|
class ClientConnectionWs : public ClientConnection
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/// c'tor
|
||||||
|
ClientConnectionWs(boost::asio::io_context& io_context, ClientSettings::Server server);
|
||||||
|
/// d'tor
|
||||||
|
virtual ~ClientConnectionWs();
|
||||||
|
|
||||||
|
void connect(const ResultHandler& handler) override;
|
||||||
|
void disconnect() override;
|
||||||
|
std::string getMacAddress() override;
|
||||||
|
void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) override;
|
||||||
|
|
||||||
|
/// SSL web socket
|
||||||
|
// std::optional<ssl_websocket> ssl_ws_;
|
||||||
|
/// TCP web socket
|
||||||
|
std::optional<tcp_websocket> tcp_ws_;
|
||||||
|
/// Receive buffer
|
||||||
|
boost::beast::flat_buffer buffer_;
|
||||||
|
};
|
||||||
|
|
|
@ -47,19 +47,20 @@ struct ClientSettings
|
||||||
};
|
};
|
||||||
|
|
||||||
Mode mode{Mode::software};
|
Mode mode{Mode::software};
|
||||||
std::string parameter{""};
|
std::string parameter;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Server
|
struct Server
|
||||||
{
|
{
|
||||||
std::string host{""};
|
std::string host;
|
||||||
|
std::string protocol;
|
||||||
size_t port{1704};
|
size_t port{1704};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Player
|
struct Player
|
||||||
{
|
{
|
||||||
std::string player_name{""};
|
std::string player_name;
|
||||||
std::string parameter{""};
|
std::string parameter;
|
||||||
int latency{0};
|
int latency{0};
|
||||||
player::PcmDevice pcm_device;
|
player::PcmDevice pcm_device;
|
||||||
SampleFormat sample_format;
|
SampleFormat sample_format;
|
||||||
|
@ -69,7 +70,7 @@ struct ClientSettings
|
||||||
|
|
||||||
struct Logging
|
struct Logging
|
||||||
{
|
{
|
||||||
std::string sink{""};
|
std::string sink;
|
||||||
std::string filter{"*:info"};
|
std::string filter{"*:info"};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -354,14 +354,17 @@ void Controller::start()
|
||||||
settings_.server.host = host;
|
settings_.server.host = host;
|
||||||
settings_.server.port = port;
|
settings_.server.port = port;
|
||||||
LOG(INFO, LOG_TAG) << "Found server " << settings_.server.host << ":" << settings_.server.port << "\n";
|
LOG(INFO, LOG_TAG) << "Found server " << settings_.server.host << ":" << settings_.server.port << "\n";
|
||||||
clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
|
clientConnection_ = make_unique<ClientConnectionTcp>(io_context_, settings_.server);
|
||||||
worker();
|
worker();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
|
if (settings_.server.protocol == "ws")
|
||||||
|
clientConnection_ = make_unique<ClientConnectionWs>(io_context_, settings_.server);
|
||||||
|
else
|
||||||
|
clientConnection_ = make_unique<ClientConnectionTcp>(io_context_, settings_.server);
|
||||||
worker();
|
worker();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
/***
|
/***
|
||||||
This file is part of snapcast
|
This file is part of snapcast
|
||||||
Copyright (C) 2014-2024 Johannes Pohl
|
Copyright (C) 2014-2025 Johannes Pohl
|
||||||
|
|
||||||
This program is free software: you can redistribute it and/or modify
|
This program is free software: you can redistribute it and/or modify
|
||||||
it under the terms of the GNU General Public License as published by
|
it under the terms of the GNU General Public License as published by
|
||||||
|
@ -43,9 +43,12 @@ using namespace std::chrono_literals;
|
||||||
class Controller
|
class Controller
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
/// c'tor
|
||||||
Controller(boost::asio::io_context& io_context, const ClientSettings& settings); //, std::unique_ptr<MetadataAdapter> meta);
|
Controller(boost::asio::io_context& io_context, const ClientSettings& settings); //, std::unique_ptr<MetadataAdapter> meta);
|
||||||
|
/// Start thw work
|
||||||
void start();
|
void start();
|
||||||
// void stop();
|
// void stop();
|
||||||
|
/// @return list of supported audio backends
|
||||||
static std::vector<std::string> getSupportedPlayerNames();
|
static std::vector<std::string> getSupportedPlayerNames();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
// local headers
|
// local headers
|
||||||
#include "common/popl.hpp"
|
#include "common/popl.hpp"
|
||||||
|
#include "common/utils/string_utils.hpp"
|
||||||
#include "controller.hpp"
|
#include "controller.hpp"
|
||||||
|
|
||||||
#ifdef HAS_ALSA
|
#ifdef HAS_ALSA
|
||||||
|
@ -37,6 +38,7 @@
|
||||||
#include "common/aixlog.hpp"
|
#include "common/aixlog.hpp"
|
||||||
#include "common/snap_exception.hpp"
|
#include "common/snap_exception.hpp"
|
||||||
#include "common/str_compat.hpp"
|
#include "common/str_compat.hpp"
|
||||||
|
#include "common/stream_uri.hpp"
|
||||||
#include "common/version.hpp"
|
#include "common/version.hpp"
|
||||||
|
|
||||||
// 3rd party headers
|
// 3rd party headers
|
||||||
|
@ -202,6 +204,14 @@ int main(int argc, char** argv)
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!op.non_option_args().empty())
|
||||||
|
{
|
||||||
|
streamreader::StreamUri uri(op.non_option_args().front());
|
||||||
|
settings.server.host = uri.host;
|
||||||
|
settings.server.port = uri.port.value_or(settings.server.port);
|
||||||
|
settings.server.protocol = uri.scheme;
|
||||||
|
}
|
||||||
|
|
||||||
if (versionSwitch->is_set())
|
if (versionSwitch->is_set())
|
||||||
{
|
{
|
||||||
cout << "snapclient v" << version::code << (!version::rev().empty() ? (" (rev " + version::rev(8) + ")") : ("")) << "\n"
|
cout << "snapclient v" << version::code << (!version::rev().empty() ? (" (rev " + version::rev(8) + ")") : ("")) << "\n"
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
set(SOURCES resampler.cpp sample_format.cpp jwt.cpp base64.cpp
|
set(SOURCES resampler.cpp sample_format.cpp jwt.cpp base64.cpp stream_uri.cpp
|
||||||
utils/string_utils.cpp)
|
utils/string_utils.cpp)
|
||||||
|
|
||||||
if(NOT WIN32 AND NOT ANDROID)
|
if(NOT WIN32 AND NOT ANDROID)
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
#ifndef NOMINMAX
|
#ifndef NOMINMAX
|
||||||
#define NOMINMAX
|
#define NOMINMAX
|
||||||
|
#include <optional>
|
||||||
#endif // NOMINMAX
|
#endif // NOMINMAX
|
||||||
|
|
||||||
// prototype/interface header file
|
// prototype/interface header file
|
||||||
|
@ -87,6 +88,12 @@ void StreamUri::parse(const std::string& stream_uri)
|
||||||
// pos: ^ or ^ or ^
|
// pos: ^ or ^ or ^
|
||||||
|
|
||||||
host = strutils::uriDecode(strutils::trim_copy(tmp.substr(0, pos)));
|
host = strutils::uriDecode(strutils::trim_copy(tmp.substr(0, pos)));
|
||||||
|
std::string str_port;
|
||||||
|
host = utils::string::split_left(host, ':', str_port);
|
||||||
|
port = std::atoi(str_port.c_str());
|
||||||
|
if (port == 0)
|
||||||
|
port = std::nullopt;
|
||||||
|
|
||||||
tmp = tmp.substr(pos);
|
tmp = tmp.substr(pos);
|
||||||
path = tmp;
|
path = tmp;
|
||||||
pos = std::min(path.find('?'), path.find('#'));
|
pos = std::min(path.find('?'), path.find('#'));
|
||||||
|
@ -166,9 +173,11 @@ std::string StreamUri::getQuery(const std::string& key, const std::string& def)
|
||||||
return def;
|
return def;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool StreamUri::operator==(const StreamUri& other) const
|
bool StreamUri::operator==(const StreamUri& other) const
|
||||||
{
|
{
|
||||||
return (other.scheme == scheme) && (other.host == host) && (other.path == path) && (other.query == query) && (other.fragment == fragment);
|
return (other.scheme == scheme) && (other.host == host) && (other.port == port) && (other.path == path) && (other.query == query) &&
|
||||||
|
(other.fragment == fragment);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace streamreader
|
} // namespace streamreader
|
|
@ -24,6 +24,7 @@
|
||||||
|
|
||||||
// standard headers
|
// standard headers
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <optional>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
|
|
||||||
|
@ -54,6 +55,8 @@ struct StreamUri
|
||||||
|
|
||||||
/// the host component
|
/// the host component
|
||||||
std::string host;
|
std::string host;
|
||||||
|
/// the port
|
||||||
|
std::optional<size_t> port;
|
||||||
/// the path component
|
/// the path component
|
||||||
std::string path;
|
std::string path;
|
||||||
/// the query component: "key = value" pairs
|
/// the query component: "key = value" pairs
|
|
@ -17,7 +17,6 @@ set(SERVER_SOURCES
|
||||||
encoder/null_encoder.cpp
|
encoder/null_encoder.cpp
|
||||||
streamreader/control_error.cpp
|
streamreader/control_error.cpp
|
||||||
streamreader/stream_control.cpp
|
streamreader/stream_control.cpp
|
||||||
streamreader/stream_uri.cpp
|
|
||||||
streamreader/stream_manager.cpp
|
streamreader/stream_manager.cpp
|
||||||
streamreader/pcm_stream.cpp
|
streamreader/pcm_stream.cpp
|
||||||
streamreader/tcp_stream.cpp
|
streamreader/tcp_stream.cpp
|
||||||
|
|
|
@ -24,12 +24,12 @@
|
||||||
#include "common/json.hpp"
|
#include "common/json.hpp"
|
||||||
#include "common/message/codec_header.hpp"
|
#include "common/message/codec_header.hpp"
|
||||||
#include "common/sample_format.hpp"
|
#include "common/sample_format.hpp"
|
||||||
|
#include "common/stream_uri.hpp"
|
||||||
#include "encoder/encoder.hpp"
|
#include "encoder/encoder.hpp"
|
||||||
#include "jsonrpcpp.hpp"
|
#include "jsonrpcpp.hpp"
|
||||||
#include "properties.hpp"
|
#include "properties.hpp"
|
||||||
#include "server_settings.hpp"
|
#include "server_settings.hpp"
|
||||||
#include "stream_control.hpp"
|
#include "stream_control.hpp"
|
||||||
#include "stream_uri.hpp"
|
|
||||||
|
|
||||||
// 3rd party headers
|
// 3rd party headers
|
||||||
#include <boost/asio/io_context.hpp>
|
#include <boost/asio/io_context.hpp>
|
||||||
|
|
|
@ -17,13 +17,13 @@ endif()
|
||||||
set(TEST_SOURCES
|
set(TEST_SOURCES
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/test_main.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/test_main.cpp
|
||||||
${CMAKE_SOURCE_DIR}/common/jwt.cpp
|
${CMAKE_SOURCE_DIR}/common/jwt.cpp
|
||||||
|
${CMAKE_SOURCE_DIR}/common/stream_uri.cpp
|
||||||
${CMAKE_SOURCE_DIR}/common/base64.cpp
|
${CMAKE_SOURCE_DIR}/common/base64.cpp
|
||||||
${CMAKE_SOURCE_DIR}/common/utils/string_utils.cpp
|
${CMAKE_SOURCE_DIR}/common/utils/string_utils.cpp
|
||||||
${CMAKE_SOURCE_DIR}/server/authinfo.cpp
|
${CMAKE_SOURCE_DIR}/server/authinfo.cpp
|
||||||
${CMAKE_SOURCE_DIR}/server/streamreader/control_error.cpp
|
${CMAKE_SOURCE_DIR}/server/streamreader/control_error.cpp
|
||||||
${CMAKE_SOURCE_DIR}/server/streamreader/properties.cpp
|
${CMAKE_SOURCE_DIR}/server/streamreader/properties.cpp
|
||||||
${CMAKE_SOURCE_DIR}/server/streamreader/metadata.cpp
|
${CMAKE_SOURCE_DIR}/server/streamreader/metadata.cpp)
|
||||||
${CMAKE_SOURCE_DIR}/server/streamreader/stream_uri.cpp)
|
|
||||||
|
|
||||||
include_directories(SYSTEM ${Boost_INCLUDE_DIR})
|
include_directories(SYSTEM ${Boost_INCLUDE_DIR})
|
||||||
|
|
||||||
|
|
|
@ -23,12 +23,12 @@
|
||||||
#include "common/base64.h"
|
#include "common/base64.h"
|
||||||
#include "common/error_code.hpp"
|
#include "common/error_code.hpp"
|
||||||
#include "common/jwt.hpp"
|
#include "common/jwt.hpp"
|
||||||
|
#include "common/stream_uri.hpp"
|
||||||
#include "common/utils/string_utils.hpp"
|
#include "common/utils/string_utils.hpp"
|
||||||
#include "server/authinfo.hpp"
|
#include "server/authinfo.hpp"
|
||||||
#include "server/server_settings.hpp"
|
#include "server/server_settings.hpp"
|
||||||
#include "server/streamreader/control_error.hpp"
|
#include "server/streamreader/control_error.hpp"
|
||||||
#include "server/streamreader/properties.hpp"
|
#include "server/streamreader/properties.hpp"
|
||||||
#include "server/streamreader/stream_uri.hpp"
|
|
||||||
|
|
||||||
// 3rd party headers
|
// 3rd party headers
|
||||||
#include <catch2/catch_test_macros.hpp>
|
#include <catch2/catch_test_macros.hpp>
|
||||||
|
@ -232,9 +232,11 @@ TEST_CASE("Uri")
|
||||||
|
|
||||||
// uri = StreamUri("scheme:[//host[:port]][/]path[?query=none][#fragment]");
|
// uri = StreamUri("scheme:[//host[:port]][/]path[?query=none][#fragment]");
|
||||||
// Test with all fields
|
// Test with all fields
|
||||||
uri = StreamUri("scheme://host:port/path?query=none&key=value#fragment");
|
uri = StreamUri("scheme://host:42/path?query=none&key=value#fragment");
|
||||||
REQUIRE(uri.scheme == "scheme");
|
REQUIRE(uri.scheme == "scheme");
|
||||||
REQUIRE(uri.host == "host:port");
|
REQUIRE(uri.host == "host");
|
||||||
|
REQUIRE(uri.port.has_value());
|
||||||
|
REQUIRE(uri.port.value() == 42);
|
||||||
REQUIRE(uri.path == "/path");
|
REQUIRE(uri.path == "/path");
|
||||||
REQUIRE(uri.query["query"] == "none");
|
REQUIRE(uri.query["query"] == "none");
|
||||||
REQUIRE(uri.query["key"] == "value");
|
REQUIRE(uri.query["key"] == "value");
|
||||||
|
@ -243,9 +245,11 @@ TEST_CASE("Uri")
|
||||||
// Test with all fields, url encoded
|
// Test with all fields, url encoded
|
||||||
// "%21%23%24%25%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
|
// "%21%23%24%25%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
|
||||||
// "!#$%&'()*+,/:;=?@[]"
|
// "!#$%&'()*+,/:;=?@[]"
|
||||||
uri = StreamUri("scheme%26://%26host%3f:port/pa%2Bth?%21%23%24%25%26%27%28%29=%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D&key%2525=value#fragment%3f%21%3F");
|
uri = StreamUri("scheme%26://%26host%3f:23/pa%2Bth?%21%23%24%25%26%27%28%29=%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D&key%2525=value#fragment%3f%21%3F");
|
||||||
REQUIRE(uri.scheme == "scheme&");
|
REQUIRE(uri.scheme == "scheme&");
|
||||||
REQUIRE(uri.host == "&host?:port");
|
REQUIRE(uri.host == "&host?");
|
||||||
|
REQUIRE(uri.port.has_value());
|
||||||
|
REQUIRE(uri.port.value() == 23);
|
||||||
REQUIRE(uri.path == "/pa+th");
|
REQUIRE(uri.path == "/pa+th");
|
||||||
REQUIRE(uri.query["!#$%&'()"] == "*+,/:;=?@[]");
|
REQUIRE(uri.query["!#$%&'()"] == "*+,/:;=?@[]");
|
||||||
REQUIRE(uri.query["key%25"] == "value");
|
REQUIRE(uri.query["key%25"] == "value");
|
||||||
|
@ -283,6 +287,7 @@ TEST_CASE("Uri")
|
||||||
uri = StreamUri("spotify:///librespot?name=Spotify&username=EMAIL&password=string%26with%26ampersands&devicename=Snapcast&bitrate=320&killall=false");
|
uri = StreamUri("spotify:///librespot?name=Spotify&username=EMAIL&password=string%26with%26ampersands&devicename=Snapcast&bitrate=320&killall=false");
|
||||||
REQUIRE(uri.scheme == "spotify");
|
REQUIRE(uri.scheme == "spotify");
|
||||||
REQUIRE(uri.host.empty());
|
REQUIRE(uri.host.empty());
|
||||||
|
REQUIRE(!uri.port.has_value());
|
||||||
REQUIRE(uri.path == "/librespot");
|
REQUIRE(uri.path == "/librespot");
|
||||||
REQUIRE(uri.query["name"] == "Spotify");
|
REQUIRE(uri.query["name"] == "Spotify");
|
||||||
REQUIRE(uri.query["username"] == "EMAIL");
|
REQUIRE(uri.query["username"] == "EMAIL");
|
||||||
|
|
Loading…
Add table
Reference in a new issue