renamed recevier to serverConnection

git-svn-id: svn://elaine/murooma/trunk@235 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-06 08:37:22 +00:00
parent 670f8e44ca
commit 18f3c5178f
4 changed files with 50 additions and 30 deletions

View file

@ -3,7 +3,7 @@ CC = /usr/bin/g++
CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -g -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -g -D_REENTRANT -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lasound -logg -lvorbis -lvorbisenc
OBJ = snapClient.o stream.o player.o receiver.o oggDecoder.o pcmDecoder.o ../common/message.o ../common/log.o ../common/sampleFormat.o OBJ = snapClient.o stream.o player.o serverConnection.o oggDecoder.o pcmDecoder.o ../common/message.o ../common/log.o ../common/sampleFormat.o
BIN = snapclient BIN = snapclient
all: client all: client

View file

@ -1,9 +1,7 @@
#include "receiver.h" #include "serverConnection.h"
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
#include <iostream> #include <iostream>
#include "common/log.h" #include "common/log.h"
#include "oggDecoder.h"
#include "pcmDecoder.h"
#define PCM_DEVICE "default" #define PCM_DEVICE "default"
@ -11,12 +9,12 @@
using namespace std; using namespace std;
Receiver::Receiver(Stream* stream) : active_(false), stream_(stream) ServerConnection::ServerConnection(Stream* stream) : active_(false), stream_(stream)
{ {
} }
void Receiver::socketRead(tcp::socket* socket, void* to, size_t bytes) void ServerConnection::socketRead(tcp::socket* socket, void* to, size_t bytes)
{ {
size_t toRead = bytes; size_t toRead = bytes;
size_t len = 0; size_t len = 0;
@ -29,22 +27,23 @@ void Receiver::socketRead(tcp::socket* socket, void* to, size_t bytes)
} }
void Receiver::start(const std::string& ip, int port) void ServerConnection::start(MessageReceiver* receiver, const std::string& ip, int port)
{ {
messageReceiver = receiver;
tcp::resolver resolver(io_service); tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port)); tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
iterator = resolver.resolve(query); iterator = resolver.resolve(query);
receiverThread = new thread(&Receiver::worker, this); receiverThread = new thread(&ServerConnection::worker, this);
} }
void Receiver::stop() void ServerConnection::stop()
{ {
active_ = false; active_ = false;
} }
BaseMessage* Receiver::getNextMessage(tcp::socket* socket) BaseMessage* ServerConnection::getNextMessage(tcp::socket* socket)
{ {
BaseMessage baseMessage; BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
@ -67,10 +66,26 @@ BaseMessage* Receiver::getNextMessage(tcp::socket* socket)
} }
void Receiver::worker() void ServerConnection::onMessageReceived(BaseMessage* message)
{
if (message->type == message_type::payload)
{
if (decoder.decode((PcmChunk*)message))
stream_->addChunk((PcmChunk*)message);
else
delete message;
//cout << ", decoded: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n";
}
else if (message->type == message_type::header)
{
decoder.setHeader((HeaderMessage*)message);
}
}
void ServerConnection::worker()
{ {
active_ = true; active_ = true;
OggDecoder decoder;
while (active_) while (active_)
{ {
try try
@ -89,18 +104,10 @@ void Receiver::worker()
if (message == NULL) if (message == NULL)
continue; continue;
if (message->type == message_type::payload) if (messageReceiver != NULL)
{ messageReceiver->onMessageReceived(message);
if (decoder.decode((PcmChunk*)message))
stream_->addChunk((PcmChunk*)message);
else else
delete message; delete message;
//cout << ", decoded: " << chunk->payloadSize << ", Duration: " << chunk->getDuration() << ", sec: " << chunk->tv_sec << ", usec: " << chunk->tv_usec/1000 << ", type: " << chunk->type << "\n";
}
else if (message->type == message_type::header)
{
decoder.setHeader((HeaderMessage*)message);
}
} }
} }
catch (const std::exception& e) catch (const std::exception& e)

View file

@ -6,18 +6,30 @@
#include <atomic> #include <atomic>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include "stream.h" #include "stream.h"
#include "oggDecoder.h"
#include "pcmDecoder.h"
using boost::asio::ip::tcp; using boost::asio::ip::tcp;
class Receiver class MessageReceiver
{ {
public: public:
Receiver(Stream* stream); virtual void onMessageReceived(BaseMessage* message) = 0;
void start(const std::string& ip, int port); };
class ServerConnection : public MessageReceiver
{
public:
ServerConnection(Stream* stream);
void start(MessageReceiver* receiver, const std::string& ip, int port);
void stop(); void stop();
virtual void onMessageReceived(BaseMessage* message);
private: private:
MessageReceiver* messageReceiver;
BaseMessage* getNextMessage(tcp::socket* socket); BaseMessage* getNextMessage(tcp::socket* socket);
void socketRead(tcp::socket* socket, void* to, size_t bytes); void socketRead(tcp::socket* socket, void* to, size_t bytes);
void worker(); void worker();
@ -26,6 +38,7 @@ private:
std::atomic<bool> active_; std::atomic<bool> active_;
Stream* stream_; Stream* stream_;
std::thread* receiverThread; std::thread* receiverThread;
OggDecoder decoder;
}; };

View file

@ -14,7 +14,7 @@
#include "common/log.h" #include "common/log.h"
#include "stream.h" #include "stream.h"
#include "player.h" #include "player.h"
#include "receiver.h" #include "serverConnection.h"
@ -65,8 +65,8 @@ int main (int argc, char *argv[])
Player player(stream); Player player(stream);
player.start(); player.start();
Receiver receiver(stream); ServerConnection serverConnection(stream);
receiver.start(ip, port); serverConnection.start(&serverConnection, ip, port);
while(true) while(true)
usleep(1000); usleep(1000);