Rename MessageReceiver to StreamMessageReceiver

This commit is contained in:
badaix 2020-07-21 17:09:26 +02:00
parent d52015ff09
commit af93719490
16 changed files with 37 additions and 40 deletions

View file

@ -53,11 +53,6 @@ public:
/// Send a message to all connected clients /// Send a message to all connected clients
void send(const std::string& message, const ControlSession* excludeSession = nullptr); void send(const std::string& message, const ControlSession* excludeSession = nullptr);
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
std::string onMessageReceived(ControlSession* session, const std::string& message) override;
void onNewSession(const std::shared_ptr<ControlSession>& session) override;
void onNewSession(const std::shared_ptr<StreamSession>& session) override;
private: private:
void startAccept(); void startAccept();
@ -65,6 +60,11 @@ private:
void handleAccept(tcp::socket socket, Args&&... args); void handleAccept(tcp::socket socket, Args&&... args);
void cleanup(); void cleanup();
/// Implementation of ControlMessageReceiver
std::string onMessageReceived(ControlSession* session, const std::string& message) override;
void onNewSession(const std::shared_ptr<ControlSession>& session) override;
void onNewSession(const std::shared_ptr<StreamSession>& session) override;
mutable std::recursive_mutex session_mutex_; mutable std::recursive_mutex session_mutex_;
std::vector<std::weak_ptr<ControlSession>> sessions_; std::vector<std::weak_ptr<ControlSession>> sessions_;

View file

@ -55,7 +55,7 @@ public:
class ControlSession : public std::enable_shared_from_this<ControlSession> class ControlSession : public std::enable_shared_from_this<ControlSession>
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to ControlMessageReceiver
ControlSession(ControlMessageReceiver* receiver) : message_receiver_(receiver) ControlSession(ControlMessageReceiver* receiver) : message_receiver_(receiver)
{ {
} }

View file

@ -39,7 +39,7 @@ namespace net = boost::asio; // from <boost/asio.hpp>
class ControlSessionHttp : public ControlSession class ControlSessionHttp : public ControlSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to ControlMessageReceiver
ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, const ServerSettings::Http& settings); ControlSessionHttp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket, const ServerSettings::Http& settings);
~ControlSessionHttp() override; ~ControlSessionHttp() override;
void start() override; void start() override;

View file

@ -31,7 +31,7 @@
class ControlSessionTcp : public ControlSession class ControlSessionTcp : public ControlSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to ControlMessageReceiver
ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket); ControlSessionTcp(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, tcp::socket&& socket);
~ControlSessionTcp() override; ~ControlSessionTcp() override;
void start() override; void start() override;

View file

@ -39,7 +39,7 @@ namespace net = boost::asio; // from <boost/asio.hpp>
class ControlSessionWebsocket : public ControlSession class ControlSessionWebsocket : public ControlSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to ControlMessageReceiver
ControlSessionWebsocket(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, websocket::stream<beast::tcp_stream>&& socket); ControlSessionWebsocket(ControlMessageReceiver* receiver, boost::asio::io_context& ioc, websocket::stream<beast::tcp_stream>&& socket);
~ControlSessionWebsocket() override; ~ControlSessionWebsocket() override;
void start() override; void start() override;

View file

