server Settings

git-svn-id: svn://elaine/murooma/trunk@255 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-12 15:04:17 +00:00
parent f7cf7a2537
commit 979f6460d3
11 changed files with 122 additions and 48 deletions

View file

@ -5,12 +5,12 @@
#include "oggDecoder.h" #include "oggDecoder.h"
#include "pcmDecoder.h" #include "pcmDecoder.h"
#include "player.h" #include "player.h"
#include "common/serverSettings.h"
using namespace std; using namespace std;
Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL) Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL), sampleFormat(NULL)
{ {
decoder = new OggDecoder(); decoder = new OggDecoder();
} }
@ -18,6 +18,7 @@ Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL)
void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer)
{ {
//cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
if (baseMessage.type == message_type::payload) if (baseMessage.type == message_type::payload)
{ {
if ((stream != NULL) && (decoder != NULL)) if ((stream != NULL) && (decoder != NULL))
@ -49,18 +50,23 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa
sampleFormat->deserialize(baseMessage, buffer); sampleFormat->deserialize(baseMessage, buffer);
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
} }
else if (baseMessage.type == message_type::serversettings)
{
ServerSettings* serverSettings = new ServerSettings();
serverSettings->deserialize(baseMessage, buffer);
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
}
} }
void Controller::start(std::string& _ip, size_t _port, int _bufferMs) void Controller::start(const std::string& _ip, size_t _port, int _bufferMs)
{ {
bufferMs = _bufferMs; bufferMs = _bufferMs;
ip = _ip;
connection = new ClientConnection(this, _ip, _port); controlConnection = new ClientConnection(this, ip, _port);
connection->start(); controlConnection->start();
// controlConnection = new ClientConnection(this, _ip, _port + 1);
// controlConnection->start();
controllerThread = new thread(&Controller::worker, this); controllerThread = new thread(&Controller::worker, this);
} }
@ -77,10 +83,13 @@ void Controller::worker()
// Decoder* decoder; // Decoder* decoder;
active_ = true; active_ = true;
while (sampleFormat == NULL) while ((sampleFormat == NULL) && (streamClient == NULL))
{ {
usleep(10000); usleep(10000);
} }
streamClient->start();
stream = new Stream(SampleFormat(*sampleFormat)); stream = new Stream(SampleFormat(*sampleFormat));
stream->setBufferLen(bufferMs); stream->setBufferLen(bufferMs);

View file

@ -5,6 +5,7 @@
#include <atomic> #include <atomic>
#include "common/message.h" #include "common/message.h"
#include "common/socketConnection.h" #include "common/socketConnection.h"
#include "streamClient.h"
#include "decoder.h" #include "decoder.h"
#include "stream.h" #include "stream.h"
@ -13,7 +14,7 @@ class Controller : public MessageReceiver
{ {
public: public:
Controller(); Controller();
void start(std::string& _ip, size_t _port, int _bufferMs); void start(const std::string& _ip, size_t _port, int _bufferMs);
void stop(); void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
@ -21,12 +22,13 @@ private:
void worker(); void worker();
std::atomic<bool> active_; std::atomic<bool> active_;
std::thread* controllerThread; std::thread* controllerThread;
ClientConnection* connection; StreamClient* streamClient;
ClientConnection* controlConnection; ClientConnection* controlConnection;
SampleFormat* sampleFormat; SampleFormat* sampleFormat;
Decoder* decoder; Decoder* decoder;
Stream* stream; Stream* stream;
int bufferMs; int bufferMs;
std::string ip;
}; };

View file

@ -30,7 +30,8 @@ enum message_type
base = 0, base = 0,
header = 1, header = 1,
payload = 2, payload = 2,
sampleformat = 3 sampleformat = 3,
serversettings = 4
}; };

43
common/serverSettings.h Normal file
View file

@ -0,0 +1,43 @@
#ifndef SERVER_SETTINGS_H
#define SERVER_SETTINGS_H
#include "message.h"
class ServerSettings : public BaseMessage
{
public:
ServerSettings(size_t _port = 0) : BaseMessage(message_type::serversettings), port(_port)
{
}
virtual ~ServerSettings()
{
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&port), sizeof(int32_t));
}
virtual uint32_t getSize()
{
return sizeof(int32_t);
}
int32_t port;
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&port), sizeof(int32_t));
}
};
#endif

View file

