basic telnet like control server

This commit is contained in:
badaix 2015-08-27 23:20:10 +02:00
parent 635daabd8c
commit 504884feb9
14 changed files with 7887 additions and 119 deletions

View file

@ -23,6 +23,7 @@
#include "clientConnection.h" #include "clientConnection.h"
#include "common/utils.h" #include "common/utils.h"
#include "common/snapException.h" #include "common/snapException.h"
#include "message/hello.h"
using namespace std; using namespace std;
@ -69,7 +70,10 @@ void ClientConnection::start()
// setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); // setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
// setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); // setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
socket_->connect(*iterator); socket_->connect(*iterator);
logO << "My MAC: \"" << getMacAddress(socket_->native()) << "\"\n"; logO << "My MAC: \"" << getMacAddress(socket_->native()) << "\"\n";
msg::Hello hello(getMacAddress(socket_->native()));
send(&hello);
connected_ = true; connected_ = true;
logS(kLogNotice) << "Connected to " << socket_->remote_endpoint().address().to_string() << endl; logS(kLogNotice) << "Connected to " << socket_->remote_endpoint().address().to_string() << endl;
active_ = true; active_ = true;

View file

@ -77,7 +77,7 @@ int main (int argc, char *argv[])
("version,v", "show version number") ("version,v", "show version number")
("list,l", po::bool_switch(&listPcmDevices)->default_value(false), "list pcm devices") ("list,l", po::bool_switch(&listPcmDevices)->default_value(false), "list pcm devices")
("ip,i", po::value<string>(&ip), "server IP") ("ip,i", po::value<string>(&ip), "server IP")
("port,p", po::value<size_t>(&port)->default_value(98765), "server port") ("port,p", po::value<size_t>(&port)->default_value(1704), "server port")
("soundcard,s", po::value<string>(&soundcard)->default_value("default"), "index or name of the soundcard") ("soundcard,s", po::value<string>(&soundcard)->default_value("default"), "index or name of the soundcard")
("daemon,d", po::value<int>(&runAsDaemon)->implicit_value(-3), "daemonize, optional process priority [-20..19]") ("daemon,d", po::value<int>(&runAsDaemon)->implicit_value(-3), "daemonize, optional process priority [-20..19]")
("latency", po::value<size_t>(&latency)->default_value(0), "latency of the soundcard") ("latency", po::value<size_t>(&latency)->default_value(0), "latency of the soundcard")

73
message/hello.h Normal file
View file

@ -0,0 +1,73 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef HELLO_MSG_H
#define HELLO_MSG_H
#include "message.h"
#include <string>
namespace msg
{
class Hello : public BaseMessage
{
public:
Hello() : BaseMessage(message_type::kHello), macAddress("")
{
}
Hello(const std::string& _macAddress) : BaseMessage(message_type::kHello), macAddress(_macAddress)
{
}
virtual ~Hello()
{
}
virtual void read(std::istream& stream)
{
int16_t size;
stream.read(reinterpret_cast<char *>(&size), sizeof(int16_t));
macAddress.resize(size);
stream.read(&macAddress[0], size);
}
virtual uint32_t getSize() const
{
return sizeof(int16_t) + macAddress.size();
}
std::string macAddress;
protected:
virtual void doserialize(std::ostream& stream) const
{
int16_t size(macAddress.size());
stream.write(reinterpret_cast<const char *>(&size), sizeof(int16_t));
stream.write(macAddress.c_str(), size);
}
};
}
#endif

View file

@ -56,7 +56,8 @@ enum message_type
kTime = 5, kTime = 5,
kRequest = 6, kRequest = 6,
kAck = 7, kAck = 7,
kCommand = 8 kCommand = 8,
kHello = 9
}; };

View file

