controller

git-svn-id: svn://elaine/murooma/trunk@242 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-09 20:31:03 +00:00
parent e342a4108e
commit 00fac3eccb
15 changed files with 126 additions and 116 deletions

View file

@ -10,6 +10,7 @@ all: client
client: $(OBJ) client: $(OBJ)
$(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS) $(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS)
strip $(BIN)
%.o: %.cpp %.o: %.cpp
$(CC) $(CFLAGS) -c $< -o $@ $(CC) $(CFLAGS) -c $< -o $@

View file

@ -1,41 +1,63 @@
#include "controller.h" #include "controller.h"
#include <iostream> #include <iostream>
#include <string>
#include <unistd.h> #include <unistd.h>
#include "oggDecoder.h"
#include "pcmDecoder.h"
#include "player.h"
using namespace std; using namespace std;
Controller::Controller() : MessageReceiver(), active_(false) Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL)
{ {
decoder = new OggDecoder();
} }
void Controller::onMessageReceived(BaseMessage* message) void Controller::onMessageReceived(tcp::socket* socket, const BaseMessage& baseMessage, char* buffer)
{ {
if (message->type == message_type::payload) if (baseMessage.type == message_type::payload)
{ {
/* if (decoder.decode((PcmChunk*)message)) if ((stream != NULL) && (decoder != NULL))
stream_->addChunk((PcmChunk*)message); {
else PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
*/ delete message; pcmChunk->deserialize(baseMessage, buffer);
//cout << ", decoded: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n"; //cout << "chunk: " << pcmChunk->payloadSize;
if (decoder->decode(pcmChunk))
{
stream->addChunk(pcmChunk);
//cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->tv_sec << ", usec: " << pcmChunk->tv_usec/1000 << ", type: " << pcmChunk->type << "\n";
}
else
delete pcmChunk;
}
} }
else if (message->type == message_type::header) else if (baseMessage.type == message_type::header)
{ {
// decoder.setHeader((HeaderMessage*)message); if (decoder != NULL)
{
HeaderMessage* headerMessage = new HeaderMessage();
headerMessage->deserialize(baseMessage, buffer);
decoder->setHeader(headerMessage);
}
} }
else if (message->type == message_type::sampleformat) else if (baseMessage.type == message_type::sampleformat)
{ {
SampleFormat* sampleFormat = (SampleFormat*)message; sampleFormat = new SampleFormat();
sampleFormat->deserialize(baseMessage, buffer);
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
delete sampleFormat;
} }
} }
void Controller::start() void Controller::start(std::string& _ip, size_t _port, int _bufferMs)
{ {
bufferMs = _bufferMs;
connection = new ServerConnection();
connection->start(this, _ip, _port);
controllerThread = new thread(&Controller::worker, this); controllerThread = new thread(&Controller::worker, this);
} }
@ -49,7 +71,17 @@ void Controller::stop()
void Controller::worker() void Controller::worker()
{ {
// Decoder* decoder;
active_ = true; active_ = true;
while (sampleFormat == NULL)
usleep(10000);
stream = new Stream(SampleFormat(*sampleFormat));
stream->setBufferLen(bufferMs);
Player player(stream);
player.start();
while (active_) while (active_)
{ {
usleep(10000); usleep(10000);
@ -57,3 +89,4 @@ void Controller::worker()
} }

View file

@ -1,9 +1,10 @@
#ifndef CONTROLLER_H #ifndef CONTROLLER_H
#define CONTROLLER_H #define CONTROLLER_H
#include <string>
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include "common/message.h"
#include "decoder.h"
#include "serverConnection.h" #include "serverConnection.h"
@ -11,14 +12,19 @@ class Controller : public MessageReceiver
{ {
public: public:
Controller(); Controller();
void start(); void start(std::string& _ip, size_t _port, int _bufferMs);
void stop(); void stop();
virtual void onMessageReceived(BaseMessage* message); virtual void onMessageReceived(tcp::socket* socket, const BaseMessage& baseMessage, char* buffer);
private: private:
void worker(); void worker();
std::atomic<bool> active_; std::atomic<bool> active_;
std::thread* controllerThread; std::thread* controllerThread;
ServerConnection* connection;
SampleFormat* sampleFormat;
Decoder* decoder;
Stream* stream;
int bufferMs;
}; };

View file

@ -6,7 +6,7 @@
class Decoder class Decoder
{ {
public: public:
Decoder(); Decoder() {};
virtual bool decode(PcmChunk* chunk) = 0; virtual bool decode(PcmChunk* chunk) = 0;
virtual bool setHeader(HeaderMessage* chunk) = 0; virtual bool setHeader(HeaderMessage* chunk) = 0;
}; };

View file

@ -8,7 +8,7 @@
using namespace std; using namespace std;
OggDecoder::OggDecoder() OggDecoder::OggDecoder() : Decoder()
{ {
ogg_sync_init(&oy); /* Now we can read pages */ ogg_sync_init(&oy); /* Now we can read pages */
convsize = 4096; convsize = 4096;

View file

@ -4,7 +4,7 @@
#include <vorbis/codec.h> #include <vorbis/codec.h>
class OggDecoder class OggDecoder : public Decoder
{ {
public: public:
OggDecoder(); OggDecoder();

View file

@ -1,6 +1,6 @@
#include "pcmDecoder.h" #include "pcmDecoder.h"
PcmDecoder::PcmDecoder() PcmDecoder::PcmDecoder() : Decoder()
{ {
} }

View file

@ -3,7 +3,7 @@
#include "decoder.h" #include "decoder.h"
class PcmDecoder class PcmDecoder : public Decoder
{ {
public: public:
PcmDecoder(); PcmDecoder();

View file

@ -2,6 +2,8 @@
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <iostream> #include <iostream>
#include "common/log.h" #include "common/log.h"
#include "common/message.h"
#include "common/headerMessage.h"
#define PCM_DEVICE "default" #define PCM_DEVICE "default"
@ -9,7 +11,7 @@
using namespace std; using namespace std;
ServerConnection::ServerConnection(Stream* stream) : active_(false), stream_(stream) ServerConnection::ServerConnection() : active_(false)
{ {
} }
@ -47,51 +49,21 @@ void ServerConnection::stop()
} }
BaseMessage* ServerConnection::getNextMessage(tcp::socket* socket) void ServerConnection::getNextMessage(tcp::socket* socket)
{ {
BaseMessage baseMessage; BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize); vector<char> buffer(baseMsgSize);
socketRead(socket, &buffer[0], baseMsgSize); socketRead(socket, &buffer[0], baseMsgSize);
baseMessage.readVec(buffer); baseMessage.deserialize(&buffer[0]);
//cout << "type: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; //cout << "type: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
if (baseMessage.size > buffer.size()) if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size); buffer.resize(baseMessage.size);
socketRead(socket, &buffer[0], baseMessage.size); socketRead(socket, &buffer[0], baseMessage.size);
BaseMessage* message = NULL;
if (baseMessage.type == message_type::payload)
message = new PcmChunk(stream_->format, 0);
else if (baseMessage.type == message_type::header)
message = new HeaderMessage();
else if (baseMessage.type == message_type::sampleformat)
message = new SampleFormat();
if (message != NULL)
message->readVec(buffer);
return message;
}
if (messageReceiver != NULL)
void ServerConnection::onMessageReceived(BaseMessage* message) messageReceiver->onMessageReceived(socket, baseMessage, &buffer[0]);
{
if (message->type == message_type::payload)
{
if (decoder.decode((PcmChunk*)message))
stream_->addChunk((PcmChunk*)message);
else
delete message;
//cout << ", decoded: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n";
}
else if (message->type == message_type::header)
{
decoder.setHeader((HeaderMessage*)message);
}
else if (message->type == message_type::sampleformat)
{
SampleFormat* sampleFormat = (SampleFormat*)message;
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
delete sampleFormat;
}
} }
@ -112,20 +84,12 @@ void ServerConnection::worker()
while(active_) while(active_)
{ {
BaseMessage* message = getNextMessage(&s); getNextMessage(&s);
if (message == NULL)
continue;
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(message);
else
delete message;
} }
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
stream_->clearChunks();
usleep(500*1000); usleep(500*1000);
} }
} }