@ -29,12 +29,12 @@
#include "common/queue.h" #include "common/queue.h"
#include "common/sample_format.hpp" #include "common/sample_format.hpp"
#include "control_server.hpp" #include "control_server.hpp"
#include "stream_server.hpp"
#include "jsonrpcpp.hpp" #include "jsonrpcpp.hpp"
#include "message/codec_header.hpp" #include "message/codec_header.hpp"
#include "message/message.hpp" #include "message/message.hpp"
#include "message/server_settings.hpp" #include "message/server_settings.hpp"
#include "server_settings.hpp" #include "server_settings.hpp"
#include "stream_server.hpp"
#include "stream_session.hpp" #include "stream_session.hpp"
#include "streamreader/stream_manager.hpp" #include "streamreader/stream_manager.hpp"
@ -49,10 +49,10 @@ using session_ptr = std::shared_ptr<StreamSession>;
/** /**
* Reads PCM data using PipeStream, implements PcmListener to get the (encoded) PCM stream. * Reads PCM data using PipeStream, implements PcmListener to get the (encoded) PCM stream.
* Accepts and holds client connections (StreamSession) * Accepts and holds client connections (StreamSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients * Receives (via the StreamMessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients * Forwards PCM data to the clients
*/ */
class Server : public MessageReceiver, public ControlMessageReceiver, public PcmListener class Server : public StreamMessageReceiver, public ControlMessageReceiver, public PcmListener
{ {
public: public:
Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings); Server(boost::asio::io_context& io_context, const ServerSettings& serverSettings);
@ -61,16 +61,13 @@ public:
void start(); void start();
void stop(); void stop();
/// Send a message to all connceted clients private:
// void send(const msg::BaseMessage* message); /// Implementation of StreamMessageReceiver
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override; void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override;
void onDisconnect(StreamSession* connection) override; void onDisconnect(StreamSession* connection) override;
/// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived /// Implementation of ControllMessageReceiver
std::string onMessageReceived(ControlSession* connection, const std::string& message) override; std::string onMessageReceived(ControlSession* connection, const std::string& message) override;
// TODO Refactor: ControlServer implements ControlMessageReceiver, calling this one.
void onNewSession(const std::shared_ptr<ControlSession>& session) override void onNewSession(const std::shared_ptr<ControlSession>& session) override
{ {
std::ignore = session; std::ignore = session;

View file

@ -30,8 +30,8 @@
#include "common/utils/string_utils.hpp" #include "common/utils/string_utils.hpp"
#include "encoder/encoder_factory.hpp" #include "encoder/encoder_factory.hpp"
#include "message/message.hpp" #include "message/message.hpp"
#include "server_settings.hpp"
#include "server.hpp" #include "server.hpp"
#include "server_settings.hpp"
#if defined(HAS_AVAHI) || defined(HAS_BONJOUR) #if defined(HAS_AVAHI) || defined(HAS_BONJOUR)
#include "publishZeroConf/publish_mdns.hpp" #include "publishZeroConf/publish_mdns.hpp"
#endif #endif

View file

@ -33,7 +33,7 @@ using json = nlohmann::json;
static constexpr auto LOG_TAG = "StreamServer"; static constexpr auto LOG_TAG = "StreamServer";
StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, MessageReceiver* messageReceiver) StreamServer::StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver)
: io_context_(io_context), config_timer_(io_context), settings_(serverSettings), messageReceiver_(messageReceiver) : io_context_(io_context), config_timer_(io_context), settings_(serverSettings), messageReceiver_(messageReceiver)
{ {
} }

View file

@ -48,13 +48,13 @@ using session_ptr = std::shared_ptr<StreamSession>;
/** /**
* Reads PCM data using PipeStream, implements PcmListener to get the (encoded) PCM stream. * Reads PCM data using PipeStream, implements PcmListener to get the (encoded) PCM stream.
* Accepts and holds client connections (StreamSession) * Accepts and holds client connections (StreamSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients * Receives (via the StreamMessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients * Forwards PCM data to the clients
*/ */
class StreamServer : public MessageReceiver class StreamServer : public StreamMessageReceiver
{ {
public: public:
StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, MessageReceiver* messageReceiver = nullptr); StreamServer(boost::asio::io_context& io_context, const ServerSettings& serverSettings, StreamMessageReceiver* messageReceiver = nullptr);
virtual ~StreamServer(); virtual ~StreamServer();
void start(); void start();
@ -75,7 +75,7 @@ private:
void handleAccept(tcp::socket socket); void handleAccept(tcp::socket socket);
void cleanup(); void cleanup();
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived /// Implementation of StreamMessageReceiver
void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override; void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) override;
void onDisconnect(StreamSession* connection) override; void onDisconnect(StreamSession* connection) override;
@ -88,7 +88,7 @@ private:
ServerSettings settings_; ServerSettings settings_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_; Queue<std::shared_ptr<msg::BaseMessage>> messages_;
MessageReceiver* messageReceiver_; StreamMessageReceiver* messageReceiver_;
}; };

View file