@ -1,4 +1,4 @@
VERSION = 0.3.1 VERSION = 0.3.90
TARGET = snapserver TARGET = snapserver
SHELL = /bin/bash SHELL = /bin/bash
@ -6,7 +6,7 @@ CC = /usr/bin/g++
CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I.. CFLAGS = -std=c++0x -Wall -Wno-unused-function -O3 -pthread -DVERSION=\"$(VERSION)\" -I..
LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common LDFLAGS = -lrt -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg -lFLAC -lavahi-client -lavahi-common
OBJ = snapServer.o controlServer.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o serverSession.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o OBJ = snapServer.o controlServer.o controlSession.o streamServer.o encoder/encoderFactory.o encoder/flacEncoder.o encoder/pcmEncoder.o encoder/oggEncoder.o serverSession.o publishAvahi.o pipeReader.o ../common/log.o ../message/pcmChunk.o ../message/sampleFormat.o
BIN = snapserver BIN = snapserver
all: $(TARGET) all: $(TARGET)

View file

@ -27,18 +27,18 @@
using namespace std; using namespace std;
ControlServer::ControlServer(const ControlServerSettings& controlServerSettings) : settings_(controlServerSettings), sampleFormat_(controlServerSettings.sampleFormat) ControlServer::ControlServer(size_t port) : port_(port)
{ {
serverSettings_.bufferMs = settings_.bufferMs;
} }
ControlServer::~ControlServer() ControlServer::~ControlServer()
{ {
stop();
} }
void ControlServer::send(const msg::BaseMessage* message) void ControlServer::send(const std::string& message)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ) for (auto it = sessions_.begin(); it != sessions_.end(); )
@ -47,7 +47,7 @@ void ControlServer::send(const msg::BaseMessage* message)
{ {
logS(kLogErr) << "Session inactive. Removing\n"; logS(kLogErr) << "Session inactive. Removing\n";
// don't block: remove ServerSession in a thread // don't block: remove ServerSession in a thread
auto func = [](shared_ptr<ServerSession> s)->void{s->stop();}; auto func = [](shared_ptr<ControlSession> s)->void{s->stop();};
std::thread t(func, *it); std::thread t(func, *it);
t.detach(); t.detach();
sessions_.erase(it++); sessions_.erase(it++);
@ -56,73 +56,27 @@ void ControlServer::send(const msg::BaseMessage* message)
++it; ++it;
} }
std::shared_ptr<const msg::BaseMessage> shared_message(message);
for (auto s : sessions_) for (auto s : sessions_)
s->add(shared_message); s->add(message);
} }
void ControlServer::onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration) void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message)
{ {
// logO << "onChunkRead " << duration << "ms\n"; logO << "received: " << message << "\n";
send(chunk); if (message == "quit")
}
void ControlServer::onResync(const PipeReader* pipeReader, double ms)
{ {
logO << "onResync " << ms << "ms\n"; for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
}
void ControlServer::onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
// logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; if (it->get() == connection)
if (baseMessage.type == message_type::kRequest)
{ {
msg::Request requestMsg; sessions_.erase(it);
requestMsg.deserialize(baseMessage, buffer); break;
// logO << "request: " << requestMsg.request << "\n";
if (requestMsg.request == kTime)
{
msg::Time timeMsg;
timeMsg.refersTo = requestMsg.id;
timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.;
// logD << "Latency: " << timeMsg.latency << ", refers to: " << timeMsg.refersTo << "\n";
connection->send(&timeMsg);
}
else if (requestMsg.request == kServerSettings)
{
std::unique_lock<std::mutex> mlock(mutex_);
serverSettings_.refersTo = requestMsg.id;
connection->send(&serverSettings_);
}
else if (requestMsg.request == kSampleFormat)
{
std::unique_lock<std::mutex> mlock(mutex_);
sampleFormat_.refersTo = requestMsg.id;
connection->send(&sampleFormat_);
}
else if (requestMsg.request == kHeader)
{
std::unique_lock<std::mutex> mlock(mutex_);
msg::Header* headerChunk = pipeReader_->getHeader();
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
} }
} }
else if (baseMessage.type == message_type::kCommand)
{
msg::Command commandMsg;
commandMsg.deserialize(baseMessage, buffer);
if (commandMsg.command == "startStream")
{
msg::Ack ackMsg;
ackMsg.refersTo = commandMsg.id;
connection->send(&ackMsg);
connection->setStreamActive(true);
}
} }
else
connection->send("echo " + message);
} }
@ -142,10 +96,9 @@ void ControlServer::handleAccept(socket_ptr socket)
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<ServerSession> session = make_shared<ServerSession>(this, socket); shared_ptr<ControlSession> session = make_shared<ControlSession>(this, socket);
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
session->setBufferMs(settings_.bufferMs);
session->start(); session->start();
sessions_.insert(session); sessions_.insert(session);
} }
@ -155,9 +108,7 @@ void ControlServer::handleAccept(socket_ptr socket)
void ControlServer::start() void ControlServer::start()
{ {
pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs); acceptor_ = make_shared<tcp::acceptor>(io_service_, tcp::endpoint(tcp::v4(), port_));
pipeReader_->start();
acceptor_ = make_shared<tcp::acceptor>(io_service_, tcp::endpoint(tcp::v4(), settings_.port));
startAccept(); startAccept();
acceptThread_ = thread(&ControlServer::acceptor, this); acceptThread_ = thread(&ControlServer::acceptor, this);
} }
@ -168,7 +119,6 @@ void ControlServer::stop()
acceptor_->cancel(); acceptor_->cancel();
io_service_.stop(); io_service_.stop();
acceptThread_.join(); acceptThread_.join();
pipeReader_->stop();
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ++it) for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
(*it)->stop(); (*it)->stop();

