diff --git a/client/client_connection.cpp b/client/client_connection.cpp index 5c4f9ec8..3e537673 100644 --- a/client/client_connection.cpp +++ b/client/client_connection.cpp @@ -20,6 +20,7 @@ #include "common/aixlog.hpp" #include "common/snap_exception.hpp" #include "common/str_compat.hpp" +#include "message/factory.hpp" #include "message/hello.hpp" #include #include @@ -32,6 +33,8 @@ ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string& : socket_(io_context_), active_(false), messageReceiver_(receiver), reqId_(1), host_(host), port_(port), readerThread_(nullptr), sumTimeout_(chronos::msec(0)) { + base_msg_size_ = base_message_.getSize(); + buffer_.resize(base_msg_size_); } @@ -130,9 +133,10 @@ bool ClientConnection::send(const msg::BaseMessage* message) } -unique_ptr ClientConnection::sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout) + +unique_ptr ClientConnection::sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout) { - unique_ptr response(nullptr); + unique_ptr response(nullptr); if (++reqId_ >= 10000) reqId_ = 1; message->id = reqId_; @@ -168,32 +172,26 @@ unique_ptr ClientConnection::sendRequest(const msg::Base void ClientConnection::getNextMessage() { - msg::BaseMessage baseMessage; - size_t baseMsgSize = baseMessage.getSize(); - vector buffer(baseMsgSize); - socketRead(&buffer[0], baseMsgSize); - baseMessage.deserialize(&buffer[0]); + socketRead(&buffer_[0], base_msg_size_); + base_message_.deserialize(buffer_.data()); // LOG(DEBUG) << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << // baseMessage.refersTo << "\n"; - if (baseMessage.size > buffer.size()) - buffer.resize(baseMessage.size); + if (base_message_.size > buffer_.size()) + buffer_.resize(base_message_.size); // { // std::lock_guard socketLock(socketMutex_); - socketRead(&buffer[0], baseMessage.size); + socketRead(buffer_.data(), base_message_.size); tv t; - baseMessage.received = t; + base_message_.received = t; // } { // scope for lock std::unique_lock lock(pendingRequestsMutex_); for (auto req : pendingRequests_) { - if (req->id() == baseMessage.refersTo) + if (req->id() == base_message_.refersTo) { - auto response = make_unique(); - response->message = baseMessage; - response->buffer = (char*)malloc(baseMessage.size); - memcpy(response->buffer, &buffer[0], baseMessage.size); + auto response = msg::factory::createMessage(base_message_, buffer_.data()); req->setValue(std::move(response)); return; } @@ -201,7 +199,7 @@ void ClientConnection::getNextMessage() } if (messageReceiver_ != nullptr) - messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]); + messageReceiver_->onMessageReceived(this, base_message_, buffer_.data()); } diff --git a/client/client_connection.hpp b/client/client_connection.hpp index 7f15e5ad..17a9d8ca 100644 --- a/client/client_connection.hpp +++ b/client/client_connection.hpp @@ -47,7 +47,7 @@ public: }; template - std::unique_ptr waitForResponse(const std::chrono::duration& timeout) + std::unique_ptr waitForResponse(const std::chrono::duration& timeout) { try { @@ -60,7 +60,7 @@ public: return nullptr; } - void setValue(std::unique_ptr value) + void setValue(std::unique_ptr value) { promise_.set_value(std::move(value)); } @@ -73,8 +73,8 @@ public: private: uint16_t id_; - std::promise> promise_; - std::future> future_; + std::promise> promise_; + std::future> future_; }; @@ -109,18 +109,24 @@ public: virtual bool send(const msg::BaseMessage* message); /// Send request to the server and wait for answer - virtual std::unique_ptr sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); + virtual std::unique_ptr sendRequest(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); /// Send request to the server and wait for answer of type T template std::unique_ptr sendReq(const msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)) { - std::unique_ptr reply = sendRequest(message, timeout); - if (!reply) + std::unique_ptr response = sendRequest(message, timeout); + if (!response) return nullptr; - std::unique_ptr msg(new T); - msg->deserialize(reply->message, reply->buffer); - return msg; + + T* tmp = dynamic_cast(response.get()); + std::unique_ptr result; + if (tmp != nullptr) + { + response.release(); + result.reset(tmp); + } + return result; } std::string getMacAddress(); @@ -136,6 +142,10 @@ protected: void socketRead(void* to, size_t bytes); void getNextMessage(); + msg::BaseMessage base_message_; + std::vector buffer_; + size_t base_msg_size_; + boost::asio::io_context io_context_; mutable std::mutex socketMutex_; tcp::socket socket_; diff --git a/client/controller.cpp b/client/controller.cpp index 9d032374..a7b74b38 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -178,6 +178,16 @@ void Controller::start(const PcmDevice& pcmDevice, const std::string& host, size } +void Controller::run(const PcmDevice& pcmDevice, const std::string& host, size_t port, int latency) +{ + pcmDevice_ = pcmDevice; + latency_ = latency; + clientConnection_.reset(new ClientConnection(this, host, port)); + worker(); + // controllerThread_ = thread(&Controller::worker, this); +} + + void Controller::stop() { LOG(DEBUG) << "Stopping Controller" << endl; diff --git a/client/controller.hpp b/client/controller.hpp index d377d893..a590f64f 100644 --- a/client/controller.hpp +++ b/client/controller.hpp @@ -50,6 +50,7 @@ class Controller : public MessageReceiver public: Controller(const std::string& clientId, size_t instance, std::shared_ptr meta); void start(const PcmDevice& pcmDevice, const std::string& host, size_t port, int latency); + void run(const PcmDevice& pcmDevice, const std::string& host, size_t port, int latency); void stop(); /// Implementation of MessageReceiver. diff --git a/client/snapclient.cpp b/client/snapclient.cpp index 52c957ad..6a275a70 100644 --- a/client/snapclient.cpp +++ b/client/snapclient.cpp @@ -215,10 +215,16 @@ int main(int argc, char** argv) #endif bool active = true; + std::shared_ptr controller; auto signal_handler = install_signal_handler({SIGHUP, SIGTERM, SIGINT}, - [&active](int signal, const std::string& strsignal) { + [&active, &controller](int signal, const std::string& strsignal) { SLOG(INFO) << "Received signal " << signal << ": " << strsignal << "\n"; active = false; + if (controller) + { + LOG(INFO) << "Stopping controller\n"; + controller->stop(); + } }); if (host.empty()) { @@ -258,11 +264,11 @@ int main(int argc, char** argv) if (metaStderr) meta.reset(new MetaStderrAdapter); - std::unique_ptr controller(new Controller(hostIdValue->value(), instance, meta)); + controller = make_shared(hostIdValue->value(), instance, meta); LOG(INFO) << "Latency: " << latency << "\n"; - controller->start(pcmDevice, host, port, latency); - signal_handler.wait(); - controller->stop(); + controller->run(pcmDevice, host, port, latency); + // signal_handler.wait(); + // controller->stop(); } } catch (const std::exception& e) diff --git a/common/message/factory.hpp b/common/message/factory.hpp new file mode 100644 index 00000000..340c09d5 --- /dev/null +++ b/common/message/factory.hpp @@ -0,0 +1,77 @@ +/*** + This file is part of snapcast + Copyright (C) 2014-2019 Johannes Pohl + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +***/ + +#ifndef MESSAGE_FACTORY_HPP +#define MESSAGE_FACTORY_HPP + +#include "codec_header.hpp" +#include "hello.hpp" +#include "server_settings.hpp" +#include "stream_tags.hpp" +#include "time.hpp" +#include "wire_chunk.hpp" + +#include "common/str_compat.hpp" +#include "common/utils.hpp" +#include "json_message.hpp" +#include + + +namespace msg +{ +namespace factory +{ + +template +static std::unique_ptr createMessage(const BaseMessage& base_message, char* buffer) +{ + std::unique_ptr result = std::make_unique(); + if (!result) + return nullptr; + result->deserialize(base_message, buffer); + return result; +} + + +static std::unique_ptr createMessage(const BaseMessage& base_message, char* buffer) +{ + std::unique_ptr result; + switch (base_message.type) + { + case kCodecHeader: + return createMessage(base_message, buffer); + case kHello: + return createMessage(base_message, buffer); + case kServerSettings: + return createMessage(base_message, buffer); + case kStreamTags: + return createMessage(base_message, buffer); + case kTime: + return createMessage