Make controller async, add client Mixer settings

Controller uses the asio io_context and thus uses one thread less.
This also fixes a lot of thread sanitizer issues.
This commit is contained in:
badaix 2020-04-29 17:31:51 +02:00
parent 4ce69d4bfb
commit 9bb1c4a041
22 changed files with 838 additions and 711 deletions

View file

@ -20,7 +20,6 @@
#include "common/aixlog.hpp"
#include "common/snap_exception.hpp"
#include "common/str_compat.hpp"
#include "message/factory.hpp"
#include "message/hello.hpp"
#include <iostream>
#include <mutex>
@ -28,10 +27,10 @@
using namespace std;
static constexpr auto LOG_TAG = "Connection";
ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string& host, size_t port)
: socket_(io_context_), active_(false), messageReceiver_(receiver), reqId_(1), host_(host), port_(port), readerThread_(nullptr),
sumTimeout_(chronos::msec(0))
ClientConnection::ClientConnection(boost::asio::io_context& io_context, const ClientSettings::Server& server)
: io_context_(io_context), resolver_(io_context_), socket_(io_context_), reqId_(1), server_(server), strand_(io_context_)
{
base_msg_size_ = base_message_.getSize();
buffer_.resize(base_msg_size_);
@ -40,20 +39,7 @@ ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string&
ClientConnection::~ClientConnection()
{
stop();
}
void ClientConnection::socketRead(void* _to, size_t _bytes)
{
size_t toRead = _bytes;
size_t len = 0;
do
{
len += socket_.read_some(boost::asio::buffer((char*)_to + len, toRead));
// cout << "len: " << len << ", error: " << error << endl;
toRead = _bytes - len;
} while (toRead > 0);
disconnect();
}
@ -67,159 +53,194 @@ std::string ClientConnection::getMacAddress()
#endif
if (mac.empty())
mac = "00:00:00:00:00:00";
LOG(INFO) << "My MAC: \"" << mac << "\", socket: " << socket_.native_handle() << "\n";
LOG(INFO, LOG_TAG) << "My MAC: \"" << mac << "\", socket: " << socket_.native_handle() << "\n";
return mac;
}
void ClientConnection::start()
void ClientConnection::connect(const ResultHandler& handler)
{
tcp::resolver resolver(io_context_);
tcp::resolver::query query(host_, cpt::to_string(port_), boost::asio::ip::resolver_query_base::numeric_service);
auto iterator = resolver.resolve(query);
LOG(DEBUG) << "Connecting\n";
// struct timeval tv;
// tv.tv_sec = 5;
// tv.tv_usec = 0;
// cout << "socket: " << socket->native_handle() << "\n";
// setsockopt(socket->native_handle(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
// setsockopt(socket->native_handle(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
socket_.connect(*iterator);
LOG(NOTICE) << "Connected to " << socket_.remote_endpoint().address().to_string() << endl;
active_ = true;
sumTimeout_ = chronos::msec(0);
readerThread_ = make_unique<thread>(&ClientConnection::reader, this);
tcp::resolver::query query(server_.host, cpt::to_string(server_.port), boost::asio::ip::resolver_query_base::numeric_service);
boost::system::error_code ec;
LOG(DEBUG, LOG_TAG) << "Resolving host IP\n";
auto iterator = resolver_.resolve(query, ec);
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to resolve host '" << server_.host << "', error: " << ec.message() << "\n";
handler(ec);
return;
}
LOG(DEBUG, LOG_TAG) << "Connecting\n";
socket_.connect(*iterator, ec);
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to connect to host '" << server_.host << "', error: " << ec.message() << "\n";
handler(ec);
return;
}
LOG(NOTICE, LOG_TAG) << "Connected to " << socket_.remote_endpoint().address().to_string() << endl;
handler(ec);
// getNextMessage();
#if 0
resolver_.async_resolve(query, host_, cpt::to_string(port_), [this, handler](const boost::system::error_code& ec, tcp::resolver::results_type results) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to resolve host '" << host_ << "', error: " << ec.message() << "\n";
handler(ec);
return;
}
resolver_.cancel();
socket_.async_connect(*results, [this, handler](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to connect to host '" << host_ << "', error: " << ec.message() << "\n";
handler(ec);
return;
}
LOG(NOTICE, LOG_TAG) << "Connected to " << socket_.remote_endpoint().address().to_string() << endl;
handler(ec);
getNextMessage();
});
});
#endif
}
void ClientConnection::stop()
void ClientConnection::disconnect()
{
active_ = false;
try
LOG(DEBUG, LOG_TAG) << "Disconnecting\n";
if (!socket_.is_open())
{
LOG(DEBUG, LOG_TAG) << "Not connected\n";
return;
}
boost::system::error_code ec;
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec)
LOG(ERROR) << "Error in socket shutdown: " << ec.message() << endl;
LOG(ERROR, LOG_TAG) << "Error in socket shutdown: " << ec.message() << endl;
socket_.close(ec);
if (ec)
LOG(ERROR) << "Error in socket close: " << ec.message() << endl;
if (readerThread_)
{
LOG(DEBUG) << "joining readerThread\n";
readerThread_->join();
}
}
catch (...)
{
}
readerThread_ = nullptr;
LOG(DEBUG) << "readerThread terminated\n";
LOG(ERROR, LOG_TAG) << "Error in socket close: " << ec.message() << endl;
pendingRequests_.clear();
LOG(DEBUG, LOG_TAG) << "Disconnected\n";
}
bool ClientConnection::send(const msg::BaseMessage* message)
void ClientConnection::sendNext()
{
// std::unique_lock<std::mutex> mlock(mutex_);
// LOG(DEBUG) << "send: " << message->type << ", size: " << message->getSize() << "\n";
std::lock_guard<std::mutex> socketLock(socketMutex_);
if (!socket_.is_open())
return false;
// LOG(DEBUG) << "send: " << message->type << ", size: " << message->getSize() << "\n";
auto& message = messages_.front();
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(socket_, streambuf);
return true;
message.msg->sent = t;
message.msg->serialize(stream);
boost::asio::async_write(socket_, streambuf, boost::asio::bind_executor(strand_, [this](boost::system::error_code ec, std::size_t length) {
std::ignore = length;
auto handler = messages_.front().handler;
messages_.pop_front();
if (handler)
handler(ec);
if (!messages_.empty())
sendNext();
}));
}
void ClientConnection::send(const msg::message_ptr& message, const ResultHandler& handler)
{
boost::asio::post(strand_, [this, message, handler]() {
messages_.push_back({message, handler});
if (messages_.size() > 1)
{
LOG(DEBUG, LOG_TAG) << "outstanding async_write\n";
return;
}
sendNext();
});
}
unique_ptr<msg::BaseMessage> ClientConnection::sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout)
void ClientConnection::sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<msg::BaseMessage>& handler)
{
// LOG(INFO, LOG_TAG) << "Req: " << message->id << "\n";
boost::asio::post(strand_, [this, message, timeout, handler]() {
unique_ptr<msg::BaseMessage> response(nullptr);
if (++reqId_ >= 10000)
reqId_ = 1;
message->id = reqId_;
// LOG(INFO) << "Req: " << message->id << "\n";
shared_ptr<PendingRequest> pendingRequest = make_shared<PendingRequest>(reqId_);
{ // scope for lock
std::unique_lock<std::mutex> lock(pendingRequestsMutex_);
pendingRequests_.insert(pendingRequest);
send(message);
}
if ((response = pendingRequest->waitForResponse(std::chrono::milliseconds(timeout))) != nullptr)
{
sumTimeout_ = chronos::msec(0);
// LOG(INFO) << "Resp: " << pendingRequest->id << "\n";
}
else
{
sumTimeout_ += timeout;
LOG(WARNING) << "timeout while waiting for response to: " << reqId_ << ", timeout " << sumTimeout_.count() << "\n";
if (sumTimeout_ > chronos::sec(10))
throw SnapException("sum timeout exceeded 10s");
}
{ // scope for lock
std::unique_lock<std::mutex> lock(pendingRequestsMutex_);
pendingRequests_.erase(pendingRequest);
}
return response;
pendingRequests_.insert(make_unique<PendingRequest>(io_context_, strand_, reqId_, timeout, handler));
send(message, [handler](const boost::system::error_code& ec) {
if (ec)
handler(ec, nullptr);
});
});
}
void ClientConnection::getNextMessage()
void ClientConnection::getNextMessage(const MessageHandler<msg::BaseMessage>& handler)
{
socketRead(&buffer_[0], base_msg_size_);
base_message_.deserialize(buffer_.data());
// LOG(DEBUG) << "getNextMessage: " << base_message_.type << ", size: " << base_message_.size << ", id: " << base_message_.id
// << ", refers: " << base_message_.refersTo << "\n";
if (base_message_.size > buffer_.size())
buffer_.resize(base_message_.size);
// {
// std::lock_guard<std::mutex> socketLock(socketMutex_);
socketRead(buffer_.data(), base_message_.size);
tv t;
base_message_.received = t;
// }
{ // scope for lock
std::unique_lock<std::mutex> lock(pendingRequestsMutex_);
for (auto req : pendingRequests_)
boost::asio::async_read(socket_, boost::asio::buffer(buffer_, base_msg_size_),
boost::asio::bind_executor(strand_, [this, handler](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
if (req->id() == base_message_.refersTo)
{
auto response = msg::factory::createMessage(base_message_, buffer_.data());
req->setValue(std::move(response));
LOG(ERROR, LOG_TAG) << "Error reading message header of length " << length << ": " << ec.message() << "\n";
if (handler)
handler(ec, nullptr);
return;
}
base_message_.deserialize(buffer_.data());
tv t;
base_message_.received = t;
// LOG(DEBUG, LOG_TAG) << "getNextMessage: " << base_message_.type << ", size: " << base_message_.size
// << ", id: " << base_message_.id << ", refers: " << base_message_.refersTo << "\n";
if (base_message_.type > message_type::kLast)
{
LOG(ERROR, LOG_TAG) << "unknown message type received: " << base_message_.type << ", size: " << base_message_.size << "\n";
if (handler)
handler(boost::asio::error::invalid_argument, nullptr);
return;
}
else if (base_message_.size > msg::max_size)
{
LOG(ERROR, LOG_TAG) << "received message of type " << base_message_.type << " to large: " << base_message_.size << "\n";
if (handler)
handler(boost::asio::error::invalid_argument, nullptr);
return;
}
if (messageReceiver_ != nullptr)
messageReceiver_->onMessageReceived(this, base_message_, buffer_.data());
if (base_message_.size > buffer_.size())
buffer_.resize(base_message_.size);
boost::asio::async_read(
socket_, boost::asio::buffer(buffer_, base_message_.size),
boost::asio::bind_executor(strand_, [this, handler](boost::system::error_code ec, std::size_t length) mutable {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error reading message body of length " << length << ": " << ec.message() << "\n";
if (handler)
handler(ec, nullptr);
return;
}
auto iter = std::find_if(
pendingRequests_.begin(), pendingRequests_.end(),
[this](const std::unique_ptr<PendingRequest>& request) { return request->id() == base_message_.refersTo; });
auto response = msg::factory::createMessage(base_message_, buffer_.data());
if (iter != pendingRequests_.end())
{
(*iter)->setValue(std::move(response));
pendingRequests_.erase(iter);
getNextMessage(handler);
return;
}
void ClientConnection::reader()
{
try
{
while (active_)
{
getNextMessage();
}
}
catch (...)
{
if (messageReceiver_ != nullptr)
messageReceiver_->onException(this, std::current_exception());
}
active_ = false;
if (handler)
handler(ec, std::move(response));
}));
}));
}

View file

@ -19,11 +19,15 @@
#ifndef CLIENT_CONNECTION_H
#define CLIENT_CONNECTION_H
#include "client_settings.hpp"
#include "common/time_defs.hpp"
#include "message/factory.hpp"
#include "message/message.hpp"
#include <atomic>
#include <boost/asio.hpp>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <set>
@ -36,33 +40,42 @@ using boost::asio::ip::tcp;
class ClientConnection;
template <typename Message>
using MessageHandler = std::function<void(const boost::system::error_code&, std::unique_ptr<Message>)>;
/// Used to synchronize server requests (wait for server response)
class PendingRequest
{
public:
PendingRequest(uint16_t reqId) : id_(reqId)
PendingRequest(boost::asio::io_context& io_context, boost::asio::io_context::strand& strand, uint16_t reqId, const chronos::usec& timeout,
const MessageHandler<msg::BaseMessage>& handler)
: id_(reqId), timer_(io_context), strand_(strand), handler_(handler)
{
future_ = promise_.get_future();
timer_.expires_after(timeout);
timer_.async_wait(boost::asio::bind_executor(strand_, [this](boost::system::error_code ec) {
if (!handler_)
return;
if (!ec)
{
handler_(boost::asio::error::timed_out, nullptr);
handler_ = nullptr;
}
else if (ec != boost::asio::error::operation_aborted)
handler_(ec, nullptr);
}));
};
template <typename Rep, typename Period>
std::unique_ptr<msg::BaseMessage> waitForResponse(const std::chrono::duration<Rep, Period>& timeout)
virtual ~PendingRequest()
{
try
{
if (future_.wait_for(timeout) == std::future_status::ready)
return future_.get();
}
catch (...)
{
}
return nullptr;
handler_ = nullptr;
timer_.cancel();
}
void setValue(std::unique_ptr<msg::BaseMessage> value)
{
promise_.set_value(std::move(value));
timer_.cancel();
if (handler_)
handler_({}, std::move(value));
}
uint16_t id() const
@ -72,21 +85,12 @@ public:
private:
uint16_t id_;
std::promise<std::unique_ptr<msg::BaseMessage>> promise_;
std::future<std::unique_ptr<msg::BaseMessage>> future_;
boost::asio::steady_timer timer_;
boost::asio::io_context::strand& strand_;
MessageHandler<msg::BaseMessage> handler_;
};
/// Interface: callback for a received message and error reporting
class MessageReceiver
{
public:
virtual ~MessageReceiver() = default;
virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0;
virtual void onException(ClientConnection* connection, std::exception_ptr exception) = 0;
};
/// Endpoint of the server connection
/**
@ -97,63 +101,70 @@ public:
class ClientConnection
{
public:
/// ctor. Received message from the server are passed to MessageReceiver
ClientConnection(MessageReceiver* receiver, const std::string& host, size_t port);
using ResultHandler = std::function<void(const boost::system::error_code&)>;
/// c'tor
ClientConnection(boost::asio::io_context& io_context, const ClientSettings::Server& server);
/// d'tor
virtual ~ClientConnection();
virtual void start();
virtual void stop();
virtual bool send(const msg::BaseMessage* message);
/// async connect
/// @param handler async result handler
void connect(const ResultHandler& handler);
/// disconnect the socket
void disconnect();
/// async send a message
/// @param message the message
/// @param handler the result handler
void send(const msg::message_ptr& message, const ResultHandler& handler);
/// Send request to the server and wait for answer
virtual std::unique_ptr<msg::BaseMessage> sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000));
/// @param message the message
/// @param timeout the send timeout
/// @param handler async result handler with the response message or error
//template <>
void sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<msg::BaseMessage>& handler);
/// Send request to the server and wait for answer of type T
template <typename T>
std::unique_ptr<T> sendReq(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000))
/// @sa sendRequest with templated response message
template <typename Message>
void sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<Message>& handler)
{
std::unique_ptr<msg::BaseMessage> response = sendRequest(message, timeout);
if (!response)
return nullptr;
T* tmp = dynamic_cast<T*>(response.get());
std::unique_ptr<T> result;
if (tmp != nullptr)
{
response.release();
result.reset(tmp);
}
return result;
sendRequest(message, timeout, [handler](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response) {
if (ec)
handler(ec, nullptr);
else
handler(ec, msg::message_cast<Message>(std::move(response)));
});
}
std::string getMacAddress();
virtual bool active() const
{
return active_;
}
/// async get the next message
/// @param handler the next received message or error
void getNextMessage(const MessageHandler<msg::BaseMessage>& handler);
protected:
virtual void reader();
void socketRead(void* to, size_t bytes);
void getNextMessage();
void sendNext();
msg::BaseMessage base_message_;
std::vector<char> buffer_;
size_t base_msg_size_;
boost::asio::io_context io_context_;
mutable std::mutex socketMutex_;
boost::asio::io_context& io_context_;
tcp::resolver resolver_;
tcp::socket socket_;
std::atomic<bool> active_;
MessageReceiver* messageReceiver_;
mutable std::mutex pendingRequestsMutex_;
std::set<std::shared_ptr<PendingRequest>> pendingRequests_;
std::set<std::unique_ptr<PendingRequest>> pendingRequests_;
uint16_t reqId_;
std::string host_;
size_t port_;
std::unique_ptr<std::thread> readerThread_;
chronos::msec sumTimeout_;
ClientSettings::Server server_;
boost::asio::io_context::strand strand_;
struct PendingMessage
{
msg::message_ptr msg;
ResultHandler handler;
};
std::deque<PendingMessage> messages_;
};

View file

@ -34,19 +34,33 @@ struct ClientSettings
shared
};
struct ServerSettings
struct Mixer
{
enum class Mode
{
hardware,
software,
script
};
Mode mode{Mode::software};
std::string parameter{""};
};
struct Server
{
std::string host{""};
size_t port{1704};
};
struct PlayerSettings
struct Player
{
std::string player_name{""};
int latency{0};
PcmDevice pcm_device;
SampleFormat sample_format;
SharingMode sharing_mode{SharingMode::shared};
Mixer mixer;
};
struct Logging
@ -58,8 +72,8 @@ struct ClientSettings
size_t instance{1};
std::string host_id;
ServerSettings server;
PlayerSettings player;
Server server;
Player player;
Logging logging;
};

View file

@ -31,8 +31,10 @@
#if defined(HAS_OPUS)
#include "decoder/opus_decoder.hpp"
#endif
#include "browseZeroConf/browse_mdns.hpp"
#include "common/aixlog.hpp"
#include "common/snap_exception.hpp"
#include "message/client_settings.hpp"
#include "message/hello.hpp"
#include "message/time.hpp"
#include "time_provider.hpp"
@ -44,51 +46,51 @@
using namespace std;
static constexpr auto LOG_TAG = "Controller";
Controller::Controller(const ClientSettings& settings, std::unique_ptr<MetadataAdapter> meta)
: MessageReceiver(), settings_(settings), active_(false), stream_(nullptr), decoder_(nullptr), player_(nullptr), meta_(std::move(meta)),
serverSettings_(nullptr), async_exception_(nullptr)
Controller::Controller(boost::asio::io_context& io_context, const ClientSettings& settings, std::unique_ptr<MetadataAdapter> meta)
: io_context_(io_context), timer_(io_context), settings_(settings), stream_(nullptr), decoder_(nullptr), player_(nullptr), meta_(std::move(meta)),
serverSettings_(nullptr)
{
}
void Controller::onException(ClientConnection* /*connection*/, std::exception_ptr exception)
void Controller::getNextMessage()
{
LOG(ERROR) << "Controller::onException\n";
async_exception_ = exception;
}
void Controller::onMessageReceived(ClientConnection* /*connection*/, const msg::BaseMessage& baseMessage, char* buffer)
clientConnection_->getNextMessage([this](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response) {
if (!ec)
{
std::lock_guard<std::mutex> lock(receiveMutex_);
if (baseMessage.type == message_type::kWireChunk)
if (response->type == message_type::kWireChunk)
{
if (stream_ && decoder_)
{
auto pcmChunk = make_unique<msg::PcmChunk>(sampleFormat_, 0);
pcmChunk->deserialize(baseMessage, buffer);
// LOG(DEBUG) << "chunk: " << pcmChunk->payloadSize << ", sampleFormat: " << sampleFormat_.getFormat() << "\n";
// auto wireChunk = msg::message_cast<msg::WireChunk>(std::move(response));
auto pcmChunk = msg::message_cast<msg::PcmChunk>(std::move(response));
pcmChunk->format = sampleFormat_;
// std::make_unique<msg::PcmChunk>(sampleFormat_, *wireChunk);
// pcmChunk->deserialize(baseMessage, buffer);
// LOG(TRACE, LOG_TAG) << "chunk: " << pcmChunk->payloadSize << ", sampleFormat: " << sampleFormat_.getFormat() << "\n";
if (decoder_->decode(pcmChunk.get()))
{
// TODO: do decoding in thread?
stream_->addChunk(move(pcmChunk));
// LOG(DEBUG) << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec <<
// ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n";
// LOG(TRACE, LOG_TAG) << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->durationMs()
// << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec / 1000 << ", type: " <<
// pcmChunk->type
// << "\n";
stream_->addChunk(std::move(pcmChunk));
}
}
}
else if (baseMessage.type == message_type::kTime)
else if (response->type == message_type::kTime)
{
msg::Time reply;
reply.deserialize(baseMessage, buffer);
TimeProvider::getInstance().setDiff(reply.latency, reply.received - reply.sent); // ToServer(diff / 2);
// should not be called, because timeSync messages are sent as request, so that the response will go there
LOG(DEBUG, LOG_TAG) << "Received time sync message\n";
auto reply = msg::message_cast<msg::Time>(std::move(response));
TimeProvider::getInstance().setDiff(reply->latency, reply->received - reply->sent); // ToServer(diff / 2);
}
else if (baseMessage.type == message_type::kServerSettings)
else if (response->type == message_type::kServerSettings)
{
serverSettings_ = make_unique<msg::ServerSettings>();
serverSettings_->deserialize(baseMessage, buffer);
LOG(INFO) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
serverSettings_ = msg::message_cast<msg::ServerSettings>(std::move(response));
LOG(INFO, LOG_TAG) << "ServerSettings - buffer: " << serverSettings_->getBufferMs() << ", latency: " << serverSettings_->getLatency()
<< ", volume: " << serverSettings_->getVolume() << ", muted: " << serverSettings_->isMuted() << "\n";
if (stream_ && player_)
{
@ -97,12 +99,10 @@ void Controller::onMessageReceived(ClientConnection* /*connection*/, const msg::
stream_->setBufferLen(std::max(0, serverSettings_->getBufferMs() - serverSettings_->getLatency() - settings_.player.latency));
}
}
else if (baseMessage.type == message_type::kCodecHeader)
else if (response->type == message_type::kCodecHeader)
{
headerChunk_ = make_unique<msg::CodecHeader>();
headerChunk_->deserialize(baseMessage, buffer);
LOG(INFO) << "Codec: " << headerChunk_->codec << "\n";
headerChunk_ = msg::message_cast<msg::CodecHeader>(std::move(response));
LOG(INFO, LOG_TAG) << "Codec: " << headerChunk_->codec << "\n";
decoder_.reset(nullptr);
stream_ = nullptr;
player_.reset(nullptr);
@ -125,156 +125,223 @@ void Controller::onMessageReceived(ClientConnection* /*connection*/, const msg::
throw SnapException("codec not supported: \"" + headerChunk_->codec + "\"");
sampleFormat_ = decoder_->setHeader(headerChunk_.get());
LOG(NOTICE) << TAG("state") << "sampleformat: " << sampleFormat_.getFormat() << "\n";
LOG(NOTICE, LOG_TAG) << TAG("state") << "sampleformat: " << sampleFormat_.getFormat() << "\n";
stream_ = make_shared<Stream>(sampleFormat_, settings_.player.sample_format);
stream_->setBufferLen(std::max(0, serverSettings_->getBufferMs() - serverSettings_->getLatency() - settings_.player.latency));
const auto& pcm_device = settings_.player.pcm_device;
const auto& player_settings = settings_.player;
const auto& player_name = settings_.player.player_name;
player_ = nullptr;
#ifdef HAS_ALSA
if (!player_ && (player_name.empty() || (player_name == "alsa")))
player_ = make_unique<AlsaPlayer>(pcm_device, stream_);
player_ = make_unique<AlsaPlayer>(io_context_, player_settings, stream_);
#endif
#ifdef HAS_OBOE
if (!player_ && (player_name.empty() || (player_name == "oboe")))
player_ = make_unique<OboePlayer>(pcm_device, stream_);
player_ = make_unique<OboePlayer>(io_context_, player_settings, stream_);
#endif
#ifdef HAS_OPENSL
if (!player_ && (player_name.empty() || (player_name == "opensl")))
player_ = make_unique<OpenslPlayer>(pcm_device, stream_);
player_ = make_unique<OpenslPlayer>(io_context_, player_settings, stream_);
#endif
#ifdef HAS_COREAUDIO
if (!player_ && (player_name.empty() || (player_name == "coreaudio")))
player_ = make_unique<CoreAudioPlayer>(pcm_device, stream_);
player_ = make_unique<CoreAudioPlayer>(io_context_, player_settings, stream_);
#endif
#ifdef HAS_WASAPI
if (!player_ && (player_name.empty() || (player_name == "wasapi")))
player_ = make_unique<WASAPIPlayer>(pcm_device, stream_, settings_.player.sharing_mode);
player_ = make_unique<WASAPIPlayer>(io_context_, player_settings, stream_);
#endif
if (!player_)
throw SnapException("No audio player support");
player_->setVolumeCallback([this](double volume, bool muted) {
auto settings = std::make_shared<msg::ClientSettings>();
settings->setVolume(static_cast<uint16_t>(volume * 100.));
settings->setMuted(muted);
clientConnection_->send(settings, [this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to send client settings, error: " << ec.message() << "\n";
reconnect();
return;
}
});
});
player_->setVolume(serverSettings_->getVolume() / 100.);
player_->setMute(serverSettings_->isMuted());
player_->start();
}
else if (baseMessage.type == message_type::kStreamTags)
else if (response->type == message_type::kStreamTags)
{
if (meta_)
{
msg::StreamTags streamTags_;
streamTags_.deserialize(baseMessage, buffer);
meta_->push(streamTags_.msg);
auto stream_tags = msg::message_cast<msg::StreamTags>(std::move(response));
meta_->push(stream_tags->msg);
}
}
// if (baseMessage.type != message_type::kTime)
// if (sendTimeSyncMessage(1000))
// LOG(DEBUG) << "time sync onMessageReceived\n";
getNextMessage();
}
bool Controller::sendTimeSyncMessage(const std::chrono::milliseconds& after)
else
{
static chronos::time_point_clk lastTimeSync(chronos::clk::now());
auto now = chronos::clk::now();
if (lastTimeSync + after > now)
return false;
lastTimeSync = now;
msg::Time timeReq;
clientConnection_->send(&timeReq);
return true;
reconnect();
}
});
}
void Controller::sendTimeSyncMessage(int quick_syncs)
{
auto timeReq = std::make_shared<msg::Time>();
clientConnection_->sendRequest<msg::Time>(
timeReq, 2s, [this, quick_syncs](const boost::system::error_code& ec, const std::unique_ptr<msg::Time>& response) mutable {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Time sync request failed: " << ec.message() << "\n";
reconnect();
return;
}
else
{
TimeProvider::getInstance().setDiff(response->latency, response->received - response->sent);
}
std::chrono::microseconds next = 1s;
if (quick_syncs > 0)
{
if (--quick_syncs == 0)
LOG(INFO, LOG_TAG) << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer<chronos::usec>().count() / 1000.f
<< "\n";
next = 100us;
}
timer_.expires_after(next);
timer_.async_wait([this, quick_syncs](const boost::system::error_code& ec) {
if (!ec)
{
sendTimeSyncMessage(quick_syncs);
}
});
});
}
void Controller::browseMdns(const MdnsHandler& handler)
{
#if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
try
{
BrowseZeroConf browser;
mDNSResult avahiResult;
if (browser.browse("_snapcast._tcp", avahiResult, 1000))
{
string host = avahiResult.ip;
uint16_t port = avahiResult.port;
if (avahiResult.ip_version == IPVersion::IPv6)
host += "%" + cpt::to_string(avahiResult.iface_idx);
handler({}, host, port);
return;
}
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Exception: " << e.what() << std::endl;
}
timer_.expires_after(500ms);
timer_.async_wait([this, handler](const boost::system::error_code& ec) {
if (!ec)
{
browseMdns(handler);
}
else
{
handler(ec, "", 0);
}
});
#else
handler(boost::asio::error::operation_not_supported, "", 0);
#endif
}
void Controller::start()
{
clientConnection_ = make_unique<ClientConnection>(this, settings_.server.host, settings_.server.port);
controllerThread_ = thread(&Controller::worker, this);
}
void Controller::run()
if (settings_.server.host.empty())
{
clientConnection_ = make_unique<ClientConnection>(this, settings_.server.host, settings_.server.port);
browseMdns([this](const boost::system::error_code& ec, const std::string& host, uint16_t port) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to browse MDNS, error: " << ec.message() << "\n";
}
else
{
settings_.server.host = host;
settings_.server.port = port;
LOG(INFO, LOG_TAG) << "Found server " << settings_.server.host << ":" << settings_.server.port << "\n";
clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
worker();
// controllerThread_ = thread(&Controller::worker, this);
}
void Controller::stop()
});
}
else
{
LOG(DEBUG) << "Stopping Controller" << endl;
active_ = false;
controllerThread_.join();
clientConnection_->stop();
clientConnection_ = make_unique<ClientConnection>(io_context_, settings_.server);
worker();
}
}
// void Controller::stop()
// {
// LOG(DEBUG, LOG_TAG) << "Stopping\n";
// timer_.cancel();
// }
void Controller::reconnect()
{
timer_.cancel();
clientConnection_->disconnect();
player_.reset();
stream_.reset();
decoder_.reset();
timer_.expires_after(1s);
timer_.async_wait([this](const boost::system::error_code& ec) {
if (!ec)
{
worker();
}
});
}
void Controller::worker()
{
active_ = true;
while (active_)
clientConnection_->connect([this](const boost::system::error_code& ec) {
if (!ec)
{
try
{
clientConnection_->start();
LOG(INFO, LOG_TAG) << "Connected!\n";
string macAddress = clientConnection_->getMacAddress();
if (settings_.host_id.empty())
settings_.host_id = ::getHostId(macAddress);
/// Say hello to the server
msg::Hello hello(macAddress, settings_.host_id, settings_.instance);
clientConnection_->send(&hello);
// Say hello to the server
auto hello = std::make_shared<msg::Hello>(macAddress, settings_.host_id, settings_.instance);
clientConnection_->send(hello, [this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Failed to send hello, error: " << ec.message() << "\n";
reconnect();
return;
}
});
/// Do initial time sync with the server
msg::Time timeReq;
for (size_t n = 0; n < 50 && active_; ++n)
// // Do initial time sync with the server
sendTimeSyncMessage(50);
getNextMessage();
}
else
{
if (async_exception_)
{
LOG(DEBUG) << "Async exception\n";
std::rethrow_exception(async_exception_);
LOG(ERROR, LOG_TAG) << "Error: " << ec.message() << "\n";
reconnect();
}
auto reply = clientConnection_->sendReq<msg::Time>(&timeReq, chronos::msec(2000));
if (reply)
{
TimeProvider::getInstance().setDiff(reply->latency, reply->received - reply->sent);
chronos::usleep(100);
}
}
LOG(INFO) << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer<chronos::usec>().count() / 1000.f << "\n";
/// Main loop
while (active_)
{
if (async_exception_)
{
LOG(DEBUG) << "Async exception\n";
std::rethrow_exception(async_exception_);
}
if (sendTimeSyncMessage(1000ms))
LOG(DEBUG) << "time sync main loop\n";
this_thread::sleep_for(100ms);
}
}
catch (const std::exception& e)
{
async_exception_ = nullptr;
LOG(ERROR) << "Exception in Controller::worker(): " << e.what() << endl;
clientConnection_->stop();
player_.reset();
stream_.reset();
decoder_.reset();
for (size_t n = 0; (n < 10) && active_; ++n)
chronos::sleep(100);
}
}
LOG(DEBUG) << "Thread stopped\n";
});
}