View file

@ -27,7 +27,7 @@
#include <sstream> #include <sstream>
#include <mutex> #include <mutex>
#include "serverSession.h" #include "controlSession.h"
#include "pipeReader.h" #include "pipeReader.h"
#include "common/queue.h" #include "common/queue.h"
#include "message/message.h" #include "message/message.h"
@ -40,65 +40,35 @@ using boost::asio::ip::tcp;
typedef std::shared_ptr<tcp::socket> socket_ptr; typedef std::shared_ptr<tcp::socket> socket_ptr;
struct ControlServerSettings /// Telnet like remote control
{
ControlServerSettings() :
port(98765),
fifoName("/tmp/snapfifo"),
codec("flac"),
bufferMs(1000),
sampleFormat("44100:16:2"),
pipeReadMs(20)
{
}
size_t port;
std::string fifoName;
std::string codec;
int32_t bufferMs;
msg::SampleFormat sampleFormat;
size_t pipeReadMs;
};
/// Forwars PCM data to the connected clients
/** /**
* Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream. * Telnet like remote control
* Accepts and holds client connections (ServerSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients
*/ */
class ControlServer : public MessageReceiver, PipeListener class ControlServer : public ControlMessageReceiver
{ {
public: public:
ControlServer(const ControlServerSettings& controlServerSettings); ControlServer(size_t port);
virtual ~ControlServer(); virtual ~ControlServer();
void start(); void start();
void stop(); void stop();
/// Send a message to all connceted clients /// Send a message to all connceted clients
void send(const msg::BaseMessage* message); void send(const std::string& message);
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer); virtual void onMessageReceived(ControlSession* connection, const std::string& message);
/// Implementation of PipeListener
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration);
virtual void onResync(const PipeReader* pipeReader, double ms);
private: private:
void startAccept(); void startAccept();
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
void acceptor(); void acceptor();
mutable std::mutex mutex_; mutable std::mutex mutex_;
PipeReader* pipeReader_; size_t port_;
std::set<std::shared_ptr<ServerSession>> sessions_; std::set<std::shared_ptr<ControlSession>> sessions_;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
std::shared_ptr<tcp::acceptor> acceptor_; std::shared_ptr<tcp::acceptor> acceptor_;
ControlServerSettings settings_;
msg::SampleFormat sampleFormat_;
msg::ServerSettings serverSettings_;
std::thread acceptThread_; std::thread acceptThread_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_; Queue<std::shared_ptr<msg::BaseMessage>> messages_;
}; };

153
server/controlSession.cpp Normal file
View file