View file

@ -6,8 +6,6 @@
#include <atomic> #include <atomic>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "stream.h" #include "stream.h"
#include "oggDecoder.h"
#include "pcmDecoder.h"
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
@ -16,17 +14,16 @@ using boost::asio::ip::tcp;
class MessageReceiver class MessageReceiver
{ {
public: public:
virtual void onMessageReceived(BaseMessage* message) = 0; virtual void onMessageReceived(tcp::socket* socket, const BaseMessage& baseMessage, char* buffer) = 0;
}; };
class ServerConnection : public MessageReceiver class ServerConnection
{ {
public: public:
ServerConnection(Stream* stream); ServerConnection();
void start(MessageReceiver* receiver, const std::string& ip, size_t port); void start(MessageReceiver* receiver, const std::string& ip, size_t port);
void stop(); void stop();
virtual void onMessageReceived(BaseMessage* message);
private: private:
void socketRead(tcp::socket* socket, void* to, size_t bytes); void socketRead(tcp::socket* socket, void* to, size_t bytes);
@ -34,13 +31,11 @@ private:
boost::asio::ip::tcp::endpoint endpt; boost::asio::ip::tcp::endpoint endpt;
MessageReceiver* messageReceiver; MessageReceiver* messageReceiver;
BaseMessage* getNextMessage(tcp::socket* socket); void getNextMessage(tcp::socket* socket);
boost::asio::io_service io_service; boost::asio::io_service io_service;
tcp::resolver::iterator iterator; tcp::resolver::iterator iterator;
std::atomic<bool> active_; std::atomic<bool> active_;
Stream* stream_;
std::thread* receiverThread; std::thread* receiverThread;
OggDecoder decoder;
}; };

View file

