Renamed ClientSession to StreamSession

This commit is contained in:
badaix 2015-10-25 12:42:16 +01:00
parent cdf0ac11c9
commit b99cf2f2fc
6 changed files with 47 additions and 46 deletions

View file

@ -3,7 +3,8 @@ TARGET = snapclient
SHELL = /bin/bash SHELL = /bin/bash
CXX = /usr/bin/g++ CXX = /usr/bin/g++
CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. # -static-libgcc -static-libstdc++ CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I..
#CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. -static-libgcc -static-libstdc++
LDFLAGS = -lrt -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc -lFLAC -lavahi-client -lavahi-common LDFLAGS = -lrt -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc -lFLAC -lavahi-client -lavahi-common
OBJ = snapClient.o stream.o alsaPlayer.o clientConnection.o timeProvider.o decoder/oggDecoder.o decoder/pcmDecoder.o decoder/flacDecoder.o controller.o browseAvahi.o ../message/pcmChunk.o ../common/log.o ../message/sampleFormat.o OBJ = snapClient.o stream.o alsaPlayer.o clientConnection.o timeProvider.o decoder/oggDecoder.o decoder/pcmDecoder.o decoder/flacDecoder.o controller.o browseAvahi.o ../message/pcmChunk.o ../common/log.o ../message/sampleFormat.o

View file

@ -6,7 +6,7 @@ CXX = /usr/bin/g++
CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common
OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o json/jsonrpc.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o clientSession.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o OBJ = snapServer.o config.o controlServer.o controlSession.o streamServer.o streamSession.o json/jsonrpc.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o
BIN = snapserver BIN = snapserver
all: $(TARGET) all: $(TARGET)

View file