@ -0,0 +1,153 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <mutex>
#include "controlSession.h"
#include "common/log.h"
#include "message/pcmChunk.h"
using namespace std;
ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver)
{
socket_ = socket;
}
ControlSession::~ControlSession()
{
stop();
}
void ControlSession::start()
{
active_ = true;
readerThread_ = new thread(&ControlSession::reader, this);
writerThread_ = new thread(&ControlSession::writer, this);
}
void ControlSession::stop()
{
std::unique_lock<std::mutex> mlock(mutex_);
active_ = false;
try
{
boost::system::error_code ec;
if (socket_)
{
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec) logE << "Error in socket shutdown: " << ec << "\n";
socket_->close(ec);
if (ec) logE << "Error in socket close: " << ec << "\n";
}
if (readerThread_)
{
logD << "joining readerThread\n";
readerThread_->join();
delete readerThread_;
}
if (writerThread_)
{
logD << "joining writerThread\n";
writerThread_->join();
delete writerThread_;
}
}
catch(...)
{
}
readerThread_ = NULL;
writerThread_ = NULL;
socket_ = NULL;
logD << "ControlSession stopped\n";
}
void ControlSession::add(const std::string& message)
{
messages_.push(message);
}
bool ControlSession::send(const std::string& message) const
{
// logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
std::unique_lock<std::mutex> mlock(mutex_);
if (!socket_ || !active_)
return false;
boost::asio::streambuf streambuf;
std::ostream request_stream(&streambuf);
request_stream << message << "\r\n";
boost::asio::write(*socket_.get(), streambuf);
// logO << "done: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
return true;
}
void ControlSession::reader()
{
active_ = true;
try
{
while (active_)
{
boost::asio::streambuf response;
boost::asio::read_until(*socket_, response, "\r\n");
std::istream response_stream(&response);
std::string strResponse;
response_stream >> strResponse;
if (messageReceiver_ != NULL)
messageReceiver_->onMessageReceived(this, strResponse);
}
}
catch (const std::exception& e)
{
logS(kLogErr) << "Exception in ControlSession::reader(): " << e.what() << endl;
}
active_ = false;
}
void ControlSession::writer()
{
try
{
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
string message;
while (active_)
{
if (messages_.try_pop(message, std::chrono::milliseconds(500)))
send(message);
}
}
catch (const std::exception& e)
{
logS(kLogErr) << "Exception in ControlSession::writer(): " << e.what() << endl;
}
active_ = false;
}

94
server/controlSession.h Normal file
View file

@ -0,0 +1,94 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef CONTROL_SESSION_H
#define CONTROL_SESSION_H
#include <string>
#include <thread>
#include <atomic>
#include <mutex>
#include <memory>
#include <boost/asio.hpp>
#include <condition_variable>
#include <set>
#include "message/message.h"
#include "common/queue.h"
using boost::asio::ip::tcp;
class ControlSession;
/// Interface: callback for a received message.
class ControlMessageReceiver
{
public:
virtual void onMessageReceived(ControlSession* connection, const std::string& message) = 0;
};
/// Endpoint for a connected client.
/**
* Endpoint for a connected client.
* Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the MessageReceiver callback
*/
class ControlSession
{
public:
/// ctor. Received message from the client are passed to MessageReceiver
ControlSession(ControlMessageReceiver* receiver, std::shared_ptr<tcp::socket> socket);
~ControlSession();
void start();
void stop();
/// Sends a message to the client (synchronous)
bool send(const std::string& message) const;
/// Sends a message to the client (asynchronous)
void add(const std::string& message);
bool active() const
{
return active_;
}
protected:
void reader();
void writer();
std::atomic<bool> active_;
mutable std::mutex mutex_;
std::thread* readerThread_;
std::thread* writerThread_;
std::shared_ptr<tcp::socket> socket_;
ControlMessageReceiver* messageReceiver_;
Queue<std::string> messages_;
};
#endif

7198
server/json.hpp Normal file

File diff suppressed because it is too large Load diff

View file

@ -16,8 +16,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
***/ ***/
#ifndef SERVER_CONNECTION_H #ifndef SERVER_SESSION_H
#define SERVER_CONNECTION_H #define SERVER_SESSION_H
#include <string> #include <string>
#include <thread> #include <thread>
@ -84,6 +84,13 @@ public:
bufferMs_ = bufferMs; bufferMs_ = bufferMs;
} }
std::string macAddress;
std::string getIP()
{
return socket_->remote_endpoint().address().to_string();
}
protected: protected:
void socketRead(void* _to, size_t _bytes); void socketRead(void* _to, size_t _bytes);
void getNextMessage(); void getNextMessage();

View file