@ -50,8 +50,10 @@ void SocketConnection::stop()
void SocketConnection::send(BaseMessage* message) void SocketConnection::send(BaseMessage* message)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
if (!connected()) if (!connected())
return; return;
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
boost::asio::streambuf streambuf; boost::asio::streambuf streambuf;
std::ostream stream(&streambuf); std::ostream stream(&streambuf);
message->serialize(stream); message->serialize(stream);
@ -61,12 +63,13 @@ void SocketConnection::send(BaseMessage* message)
void SocketConnection::getNextMessage() void SocketConnection::getNextMessage()
{ {
//cout << "getNextMessage\n";
BaseMessage baseMessage; BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize); vector<char> buffer(baseMsgSize);
socketRead(&buffer[0], baseMsgSize); socketRead(&buffer[0], baseMsgSize);
baseMessage.deserialize(&buffer[0]); baseMessage.deserialize(&buffer[0]);
//cout << "type: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; //cout << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
if (baseMessage.size > buffer.size()) if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size); buffer.resize(baseMessage.size);
socketRead(&buffer[0], baseMessage.size); socketRead(&buffer[0], baseMessage.size);
@ -102,7 +105,7 @@ void ClientConnection::worker()
{ {
{ {
// std::unique_lock<std::mutex> mlock(mutex_); // std::unique_lock<std::mutex> mlock(mutex_);
cout << "connecting\n"; //cout << "connecting\n";
socket.reset(new tcp::socket(io_service)); socket.reset(new tcp::socket(io_service));
struct timeval tv; struct timeval tv;
tv.tv_sec = 5; tv.tv_sec = 5;
@ -110,7 +113,7 @@ cout << "connecting\n";
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
socket->connect(*iterator); socket->connect(*iterator);
connected_ = true; connected_ = true;
cout << "connected\n"; //cout << "connected\n";
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
} }
while(active_) while(active_)

View file

@ -38,7 +38,8 @@ public:
virtual bool connected() virtual bool connected()
{ {
return (connected_ && socket); return (socket != 0);
// return (connected_ && socket);
} }
protected: protected:

View file

@ -2,7 +2,7 @@
#include <iostream> #include <iostream>
ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NULL) ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NULL), sampleFormat(NULL)
{ {
} }
@ -20,10 +20,13 @@ void ControlServer::acceptor()
{ {
socket_ptr sock(new tcp::socket(io_service_)); socket_ptr sock(new tcp::socket(io_service_));
a.accept(*sock); a.accept(*sock);
cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n";
ServerConnection* session = new ServerConnection(this, sock); ServerConnection* session = new ServerConnection(this, sock);
sessions.insert(shared_ptr<ServerConnection>(session)); sessions.insert(shared_ptr<ServerConnection>(session));
session->start(); session->start();
session->send(serverSettings);
session->send(sampleFormat);
session->send(headerChunk);
} }
} }
@ -40,4 +43,28 @@ void ControlServer::stop()
} }
void ControlServer::setHeader(HeaderMessage* header)
{
if (header)
headerChunk = header;
}
void ControlServer::setFormat(SampleFormat* format)
{
if (format)
sampleFormat = format;
}
void ControlServer::setServerSettings(ServerSettings* settings)
{
if (settings)
serverSettings = settings;
}

View file

@ -12,6 +12,7 @@
#include "common/message.h" #include "common/message.h"
#include "common/headerMessage.h" #include "common/headerMessage.h"
#include "common/sampleFormat.h" #include "common/sampleFormat.h"
#include "common/serverSettings.h"
#include "common/socketConnection.h" #include "common/socketConnection.h"
@ -28,14 +29,18 @@ public:
void start(); void start();
void stop(); void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer); virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
void setHeader(HeaderMessage* header);
void setFormat(SampleFormat* format);
void setServerSettings(ServerSettings* settings);
private: private:
void acceptor(); void acceptor();
set<shared_ptr<ServerConnection>> sessions; set<shared_ptr<ServerConnection>> sessions;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
unsigned short port_; unsigned short port_;
shared_ptr<HeaderMessage> headerChunk; HeaderMessage* headerChunk;
shared_ptr<SampleFormat> sampleFormat; SampleFormat* sampleFormat;
ServerSettings* serverSettings;
thread* acceptThread; thread* acceptThread;
}; };

View file

@ -61,11 +61,6 @@ int main(int argc, char* argv[])
openlog ("firstdaemon", LOG_PID, LOG_DAEMON); openlog ("firstdaemon", LOG_PID, LOG_DAEMON);
using namespace std; // For atoi. using namespace std; // For atoi.
StreamServer* server = new StreamServer(port);
server->start();
ControlServer* controlServer = new ControlServer(port + 1);
controlServer->start();
timeval tvChunk; timeval tvChunk;
gettimeofday(&tvChunk, NULL); gettimeofday(&tvChunk, NULL);
@ -86,9 +81,15 @@ size_t duration = 50;
return 1; return 1;
} }
server->setFormat(format); std::auto_ptr<ServerSettings> serverSettings(new ServerSettings(port + 1));
shared_ptr<HeaderMessage> header(encoder->getHeader()); ControlServer* controlServer = new ControlServer(port);
server->setHeader(header); controlServer->setServerSettings(serverSettings.get());
controlServer->setFormat(&format);
controlServer->setHeader(encoder->getHeader());
controlServer->start();
StreamServer* server = new StreamServer(port + 1);
server->start();
while (!g_terminated) while (!g_terminated)
{ {

View file

@ -57,32 +57,17 @@ void StreamServer::acceptor()
a.accept(*sock); a.accept(*sock);
cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n";
StreamSession* session = new StreamSession(sock); StreamSession* session = new StreamSession(sock);
session->send(sampleFormat);
session->send(headerChunk);
session->start();
sessions.insert(shared_ptr<StreamSession>(session)); sessions.insert(shared_ptr<StreamSession>(session));
session->start();
} }
} }
void StreamServer::setHeader(shared_ptr<HeaderMessage> header)
{
if (header)
headerChunk = header;
}
void StreamServer::setFormat(SampleFormat& format)
{
sampleFormat = shared_ptr<SampleFormat>(new SampleFormat(format));
}
void StreamServer::send(shared_ptr<BaseMessage> message) void StreamServer::send(shared_ptr<BaseMessage> message)
{ {
for (std::set<shared_ptr<StreamSession>>::iterator it = sessions.begin(); it != sessions.end(); ) for (std::set<shared_ptr<StreamSession>>::iterator it = sessions.begin(); it != sessions.end(); )
{ {
if (!(*it)->isActive()) if (!(*it)->active())
{ {
cout << "Session inactive. Removing\n"; cout << "Session inactive. Removing\n";
sessions.erase(it++); sessions.erase(it++);

View file

@ -39,9 +39,6 @@ class StreamServer
{ {
public: public:
StreamServer(unsigned short port); StreamServer(unsigned short port);
void setHeader(shared_ptr<HeaderMessage> header);
void setFormat(SampleFormat& format);
void send(shared_ptr<BaseMessage> message); void send(shared_ptr<BaseMessage> message);
void start(); void start();