diff --git a/client/Makefile b/client/Makefile index 03be1dc3..a5d8659f 100644 --- a/client/Makefile +++ b/client/Makefile @@ -3,7 +3,8 @@ TARGET = snapclient SHELL = /bin/bash CXX = /usr/bin/g++ -CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. # -static-libgcc -static-libstdc++ +CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. +#CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. -static-libgcc -static-libstdc++ LDFLAGS = -lrt -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc -lFLAC -lavahi-client -lavahi-common OBJ = snapClient.o stream.o alsaPlayer.o clientConnection.o timeProvider.o decoder/oggDecoder.o decoder/pcmDecoder.o decoder/flacDecoder.o controller.o browseAvahi.o ../message/pcmChunk.o ../common/log.o ../message/sampleFormat.o diff --git a/server/Makefile b/server/Makefile index 98e91998..80c181c6 100644 --- a/server/Makefile +++ b/server/Makefile @@ -6,7 +6,7 @@ CXX = /usr/bin/g++ CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common -OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o json/jsonrpc.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o clientSession.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o +OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o BIN = snapserver all: $(TARGET) diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 24d429a5..b398fd6f 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -70,18 +70,18 @@ void StreamServer::onResync(const PipeReader* pipeReader, double ms) } -void StreamServer::onDisconnect(ClientSession* clientSession) +void StreamServer::onDisconnect(StreamSession* streamSession) { - logO << "onDisconnect: " << clientSession->macAddress << "\n"; - auto func = [](ClientSession* s)->void{s->stop();}; - std::thread t(func, clientSession); + logO << "onDisconnect: " << streamSession->macAddress << "\n"; + auto func = [](StreamSession* s)->void{s->stop();}; + std::thread t(func, streamSession); t.detach(); - ClientInfoPtr clientInfo = Config::instance().getClientInfo(clientSession->macAddress); - // don't block: remove ClientSession in a thread + ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress); + // don't block: remove StreamSession in a thread for (auto it = sessions_.begin(); it != sessions_.end(); ) { - if (it->get() == clientSession) + if (it->get() == streamSession) { logO << "erase: " << (*it)->macAddress << "\n"; sessions_.erase(it); @@ -169,7 +169,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std:: serverSettings.muted = clientInfo->volume.muted; serverSettings.latency = clientInfo->latency; - ClientSession* session = getClientSession(request.getParam("client").get()); + StreamSession* session = getStreamSession(request.getParam("client").get()); if (session != NULL) session->send(&serverSettings); @@ -192,7 +192,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std:: } -void StreamServer::onMessageReceived(ClientSession* connection, const msg::BaseMessage& baseMessage, char* buffer) +void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) { // logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; if (baseMessage.type == message_type::kRequest) @@ -267,7 +267,7 @@ void StreamServer::onMessageReceived(ClientSession* connection, const msg::BaseM } -ClientSession* StreamServer::getClientSession(const std::string& mac) +StreamSession* StreamServer::getStreamSession(const std::string& mac) { for (auto session: sessions_) { @@ -293,7 +293,7 @@ void StreamServer::handleAccept(socket_ptr socket) setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; - shared_ptr session = make_shared(this, socket); + shared_ptr session = make_shared(this, socket); { std::unique_lock mlock(mutex_); session->setBufferMs(settings_.bufferMs); diff --git a/server/streamServer.h b/server/streamServer.h index 0c6aa1b5..6448ef04 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -27,7 +27,7 @@ #include #include -#include "clientSession.h" +#include "streamSession.h" #include "pipeReader.h" #include "common/queue.h" #include "message/message.h" @@ -66,7 +66,7 @@ struct StreamServerSettings /// Forwars PCM data to the connected clients /** * Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream. - * Accepts and holds client connections (ClientSession) + * Accepts and holds client connections (StreamSession) * Receives (via the MessageReceiver interface) and answers messages from the clients * Forwards PCM data to the clients */ @@ -83,9 +83,10 @@ public: void send(const msg::BaseMessage* message); /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived - virtual void onMessageReceived(ClientSession* connection, const msg::BaseMessage& baseMessage, char* buffer); - virtual void onDisconnect(ClientSession* connection); + virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer); + virtual void onDisconnect(StreamSession* connection); + /// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived virtual void onMessageReceived(ControlSession* connection, const std::string& message); /// Implementation of PipeListener @@ -95,17 +96,15 @@ public: private: void startAccept(); void handleAccept(socket_ptr socket); -// void acceptor(); - ClientSession* getClientSession(const std::string& mac); + StreamSession* getStreamSession(const std::string& mac); mutable std::mutex mutex_; PipeReader* pipeReader_; - std::set> sessions_; + std::set> sessions_; boost::asio::io_service* io_service_; std::shared_ptr acceptor_; StreamServerSettings settings_; msg::SampleFormat sampleFormat_; -// std::thread acceptThread_; Queue> messages_; std::unique_ptr controlServer_; }; diff --git a/server/clientSession.cpp b/server/streamSession.cpp similarity index 84% rename from server/clientSession.cpp rename to server/streamSession.cpp index 9ea1046f..beace3a6 100644 --- a/server/clientSession.cpp +++ b/server/streamSession.cpp @@ -16,10 +16,11 @@ along with this program. If not, see . ***/ +#include "streamSession.h" + #include #include #include -#include "clientSession.h" #include "common/log.h" #include "message/pcmChunk.h" @@ -27,28 +28,28 @@ using namespace std; -ClientSession::ClientSession(MessageReceiver* receiver, std::shared_ptr socket) : messageReceiver_(receiver) +StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr socket) : messageReceiver_(receiver) { socket_ = socket; } -ClientSession::~ClientSession() +StreamSession::~StreamSession() { stop(); } -void ClientSession::start() +void StreamSession::start() { active_ = true; streamActive_ = false; - readerThread_ = new thread(&ClientSession::reader, this); - writerThread_ = new thread(&ClientSession::writer, this); + readerThread_ = new thread(&StreamSession::reader, this); + writerThread_ = new thread(&StreamSession::writer, this); } -void ClientSession::stop() +void StreamSession::stop() { std::unique_lock mlock(mutex_); setActive(false); @@ -81,11 +82,11 @@ void ClientSession::stop() readerThread_ = NULL; writerThread_ = NULL; socket_ = NULL; - logD << "ClientSession stopped\n"; + logD << "StreamSession stopped\n"; } -void ClientSession::socketRead(void* _to, size_t _bytes) +void StreamSession::socketRead(void* _to, size_t _bytes) { size_t read = 0; do @@ -97,7 +98,7 @@ void ClientSession::socketRead(void* _to, size_t _bytes) } -void ClientSession::add(const shared_ptr& message) +void StreamSession::add(const shared_ptr& message) { if (!message || !streamActive_) return; @@ -108,7 +109,7 @@ void ClientSession::add(const shared_ptr& message) } -bool ClientSession::send(const msg::BaseMessage* message) const +bool StreamSession::send(const msg::BaseMessage* message) const { // logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; std::unique_lock mlock(mutex_); @@ -125,7 +126,7 @@ bool ClientSession::send(const msg::BaseMessage* message) const } -void ClientSession::getNextMessage() +void StreamSession::getNextMessage() { msg::BaseMessage baseMessage; size_t baseMsgSize = baseMessage.getSize(); @@ -144,7 +145,7 @@ void ClientSession::getNextMessage() } -void ClientSession::reader() +void StreamSession::reader() { try { @@ -155,13 +156,13 @@ void ClientSession::reader() } catch (const std::exception& e) { - logS(kLogErr) << "Exception in ClientSession::reader(): " << e.what() << endl; + logS(kLogErr) << "Exception in StreamSession::reader(): " << e.what() << endl; } setActive(false); } -void ClientSession::writer() +void StreamSession::writer() { try { @@ -192,13 +193,13 @@ void ClientSession::writer() } catch (const std::exception& e) { - logS(kLogErr) << "Exception in ClientSession::writer(): " << e.what() << endl; + logS(kLogErr) << "Exception in StreamSession::writer(): " << e.what() << endl; } setActive(false); } -void ClientSession::setActive(bool active) +void StreamSession::setActive(bool active) { if (active_ && !active && (messageReceiver_ != NULL)) messageReceiver_->onDisconnect(this); diff --git a/server/clientSession.h b/server/streamSession.h similarity index 89% rename from server/clientSession.h rename to server/streamSession.h index 34642773..2b5250cd 100644 --- a/server/clientSession.h +++ b/server/streamSession.h @@ -16,8 +16,8 @@ along with this program. If not, see . ***/ -#ifndef SERVER_SESSION_H -#define SERVER_SESSION_H +#ifndef STREAM_SESSION_H +#define STREAM_SESSION_H #include #include @@ -34,15 +34,15 @@ using boost::asio::ip::tcp; -class ClientSession; +class StreamSession; /// Interface: callback for a received message. class MessageReceiver { public: - virtual void onMessageReceived(ClientSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; - virtual void onDisconnect(ClientSession* connection) = 0; + virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; + virtual void onDisconnect(StreamSession* connection) = 0; }; @@ -52,12 +52,12 @@ public: * Messages are sent to the client with the "send" method. * Received messages from the client are passed to the MessageReceiver callback */ -class ClientSession +class StreamSession { public: /// ctor. Received message from the client are passed to MessageReceiver - ClientSession(MessageReceiver* receiver, std::shared_ptr socket); - ~ClientSession(); + StreamSession(MessageReceiver* receiver, std::shared_ptr socket); + ~StreamSession(); void start(); void stop();