@ -29,7 +29,7 @@
#include "message/sampleFormat.h" #include "message/sampleFormat.h"
#include "message/message.h" #include "message/message.h"
#include "encoder/encoderFactory.h" #include "encoder/encoderFactory.h"
#include "controlServer.h" #include "streamServer.h"
#include "publishAvahi.h" #include "publishAvahi.h"
@ -46,7 +46,7 @@ int main(int argc, char* argv[])
{ {
try try
{ {
ControlServerSettings settings; StreamServerSettings settings;
int runAsDaemon; int runAsDaemon;
string sampleFormat; string sampleFormat;
@ -122,14 +122,14 @@ int main(int argc, char* argv[])
if (settings.bufferMs < 400) if (settings.bufferMs < 400)
settings.bufferMs = 400; settings.bufferMs = 400;
settings.sampleFormat = sampleFormat; settings.sampleFormat = sampleFormat;
std::unique_ptr<ControlServer> controlServer(new ControlServer(settings)); std::unique_ptr<StreamServer> streamServer(new StreamServer(settings));
controlServer->start(); streamServer->start();
while (!g_terminated) while (!g_terminated)
usleep(100*1000); usleep(100*1000);
logO << "Stopping controlServer" << endl; logO << "Stopping streamServer" << endl;
controlServer->stop(); streamServer->stop();
logO << "done" << endl; logO << "done" << endl;
} }
catch (const std::exception& e) catch (const std::exception& e)

206
server/streamServer.cpp Normal file
View file

@ -0,0 +1,206 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "streamServer.h"
#include "message/time.h"
#include "message/ack.h"
#include "message/request.h"
#include "message/command.h"
#include "message/hello.h"
#include "common/log.h"
#include "json.hpp"
#include <iostream>
using namespace std;
using json = nlohmann::json;
StreamServer::StreamServer(const StreamServerSettings& streamServerSettings) : settings_(streamServerSettings), sampleFormat_(streamServerSettings.sampleFormat)
{
serverSettings_.bufferMs = settings_.bufferMs;
}
StreamServer::~StreamServer()
{
}
void StreamServer::send(const msg::BaseMessage* message)
{
std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); )
{
if (!(*it)->active())
{
logS(kLogErr) << "Session inactive. Removing\n";
// don't block: remove ServerSession in a thread
auto func = [](shared_ptr<ServerSession> s)->void{s->stop();};
std::thread t(func, *it);
t.detach();
controlServer->send("Client gone: " + (*it)->macAddress);
sessions_.erase(it++);
}
else
++it;
}
std::shared_ptr<const msg::BaseMessage> shared_message(message);
for (auto s : sessions_)
s->add(shared_message);
}
void StreamServer::onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration)
{
// logO << "onChunkRead " << duration << "ms\n";
send(chunk);
}
void StreamServer::onResync(const PipeReader* pipeReader, double ms)
{
logO << "onResync " << ms << "ms\n";
}
void StreamServer::onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{
// logO << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::kRequest)
{
msg::Request requestMsg;
requestMsg.deserialize(baseMessage, buffer);
// logO << "request: " << requestMsg.request << "\n";
if (requestMsg.request == kTime)
{
msg::Time timeMsg;
timeMsg.refersTo = requestMsg.id;
timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.;
// logD << "Latency: " << timeMsg.latency << ", refers to: " << timeMsg.refersTo << "\n";
connection->send(&timeMsg);
}
else if (requestMsg.request == kServerSettings)
{
std::unique_lock<std::mutex> mlock(mutex_);
serverSettings_.refersTo = requestMsg.id;
connection->send(&serverSettings_);
}
else if (requestMsg.request == kSampleFormat)
{
std::unique_lock<std::mutex> mlock(mutex_);
sampleFormat_.refersTo = requestMsg.id;
connection->send(&sampleFormat_);
}
else if (requestMsg.request == kHeader)
{
std::unique_lock<std::mutex> mlock(mutex_);
msg::Header* headerChunk = pipeReader_->getHeader();
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
}
}
else if (baseMessage.type == message_type::kCommand)
{
msg::Command commandMsg;
commandMsg.deserialize(baseMessage, buffer);
if (commandMsg.command == "startStream")
{
msg::Ack ackMsg;
ackMsg.refersTo = commandMsg.id;
connection->send(&ackMsg);
connection->setStreamActive(true);
}
}
else if (baseMessage.type == message_type::kHello)
{
msg::Hello helloMsg;
helloMsg.deserialize(baseMessage, buffer);
connection->macAddress = helloMsg.macAddress;
logO << "Hello from " << connection->macAddress << "\n";
json j = {
{"event", "newConnection"},
{"client", {
{"ip", connection->getIP()},
{"mac", connection->macAddress}
}}
};
controlServer->send(j.dump());
}
}
void StreamServer::startAccept()
{
socket_ptr socket = make_shared<tcp::socket>(io_service_);
acceptor_->async_accept(*socket, bind(&StreamServer::handleAccept, this, socket));
}
void StreamServer::handleAccept(socket_ptr socket)
{
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<ServerSession> session = make_shared<ServerSession>(this, socket);
{
std::unique_lock<std::mutex> mlock(mutex_);
session->setBufferMs(settings_.bufferMs);
session->start();
sessions_.insert(session);
}
startAccept();
}
void StreamServer::start()
{
controlServer.reset(new ControlServer(settings_.port + 1));
controlServer->start();
pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs);
pipeReader_->start();
acceptor_ = make_shared<tcp::acceptor>(io_service_, tcp::endpoint(tcp::v4(), settings_.port));
startAccept();
acceptThread_ = thread(&StreamServer::acceptor, this);
}
void StreamServer::stop()
{
acceptor_->cancel();
io_service_.stop();
acceptThread_.join();
pipeReader_->stop();
std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
(*it)->stop();
}
void StreamServer::acceptor()
{
io_service_.run();
}