View file

@ -54,29 +54,27 @@ using namespace std::chrono_literals;
* Decodes audio (message_type::kWireChunk) and feeds PCM to the audio stream buffer
* Does timesync with the server
*/
class Controller : public MessageReceiver
class Controller
{
public:
Controller(const ClientSettings& settings, std::unique_ptr<MetadataAdapter> meta);
Controller(boost::asio::io_context& io_context, const ClientSettings& settings, std::unique_ptr<MetadataAdapter> meta);
void start();
void run();
void stop();
/// Implementation of MessageReceiver.
/// ClientConnection passes messages from the server through these callbacks
void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer) override;
/// Implementation of MessageReceiver.
/// Used for async exception reporting
void onException(ClientConnection* connection, std::exception_ptr exception) override;
// void stop();
private:
using MdnsHandler = std::function<void(const boost::system::error_code&, const std::string&, uint16_t)>;
void worker();
bool sendTimeSyncMessage(const std::chrono::milliseconds& after = 1000ms);
void reconnect();
void browseMdns(const MdnsHandler& handler);
void getNextMessage();
void sendTimeSyncMessage(int quick_syncs);
boost::asio::io_context& io_context_;
boost::asio::steady_timer timer_;
ClientSettings settings_;
std::string meta_callback_;
std::atomic<bool> active_;
std::thread controllerThread_;
SampleFormat sampleFormat_;
std::unique_ptr<ClientConnection> clientConnection_;
std::shared_ptr<Stream> stream_;
@ -85,9 +83,6 @@ private:
std::unique_ptr<MetadataAdapter> meta_;
std::unique_ptr<msg::ServerSettings> serverSettings_;
std::unique_ptr<msg::CodecHeader> headerChunk_;
std::mutex receiveMutex_;
std::exception_ptr async_exception_;
};