@ -9,12 +9,8 @@
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include "common/sampleFormat.h"
#include "common/utils.h" #include "common/utils.h"
#include "common/log.h" #include "common/log.h"
#include "stream.h"
#include "player.h"
#include "serverConnection.h"
#include "controller.h" #include "controller.h"
@ -27,32 +23,31 @@ namespace po = boost::program_options;
int main (int argc, char *argv[]) int main (int argc, char *argv[])
{ {
int deviceIdx; int deviceIdx;
Stream* stream;
string ip; string ip;
int bufferMs; int bufferMs;
size_t port; size_t port;
bool runAsDaemon; bool runAsDaemon;
string sampleFormat; // string sampleFormat;
po::options_description desc("Allowed options"); po::options_description desc("Allowed options");
desc.add_options() desc.add_options()
("help,h", "produce help message") ("help,h", "produce help message")
("port,p", po::value<size_t>(&port)->default_value(98765), "port where the server listens on") ("port,p", po::value<size_t>(&port)->default_value(98765), "port where the server listens on")
("ip,i", po::value<string>(&ip)->default_value("192.168.0.2"), "server IP") ("ip,i", po::value<string>(&ip)->default_value("192.168.0.2"), "server IP")
("soundcard,s", po::value<int>(&deviceIdx)->default_value(-1), "index of the soundcard") ("soundcard,s", po::value<int>(&deviceIdx)->default_value(-1), "index of the soundcard")
("sampleformat,f", po::value<string>(&sampleFormat)->default_value("48000:16:2"), "sample format") // ("sampleformat,f", po::value<string>(&sampleFormat)->default_value("48000:16:2"), "sample format")
("buffer,b", po::value<int>(&bufferMs)->default_value(300), "buffer size [ms]") ("buffer,b", po::value<int>(&bufferMs)->default_value(300), "buffer size [ms]")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
; ;
po::variables_map vm; po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm); po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm); po::notify(vm);
if (vm.count("help")) if (vm.count("help"))
{ {
cout << desc << "\n"; cout << desc << "\n";
return 1; return 1;
} }
std::clog.rdbuf(new Log("snapclient", LOG_DAEMON)); std::clog.rdbuf(new Log("snapclient", LOG_DAEMON));
if (runAsDaemon) if (runAsDaemon)
@ -61,18 +56,13 @@ int main (int argc, char *argv[])
std::clog << kLogNotice << "daemon started" << std::endl; std::clog << kLogNotice << "daemon started" << std::endl;
} }
stream = new Stream(SampleFormat(sampleFormat)); Controller controller;
stream->setBufferLen(bufferMs); controller.start(ip, port, bufferMs);
Player player(stream);
player.start();
ServerConnection serverConnection(stream);
serverConnection.start(&serverConnection, ip, port);
while(true) while(true)
usleep(10000); usleep(10000);
return 0; return 0;
} }

View file

@ -17,6 +17,13 @@ public:
}; };
struct membuf : public std::basic_streambuf<char>
{
membuf(char* begin, char* end) {
this->setg(begin, begin, end);
}
};
enum message_type enum message_type
{ {
@ -48,9 +55,18 @@ struct BaseMessage
stream.read(reinterpret_cast<char*>(&size), sizeof(uint32_t)); stream.read(reinterpret_cast<char*>(&size), sizeof(uint32_t));
} }
virtual void readVec(std::vector<char>& stream) void deserialize(char* payload)
{ {
vectorwrapbuf<char> databuf(stream); membuf databuf(payload, payload + sizeof(size));
std::istream is(&databuf);
read(is);
}
void deserialize(const BaseMessage& baseMessage, char* payload)
{
type = baseMessage.type;
size = baseMessage.size;
membuf databuf(payload, payload + size);
std::istream is(&databuf); std::istream is(&databuf);
read(is); read(is);
} }

View file

@ -7,10 +7,13 @@
using namespace std; using namespace std;
PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(), format(sampleFormat), idx(0) PcmChunk::PcmChunk(const SampleFormat& sampleFormat, size_t ms) : WireChunk(sampleFormat.rate*sampleFormat.frameSize*ms / 1000), format(sampleFormat), idx(0)
{
}
PcmChunk::PcmChunk() : WireChunk(), idx(0)
{ {
payloadSize = format.rate*format.frameSize*ms / 1000;
payload = (char*)malloc(payloadSize);
} }

View file

@ -14,6 +14,7 @@ class PcmChunk : public WireChunk
{ {
public: public:
PcmChunk(const SampleFormat& sampleFormat, size_t ms); PcmChunk(const SampleFormat& sampleFormat, size_t ms);
PcmChunk();
~PcmChunk(); ~PcmChunk();
int readFrames(void* outputBuffer, size_t frameCount); int readFrames(void* outputBuffer, size_t frameCount);

View file

@ -10,6 +10,7 @@ all: server
server: $(OBJ) server: $(OBJ)
$(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS) $(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS)
strip $(BIN)
%.o: %.cpp %.o: %.cpp
$(CC) $(CFLAGS) -c $< -o $@ $(CC) $(CFLAGS) -c $< -o $@