diff --git a/server/Makefile b/server/Makefile index 10308067..db1f03d8 100644 --- a/server/Makefile +++ b/server/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -OBJ = snapServer.o pcmEncoder.o oggEncoder.o ../common/pcmChunk.o ../common/sampleFormat.o +OBJ = snapServer.o streamServer.o controlServer.o pcmEncoder.o oggEncoder.o ../common/pcmChunk.o ../common/sampleFormat.o BIN = snapserver all: server diff --git a/server/controlServer.cpp b/server/controlServer.cpp new file mode 100644 index 00000000..85d77a7b --- /dev/null +++ b/server/controlServer.cpp @@ -0,0 +1,85 @@ +#include "controlServer.h" + + +ControlSession::ControlSession(socket_ptr sock) : active_(false), socket_(sock) +{ +} + + +void ControlSession::sender() +{ + try + { + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); + for (;;) + { + shared_ptr message(messages.pop()); + message->serialize(stream); + boost::asio::write(*socket_, streambuf); + } + } + catch (std::exception& e) + { + std::cerr << "Exception in thread: " << e.what() << "\n"; + active_ = false; + } +} + +void ControlSession::start() +{ + active_ = true; + senderThread = new thread(&ControlSession::sender, this); +// readerThread.join(); +} + +void ControlSession::send(BaseMessage* message) +{ + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); + message->serialize(stream); + boost::asio::write(*socket_, streambuf); +} + + +bool ControlSession::isActive() const +{ + return active_; +} + + + + +ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NULL) +{ +} + + +void ControlServer::acceptor() +{ + tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service_)); + a.accept(*sock); + cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + ControlSession* session = new ControlSession(sock); + sessions.insert(shared_ptr(session)); + session->start(); + } +} + + +void ControlServer::start() +{ + acceptThread = new thread(&ControlServer::acceptor, this); +} + + +void ControlServer::stop() +{ +// acceptThread->join(); +} + + + diff --git a/server/controlServer.h b/server/controlServer.h new file mode 100644 index 00000000..90fb43fd --- /dev/null +++ b/server/controlServer.h @@ -0,0 +1,64 @@ +#ifndef CONTROL_SERVER_H +#define CONTROL_SERVER_H + +#include +#include +#include +#include +#include +#include +#include "common/timeUtils.h" +#include "common/queue.h" +#include "common/message.h" +#include "common/headerMessage.h" +#include "common/sampleFormat.h" + + +using boost::asio::ip::tcp; +typedef std::shared_ptr socket_ptr; +using namespace std; + + + +class ControlSession +{ +public: + ControlSession(socket_ptr sock); + + void start(); + void send(BaseMessage* message); + bool isActive() const; + +private: + void sender(); + bool active_; + socket_ptr socket_; + thread* senderThread; + Queue> messages; +}; + + + +class ControlServer +{ +public: + ControlServer(unsigned short port); + + void start(); + void stop(); + +private: + void acceptor(); + set> sessions; + boost::asio::io_service io_service_; + unsigned short port_; + shared_ptr headerChunk; + shared_ptr sampleFormat; + thread* acceptThread; +}; + + + +#endif + + diff --git a/server/snapServer.cpp b/server/snapServer.cpp index 930ab367..ca5dee6f 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -1,186 +1,23 @@ -#include -#include -#include #include #include -#include -#include // localtime -#include // stringstream -#include -#include #include -#include -#include #include "common/timeUtils.h" -#include "common/queue.h" #include "common/signalHandler.h" #include "common/utils.h" #include "common/sampleFormat.h" #include "../server/pcmEncoder.h" #include "../server/oggEncoder.h" #include "common/message.h" - - -using boost::asio::ip::tcp; -namespace po = boost::program_options; - - -typedef boost::shared_ptr socket_ptr; -using namespace std; -using namespace std::chrono; +#include "streamServer.h" +#include "controlServer.h" bool g_terminated = false; +namespace po = boost::program_options; +using namespace std; -class Session -{ -public: - Session(socket_ptr sock) : active_(false), socket_(sock) - { - } - - void sender() - { - try - { - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - for (;;) - { - shared_ptr message(messages.pop()); - message->serialize(stream); - boost::asio::write(*socket_, streambuf); - } - } - catch (std::exception& e) - { - std::cerr << "Exception in thread: " << e.what() << "\n"; - active_ = false; - } - } - - void start() - { - active_ = true; - senderThread = new thread(&Session::sender, this); -// readerThread.join(); - } - - void send(shared_ptr message) - { - if (!message) - return; - - while (messages.size() > 100)//* chunk->getDuration() > 10000) - messages.pop(); - messages.push(message); - } - - bool isActive() const - { - return active_; - } - -private: - bool active_; - socket_ptr socket_; - thread* senderThread; - Queue> messages; -}; - - -class Server -{ -public: - Server(unsigned short port) : port_(port), headerChunk(NULL) - { - } - - void acceptor() - { - tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); - for (;;) - { - socket_ptr sock(new tcp::socket(io_service_)); - a.accept(*sock); - cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; - Session* session = new Session(sock); - session->send(sampleFormat); - session->send(headerChunk); - session->start(); - sessions.insert(shared_ptr(session)); - } - } - - void setHeader(shared_ptr header) - { - if (header) - headerChunk = header; - } - - void setFormat(SampleFormat& format) - { - sampleFormat = shared_ptr(new SampleFormat(format)); - } - - void send(shared_ptr message) - { - for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) - { - if (!(*it)->isActive()) - { - cout << "Session inactive. Removing\n"; - sessions.erase(it++); - } - else - ++it; - } - - for (auto s : sessions) - s->send(message); - } - - void start() - { - acceptThread = new thread(&Server::acceptor, this); - } - - void stop() - { -// acceptThread->join(); - } - -private: - set> sessions; - boost::asio::io_service io_service_; - unsigned short port_; - shared_ptr headerChunk; - shared_ptr sampleFormat; - thread* acceptThread; -}; - - -class ServerException : public std::exception -{ -public: - ServerException(const std::string& what) : what_(what) - { - } - - virtual ~ServerException() throw() - { - } - - virtual const char* what() const throw() - { - return what_.c_str(); - } - -private: - std::string what_; -}; int main(int argc, char* argv[]) @@ -224,7 +61,7 @@ int main(int argc, char* argv[]) openlog ("firstdaemon", LOG_PID, LOG_DAEMON); using namespace std; // For atoi. - Server* server = new Server(port); + StreamServer* server = new StreamServer(port); server->start(); timeval tvChunk; diff --git a/server/streamServer.cpp b/server/streamServer.cpp new file mode 100644 index 00000000..306bb90b --- /dev/null +++ b/server/streamServer.cpp @@ -0,0 +1,119 @@ +#include "streamServer.h" + + + +StreamSession::StreamSession(socket_ptr sock) : active_(false), socket_(sock) +{ +} + + +void StreamSession::sender() +{ + try + { + boost::asio::streambuf streambuf; + std::ostream stream(&streambuf); + for (;;) + { + shared_ptr message(messages.pop()); + message->serialize(stream); + boost::asio::write(*socket_, streambuf); + } + } + catch (std::exception& e) + { + std::cerr << "Exception in thread: " << e.what() << "\n"; + active_ = false; + } +} + +void StreamSession::start() +{ + active_ = true; + senderThread = new thread(&StreamSession::sender, this); +// readerThread.join(); +} + +void StreamSession::send(shared_ptr message) +{ + if (!message) + return; + + while (messages.size() > 100)//* chunk->getDuration() > 10000) + messages.pop(); + messages.push(message); +} + +bool StreamSession::isActive() const +{ + return active_; +} + + + +StreamServer::StreamServer(unsigned short port) : port_(port), headerChunk(NULL) +{ +} + + +void StreamServer::acceptor() +{ + tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service_)); + a.accept(*sock); + cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + StreamSession* session = new StreamSession(sock); + session->send(sampleFormat); + session->send(headerChunk); + session->start(); + sessions.insert(shared_ptr(session)); + } +} + + +void StreamServer::setHeader(shared_ptr header) +{ + if (header) + headerChunk = header; +} + + +void StreamServer::setFormat(SampleFormat& format) +{ + sampleFormat = shared_ptr(new SampleFormat(format)); +} + + +void StreamServer::send(shared_ptr message) +{ + for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) + { + if (!(*it)->isActive()) + { + cout << "Session inactive. Removing\n"; + sessions.erase(it++); + } + else + ++it; + } + + for (auto s : sessions) + s->send(message); +} + + +void StreamServer::start() +{ + acceptThread = new thread(&StreamServer::acceptor, this); +} + + +void StreamServer::stop() +{ +// acceptThread->join(); +} + + + diff --git a/server/streamServer.h b/server/streamServer.h new file mode 100644 index 00000000..162a7b1c --- /dev/null +++ b/server/streamServer.h @@ -0,0 +1,88 @@ +#ifndef STREAM_SERVER_H +#define STREAM_SERVER_H + +#include +#include +#include +#include +#include +#include +#include "common/timeUtils.h" +#include "common/queue.h" +#include "common/message.h" +#include "common/headerMessage.h" +#include "common/sampleFormat.h" + + +using boost::asio::ip::tcp; +typedef std::shared_ptr socket_ptr; +using namespace std; + + + +class StreamSession +{ +public: + StreamSession(socket_ptr sock); + + void start(); + void send(shared_ptr message); + bool isActive() const; + +private: + void sender(); + bool active_; + socket_ptr socket_; + thread* senderThread; + Queue> messages; +}; + + + +class StreamServer +{ +public: + StreamServer(unsigned short port); + + void setHeader(shared_ptr header); + void setFormat(SampleFormat& format); + void send(shared_ptr message); + + void start(); + void stop(); + +private: + void acceptor(); + set> sessions; + boost::asio::io_service io_service_; + unsigned short port_; + shared_ptr headerChunk; + shared_ptr sampleFormat; + thread* acceptThread; +}; + + +class ServerException : public std::exception +{ +public: + ServerException(const std::string& what) : what_(what) + { + } + + virtual ~ServerException() throw() + { + } + + virtual const char* what() const throw() + { + return what_.c_str(); + } + +private: + std::string what_; +}; + + +#endif + +