View file

@ -28,92 +28,36 @@ using namespace std;
static constexpr auto LOG_TAG = "Alsa";
AlsaPlayer::AlsaPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) : Player(pcmDevice, stream), handle_(nullptr), ctl_(nullptr)
AlsaPlayer::AlsaPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: Player(io_context, settings, stream), handle_(nullptr), ctl_(nullptr), sd_(io_context)
{
}
// typedef enum
// {
// AUDIO_VOLUME_SET,
// AUDIO_VOLUME_GET,
// } audio_volume_action;
// int audio_volume(audio_volume_action action, long& outvol)
// {
// static const char* mix_name = "Master";
// static const char* card = "default";
// static int mix_index = 0;
// long pmin, pmax;
// long get_vol, set_vol;
// float f_multi;
// if (action == AUDIO_VOLUME_GET)
// {
// if (snd_mixer_selem_get_playback_volume(elem, SND_MIXER_SCHN_MONO, &outvol) < 0)
// {
// snd_mixer_close(handle);
// return -6;
// }
// LOG(INFO, LOG_TAG) << "Get volume " << outvol << " with status " << ret << "\n";
// // make the value bound to 100
// outvol -= minv;
// maxv -= minv;
// minv = 0;
// outvol = 100 * outvol / maxv; // make the value bound from 0 to 100
// int val;
// if (snd_mixer_selem_get_playback_switch(elem, SND_MIXER_SCHN_MONO, &val) == 0)
// LOG(INFO, LOG_TAG) << "switch: " << val << "\n";
// }
// // else if (action == AUDIO_VOLUME_SET)
// // {
// // if (*outvol < 0 || *outvol > VOLUME_BOUND) // out of bounds
// // return -7;
// // *outvol = (*outvol * (maxv - minv) / (100 - 1)) + minv;
// // if (snd_mixer_selem_set_playback_volume(elem, 0, *outvol) < 0)
// // {
// // snd_mixer_close(handle);
// // return -8;
// // }
// // if (snd_mixer_selem_set_playback_volume(elem, 1, *outvol) < 0)
// // {
// // snd_mixer_close(handle);
// // return -9;
// // }
// // fprintf(stderr, "Set volume %i with status %i\n", *outvol, ret);
// // }
// snd_mixer_close(handle);
// return 0;
// }
void AlsaPlayer::setVolume(double volume)
{
std::lock_guard<std::mutex> lock(mutex_);
int err = 0;
snd_mixer_elem_t* elem(nullptr);
snd_mixer_t* mixer(nullptr);
// boost::system::error_code ec;
// sd_.cancel(ec);
// if (ctl_)
// snd_ctl_subscribe_events(ctl_, 0);
last_change_ = std::chrono::steady_clock::now();
try
{
openMixer(&elem, &mixer);
// make the value bound to 100
long minv, maxv;
if ((err = snd_mixer_selem_get_playback_volume_range(elem, &minv, &maxv)) < 0)
throw SnapException(std::string("Failed to get playback volume range, error: ") + snd_strerror(err));
LOG(INFO, LOG_TAG) << "Mixer volume range [" << minv << ", " << maxv << "]\n";
volume = volume * (maxv - minv) + minv;
std::cerr << "vol: " << volume << "\n";
if ((err = snd_mixer_selem_set_playback_volume(elem, SND_MIXER_SCHN_FRONT_LEFT, volume)) < 0)
throw SnapException(std::string("Failed to get playback volume, error: ") + snd_strerror(err));
if ((err = snd_mixer_selem_set_playback_volume(elem, SND_MIXER_SCHN_FRONT_RIGHT, volume)) < 0)
throw SnapException(std::string("Failed to get playback volume, error: ") + snd_strerror(err));
auto mixer_volume = volume * (maxv - minv) + minv;
LOG(DEBUG, LOG_TAG) << "Mixer volume range [" << minv << ", " << maxv << "], volume: " << volume << ", mixer volume: " << mixer_volume << "\n";
if ((err = snd_mixer_selem_set_playback_volume_all(elem, mixer_volume)) < 0)
throw SnapException(std::string("Failed to set playback volume, error: ") + snd_strerror(err));
}
catch (const std::exception& e)
{
@ -121,6 +65,11 @@ void AlsaPlayer::setVolume(double volume)
}
if (mixer != nullptr)
snd_mixer_close(mixer);
// if (ctl_)
// {
// snd_ctl_subscribe_events(ctl_, 1);
// waitForEvent();
// }
}
@ -138,21 +87,20 @@ bool AlsaPlayer::getVolume(double& volume, bool& muted)
if ((err = snd_mixer_selem_get_playback_volume(elem, SND_MIXER_SCHN_MONO, &vol)) < 0)
throw SnapException(std::string("Failed to get playback volume, error: ") + snd_strerror(err));
std::cerr << "vol: " << vol << "\n";
// make the value bound to 100
// make the value bound to 1
long minv, maxv;
if ((err = snd_mixer_selem_get_playback_volume_range(elem, &minv, &maxv)) < 0)
throw SnapException(std::string("Failed to get playback volume range, error: ") + snd_strerror(err));
LOG(INFO, LOG_TAG) << "Mixer volume range [" << minv << ", " << maxv << "]\n";
vol -= minv;
maxv = maxv - minv;
volume = 100. * static_cast<double>(vol) / static_cast<double>(maxv);
volume = static_cast<double>(vol) / static_cast<double>(maxv);
int val;
if ((err = snd_mixer_selem_get_playback_switch(elem, SND_MIXER_SCHN_MONO, &val)) < 0)
return false;
muted = (val == 0);
LOG(DEBUG, LOG_TAG) << "Get volume, mixer volume range [" << minv << ", " << maxv << "], volume: " << volume << ", muted: " << muted << "\n";
if (mixer != nullptr)
snd_mixer_close(mixer);
return true;
@ -170,7 +118,6 @@ bool AlsaPlayer::getVolume(double& volume, bool& muted)
void AlsaPlayer::openMixer(snd_mixer_elem_t** elem, snd_mixer_t** mixer)
{
snd_mixer_selem_id_t* sid;
long minv_, maxv_;
snd_mixer_selem_id_alloca(&sid);
std::string mix_name = "Master";
@ -182,8 +129,8 @@ void AlsaPlayer::openMixer(snd_mixer_elem_t** elem, snd_mixer_t** mixer)
int err;
if ((err = snd_mixer_open(mixer, 0)) < 0)
throw SnapException(std::string("Failed to open mixer, error: ") + snd_strerror(err));
if ((err = snd_mixer_attach(*mixer, pcmDevice_.name.c_str())) < 0)
throw SnapException("Failed to attach mixer to " + pcmDevice_.name + ", error: " + snd_strerror(err));
if ((err = snd_mixer_attach(*mixer, settings_.pcm_device.name.c_str())) < 0)
throw SnapException("Failed to attach mixer to " + settings_.pcm_device.name + ", error: " + snd_strerror(err));
if ((err = snd_mixer_selem_register(*mixer, NULL, NULL)) < 0)
throw SnapException(std::string("Failed to register selem, error: ") + snd_strerror(err));
if ((err = snd_mixer_load(*mixer)) < 0)
@ -193,15 +140,60 @@ void AlsaPlayer::openMixer(snd_mixer_elem_t** elem, snd_mixer_t** mixer)
throw SnapException(std::string("Failed to find selem, error: ") + snd_strerror(err));
}
void AlsaPlayer::waitForEvent()
{
sd_.async_wait(boost::asio::posix::stream_descriptor::wait_read, [this](const boost::system::error_code& ec) {
if (ec)
{
LOG(DEBUG, LOG_TAG) << "waitForEvent error: " << ec.message() << "\n";
return;
}
std::lock_guard<std::mutex> lock(mutex_);
unsigned short revents;
snd_ctl_poll_descriptors_revents(ctl_, fd_.get(), 1, &revents);
if (revents & POLLIN)
{
snd_ctl_event_t* event;
snd_ctl_event_alloca(&event);
if ((snd_ctl_read(ctl_, event) >= 0) && (snd_ctl_event_get_type(event) == SND_CTL_EVENT_ELEM))
{
auto now = std::chrono::steady_clock::now();
if (now - last_change_ < 1s)
{
LOG(DEBUG, LOG_TAG) << "Last volume change by server: " << std::chrono::duration_cast<std::chrono::milliseconds>(now - last_change_).count()
<< " ms => ignoring volume change\n";
waitForEvent();
return;
}
double volume;
bool muted;
if (getVolume(volume, muted))
{
LOG(DEBUG, LOG_TAG) << "Volume: " << volume << ", muted: " << muted << "\n";
notifyVolumeChange(volume, muted);
}
}
}
waitForEvent();
});
}
void AlsaPlayer::initMixer()
{
std::lock_guard<std::mutex> lock(mutex_);
int err;
if ((err = snd_ctl_open(&ctl_, pcmDevice_.name.c_str(), SND_CTL_READONLY)) < 0)
throw SnapException("Can't open control for " + pcmDevice_.name + ", error: " + snd_strerror(err));
if ((err = snd_ctl_open(&ctl_, settings_.pcm_device.name.c_str(), SND_CTL_READONLY)) < 0)
throw SnapException("Can't open control for " + settings_.pcm_device.name + ", error: " + snd_strerror(err));
if ((err = snd_ctl_subscribe_events(ctl_, 1)) < 0)
throw SnapException("Can't subscribe for events for " + pcmDevice_.name + ", error: " + snd_strerror(err));
fd_ = (pollfd*)malloc(sizeof(struct pollfd));
snd_ctl_poll_descriptors(ctl_, fd_, 1);
throw SnapException("Can't subscribe for events for " + settings_.pcm_device.name + ", error: " + snd_strerror(err));
fd_ = std::make_unique<pollfd>();
snd_ctl_poll_descriptors(ctl_, fd_.get(), 1);
sd_ = boost::asio::posix::stream_descriptor(io_context_, fd_->fd);
waitForEvent();
}
@ -216,8 +208,8 @@ void AlsaPlayer::initAlsa()
channels = format.channels();
/* Open the PCM device in playback mode */
if ((err = snd_pcm_open(&handle_, pcmDevice_.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0)
throw SnapException("Can't open " + pcmDevice_.name + ", error: " + snd_strerror(err));
if ((err = snd_pcm_open(&handle_, settings_.pcm_device.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0)
throw SnapException("Can't open " + settings_.pcm_device.name + ", error: " + snd_strerror(err));
/* struct snd_pcm_playback_info_t pinfo;
if ( (pcm = snd_pcm_playback_info( pcm_handle, &pinfo )) < 0 )
@ -323,7 +315,7 @@ void AlsaPlayer::initAlsa()
// snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames_);
snd_pcm_sw_params(handle_, swparams);
initMixer();
// initMixer();
}
@ -335,6 +327,7 @@ void AlsaPlayer::uninitAlsa()
snd_pcm_close(handle_);
handle_ = nullptr;
}
std::lock_guard<std::mutex> lock(mutex_);
if (ctl_ != nullptr)
{
snd_ctl_close(ctl_);
@ -358,11 +351,22 @@ AlsaPlayer::~AlsaPlayer()
void AlsaPlayer::stop()
{
if (sd_.is_open())
{
boost::system::error_code ec;
sd_.cancel(ec);
}
Player::stop();
uninitAlsa();
}
bool AlsaPlayer::needsThread() const
{
return true;
}
void AlsaPlayer::worker()
{
snd_pcm_sframes_t pcm;
@ -370,7 +374,6 @@ void AlsaPlayer::worker()
snd_pcm_sframes_t framesAvail;
long lastChunkTick = chronos::getTickCount();
const SampleFormat& format = stream_->getFormat();
while (active_)
{
if (handle_ == nullptr)
@ -388,27 +391,6 @@ void AlsaPlayer::worker()
continue;
}
auto err = poll(fd_, 1, 0);
if (err > 0)
{
unsigned short revents;
snd_ctl_poll_descriptors_revents(ctl_, fd_, 1, &revents);
if (revents & POLLIN)
{
snd_ctl_event_t* event;
snd_ctl_event_alloca(&event);
if ((snd_ctl_read(ctl_, event) >= 0) && (snd_ctl_event_get_type(event) == SND_CTL_EVENT_ELEM))
{
LOG(INFO, LOG_TAG) << "event\n";
double volume;
bool muted;
if (getVolume(volume, muted))
LOG(INFO, LOG_TAG) << "Volume: " << volume << ", muted: " << muted << "\n";
}
}
}
int wait_result = snd_pcm_wait(handle_, 100);
if (wait_result == -EPIPE)
{
@ -459,7 +441,7 @@ void AlsaPlayer::worker()
if (buffer_.size() < static_cast<size_t>(framesAvail * format.frameSize()))
{
LOG(INFO, LOG_TAG) << "Resizing buffer from " << buffer_.size() << " to " << framesAvail * format.frameSize() << "\n";
LOG(DEBUG, LOG_TAG) << "Resizing buffer from " << buffer_.size() << " to " << framesAvail * format.frameSize() << "\n";
buffer_.resize(framesAvail * format.frameSize());
}
if (stream_->getPlayerChunk(buffer_.data(), delay, framesAvail))

View file

@ -30,7 +30,7 @@
class AlsaPlayer : public Player
{
public:
AlsaPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
AlsaPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream);
~AlsaPlayer() override;
/// Set audio volume in range [0..1]
@ -43,6 +43,7 @@ public:
protected:
void worker() override;
bool needsThread() const override;
private:
void initAlsa();
@ -50,14 +51,17 @@ private:
void initMixer();
bool getVolume(double& volume, bool& muted);
void openMixer(snd_mixer_elem_t** elem, snd_mixer_t** mixer);
void waitForEvent();
snd_pcm_t* handle_;
snd_ctl_t* ctl_;
pollfd* fd_;
std::unique_ptr<pollfd> fd_;
std::vector<char> buffer_;
snd_pcm_uframes_t frames_;
boost::asio::posix::stream_descriptor sd_;
std::chrono::time_point<std::chrono::steady_clock> last_change_;
std::mutex mutex_;
};

View file

@ -32,7 +32,8 @@ void callback(void* custom_data, AudioQueueRef queue, AudioQueueBufferRef buffer
}
CoreAudioPlayer::CoreAudioPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) : Player(pcmDevice, stream), ms_(100), pubStream_(stream)
CoreAudioPlayer::CoreAudioPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: Player(io_context, settings, stream), ms_(100), pubStream_(stream)
{
}
@ -130,6 +131,12 @@ void CoreAudioPlayer::playerCallback(AudioQueueRef queue, AudioQueueBufferRef bu
}
bool CoreAudioPlayer::needsThread() const
{
return true;
}
void CoreAudioPlayer::worker()
{
while (active_)

View file

@ -37,14 +37,16 @@
class CoreAudioPlayer : public Player
{
public:
CoreAudioPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
CoreAudioPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream);
virtual ~CoreAudioPlayer();
void playerCallback(AudioQueueRef queue, AudioQueueBufferRef bufferRef);
static std::vector<PcmDevice> pcm_list(void);
protected:
virtual void worker();
void worker() override;
bool needsThread() const override;
void initAudioQueue();
void uninitAudioQueue(AudioQueueRef queue);

View file

@ -30,7 +30,8 @@ static constexpr auto LOG_TAG = "OboePlayer";
static constexpr double kDefaultLatency = 50;
OboePlayer::OboePlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream) : Player(pcmDevice, stream)
OboePlayer::OboePlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: Player(io_context, settings, stream)
{
LOG(DEBUG, LOG_TAG) << "Contructor\n";
LOG(INFO, LOG_TAG) << "Init start\n";
@ -89,6 +90,12 @@ OboePlayer::~OboePlayer()
}
bool OboePlayer::needsThread() const
{
return false;
}
double OboePlayer::getCurrentOutputLatencyMillis() const
{
// Get the time that a known audio frame was presented for playing
@ -159,8 +166,3 @@ void OboePlayer::stop()
if (result != oboe::Result::OK)
LOG(ERROR, LOG_TAG) << "Error in requestStop: " << oboe::convertToText(result) << "\n";
}
void OboePlayer::worker()
{
}

View file

@ -34,7 +34,7 @@ typedef int (*AndroidAudioCallback)(short* buffer, int num_samples);
class OboePlayer : public Player, public oboe::AudioStreamCallback
{
public:
OboePlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
OboePlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream);
virtual ~OboePlayer();
void start() override;
@ -44,7 +44,7 @@ protected:
oboe::DataCallbackResult onAudioReady(oboe::AudioStream* oboeStream, void* audioData, int32_t numFrames) override;
double getCurrentOutputLatencyMillis() const;
void worker() override;
bool needsThread() const override;
oboe::ManagedStream out_stream_;
std::unique_ptr<oboe::LatencyTuner> mLatencyTuner;

View file

@ -49,8 +49,8 @@ static void bqPlayerCallback(SLAndroidSimpleBufferQueueItf bq, void* context)
}
OpenslPlayer::OpenslPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream)
: Player(pcmDevice, stream), engineObject(NULL), engineEngine(NULL), outputMixObject(NULL), bqPlayerObject(NULL), bqPlayerPlay(NULL),
OpenslPlayer::OpenslPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: Player(io_context, settings, stream), engineObject(NULL), engineEngine(NULL), outputMixObject(NULL), bqPlayerObject(NULL), bqPlayerPlay(NULL),
bqPlayerBufferQueue(NULL), bqPlayerVolume(NULL), curBuffer(0), ms_(50), buff_size(0), pubStream_(stream)
{
initOpensl();
@ -142,6 +142,11 @@ std::string OpenslPlayer::resultToString(SLresult result) const
}
bool OpenslPlayer::needsThread() const
{
return false;
}
void OpenslPlayer::throwUnsuccess(const std::string& phase, const std::string& what, SLresult result)
{
@ -370,8 +375,3 @@ void OpenslPlayer::stop()
(*bqPlayerBufferQueue)->Clear(bqPlayerBufferQueue);
throwUnsuccess(kPhaseStop, "PlayerPlay::SetPlayState", result);
}
void OpenslPlayer::worker()
{
}

View file

@ -35,7 +35,7 @@ typedef int (*AndroidAudioCallback)(short* buffer, int num_samples);
class OpenslPlayer : public Player
{
public:
OpenslPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
OpenslPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream);
virtual ~OpenslPlayer();
void start() override;
@ -47,7 +47,7 @@ protected:
void initOpensl();
void uninitOpensl();
void worker() override;
bool needsThread() const override;
void throwUnsuccess(const std::string& phase, const std::string& what, SLresult result);
std::string resultToString(SLresult result) const;

View file

@ -26,39 +26,47 @@
using namespace std;
Player::Player(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream)
: active_(false), stream_(stream), pcmDevice_(pcmDevice), volume_(1.0), muted_(false), volCorrection_(1.0)
Player::Player(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: io_context_(io_context), active_(false), stream_(stream), settings_(settings), volume_(1.0), muted_(false), volCorrection_(1.0)
{
}
void Player::start()
{
active_ = true;
playerThread_ = thread(&Player::worker, this);
}
Player::~Player()
{
stop();
}
void Player::start()
{
active_ = true;
if (needsThread())
playerThread_ = thread(&Player::worker, this);
}
void Player::stop()
{
if (active_)
{
active_ = false;
if (playerThread_.joinable())
playerThread_.join();
}
}
void Player::worker()
{
}
void Player::adjustVolume(char* buffer, size_t frames)
{
// if (settings_.mixer.mode != ClientSettings::Mixer::Mode::software)
// return;
double volume = volume_;
if (muted_)
volume = 0.;

View file

@ -19,11 +19,15 @@
#ifndef PLAYER_H
#define PLAYER_H
#include "client_settings.hpp"
#include "common/aixlog.hpp"
#include "common/endian.hpp"
#include "pcm_device.hpp"
#include "stream.hpp"
#include <boost/asio.hpp>
#include <atomic>
#include <functional>
#include <string>
#include <thread>
#include <vector>
@ -35,8 +39,10 @@
*/
class Player
{
using volume_callback = std::function<void(double volume, bool muted)>;
public:
Player(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream);
Player(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream);
virtual ~Player();
/// Set audio volume in range [0..1]
@ -44,13 +50,36 @@ public:
virtual void setMute(bool mute);
virtual void start();
virtual void stop();
void setVolumeCallback(const volume_callback& callback)
{
onVolumeChanged_ = callback;
}
protected:
virtual void worker() = 0;
virtual void worker();
virtual bool needsThread() const = 0;
void setVolume_poly(double volume, double exp);
void setVolume_exp(double volume, double base);
void adjustVolume(char* buffer, size_t frames);
void notifyVolumeChange(double volume, bool muted) const
{
if (onVolumeChanged_)
onVolumeChanged_(volume, muted);
}
boost::asio::io_context& io_context_;
std::atomic<bool> active_;
std::shared_ptr<Stream> stream_;
std::thread playerThread_;
ClientSettings::Player settings_;
double volume_;
bool muted_;
double volCorrection_;
volume_callback onVolumeChanged_;
private:
template <typename T>
void adjustVolume(char* buffer, size_t count, double volume)
{
@ -58,16 +87,6 @@ protected:
for (size_t n = 0; n < count; ++n)
bufferT[n] = endian::swap<T>(static_cast<T>(endian::swap<T>(bufferT[n]) * volume));
}
void adjustVolume(char* buffer, size_t frames);
std::atomic<bool> active_;
std::shared_ptr<Stream> stream_;
std::thread playerThread_;
PcmDevice pcmDevice_;
double volume_;
bool muted_;
double volCorrection_;
};

View file

@ -68,8 +68,8 @@ EXTERN_C const PROPERTYKEY DECLSPEC_SELECTANY PKEY_Device_FriendlyName = {{0xa45
throw SnapException(ss.str()); \
}
WASAPIPlayer::WASAPIPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream, ClientSettings::SharingMode mode)
: Player(pcmDevice, stream), mode_(mode)
WASAPIPlayer::WASAPIPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream)
: Player(io_context, settings, stream)
{
HRESULT hr = CoInitializeEx(NULL, COINIT_MULTITHREADED);
CHECK_HR(hr);
@ -189,7 +189,7 @@ void WASAPIPlayer::worker()
// Register the default playback device (eRender for playback)
IMMDevicePtr device = nullptr;
if (pcmDevice_.idx == 0)
if (settings_.pcm_device.idx == 0)
{
hr = deviceEnumerator->GetDefaultAudioEndpoint(eRender, eConsole, &device);
CHECK_HR(hr);
@ -200,7 +200,7 @@ void WASAPIPlayer::worker()
hr = deviceEnumerator->EnumAudioEndpoints(eRender, DEVICE_STATE_ACTIVE, &devices);
CHECK_HR(hr);
devices->Item(pcmDevice_.idx - 1, &device);
devices->Item(settings_.pcm_device.idx - 1, &device);
}
IPropertyStorePtr properties = nullptr;
@ -218,7 +218,7 @@ void WASAPIPlayer::worker()
hr = device->Activate(IID_IAudioClient, CLSCTX_SERVER, NULL, (void**)&audioClient);
CHECK_HR(hr);
if (mode_ == ClientSettings::SharingMode::exclusive)
if (settings_.sharing_mode == ClientSettings::SharingMode::exclusive)
{
hr = audioClient->IsFormatSupported(AUDCLNT_SHAREMODE_EXCLUSIVE, &(waveformatExtended->Format), NULL);
CHECK_HR(hr);
@ -246,10 +246,10 @@ void WASAPIPlayer::worker()
hr = audioClient->GetDevicePeriod(NULL, &hnsRequestedDuration);
CHECK_HR(hr);
LOG(INFO, LOG_TAG) << "Initializing WASAPI in " << (mode_ == ClientSettings::SharingMode::shared ? "shared" : "exclusive") << " mode\n";
LOG(INFO, LOG_TAG) << "Initializing WASAPI in " << (settings_.sharing_mode == ClientSettings::SharingMode::shared ? "shared" : "exclusive") << " mode\n";
_AUDCLNT_SHAREMODE share_mode = mode_ == ClientSettings::SharingMode::shared ? AUDCLNT_SHAREMODE_SHARED : AUDCLNT_SHAREMODE_EXCLUSIVE;
DWORD stream_flags = mode_ == ClientSettings::SharingMode::shared
_AUDCLNT_SHAREMODE share_mode = settings_.sharing_mode == ClientSettings::SharingMode::shared ? AUDCLNT_SHAREMODE_SHARED : AUDCLNT_SHAREMODE_EXCLUSIVE;
DWORD stream_flags = settings_.sharing_mode == ClientSettings::SharingMode::shared
? AUDCLNT_STREAMFLAGS_EVENTCALLBACK | AUDCLNT_STREAMFLAGS_AUTOCONVERTPCM | AUDCLNT_STREAMFLAGS_SRC_DEFAULT_QUALITY
: AUDCLNT_STREAMFLAGS_EVENTCALLBACK;
@ -340,7 +340,7 @@ void WASAPIPlayer::worker()
clock->GetPosition(&position, NULL);
UINT32 padding = 0;
if (mode_ == ClientSettings::SharingMode::shared)
if (settings_.sharing_mode == ClientSettings::SharingMode::shared)
{
hr = audioClient->GetCurrentPadding(&padding);
CHECK_HR(hr);

View file

@ -93,17 +93,20 @@ public:
class WASAPIPlayer : public Player
{
public:
WASAPIPlayer(const PcmDevice& pcmDevice, std::shared_ptr<Stream> stream, ClientSettings::SharingMode mode);
WASAPIPlayer(boost::asio::io_context& io_context, const ClientSettings::Player& settings, std::shared_ptr<Stream> stream);
virtual ~WASAPIPlayer();
static std::vector<PcmDevice> pcm_list(void);
protected:
virtual void worker();
virtual bool needsThread() const override
{
return true;
}
private:
AudioSessionEventListener* audioEventListener_;
ClientSettings::SharingMode mode_;
};
#endif

View file

@ -19,10 +19,10 @@
#include <chrono>
#include <iostream>
#ifndef WINDOWS
#include <signal.h>
#include <sys/resource.h>
#endif
#include "browseZeroConf/browse_mdns.hpp"
#include "common/popl.hpp"
#include "controller.hpp"
@ -34,7 +34,6 @@
#endif
#include "client_settings.hpp"
#include "common/aixlog.hpp"
#include "common/signal_handler.hpp"
#include "common/snap_exception.hpp"
#include "common/str_compat.hpp"
#include "common/utils.hpp"
@ -46,6 +45,8 @@ using namespace popl;
using namespace std::chrono_literals;
static constexpr auto LOG_TAG = "Snapclient";
PcmDevice getPcmDevice(const std::string& soundcard)
{
#if defined(HAS_ALSA) || defined(WINDOWS)
@ -78,6 +79,26 @@ PcmDevice getPcmDevice(const std::string& soundcard)
return pcmDevice;
}
#ifdef WINDOWS
// hack to avoid case destinction in the signal handler
#define SIGHUP SIGINT
const char* strsignal(int sig)
{
switch (sig)
{
case SIGTERM:
return "SIGTERM";
case SIGINT:
return "SIGINT";
case SIGBREAK:
return "SIGBREAK";
case SIGABRT:
return "SIGABRT";
default:
return "Unhandled";
}
}
#endif
int main(int argc, char** argv)
{
@ -236,7 +257,7 @@ int main(int argc, char** argv)
group = user_group[1];
}
daemon = std::make_unique<Daemon>(user, group, pidFile);
LOG(NOTICE) << "daemonizing" << std::endl;
LOG(NOTICE, LOG_TAG) << "daemonizing" << std::endl;
daemon->daemonize();
if (processPriority < -20)
processPriority = -20;
@ -244,7 +265,7 @@ int main(int argc, char** argv)
processPriority = 19;
if (processPriority != 0)
setpriority(PRIO_PROCESS, 0, processPriority);
LOG(NOTICE) << "daemon started" << std::endl;
LOG(NOTICE, LOG_TAG) << "daemon started" << std::endl;
}
#endif
@ -277,71 +298,31 @@ int main(int argc, char** argv)
}
#endif
bool active = true;
std::shared_ptr<Controller> controller;
auto signal_handler = install_signal_handler(
{
#ifndef WINDOWS // no sighup on windows
SIGHUP,
#endif
SIGTERM, SIGINT},
[&active, &controller](int signal, const std::string& strsignal) {
LOG(INFO) << "Received signal " << signal << ": " << strsignal << "\n";
active = false;
if (controller)
{
LOG(INFO) << "Stopping controller\n";
controller->stop();
}
boost::asio::io_context io_context;
// Construct a signal set registered for process termination.
boost::asio::signal_set signals(io_context, SIGHUP, SIGINT, SIGTERM);
signals.async_wait([&](const boost::system::error_code& ec, int signal) {
if (!ec)
LOG(INFO, LOG_TAG) << "Received signal " << signal << ": " << strsignal(signal) << "\n";
else
LOG(INFO, LOG_TAG) << "Failed to wait for signal, error: " << ec.message() << "\n";
io_context.stop();
});
if (settings.server.host.empty())
{
#if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
BrowseZeroConf browser;
mDNSResult avahiResult;
while (active)
{
signal_handler.wait_for(500ms);
if (!active)
break;
try
{
if (browser.browse("_snapcast._tcp", avahiResult, 5000))
{
settings.server.host = avahiResult.ip;
settings.server.port = avahiResult.port;
if (avahiResult.ip_version == IPVersion::IPv6)
settings.server.host += "%" + cpt::to_string(avahiResult.iface_idx);
LOG(INFO) << "Found server " << settings.server.host << ":" << settings.server.port << "\n";
break;
}
}
catch (const std::exception& e)
{
LOG(ERROR) << "Exception: " << e.what() << std::endl;
}
}
#endif
}
if (active)
{
// Setup metadata handling
auto meta(metaStderr ? std::make_unique<MetaStderrAdapter>() : std::make_unique<MetadataAdapter>());
controller = make_shared<Controller>(settings, std::move(meta));
LOG(INFO) << "Latency: " << settings.player.latency << "\n";
controller->run();
// signal_handler.wait();
// controller->stop();
}
auto controller = make_shared<Controller>(io_context, settings, std::move(meta));
controller->start();
// std::thread t([&] { io_context.run(); });
io_context.run();
// t.join();
}
catch (const std::exception& e)
{
LOG(FATAL) << "Exception: " << e.what() << std::endl;
LOG(FATAL, LOG_TAG) << "Exception: " << e.what() << std::endl;
exitcode = EXIT_FAILURE;
}
LOG(NOTICE) << "daemon terminated." << endl;
LOG(NOTICE, LOG_TAG) << "daemon terminated." << endl;
exit(exitcode);
}

View file

@ -0,0 +1,62 @@
/***
This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef CLIENT_SETTINGS_H
#define CLIENT_SETTINGS_H
#include "json_message.hpp"
namespace msg
{
class ClientSettings : public JsonMessage
{
public:
ClientSettings() : JsonMessage(message_type::kClientSettings)
{
setVolume(100);
setMuted(false);
}
~ClientSettings() override = default;
uint16_t getVolume()
{
return get("volume", 100);
}
bool isMuted()
{
return get("muted", false);
}
void setVolume(uint16_t volume)
{
msg["volume"] = volume;
}
void setMuted(bool muted)
{
msg["muted"] = muted;
}
};
}
#endif

View file

@ -19,12 +19,13 @@
#ifndef MESSAGE_FACTORY_HPP
#define MESSAGE_FACTORY_HPP
#include "client_settings.hpp"
#include "codec_header.hpp"
#include "hello.hpp"
#include "server_settings.hpp"
#include "stream_tags.hpp"
#include "time.hpp"
#include "wire_chunk.hpp"
#include "pcm_chunk.hpp"
#include "common/str_compat.hpp"
#include "common/utils.hpp"
@ -34,6 +35,21 @@
namespace msg
{
template <typename ToType>
static std::unique_ptr<ToType> message_cast(std::unique_ptr<msg::BaseMessage> message)
{
ToType* tmp = dynamic_cast<ToType*>(message.get());
std::unique_ptr<ToType> result;
if (tmp != nullptr)
{
message.release();
result.reset(tmp);
return result;
}
return nullptr;
}
namespace factory
{
@ -47,7 +63,6 @@ static std::unique_ptr<T> createMessage(const BaseMessage& base_message, char* b
return result;
}
static std::unique_ptr<BaseMessage> createMessage(const BaseMessage& base_message, char* buffer)
{
std::unique_ptr<BaseMessage> result;
@ -64,7 +79,11 @@ static std::unique_ptr<BaseMessage> createMessage(const BaseMessage& base_messag
case kTime:
return createMessage<Time>(base_message, buffer);
case kWireChunk:
return createMessage<WireChunk>(base_message, buffer);
// this is kind of cheated to safe the convertion from WireChunk to PcmChunk
// the user of the factory must be aware that a PcmChunk will be created
return createMessage<PcmChunk>(base_message, buffer);
case kClientSettings:
return createMessage<ClientSettings>(base_message, buffer);
default:
return nullptr;
}

View file

@ -60,9 +60,10 @@ enum message_type
kTime = 4,
kHello = 5,
kStreamTags = 6,
kClientSettings = 7,
kFirst = kBase,
kLast = kStreamTags
kLast = kClientSettings
};

View file

@ -1,71 +0,0 @@
/***
This file is part of snapcast
Copyright (C) 2014-2020 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef SIGNAL_HANDLER_HPP
#define SIGNAL_HANDLER_HPP
#include <functional>
#include <future>
#include <set>
#include <signal.h>
#ifdef WINDOWS
const char* strsignal(int sig)
{
switch (sig)
{
case SIGTERM:
return "SIGTERM";
case SIGINT:
return "SIGINT";
case SIGBREAK:
return "SIGBREAK";
case SIGABRT:
return "SIGABRT";
default:
return "Unhandled";
}
}
#endif
using signal_callback = std::function<void(int signal, const std::string& name)>;
static std::future<int> install_signal_handler(std::set<int> signals, const signal_callback& on_signal = nullptr)
{
static std::promise<int> promise;
std::future<int> future = promise.get_future();
static signal_callback callback = on_signal;
for (auto signal : signals)
{
::signal(signal, [](int sig) {
if (callback)
callback(sig, strsignal(sig));
try
{
promise.set_value(sig);
}
catch (const std::future_error&)
{
}
});
}
return future;
}
#endif