From 8039c3d0231c811ad051da0d3e8c77f20285aa4f Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Wed, 10 Sep 2014 21:20:22 +0000 Subject: [PATCH] socket stuff git-svn-id: svn://elaine/murooma/trunk@252 d8a302eb-03bc-478d-80e4-98257eca68ef --- client/Makefile | 2 +- client/controller.cpp | 9 +++- client/controller.h | 4 +- common/message.h | 14 +++--- {client => common}/socketConnection.cpp | 37 ++++++++------- {client => common}/socketConnection.h | 4 +- server/Makefile | 2 +- server/controlServer.cpp | 60 ++++--------------------- server/controlServer.h | 26 ++--------- server/snapServer.cpp | 3 ++ 10 files changed, 56 insertions(+), 105 deletions(-) rename {client => common}/socketConnection.cpp (78%) rename {client => common}/socketConnection.h (96%) diff --git a/client/Makefile b/client/Makefile index df47ee60..a28b2ddb 100644 --- a/client/Makefile +++ b/client/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -g -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc -OBJ = snapClient.o stream.o player.o socketConnection.o oggDecoder.o pcmDecoder.o controller.o ../common/pcmChunk.o ../common/log.o ../common/sampleFormat.o +OBJ = snapClient.o stream.o player.o ../common/socketConnection.o oggDecoder.o pcmDecoder.o controller.o ../common/pcmChunk.o ../common/log.o ../common/sampleFormat.o BIN = snapclient all: client diff --git a/client/controller.cpp b/client/controller.cpp index 5eba84ed..0b90dd88 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -59,6 +59,9 @@ void Controller::start(std::string& _ip, size_t _port, int _bufferMs) connection = new ClientConnection(this, _ip, _port); connection->start(); +// controlConnection = new ClientConnection(this, _ip, _port + 1); +// controlConnection->start(); + controllerThread = new thread(&Controller::worker, this); } @@ -75,7 +78,9 @@ void Controller::worker() active_ = true; while (sampleFormat == NULL) + { usleep(10000); + } stream = new Stream(SampleFormat(*sampleFormat)); stream->setBufferLen(bufferMs); @@ -84,7 +89,9 @@ void Controller::worker() while (active_) { - usleep(10000); + usleep(100000); +// BaseMessage msg; +// controlConnection->send(&msg); } } diff --git a/client/controller.h b/client/controller.h index bbcc244f..c319b2a6 100644 --- a/client/controller.h +++ b/client/controller.h @@ -4,8 +4,9 @@ #include #include #include "common/message.h" +#include "common/socketConnection.h" #include "decoder.h" -#include "socketConnection.h" +#include "stream.h" class Controller : public MessageReceiver @@ -21,6 +22,7 @@ private: std::atomic active_; std::thread* controllerThread; ClientConnection* connection; + ClientConnection* controlConnection; SampleFormat* sampleFormat; Decoder* decoder; Stream* stream; diff --git a/common/message.h b/common/message.h index f312310a..30d9566e 100644 --- a/common/message.h +++ b/common/message.h @@ -27,23 +27,23 @@ struct membuf : public std::basic_streambuf enum message_type { - header = 0, - payload = 1, - sampleformat = 2 + base = 0, + header = 1, + payload = 2, + sampleformat = 3 }; struct BaseMessage { - BaseMessage() + BaseMessage() : type(base) { } - BaseMessage(message_type type_) + BaseMessage(message_type type_) : type(type_) { - type = type_; - }; + } virtual ~BaseMessage() { diff --git a/client/socketConnection.cpp b/common/socketConnection.cpp similarity index 78% rename from client/socketConnection.cpp rename to common/socketConnection.cpp index 8e976299..41554d48 100644 --- a/client/socketConnection.cpp +++ b/common/socketConnection.cpp @@ -2,7 +2,7 @@ #include #include #include -#include "common/log.h" +#include "log.h" #define PCM_DEVICE "default" @@ -23,7 +23,7 @@ SocketConnection::~SocketConnection() void SocketConnection::socketRead(void* _to, size_t _bytes) { - std::unique_lock mlock(mutex_); +// std::unique_lock mlock(mutex_); size_t toRead = _bytes; size_t len = 0; do @@ -52,11 +52,8 @@ void SocketConnection::send(BaseMessage* message) std::unique_lock mlock(mutex_); boost::asio::streambuf streambuf; std::ostream stream(&streambuf); - for (;;) - { - message->serialize(stream); - boost::asio::write(*socket.get(), streambuf); - } + message->serialize(stream); + boost::asio::write(*socket.get(), streambuf); } @@ -94,19 +91,21 @@ void ClientConnection::worker() while (active_) { try - { - tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); - iterator = resolver.resolve(query); - - socket.reset(new tcp::socket(io_service)); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - socket->connect(*iterator); - std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; + { + { + std::unique_lock mlock(mutex_); + tcp::resolver resolver(io_service); + tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); + iterator = resolver.resolve(query); + socket.reset(new tcp::socket(io_service)); + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + socket->connect(*iterator); + std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; + } while(active_) { getNextMessage(); diff --git a/client/socketConnection.h b/common/socketConnection.h similarity index 96% rename from client/socketConnection.h rename to common/socketConnection.h index e79f6a7f..757158ac 100644 --- a/client/socketConnection.h +++ b/common/socketConnection.h @@ -4,9 +4,9 @@ #include #include #include +#include #include -#include "stream.h" -#include "common/message.h" +#include "message.h" using boost::asio::ip::tcp; diff --git a/server/Makefile b/server/Makefile index db1f03d8..1c67a119 100644 --- a/server/Makefile +++ b/server/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -OBJ = snapServer.o streamServer.o controlServer.o pcmEncoder.o oggEncoder.o ../common/pcmChunk.o ../common/sampleFormat.o +OBJ = snapServer.o streamServer.o controlServer.o pcmEncoder.o oggEncoder.o ../common/log.o ../common/socketConnection.o ../common/pcmChunk.o ../common/sampleFormat.o BIN = snapserver all: server diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 85d77a7b..123236c0 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -1,53 +1,5 @@ #include "controlServer.h" - - -ControlSession::ControlSession(socket_ptr sock) : active_(false), socket_(sock) -{ -} - - -void ControlSession::sender() -{ - try - { - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - for (;;) - { - shared_ptr message(messages.pop()); - message->serialize(stream); - boost::asio::write(*socket_, streambuf); - } - } - catch (std::exception& e) - { - std::cerr << "Exception in thread: " << e.what() << "\n"; - active_ = false; - } -} - -void ControlSession::start() -{ - active_ = true; - senderThread = new thread(&ControlSession::sender, this); -// readerThread.join(); -} - -void ControlSession::send(BaseMessage* message) -{ - boost::asio::streambuf streambuf; - std::ostream stream(&streambuf); - message->serialize(stream); - boost::asio::write(*socket_, streambuf); -} - - -bool ControlSession::isActive() const -{ - return active_; -} - - +#include ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NULL) @@ -55,6 +7,12 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL } +void ControlServer::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) +{ + cout << "onMessageReceived: " << baseMessage.type << ", " << baseMessage.size << "\n"; +} + + void ControlServer::acceptor() { tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); @@ -63,8 +21,8 @@ void ControlServer::acceptor() socket_ptr sock(new tcp::socket(io_service_)); a.accept(*sock); cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; - ControlSession* session = new ControlSession(sock); - sessions.insert(shared_ptr(session)); + ServerConnection* session = new ServerConnection(this, sock); + sessions.insert(shared_ptr(session)); session->start(); } } diff --git a/server/controlServer.h b/server/controlServer.h index 90fb43fd..00c796f7 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -12,6 +12,7 @@ #include "common/message.h" #include "common/headerMessage.h" #include "common/sampleFormat.h" +#include "common/socketConnection.h" using boost::asio::ip::tcp; @@ -19,37 +20,18 @@ typedef std::shared_ptr socket_ptr; using namespace std; - -class ControlSession -{ -public: - ControlSession(socket_ptr sock); - - void start(); - void send(BaseMessage* message); - bool isActive() const; - -private: - void sender(); - bool active_; - socket_ptr socket_; - thread* senderThread; - Queue> messages; -}; - - - -class ControlServer +class ControlServer : public MessageReceiver { public: ControlServer(unsigned short port); void start(); void stop(); + virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); private: void acceptor(); - set> sessions; + set> sessions; boost::asio::io_service io_service_; unsigned short port_; shared_ptr headerChunk; diff --git a/server/snapServer.cpp b/server/snapServer.cpp index ca5dee6f..1d261a94 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -64,6 +64,9 @@ int main(int argc, char* argv[]) StreamServer* server = new StreamServer(port); server->start(); + ControlServer* controlServer = new ControlServer(port + 1); + controlServer->start(); + timeval tvChunk; gettimeofday(&tvChunk, NULL); long nextTick = getTickCount();