112
server/streamServer.h Normal file
View file

@ -0,0 +1,112 @@
/***
This file is part of snapcast
Copyright (C) 2015 Johannes Pohl
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#ifndef STREAM_SERVER_H
#define STREAM_SERVER_H
#include <boost/asio.hpp>
#include <vector>
#include <thread>
#include <memory>
#include <set>
#include <sstream>
#include <mutex>
#include "serverSession.h"
#include "pipeReader.h"
#include "common/queue.h"
#include "message/message.h"
#include "message/header.h"
#include "message/sampleFormat.h"
#include "message/serverSettings.h"
#include "controlServer.h"
using boost::asio::ip::tcp;
typedef std::shared_ptr<tcp::socket> socket_ptr;
struct StreamServerSettings
{
StreamServerSettings() :
port(1704),
fifoName("/tmp/snapfifo"),
codec("flac"),
bufferMs(1000),
sampleFormat("44100:16:2"),
pipeReadMs(20)
{
}
size_t port;
std::string fifoName;
std::string codec;
int32_t bufferMs;
msg::SampleFormat sampleFormat;
size_t pipeReadMs;
};
/// Forwars PCM data to the connected clients
/**
* Reads PCM data using PipeReader, implements PipeListener to get the (encoded) PCM stream.
* Accepts and holds client connections (ServerSession)
* Receives (via the MessageReceiver interface) and answers messages from the clients
* Forwards PCM data to the clients
*/
class StreamServer : public MessageReceiver, PipeListener
{
public:
StreamServer(const StreamServerSettings& streamServerSettings);
virtual ~StreamServer();
void start();
void stop();
/// Send a message to all connceted clients
void send(const msg::BaseMessage* message);
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
virtual void onMessageReceived(ServerSession* connection, const msg::BaseMessage& baseMessage, char* buffer);
/// Implementation of PipeListener
virtual void onChunkRead(const PipeReader* pipeReader, const msg::PcmChunk* chunk, double duration);
virtual void onResync(const PipeReader* pipeReader, double ms);
private:
void startAccept();
void handleAccept(socket_ptr socket);
void acceptor();
mutable std::mutex mutex_;
PipeReader* pipeReader_;
std::set<std::shared_ptr<ServerSession>> sessions_;
boost::asio::io_service io_service_;
std::shared_ptr<tcp::acceptor> acceptor_;
StreamServerSettings settings_;
msg::SampleFormat sampleFormat_;
msg::ServerSettings serverSettings_;
std::thread acceptThread_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_;
std::unique_ptr<ControlServer> controlServer;
};
#endif