From 00fac3eccbf58d824e645e584ca005d5d222dba2 Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Tue, 9 Sep 2014 20:31:03 +0000 Subject: [PATCH] controller git-svn-id: svn://elaine/murooma/trunk@242 d8a302eb-03bc-478d-80e4-98257eca68ef --- client/Makefile | 3 +- client/controller.cpp | 61 ++++++++++++++++++++++++++++--------- client/controller.h | 12 ++++++-- client/decoder.h | 2 +- client/oggDecoder.cpp | 2 +- client/oggDecoder.h | 2 +- client/pcmDecoder.cpp | 2 +- client/pcmDecoder.h | 2 +- client/serverConnection.cpp | 54 ++++++-------------------------- client/serverConnection.h | 13 +++----- client/snapClient.cpp | 56 ++++++++++++++-------------------- common/message.h | 20 ++++++++++-- common/pcmChunk.cpp | 9 ++++-- common/pcmChunk.h | 1 + server/Makefile | 3 +- 15 files changed, 126 insertions(+), 116 deletions(-) diff --git a/client/Makefile b/client/Makefile index 2826c9d3..fe5bcefb 100644 --- a/client/Makefile +++ b/client/Makefile @@ -10,7 +10,8 @@ all: client client: $(OBJ) $(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS) - + strip $(BIN) + %.o: %.cpp $(CC) $(CFLAGS) -c $< -o $@ diff --git a/client/controller.cpp b/client/controller.cpp index 3277e550..f0944279 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -1,41 +1,63 @@ #include "controller.h" #include +#include #include +#include "oggDecoder.h" +#include "pcmDecoder.h" +#include "player.h" using namespace std; -Controller::Controller() : MessageReceiver(), active_(false) +Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL) { + decoder = new OggDecoder(); } -void Controller::onMessageReceived(BaseMessage* message) +void Controller::onMessageReceived(tcp::socket* socket, const BaseMessage& baseMessage, char* buffer) { - if (message->type == message_type::payload) + if (baseMessage.type == message_type::payload) { -/* if (decoder.decode((PcmChunk*)message)) - stream_->addChunk((PcmChunk*)message); - else -*/ delete message; -//cout << ", decoded: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n"; + if ((stream != NULL) && (decoder != NULL)) + { + PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); + pcmChunk->deserialize(baseMessage, buffer); +//cout << "chunk: " << pcmChunk->payloadSize; + if (decoder->decode(pcmChunk)) + { + stream->addChunk(pcmChunk); +//cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->tv_sec << ", usec: " << pcmChunk->tv_usec/1000 << ", type: " << pcmChunk->type << "\n"; + } + else + delete pcmChunk; + } } - else if (message->type == message_type::header) + else if (baseMessage.type == message_type::header) { -// decoder.setHeader((HeaderMessage*)message); + if (decoder != NULL) + { + HeaderMessage* headerMessage = new HeaderMessage(); + headerMessage->deserialize(baseMessage, buffer); + decoder->setHeader(headerMessage); + } } - else if (message->type == message_type::sampleformat) + else if (baseMessage.type == message_type::sampleformat) { - SampleFormat* sampleFormat = (SampleFormat*)message; + sampleFormat = new SampleFormat(); + sampleFormat->deserialize(baseMessage, buffer); cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; - delete sampleFormat; } } -void Controller::start() +void Controller::start(std::string& _ip, size_t _port, int _bufferMs) { + bufferMs = _bufferMs; + + connection = new ServerConnection(); + connection->start(this, _ip, _port); controllerThread = new thread(&Controller::worker, this); } @@ -49,7 +71,17 @@ void Controller::stop() void Controller::worker() { +// Decoder* decoder; active_ = true; + + while (sampleFormat == NULL) + usleep(10000); + stream = new Stream(SampleFormat(*sampleFormat)); + stream->setBufferLen(bufferMs); + + Player player(stream); + player.start(); + while (active_) { usleep(10000); @@ -57,3 +89,4 @@ void Controller::worker() } + diff --git a/client/controller.h b/client/controller.h index 218c293d..1f435411 100644 --- a/client/controller.h +++ b/client/controller.h @@ -1,9 +1,10 @@ #ifndef CONTROLLER_H #define CONTROLLER_H -#include #include #include +#include "common/message.h" +#include "decoder.h" #include "serverConnection.h" @@ -11,14 +12,19 @@ class Controller : public MessageReceiver { public: Controller(); - void start(); + void start(std::string& _ip, size_t _port, int _bufferMs); void stop(); - virtual void onMessageReceived(BaseMessage* message); + virtual void onMessageReceived(tcp::socket* socket, const BaseMessage& baseMessage, char* buffer); private: void worker(); std::atomic active_; std::thread* controllerThread; + ServerConnection* connection; + SampleFormat* sampleFormat; + Decoder* decoder; + Stream* stream; + int bufferMs; }; diff --git a/client/decoder.h b/client/decoder.h index c19d2e48..7935d23c 100644 --- a/client/decoder.h +++ b/client/decoder.h @@ -6,7 +6,7 @@ class Decoder { public: - Decoder(); + Decoder() {}; virtual bool decode(PcmChunk* chunk) = 0; virtual bool setHeader(HeaderMessage* chunk) = 0; }; diff --git a/client/oggDecoder.cpp b/client/oggDecoder.cpp index 90a7a6f5..e9943c10 100644 --- a/client/oggDecoder.cpp +++ b/client/oggDecoder.cpp @@ -8,7 +8,7 @@ using namespace std; -OggDecoder::OggDecoder() +OggDecoder::OggDecoder() : Decoder() { ogg_sync_init(&oy); /* Now we can read pages */ convsize = 4096; diff --git a/client/oggDecoder.h b/client/oggDecoder.h index 11554401..73dc3d68 100644 --- a/client/oggDecoder.h +++ b/client/oggDecoder.h @@ -4,7 +4,7 @@ #include -class OggDecoder +class OggDecoder : public Decoder { public: OggDecoder(); diff --git a/client/pcmDecoder.cpp b/client/pcmDecoder.cpp index 3bd6e749..85eca67f 100644 --- a/client/pcmDecoder.cpp +++ b/client/pcmDecoder.cpp @@ -1,6 +1,6 @@ #include "pcmDecoder.h" -PcmDecoder::PcmDecoder() +PcmDecoder::PcmDecoder() : Decoder() { } diff --git a/client/pcmDecoder.h b/client/pcmDecoder.h index dceeafd2..87a32df9 100644 --- a/client/pcmDecoder.h +++ b/client/pcmDecoder.h @@ -3,7 +3,7 @@ #include "decoder.h" -class PcmDecoder +class PcmDecoder : public Decoder { public: PcmDecoder(); diff --git a/client/serverConnection.cpp b/client/serverConnection.cpp index 1a621f2e..0972c9d4 100644 --- a/client/serverConnection.cpp +++ b/client/serverConnection.cpp @@ -2,6 +2,8 @@ #include #include #include "common/log.h" +#include "common/message.h" +#include "common/headerMessage.h" #define PCM_DEVICE "default" @@ -9,7 +11,7 @@ using namespace std; -ServerConnection::ServerConnection(Stream* stream) : active_(false), stream_(stream) +ServerConnection::ServerConnection() : active_(false) { } @@ -47,51 +49,21 @@ void ServerConnection::stop() } -BaseMessage* ServerConnection::getNextMessage(tcp::socket* socket) +void ServerConnection::getNextMessage(tcp::socket* socket) { BaseMessage baseMessage; size_t baseMsgSize = baseMessage.getSize(); vector buffer(baseMsgSize); socketRead(socket, &buffer[0], baseMsgSize); - baseMessage.readVec(buffer); + baseMessage.deserialize(&buffer[0]); //cout << "type: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; if (baseMessage.size > buffer.size()) buffer.resize(baseMessage.size); socketRead(socket, &buffer[0], baseMessage.size); - BaseMessage* message = NULL; - if (baseMessage.type == message_type::payload) - message = new PcmChunk(stream_->format, 0); - else if (baseMessage.type == message_type::header) - message = new HeaderMessage(); - else if (baseMessage.type == message_type::sampleformat) - message = new SampleFormat(); - if (message != NULL) - message->readVec(buffer); - return message; -} - - -void ServerConnection::onMessageReceived(BaseMessage* message) -{ - if (message->type == message_type::payload) - { - if (decoder.decode((PcmChunk*)message)) - stream_->addChunk((PcmChunk*)message); - else - delete message; -//cout << ", decoded: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n"; - } - else if (message->type == message_type::header) - { - decoder.setHeader((HeaderMessage*)message); - } - else if (message->type == message_type::sampleformat) - { - SampleFormat* sampleFormat = (SampleFormat*)message; - cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; - delete sampleFormat; - } + + if (messageReceiver != NULL) + messageReceiver->onMessageReceived(socket, baseMessage, &buffer[0]); } @@ -112,20 +84,12 @@ void ServerConnection::worker() while(active_) { - BaseMessage* message = getNextMessage(&s); - if (message == NULL) - continue; - - if (messageReceiver != NULL) - messageReceiver->onMessageReceived(message); - else - delete message; + getNextMessage(&s); } } catch (const std::exception& e) { cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; - stream_->clearChunks(); usleep(500*1000); } } diff --git a/client/serverConnection.h b/client/serverConnection.h index 1f50bd0e..dc85ee30 100644 --- a/client/serverConnection.h +++ b/client/serverConnection.h @@ -6,8 +6,6 @@ #include #include #include "stream.h" -#include "oggDecoder.h" -#include "pcmDecoder.h" using boost::asio::ip::tcp; @@ -16,17 +14,16 @@ using boost::asio::ip::tcp; class MessageReceiver { public: - virtual void onMessageReceived(BaseMessage* message) = 0; + virtual void onMessageReceived(tcp::socket* socket, const BaseMessage& baseMessage, char* buffer) = 0; }; -class ServerConnection : public MessageReceiver +class ServerConnection { public: - ServerConnection(Stream* stream); + ServerConnection(); void start(MessageReceiver* receiver, const std::string& ip, size_t port); void stop(); - virtual void onMessageReceived(BaseMessage* message); private: void socketRead(tcp::socket* socket, void* to, size_t bytes); @@ -34,13 +31,11 @@ private: boost::asio::ip::tcp::endpoint endpt; MessageReceiver* messageReceiver; - BaseMessage* getNextMessage(tcp::socket* socket); + void getNextMessage(tcp::socket* socket); boost::asio::io_service io_service; tcp::resolver::iterator iterator; std::atomic active_; - Stream* stream_; std::thread* receiverThread; - OggDecoder decoder; }; diff --git a/client/snapClient.cpp b/client/snapClient.cpp index 869bb7d4..48392c0b 100644 --- a/client/snapClient.cpp +++ b/client/snapClient.cpp @@ -9,12 +9,8 @@ #include #include -#include "common/sampleFormat.h" #include "common/utils.h" #include "common/log.h" -#include "stream.h" -#include "player.h" -#include "serverConnection.h" #include "controller.h" @@ -27,32 +23,31 @@ namespace po = boost::program_options; int main (int argc, char *argv[]) { int deviceIdx; - Stream* stream; string ip; int bufferMs; size_t port; bool runAsDaemon; - string sampleFormat; - po::options_description desc("Allowed options"); - desc.add_options() - ("help,h", "produce help message") - ("port,p", po::value(&port)->default_value(98765), "port where the server listens on") - ("ip,i", po::value(&ip)->default_value("192.168.0.2"), "server IP") - ("soundcard,s", po::value(&deviceIdx)->default_value(-1), "index of the soundcard") - ("sampleformat,f", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") - ("buffer,b", po::value(&bufferMs)->default_value(300), "buffer size [ms]") - ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") - ; +// string sampleFormat; + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port,p", po::value(&port)->default_value(98765), "port where the server listens on") + ("ip,i", po::value(&ip)->default_value("192.168.0.2"), "server IP") + ("soundcard,s", po::value(&deviceIdx)->default_value(-1), "index of the soundcard") +// ("sampleformat,f", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") + ("buffer,b", po::value(&bufferMs)->default_value(300), "buffer size [ms]") + ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") + ; - po::variables_map vm; - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); - if (vm.count("help")) - { - cout << desc << "\n"; - return 1; - } + if (vm.count("help")) + { + cout << desc << "\n"; + return 1; + } std::clog.rdbuf(new Log("snapclient", LOG_DAEMON)); if (runAsDaemon) @@ -61,18 +56,13 @@ int main (int argc, char *argv[]) std::clog << kLogNotice << "daemon started" << std::endl; } - stream = new Stream(SampleFormat(sampleFormat)); - stream->setBufferLen(bufferMs); - - Player player(stream); - player.start(); - ServerConnection serverConnection(stream); - serverConnection.start(&serverConnection, ip, port); + Controller controller; + controller.start(ip, port, bufferMs); while(true) usleep(10000); - - return 0; + + return 0; } diff --git a/common/message.h b/common/message.h index 85e6e5ad..47f2ae98 100644 --- a/common/message.h +++ b/common/message.h @@ -17,6 +17,13 @@ public: }; +struct membuf : public std::basic_streambuf +{ + membuf(char* begin, char* end) { + this->setg(begin, begin, end); + } +}; + enum message_type { @@ -48,9 +55,18 @@ struct BaseMessage stream.read(reinterpret_cast(&size), sizeof(uint32_t)); } - virtual void readVec(std::vector& stream) + void deserialize(char* payload) { - vectorwrapbuf databuf(stream); + membuf databuf(payload, payload + sizeof(size)); + std::istream is(&databuf); + read(is); + } + + void deserialize(const BaseMessage& baseMessage, char* payload) + { + type = baseMessage.type; + size = baseMessage.size; + membuf databuf(payload, payload + size); std::istream is(&databuf); read(is); } diff --git a/common/pcmChunk.cpp b/common/pcmChunk.cpp index efae9270..9ec96cf2 100644 --- a/common/pcmChunk.cpp +++ b/common/pcmChunk.cpp @@ -7,10 +7,13 @@ using namespace std; -PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(), format(sampleFormat), idx(0) +PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(sampleFormat.rate*sampleFormat.frameSize*ms / 1000), format(sampleFormat), idx(0) +{ +} + + +PcmChunk::PcmChunk() : WireChunk(), idx(0) { - payloadSize = format.rate*format.frameSize*ms / 1000; - payload = (char*)malloc(payloadSize); } diff --git a/common/pcmChunk.h b/common/pcmChunk.h index 41a5b371..c451942c 100644 --- a/common/pcmChunk.h +++ b/common/pcmChunk.h @@ -14,6 +14,7 @@ class PcmChunk : public WireChunk { public: PcmChunk(const SampleFormat& sampleFormat, size_t ms); + PcmChunk(); ~PcmChunk(); int readFrames(void* outputBuffer, size_t frameCount); diff --git a/server/Makefile b/server/Makefile index 13a769ca..6d8784c5 100644 --- a/server/Makefile +++ b/server/Makefile @@ -10,7 +10,8 @@ all: server server: $(OBJ) $(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS) - + strip $(BIN) + %.o: %.cpp $(CC) $(CFLAGS) -c $< -o $@