@ -70,18 +70,18 @@ void StreamServer::onResync(const PipeReader* pipeReader, double ms)
} }
void StreamServer::onDisconnect(ClientSession* clientSession) void StreamServer::onDisconnect(StreamSession* streamSession)
{ {
logO << "onDisconnect: " << clientSession->macAddress << "\n"; logO << "onDisconnect: " << streamSession->macAddress << "\n";
auto func = [](ClientSession* s)->void{s->stop();}; auto func = [](StreamSession* s)->void{s->stop();};
std::thread t(func, clientSession); std::thread t(func, streamSession);
t.detach(); t.detach();
ClientInfoPtr clientInfo = Config::instance().getClientInfo(clientSession->macAddress); ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress);
// don't block: remove ClientSession in a thread // don't block: remove StreamSession in a thread
for (auto it = sessions_.begin(); it != sessions_.end(); ) for (auto it = sessions_.begin(); it != sessions_.end(); )
{ {
if (it->get() == clientSession) if (it->get() == streamSession)
{ {
logO << "erase: " << (*it)->macAddress << "\n"; logO << "erase: " << (*it)->macAddress << "\n";
sessions_.erase(it); sessions_.erase(it);
@ -169,7 +169,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
serverSettings.muted = clientInfo->volume.muted; serverSettings.muted = clientInfo->volume.muted;
serverSettings.latency = clientInfo->latency; serverSettings.latency = clientInfo->latency;
ClientSession* session = getClientSession(request.getParam("client").get<string>()); StreamSession* session = getStreamSession(request.getParam("client").get<string>());
if (session != NULL) if (session != NULL)
session->send(&serverSettings); session->send(&serverSettings);
@ -192,7 +192,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
} }
void StreamServer::onMessageReceived(ClientSession* connection, const msg::BaseMessage& baseMessage, char* buffer) void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
// logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; // logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::kRequest) if (baseMessage.type == message_type::kRequest)
@ -267,7 +267,7 @@ void StreamServer::onMessageReceived(ClientSession* connection, const msg::BaseM
} }
ClientSession* StreamServer::getClientSession(const std::string& mac) StreamSession* StreamServer::getStreamSession(const std::string& mac)
{ {
for (auto session: sessions_) for (auto session: sessions_)
{ {
@ -293,7 +293,7 @@ void StreamServer::handleAccept(socket_ptr socket)
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<ClientSession> session = make_shared<ClientSession>(this, socket); shared_ptr<StreamSession> session = make_shared<StreamSession>(this, socket);
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
session->setBufferMs(settings_.bufferMs); session->setBufferMs(settings_.bufferMs);

View file

@ -27,7 +27,7 @@
#include <sstream> #include <sstream>
#include <mutex> #include <mutex>
#include "clientSession.h" #include "streamSession.h"
#include "pipeReader.h" #include "pipeReader.h"
#include "common/queue.h" #include "common/queue.h"
#include "message/message.h" #include "message/message.h"
@ -66,7 +66,7 @@ struct StreamServerSettings
/// Forwars PCM data to the connected clients /// Forwars PCM data to the connected clients
/** /**
* Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream. * Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream.
* Accepts and holds client connections (ClientSession) * Accepts and holds client connections (StreamSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients * Receives (via the MessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients * Forwards PCM data to the clients
*/ */
@ -83,9 +83,10 @@ public:
void send(const msg::BaseMessage* message); void send(const msg::BaseMessage* message);
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
virtual void onMessageReceived(ClientSession* connection, const msg::BaseMessage& baseMessage, char* buffer); virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer);
virtual void onDisconnect(ClientSession* connection); virtual void onDisconnect(StreamSession* connection);
/// Implementation of ControllMessageReceiver::onMessageReceived, called by ControlServer::onMessageReceived
virtual void onMessageReceived(ControlSession* connection, const std::string& message); virtual void onMessageReceived(ControlSession* connection, const std::string& message);
/// Implementation of PipeListener /// Implementation of PipeListener
@ -95,17 +96,15 @@ public:
private: private:
void startAccept(); void startAccept();
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
// void acceptor(); StreamSession* getStreamSession(const std::string& mac);
ClientSession* getClientSession(const std::string& mac);
mutable std::mutex mutex_; mutable std::mutex mutex_;
PipeReader* pipeReader_; PipeReader* pipeReader_;
std::set<std::shared_ptr<ClientSession>> sessions_; std::set<std::shared_ptr<StreamSession>> sessions_;
boost::asio::io_service* io_service_; boost::asio::io_service* io_service_;
std::shared_ptr<tcp::acceptor> acceptor_; std::shared_ptr<tcp::acceptor> acceptor_;
StreamServerSettings settings_; StreamServerSettings settings_;
msg::SampleFormat sampleFormat_; msg::SampleFormat sampleFormat_;
// std::thread acceptThread_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_; Queue<std::shared_ptr<msg::BaseMessage>> messages_;
std::unique_ptr<ControlServer> controlServer_; std::unique_ptr<ControlServer> controlServer_;
}; };

View file

@ -16,10 +16,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#include "streamSession.h"
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <iostream> #include <iostream>
#include <mutex> #include <mutex>
#include "clientSession.h"
#include "common/log.h" #include "common/log.h"
#include "message/pcmChunk.h" #include "message/pcmChunk.h"
@ -27,28 +28,28 @@ using namespace std;
ClientSession::ClientSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver) StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver)
{ {
socket_ = socket; socket_ = socket;
} }
ClientSession::~ClientSession() StreamSession::~StreamSession()
{ {
stop(); stop();
} }
void ClientSession::start() void StreamSession::start()
{ {
active_ = true; active_ = true;
streamActive_ = false; streamActive_ = false;
readerThread_ = new thread(&ClientSession::reader, this); readerThread_ = new thread(&StreamSession::reader, this);
writerThread_ = new thread(&ClientSession::writer, this); writerThread_ = new thread(&StreamSession::writer, this);
} }
void ClientSession::stop() void StreamSession::stop()
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
setActive(false); setActive(false);
@ -81,11 +82,11 @@ void ClientSession::stop()
readerThread_ = NULL; readerThread_ = NULL;
writerThread_ = NULL; writerThread_ = NULL;
socket_ = NULL; socket_ = NULL;
logD << "ClientSession stopped\n"; logD << "StreamSession stopped\n";
} }
void ClientSession::socketRead(void* _to, size_t _bytes) void StreamSession::socketRead(void* _to, size_t _bytes)
{ {
size_t read = 0; size_t read = 0;
do do
@ -97,7 +98,7 @@ void ClientSession::socketRead(void* _to, size_t _bytes)
} }
void ClientSession::add(const shared_ptr<const msg::BaseMessage>& message) void StreamSession::add(const shared_ptr<const msg::BaseMessage>& message)
{ {
if (!message || !streamActive_) if (!message || !streamActive_)
return; return;
@ -108,7 +109,7 @@ void ClientSession::add(const shared_ptr<const msg::BaseMessage>& message)
} }
bool ClientSession::send(const msg::BaseMessage* message) const bool StreamSession::send(const msg::BaseMessage* message) const
{ {
// logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; // logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
@ -125,7 +126,7 @@ bool ClientSession::send(const msg::BaseMessage* message) const
} }
void ClientSession::getNextMessage() void StreamSession::getNextMessage()
{ {
msg::BaseMessage baseMessage; msg::BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
@ -144,7 +145,7 @@ void ClientSession::getNextMessage()
} }
void ClientSession::reader() void StreamSession::reader()
{ {
try try
{ {
@ -155,13 +156,13 @@ void ClientSession::reader()
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
logS(kLogErr) << "Exception in ClientSession::reader(): " << e.what() << endl; logS(kLogErr) << "Exception in StreamSession::reader(): " << e.what() << endl;
} }
setActive(false); setActive(false);
} }
void ClientSession::writer() void StreamSession::writer()
{ {
try try
{ {
@ -192,13 +193,13 @@ void ClientSession::writer()
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
logS(kLogErr) << "Exception in ClientSession::writer(): " << e.what() << endl; logS(kLogErr) << "Exception in StreamSession::writer(): " << e.what() << endl;
} }
setActive(false); setActive(false);
} }
void ClientSession::setActive(bool active) void StreamSession::setActive(bool active)
{ {
if (active_ && !active && (messageReceiver_ != NULL)) if (active_ && !active && (messageReceiver_ != NULL))
messageReceiver_->onDisconnect(this); messageReceiver_->onDisconnect(this);

View file

@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#ifndef SERVER_SESSION_H #ifndef STREAM_SESSION_H
#define SERVER_SESSION_H #define STREAM_SESSION_H
#include <string> #include <string>
#include <thread> #include <thread>
@ -34,15 +34,15 @@
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
class ClientSession; class StreamSession;
/// Interface: callback for a received message. /// Interface: callback for a received message.
class MessageReceiver class MessageReceiver
{ {
public: public:
virtual void onMessageReceived(ClientSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0; virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) = 0;
virtual void onDisconnect(ClientSession* connection) = 0; virtual void onDisconnect(StreamSession* connection) = 0;
}; };
@ -52,12 +52,12 @@ public:
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the MessageReceiver callback * Received messages from the client are passed to the MessageReceiver callback
*/ */
class ClientSession class StreamSession
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to MessageReceiver
ClientSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket); StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket);
~ClientSession(); ~StreamSession();
void start(); void start();
void stop(); void stop();