@ -29,7 +29,7 @@ using namespace streamreader;
static constexpr auto LOG_TAG = "StreamSession"; static constexpr auto LOG_TAG = "StreamSession";
StreamSession::StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver) : messageReceiver_(receiver), pcmStream_(nullptr), strand_(ioc) StreamSession::StreamSession(boost::asio::io_context& ioc, StreamMessageReceiver* receiver) : messageReceiver_(receiver), pcmStream_(nullptr), strand_(ioc)
{ {
base_msg_size_ = baseMessage_.getSize(); base_msg_size_ = baseMessage_.getSize();
buffer_.resize(base_msg_size_); buffer_.resize(base_msg_size_);

View file

@ -40,7 +40,7 @@ class StreamSession;
/// Interface: callback for a received message. /// Interface: callback for a received message.
class MessageReceiver class StreamMessageReceiver
{ {
public: public:
virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0;
@ -110,13 +110,13 @@ using WriteHandler = std::function<void(boost::system::error_code ec, std::size_
/** /**
* Endpoint for a connected client. * Endpoint for a connected client.
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the MessageReceiver callback * Received messages from the client are passed to the StreamMessageReceiver callback
*/ */
class StreamSession : public std::enable_shared_from_this<StreamSession> class StreamSession : public std::enable_shared_from_this<StreamSession>
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to StreamMessageReceiver
StreamSession(boost::asio::io_context& ioc, MessageReceiver* receiver); StreamSession(boost::asio::io_context& ioc, StreamMessageReceiver* receiver);
virtual ~StreamSession() = default; virtual ~StreamSession() = default;
virtual std::string getIP() = 0; virtual std::string getIP() = 0;
@ -124,7 +124,7 @@ public:
virtual void start() = 0; virtual void start() = 0;
virtual void stop() = 0; virtual void stop() = 0;
void setMessageReceiver(MessageReceiver* receiver) void setMessageReceiver(StreamMessageReceiver* receiver)
{ {
messageReceiver_ = receiver; messageReceiver_ = receiver;
} }
@ -153,7 +153,7 @@ protected:
msg::BaseMessage baseMessage_; msg::BaseMessage baseMessage_;
std::vector<char> buffer_; std::vector<char> buffer_;
size_t base_msg_size_; size_t base_msg_size_;
MessageReceiver* messageReceiver_; StreamMessageReceiver* messageReceiver_;
size_t bufferMs_; size_t bufferMs_;
streamreader::PcmStreamPtr pcmStream_; streamreader::PcmStreamPtr pcmStream_;
boost::asio::io_context::strand strand_; boost::asio::io_context::strand strand_;

View file

@ -29,7 +29,7 @@ using namespace streamreader;
static constexpr auto LOG_TAG = "StreamSessionTCP"; static constexpr auto LOG_TAG = "StreamSessionTCP";
StreamSessionTcp::StreamSessionTcp(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket) StreamSessionTcp::StreamSessionTcp(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, tcp::socket&& socket)
: StreamSession(ioc, receiver), socket_(std::move(socket)) : StreamSession(ioc, receiver), socket_(std::move(socket))
{ {
} }

View file

@ -28,13 +28,13 @@ using boost::asio::ip::tcp;
/** /**
* Endpoint for a connected client. * Endpoint for a connected client.
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the MessageReceiver callback * Received messages from the client are passed to the StreamMessageReceiver callback
*/ */
class StreamSessionTcp : public StreamSession class StreamSessionTcp : public StreamSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to StreamMessageReceiver
StreamSessionTcp(boost::asio::io_context& ioc, MessageReceiver* receiver, tcp::socket&& socket); StreamSessionTcp(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, tcp::socket&& socket);
~StreamSessionTcp() override; ~StreamSessionTcp() override;
void start() override; void start() override;
void stop() override; void stop() override;

View file

@ -26,7 +26,7 @@ using namespace std;
static constexpr auto LOG_TAG = "StreamSessionWS"; static constexpr auto LOG_TAG = "StreamSessionWS";
StreamSessionWebsocket::StreamSessionWebsocket(boost::asio::io_context& ioc, MessageReceiver* receiver, websocket::stream<beast::tcp_stream>&& socket) StreamSessionWebsocket::StreamSessionWebsocket(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, websocket::stream<beast::tcp_stream>&& socket)
: StreamSession(ioc, receiver), ws_(std::move(socket)) : StreamSession(ioc, receiver), ws_(std::move(socket))
{ {
LOG(DEBUG, LOG_TAG) << "StreamSessionWS\n"; LOG(DEBUG, LOG_TAG) << "StreamSessionWS\n";

View file

@ -39,8 +39,8 @@ namespace net = boost::asio; // from <boost/asio.hpp>
class StreamSessionWebsocket : public StreamSession class StreamSessionWebsocket : public StreamSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to StreamMessageReceiver
StreamSessionWebsocket(boost::asio::io_context& ioc, MessageReceiver* receiver, websocket::stream<beast::tcp_stream>&& socket); StreamSessionWebsocket(boost::asio::io_context& ioc, StreamMessageReceiver* receiver, websocket::stream<beast::tcp_stream>&& socket);
~StreamSessionWebsocket() override; ~StreamSessionWebsocket() override;
void start() override; void start() override;
void stop() override; void stop() override;