git-svn-id: svn://elaine/murooma/trunk@272 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-17 21:58:38 +00:00
parent 16ef1ae634
commit feabfee936
50 changed files with 1996 additions and 1984 deletions

View file

@ -1,92 +1,12 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<CodeBlocks_layout_file>
<ActiveTarget name="Release" />
<File name="server/controlServer.h" open="1" top="0" tabpos="4" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/requestMsg.h" open="1" top="0" tabpos="9" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/message.h" open="1" top="0" tabpos="16" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/pcmChunk.cpp" open="1" top="0" tabpos="6" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/log.cpp" open="1" top="0" tabpos="7" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/oggEncoder.h" open="1" top="0" tabpos="25" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/sampleFormat.cpp" open="1" top="0" tabpos="17" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/wireChunk.h" open="1" top="0" tabpos="26" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/serverConnection.cpp" open="1" top="0" tabpos="12" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/socketConnection.h" open="1" top="0" tabpos="15" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/headerMessage.h" open="1" top="0" tabpos="22" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/timeMsg.h" open="1" top="0" tabpos="10" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/socketConnection.cpp" open="1" top="0" tabpos="21" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/oggEncoder.cpp" open="1" top="0" tabpos="28" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="168" />
</Cursor>
</File>
<File name="server/pcmEncoder.cpp" open="1" top="0" tabpos="11" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/sampleFormat.h" open="1" top="0" tabpos="30" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/log.h" open="1" top="0" tabpos="13" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/pcmChunk.h" open="1" top="0" tabpos="8" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="common/sampleFormat.cpp" open="1" top="0" tabpos="17" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
@ -101,24 +21,34 @@
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/timeUtils.h" open="1" top="0" tabpos="1" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="common/headerMessage.h" open="1" top="0" tabpos="22" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/pcmEncoder.h" open="1" top="0" tabpos="23" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="common/sampleFormat.h" open="1" top="1" tabpos="48" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/serverSettings.h" open="1" top="0" tabpos="29" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="common/pcmChunk.h" open="1" top="0" tabpos="8" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/controlServer.cpp" open="1" top="0" tabpos="3" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="server/serverConnection.cpp" open="1" top="0" tabpos="12" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="215" topLine="82" />
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/requestMsg.h" open="1" top="0" tabpos="9" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/wireChunk.h" open="1" top="0" tabpos="26" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/queue.h" open="1" top="0" tabpos="14" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
@ -126,7 +56,12 @@
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/utils.h" open="1" top="0" tabpos="30" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="common/log.cpp" open="1" top="0" tabpos="7" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/oggEncoder.cpp" open="1" top="0" tabpos="27" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
@ -136,14 +71,54 @@
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/streamServer.cpp" open="1" top="0" tabpos="24" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="server/pcmEncoder.cpp" open="1" top="0" tabpos="11" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/pcmEncoder.h" open="1" top="0" tabpos="23" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/pcmChunk.cpp" open="1" top="0" tabpos="6" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/message.h" open="1" top="0" tabpos="16" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/snapServer.cpp" open="1" top="0" tabpos="2" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="4569" topLine="114" />
<Cursor1 position="3872" topLine="138" />
</Cursor>
</File>
<File name="server/streamServer.cpp" open="1" top="0" tabpos="24" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/controlServer.h" open="1" top="0" tabpos="4" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/timeUtils.h" open="1" top="0" tabpos="1" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/socketConnection.h" open="1" top="0" tabpos="15" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/log.h" open="1" top="0" tabpos="13" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/serverConnection.h" open="1" top="0" tabpos="18" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
@ -151,4 +126,29 @@
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/oggEncoder.h" open="1" top="0" tabpos="25" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="server/controlServer.cpp" open="1" top="0" tabpos="3" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="215" topLine="0" />
</Cursor>
</File>
<File name="common/timeMsg.h" open="1" top="0" tabpos="10" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/utils.h" open="1" top="0" tabpos="29" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/serverSettings.h" open="1" top="0" tabpos="28" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
</CodeBlocks_layout_file>

View file

@ -1,26 +1,81 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<CodeBlocks_layout_file>
<ActiveTarget name="Release" />
<File name="client/player.cpp" open="1" top="0" tabpos="16" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="client/oggDecoder.cpp" open="1" top="0" tabpos="14" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/snapClient.cpp" open="1" top="0" tabpos="2" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="1639" topLine="49" />
</Cursor>
</File>
<File name="client/clientConnection.h" open="1" top="0" tabpos="10" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/pcmDecoder.cpp" open="1" top="0" tabpos="7" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<File name="client/doubleBuffer.h" open="1" top="0" tabpos="3" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/stream.cpp" open="1" top="0" tabpos="1" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="767" topLine="20" />
</Cursor>
</File>
<File name="common/utils.h" open="0" top="0" tabpos="0" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="707" topLine="26" />
</Cursor>
</File>
<File name="client/stream.h" open="1" top="0" tabpos="4" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/clientConnection.cpp" open="1" top="0" tabpos="9" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/timeProvider.cpp" open="1" top="0" tabpos="8" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/streamClient.h" open="1" top="0" tabpos="5" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/controller.cpp" open="1" top="0" tabpos="6" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/timeUtils.h" open="0" top="0" tabpos="0" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="999" topLine="39" />
</Cursor>
</File>
<File name="client/controller.h" open="1" top="0" tabpos="13" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/player.h" open="1" top="1" tabpos="18" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/timeProvider.h" open="1" top="0" tabpos="15" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/pcmDecoder.h" open="1" top="0" tabpos="17" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
@ -31,74 +86,19 @@
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/timeProvider.cpp" open="1" top="0" tabpos="8" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/controller.cpp" open="1" top="0" tabpos="6" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/clientConnection.cpp" open="1" top="0" tabpos="9" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="common/timeUtils.h" open="0" top="0" tabpos="0" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="999" topLine="39" />
</Cursor>
</File>
<File name="client/timeProvider.h" open="1" top="0" tabpos="15" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/streamClient.h" open="1" top="0" tabpos="5" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/doubleBuffer.h" open="1" top="0" tabpos="3" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/stream.h" open="1" top="0" tabpos="4" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/snapClient.cpp" open="1" top="0" tabpos="2" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="1647" topLine="39" />
</Cursor>
</File>
<File name="client/controller.h" open="1" top="0" tabpos="13" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/oggDecoder.cpp" open="1" top="0" tabpos="14" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/stream.cpp" open="1" top="1" tabpos="1" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="767" topLine="20" />
</Cursor>
</File>
<File name="client/player.h" open="1" top="0" tabpos="18" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="11" />
</Cursor>
</File>
<File name="client/oggDecoder.h" open="1" top="0" tabpos="12" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/player.cpp" open="1" top="0" tabpos="16" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
<File name="client/pcmDecoder.cpp" open="1" top="0" tabpos="7" split="0" active="1" splitpos="0" zoom_1="0" zoom_2="0">
<Cursor>
<Cursor1 position="0" topLine="0" />
</Cursor>
</File>
</CodeBlocks_layout_file>

View file

@ -15,49 +15,52 @@ ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string
void ClientConnection::start()
{
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
iterator = resolver.resolve(query);
SocketConnection::start();
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
iterator = resolver.resolve(query);
SocketConnection::start();
}
void ClientConnection::worker()
{
active_ = true;
while (active_)
{
connected_ = false;
try
{
{
active_ = true;
while (active_)
{
connected_ = false;
try
{
{
// std::unique_lock<std::mutex> mlock(mutex_);
cout << "connecting\n";
socket.reset(new tcp::socket(io_service));
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
socket->connect(*iterator);
connected_ = true;
cout << "connected\n";
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
}
while(active_)
{
cout << ".";
getNextMessage();
cout << "|";
cout.flush();
}
}
catch (const std::exception& e)
{
connected_ = false;
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
usleep(1000*1000);
}
}
cout << "connecting\n";
socket.reset(new tcp::socket(io_service));
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
cout << "socket: " << socket->native() << "\n";
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
socket->connect(*iterator);
connected_ = true;
cout << "connected\n";
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
}
while(active_)
{
// cout << ".";
// cout.flush();
getNextMessage();
// cout << "|";
// cout.flush();
}
}
catch (const std::exception& e)
{
connected_ = false;
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
usleep(1000*1000);
}
}
}

View file

@ -11,15 +11,15 @@ using boost::asio::ip::tcp;
class ClientConnection : public SocketConnection
{
public:
ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port);
virtual void start();
ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port);
virtual void start();
protected:
virtual void worker();
virtual void worker();
private:
std::string ip;
size_t port;
std::string ip;
size_t port;
};

View file

@ -21,96 +21,96 @@ Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL)
void Controller::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer)
{
if (baseMessage.type == message_type::payload)
{
if ((stream != NULL) && (decoder != NULL))
{
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
pcmChunk->deserialize(baseMessage, buffer);
if (baseMessage.type == message_type::payload)
{
if ((stream != NULL) && (decoder != NULL))
{
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
pcmChunk->deserialize(baseMessage, buffer);
//cout << "chunk: " << pcmChunk->payloadSize;
if (decoder->decode(pcmChunk))
{
stream->addChunk(pcmChunk);
if (decoder->decode(pcmChunk))
{
stream->addChunk(pcmChunk);
//cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n";
}
else
delete pcmChunk;
}
}
}
else
delete pcmChunk;
}
}
}
void Controller::start(const std::string& _ip, size_t _port, int _bufferMs)
{
bufferMs = _bufferMs;
ip = _ip;
bufferMs = _bufferMs;
ip = _ip;
controlConnection = new ClientConnection(this, ip, _port);
controlConnection->start();
controlConnection = new ClientConnection(this, ip, _port);
controlConnection->start();
controllerThread = new thread(&Controller::worker, this);
controllerThread = new thread(&Controller::worker, this);
}
void Controller::stop()
{
active_ = false;
active_ = false;
}
void Controller::worker()
{
// Decoder* decoder;
active_ = true;
decoder = NULL;
active_ = true;
decoder = NULL;
while (active_)
{
try
{
RequestMsg requestMsg("serverSettings");
shared_ptr<ServerSettings> serverSettings(NULL);
while (!(serverSettings = controlConnection->sendReq<ServerSettings>(&requestMsg, 1000)));
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
while (active_)
{
try
{
RequestMsg requestMsg("serverSettings");
shared_ptr<ServerSettings> serverSettings(NULL);
while (!(serverSettings = controlConnection->sendReq<ServerSettings>(&requestMsg, 1000)));
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
requestMsg.request = "sampleFormat";
while (!(sampleFormat = controlConnection->sendReq<SampleFormat>(&requestMsg, 1000)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
requestMsg.request = "sampleFormat";
while (!(sampleFormat = controlConnection->sendReq<SampleFormat>(&requestMsg, 1000)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
decoder = new OggDecoder();
if (decoder != NULL)
{
requestMsg.request = "headerChunk";
shared_ptr<HeaderMessage> headerChunk(NULL);
while (!(headerChunk = controlConnection->sendReq<HeaderMessage>(&requestMsg, 1000)));
decoder->setHeader(headerChunk.get());
}
decoder = new OggDecoder();
if (decoder != NULL)
{
requestMsg.request = "headerChunk";
shared_ptr<HeaderMessage> headerChunk(NULL);
while (!(headerChunk = controlConnection->sendReq<HeaderMessage>(&requestMsg, 1000)));
decoder->setHeader(headerChunk.get());
}
RequestMsg timeReq("time");
for (size_t n=0; n<10; ++n)
{
shared_ptr<TimeMsg> reply = controlConnection->sendReq<TimeMsg>(&timeReq, 2000);
if (reply)
{
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
usleep(1000);
}
}
RequestMsg timeReq("time");
for (size_t n=0; n<10; ++n)
{
shared_ptr<TimeMsg> reply = controlConnection->sendReq<TimeMsg>(&timeReq, 2000);
if (reply)
{
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
usleep(1000);
}
}
streamClient->start();
stream = new Stream(*sampleFormat);
stream->setBufferLen(bufferMs);
streamClient->start();
stream = new Stream(*sampleFormat);
stream->setBufferLen(bufferMs);
Player player(stream);
player.start();
Player player(stream);
player.start();
try
{
while (active_)
{
usleep(1000000);
try
{
while (active_)
{
usleep(1000000);
shared_ptr<TimeMsg> reply = controlConnection->sendReq<TimeMsg>(&timeReq, 1000);
if (reply)
{
@ -119,31 +119,32 @@ void Controller::worker()
TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
cout << TimeProvider::getInstance().getDiffToServer() << "\n";
}
}
}
catch (const std::exception& e)
{
cout << "Stopping player\n";
player.stop();
cout << "Stopping streamClient\n";
streamClient->stop();
delete streamClient;
streamClient = NULL;
delete stream;
stream = NULL;
cout << "done\n";
throw e;
}
}
catch (const std::exception& e)
{
cout << "Exception in Controller::worker(): " << e.what() << "\n";
if (decoder != NULL)
delete decoder;
decoder = NULL;
usleep(1000000);
}
}
}
}
catch (const std::exception& e)
{
cout << "Stopping player\n";
player.stop();
cout << "Stopping streamClient\n";
streamClient->stop();
delete streamClient;
streamClient = NULL;
cout << "Deleting stream\n";
delete stream;
stream = NULL;
cout << "done\n";
throw e;
}
}
catch (const std::exception& e)
{
cout << "Exception in Controller::worker(): " << e.what() << "\n";
if (decoder != NULL)
delete decoder;
decoder = NULL;
usleep(1000000);
}
}
}

View file

@ -13,22 +13,22 @@
class Controller : public MessageReceiver
{
public:
Controller();
void start(const std::string& _ip, size_t _port, int _bufferMs);
void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
Controller();
void start(const std::string& _ip, size_t _port, int _bufferMs);
void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
private:
void worker();
std::atomic<bool> active_;
std::thread* controllerThread;
StreamClient* streamClient;
ClientConnection* controlConnection;
Stream* stream;
int bufferMs;
std::string ip;
std::shared_ptr<SampleFormat> sampleFormat;
Decoder* decoder;
void worker();
std::atomic<bool> active_;
std::thread* controllerThread;
StreamClient* streamClient;
ClientConnection* controlConnection;
Stream* stream;
int bufferMs;
std::string ip;
std::shared_ptr<SampleFormat> sampleFormat;
Decoder* decoder;
};

View file

@ -6,10 +6,10 @@
class Decoder
{
public:
Decoder() {};
virtual ~Decoder() {};
virtual bool decode(PcmChunk* chunk) = 0;
virtual bool setHeader(HeaderMessage* chunk) = 0;
Decoder() {};
virtual ~Decoder() {};
virtual bool decode(PcmChunk* chunk) = 0;
virtual bool setHeader(HeaderMessage* chunk) = 0;
};

View file

@ -8,69 +8,69 @@ template <class T>
class DoubleBuffer
{
public:
DoubleBuffer(size_t size = 10) : bufferSize(size)
{
}
DoubleBuffer(size_t size = 10) : bufferSize(size)
{
}
inline void add(const T& element)
{
buffer.push_back(element);
if (buffer.size() > bufferSize)
buffer.pop_front();
}
inline void add(const T& element)
{
buffer.push_back(element);
if (buffer.size() > bufferSize)
buffer.pop_front();
}
T median() const
{
if (buffer.empty())
return 0;
std::deque<T> tmpBuffer(buffer.begin(), buffer.end());
std::sort(tmpBuffer.begin(), tmpBuffer.end());
return tmpBuffer[tmpBuffer.size() / 2];
}
T median() const
{
if (buffer.empty())
return 0;
std::deque<T> tmpBuffer(buffer.begin(), buffer.end());
std::sort(tmpBuffer.begin(), tmpBuffer.end());
return tmpBuffer[tmpBuffer.size() / 2];
}
double mean() const
{
if (buffer.empty())
return 0;
double mean = 0.;
for (size_t n=0; n<buffer.size(); ++n)
mean += (float)buffer[n] / (float)buffer.size();
return mean;
}
double mean() const
{
if (buffer.empty())
return 0;
double mean = 0.;
for (size_t n=0; n<buffer.size(); ++n)
mean += (float)buffer[n] / (float)buffer.size();
return mean;
}
T percentil(unsigned int percentil) const
{
if (buffer.empty())
return 0;
std::deque<T> tmpBuffer(buffer.begin(), buffer.end());
std::sort(tmpBuffer.begin(), tmpBuffer.end());
return tmpBuffer[(size_t)(tmpBuffer.size() * ((float)percentil / (float)100))];
}
T percentil(unsigned int percentil) const
{
if (buffer.empty())
return 0;
std::deque<T> tmpBuffer(buffer.begin(), buffer.end());
std::sort(tmpBuffer.begin(), tmpBuffer.end());
return tmpBuffer[(size_t)(tmpBuffer.size() * ((float)percentil / (float)100))];
}
inline bool full() const
{
return (buffer.size() == bufferSize);
}
inline bool full() const
{
return (buffer.size() == bufferSize);
}
inline void clear()
{
buffer.clear();
}
inline void clear()
{
buffer.clear();
}
inline size_t size() const
{
return buffer.size();
}
inline size_t size() const
{
return buffer.size();
}
void setSize(size_t size)
{
bufferSize = size;
}
void setSize(size_t size)
{
bufferSize = size;
}
private:
size_t bufferSize;
std::deque<T> buffer;
size_t bufferSize;
std::deque<T> buffer;
};

View file

@ -10,205 +10,205 @@ using namespace std;
OggDecoder::OggDecoder() : Decoder()
{
ogg_sync_init(&oy); /* Now we can read pages */
convsize = 4096;
convbuffer = (ogg_int16_t*)malloc(convsize * sizeof(ogg_int16_t));
ogg_sync_init(&oy); /* Now we can read pages */
convsize = 4096;
convbuffer = (ogg_int16_t*)malloc(convsize * sizeof(ogg_int16_t));
}
OggDecoder::~OggDecoder()
{
// ogg_sync_init(&oy); /* Now we can read pages */
delete convbuffer;
delete convbuffer;
}
bool OggDecoder::decode(PcmChunk* chunk)
{
/* grab some data at the head of the stream. We want the first page
(which is guaranteed to be small and only contain the Vorbis
stream initial header) We need the first page to get the stream
serialno. */
bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, chunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
/* grab some data at the head of the stream. We want the first page
(which is guaranteed to be small and only contain the Vorbis
stream initial header) We need the first page to get the stream
serialno. */
bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, chunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
chunk->payloadSize = 0;
convsize=4096;//bytes/vi.channels;
/* The rest is just a straight decode loop until end of stream */
// while(!eos){
while(true)
{
int result=ogg_sync_pageout(&oy,&og);
if (result==0)
break; /* need more data */
if(result<0)
{
/* missing or corrupt data at this page position */
fprintf(stderr,"Corrupt or missing data in bitstream; continuing...\n");
continue;
}
chunk->payloadSize = 0;
convsize=4096;//bytes/vi.channels;
/* The rest is just a straight decode loop until end of stream */
// while(!eos){
while(true)
{
int result=ogg_sync_pageout(&oy,&og);
if (result==0)
break; /* need more data */
if(result<0)
{
/* missing or corrupt data at this page position */
fprintf(stderr,"Corrupt or missing data in bitstream; continuing...\n");
continue;
}
ogg_stream_pagein(&os,&og); /* can safely ignore errors at
ogg_stream_pagein(&os,&og); /* can safely ignore errors at
this point */
while(1)
{
result=ogg_stream_packetout(&os,&op);
while(1)
{
result=ogg_stream_packetout(&os,&op);
if(result==0)
break; /* need more data */
if(result<0)
continue; /* missing or corrupt data at this page position */
/* no reason to complain; already complained above */
/* we have a packet. Decode it */
float **pcm;
int samples;
if(result==0)
break; /* need more data */
if(result<0)
continue; /* missing or corrupt data at this page position */
/* no reason to complain; already complained above */
/* we have a packet. Decode it */
float **pcm;
int samples;
if(vorbis_synthesis(&vb,&op)==0) /* test for success! */
vorbis_synthesis_blockin(&vd,&vb);
/*
if(vorbis_synthesis(&vb,&op)==0) /* test for success! */
vorbis_synthesis_blockin(&vd,&vb);
/*
**pcm is a multichannel float vector. In stereo, for
example, pcm[0] is left, and pcm[1] is right. samples is
the size of each channel. Convert the float values
(-1.<=range<=1.) to whatever PCM format and write it out */
**pcm is a multichannel float vector. In stereo, for
example, pcm[0] is left, and pcm[1] is right. samples is
the size of each channel. Convert the float values
(-1.<=range<=1.) to whatever PCM format and write it out */
while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0)
{
int bout=(samples<convsize?samples:convsize);
//cout << "samples: " << samples << ", convsize: " << convsize << "\n";
/* convert floats to 16 bit signed ints (host order) and
interleave */
for(int i=0; i<vi.channels; i++)
{
ogg_int16_t *ptr=convbuffer+i;
float *mono=pcm[i];
for(int j=0; j<bout; j++)
{
int val=floor(mono[j]*32767.f+.5f);
/* might as well guard against clipping */
if(val>32767)
val=32767;
else if(val<-32768)
val=-32768;
*ptr=val;
ptr+=vi.channels;
}
}
while((samples=vorbis_synthesis_pcmout(&vd,&pcm))>0)
{
int bout=(samples<convsize?samples:convsize);
//cout << "samples: " << samples << ", convsize: " << convsize << "\n";
/* convert floats to 16 bit signed ints (host order) and
interleave */
for(int i=0; i<vi.channels; i++)
{
ogg_int16_t *ptr=convbuffer+i;
float *mono=pcm[i];
for(int j=0; j<bout; j++)
{
int val=floor(mono[j]*32767.f+.5f);
/* might as well guard against clipping */
if(val>32767)
val=32767;
else if(val<-32768)
val=-32768;
*ptr=val;
ptr+=vi.channels;
}
}
size_t oldSize = chunk->payloadSize;
size_t size = 2*vi.channels * bout;
chunk->payloadSize += size;
chunk->payload = (char*)realloc(chunk->payload, chunk->payloadSize);
memcpy(chunk->payload + oldSize, convbuffer, size);
/* tell libvorbis how many samples we actually consumed */
vorbis_synthesis_read(&vd,bout);
}
}
}
// if(ogg_page_eos(&og))eos=1;
// ogg_stream_clear(&os);
// vorbis_comment_clear(&vc);
// vorbis_info_clear(&vi); /* must be called last */
return true;
size_t oldSize = chunk->payloadSize;
size_t size = 2*vi.channels * bout;
chunk->payloadSize += size;
chunk->payload = (char*)realloc(chunk->payload, chunk->payloadSize);
memcpy(chunk->payload + oldSize, convbuffer, size);
/* tell libvorbis how many samples we actually consumed */
vorbis_synthesis_read(&vd,bout);
}
}
}
// if(ogg_page_eos(&og))eos=1;
// ogg_stream_clear(&os);
// vorbis_comment_clear(&vc);
// vorbis_info_clear(&vi); /* must be called last */
return true;
}
bool OggDecoder::setHeader(HeaderMessage* chunk)
{
bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, chunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
bytes = chunk->payloadSize;
buffer=ogg_sync_buffer(&oy, bytes);
memcpy(buffer, chunk->payload, bytes);
ogg_sync_wrote(&oy,bytes);
if(ogg_sync_pageout(&oy,&og)!=1)
{
fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n");
return false;
}
if(ogg_sync_pageout(&oy,&og)!=1)
{
fprintf(stderr,"Input does not appear to be an Ogg bitstream.\n");
return false;
}
ogg_stream_init(&os,ogg_page_serialno(&og));
ogg_stream_init(&os,ogg_page_serialno(&og));
vorbis_info_init(&vi);
vorbis_comment_init(&vc);
if(ogg_stream_pagein(&os,&og)<0)
{
fprintf(stderr,"Error reading first page of Ogg bitstream data.\n");
return false;
}
vorbis_info_init(&vi);
vorbis_comment_init(&vc);
if(ogg_stream_pagein(&os,&og)<0)
{
fprintf(stderr,"Error reading first page of Ogg bitstream data.\n");
return false;
}
if(ogg_stream_packetout(&os,&op)!=1)
{
fprintf(stderr,"Error reading initial header packet.\n");
return false;
}
if(ogg_stream_packetout(&os,&op)!=1)
{
fprintf(stderr,"Error reading initial header packet.\n");
return false;
}
if(vorbis_synthesis_headerin(&vi,&vc,&op)<0)
{
fprintf(stderr,"This Ogg bitstream does not contain Vorbis audio data.\n");
return false;
}
if(vorbis_synthesis_headerin(&vi,&vc,&op)<0)
{
fprintf(stderr,"This Ogg bitstream does not contain Vorbis audio data.\n");
return false;
}
int i(0);
while(i<2)
{
while(i<2)
{
int result=ogg_sync_pageout(&oy,&og);
if(result==0)
break; /* Need more data */
/* Don't complain about missing or corrupt data yet. We'll
catch it at the packet output phase */
if(result==1)
{
ogg_stream_pagein(&os,&og); /* we can ignore any errors here as they'll also become apparent at packetout */
while(i<2)
{
result=ogg_stream_packetout(&os,&op);
if(result==0)
break;
if(result<0)
{
/* Uh oh; data at some point was corrupted or missing!
We can't tolerate that in a header. Die. */
fprintf(stderr,"Corrupt secondary header. Exiting.\n");
return false;
}
result=vorbis_synthesis_headerin(&vi,&vc,&op);
if(result<0)
{
fprintf(stderr,"Corrupt secondary header. Exiting.\n");
return false;
}
i++;
}
}
}
}
int i(0);
while(i<2)
{
while(i<2)
{
int result=ogg_sync_pageout(&oy,&og);
if(result==0)
break; /* Need more data */
/* Don't complain about missing or corrupt data yet. We'll
catch it at the packet output phase */
if(result==1)
{
ogg_stream_pagein(&os,&og); /* we can ignore any errors here as they'll also become apparent at packetout */
while(i<2)
{
result=ogg_stream_packetout(&os,&op);
if(result==0)
break;
if(result<0)
{
/* Uh oh; data at some point was corrupted or missing!
We can't tolerate that in a header. Die. */
fprintf(stderr,"Corrupt secondary header. Exiting.\n");
return false;
}
result=vorbis_synthesis_headerin(&vi,&vc,&op);
if(result<0)
{
fprintf(stderr,"Corrupt secondary header. Exiting.\n");
return false;
}
i++;
}
}
}
}
/* Throw the comments plus a few lines about the bitstream we're decoding */
char **ptr=vc.user_comments;
while(*ptr)
{
fprintf(stderr,"%s\n",*ptr);
++ptr;
}
fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate);
fprintf(stderr,"Encoded by: %s\n\n",vc.vendor);
/* Throw the comments plus a few lines about the bitstream we're decoding */
char **ptr=vc.user_comments;
while(*ptr)
{
fprintf(stderr,"%s\n",*ptr);
++ptr;
}
fprintf(stderr,"\nBitstream is %d channel, %ldHz\n",vi.channels,vi.rate);
fprintf(stderr,"Encoded by: %s\n\n",vc.vendor);
/* OK, got and parsed all three headers. Initialize the Vorbis
packet->PCM decoder. */
if(vorbis_synthesis_init(&vd,&vi)==0) /* central decode state */
vorbis_block_init(&vd,&vb); /* local state for most of the decode
/* OK, got and parsed all three headers. Initialize the Vorbis
packet->PCM decoder. */
if(vorbis_synthesis_init(&vd,&vi)==0) /* central decode state */
vorbis_block_init(&vd,&vb); /* local state for most of the decode
so multiple block decodes can
proceed in parallel. We could init
multiple vorbis_block structures
for vd here */
return false;
return false;
}

View file

@ -7,31 +7,31 @@
class OggDecoder : public Decoder
{
public:
OggDecoder();
virtual ~OggDecoder();
virtual bool decode(PcmChunk* chunk);
virtual bool setHeader(HeaderMessage* chunk);
OggDecoder();
virtual ~OggDecoder();
virtual bool decode(PcmChunk* chunk);
virtual bool setHeader(HeaderMessage* chunk);
private:
bool decodePayload(PcmChunk* chunk);
bool decodePayload(PcmChunk* chunk);
ogg_sync_state oy; /* sync and verify incoming physical bitstream */
ogg_stream_state os; /* take physical pages, weld into a logical
ogg_sync_state oy; /* sync and verify incoming physical bitstream */
ogg_stream_state os; /* take physical pages, weld into a logical
stream of packets */
ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */
ogg_packet op; /* one raw packet of data for decode */
ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */
ogg_packet op; /* one raw packet of data for decode */
vorbis_info vi; /* struct that stores all the static vorbis bitstream
vorbis_info vi; /* struct that stores all the static vorbis bitstream
settings */
vorbis_comment vc; /* struct that stores all the bitstream user comments */
vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */
vorbis_block vb; /* local working space for packet->PCM decode */
vorbis_comment vc; /* struct that stores all the bitstream user comments */
vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */
vorbis_block vb; /* local working space for packet->PCM decode */
ogg_int16_t* convbuffer; /* take 8k out of the data segment, not the stack */
int convsize;
ogg_int16_t* convbuffer; /* take 8k out of the data segment, not the stack */
int convsize;
char *buffer;
int bytes;
char *buffer;
int bytes;
};

View file

@ -7,17 +7,17 @@ PcmDecoder::PcmDecoder() : Decoder()
bool PcmDecoder::decode(PcmChunk* chunk)
{
/* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n)
wireChunk->payload[n] *= 1;
*/
return true;
/* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n)
wireChunk->payload[n] *= 1;
*/
return true;
}
bool PcmDecoder::setHeader(HeaderMessage* chunk)
{
return true;
return true;
}

View file

@ -6,9 +6,9 @@
class PcmDecoder : public Decoder
{
public:
PcmDecoder();
virtual bool decode(PcmChunk* chunk);
virtual bool setHeader(HeaderMessage* chunk);
PcmDecoder();
virtual bool decode(PcmChunk* chunk);
virtual bool setHeader(HeaderMessage* chunk);
};

View file

@ -15,137 +15,137 @@ Player::Player(Stream* stream) : active_(false), stream_(stream)
void Player::start()
{
unsigned int pcm, tmp, rate;
int channels;
snd_pcm_hw_params_t *params;
int buff_size;
unsigned int pcm, tmp, rate;
int channels;
snd_pcm_hw_params_t *params;
int buff_size;
rate = stream_->format.rate;
channels = stream_->format.channels;
rate = stream_->format.rate;
channels = stream_->format.channels;
/* Open the PCM device in playback mode */
if ((pcm = snd_pcm_open(&pcm_handle, PCM_DEVICE, SND_PCM_STREAM_PLAYBACK, 0)) < 0)
cout << "ERROR: Can't open " << PCM_DEVICE << " PCM device. " << snd_strerror(pcm) << "\n";
/* Open the PCM device in playback mode */
if ((pcm = snd_pcm_open(&pcm_handle, PCM_DEVICE, SND_PCM_STREAM_PLAYBACK, 0)) < 0)
cout << "ERROR: Can't open " << PCM_DEVICE << " PCM device. " << snd_strerror(pcm) << "\n";
/* struct snd_pcm_playback_info_t pinfo;
if ( (pcm = snd_pcm_playback_info( pcm_handle, &pinfo )) < 0 )
fprintf( stderr, "Error: playback info error: %s\n", snd_strerror( err ) );
printf("buffer: '%d'\n", pinfo.buffer_size);
*/
/* Allocate parameters object and fill it with default values*/
snd_pcm_hw_params_alloca(&params);
/* struct snd_pcm_playback_info_t pinfo;
if ( (pcm = snd_pcm_playback_info( pcm_handle, &pinfo )) < 0 )
fprintf( stderr, "Error: playback info error: %s\n", snd_strerror( err ) );
printf("buffer: '%d'\n", pinfo.buffer_size);
*/
/* Allocate parameters object and fill it with default values*/
snd_pcm_hw_params_alloca(&params);
snd_pcm_hw_params_any(pcm_handle, params);
snd_pcm_hw_params_any(pcm_handle, params);
/* Set parameters */
if ((pcm = snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
cout << "ERROR: Can't set interleaved mode. " << snd_strerror(pcm) << "\n";
/* Set parameters */
if ((pcm = snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
cout << "ERROR: Can't set interleaved mode. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE)) < 0)
cout << "ERROR: Can't set format. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE)) < 0)
cout << "ERROR: Can't set format. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels)) < 0)
cout << "ERROR: Can't set channels number. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels)) < 0)
cout << "ERROR: Can't set channels number. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0)) < 0)
cout << "ERROR: Can't set rate. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0)) < 0)
cout << "ERROR: Can't set rate. " << snd_strerror(pcm) << "\n";
unsigned int buffer_time;
snd_pcm_hw_params_get_buffer_time_max(params, &buffer_time, 0);
if (buffer_time > BUFFER_TIME)
buffer_time = BUFFER_TIME;
unsigned int buffer_time;
snd_pcm_hw_params_get_buffer_time_max(params, &buffer_time, 0);
if (buffer_time > BUFFER_TIME)
buffer_time = BUFFER_TIME;
unsigned int period_time = buffer_time / 4;
unsigned int period_time = buffer_time / 4;
snd_pcm_hw_params_set_period_time_near(pcm_handle, params, &period_time, 0);
snd_pcm_hw_params_set_buffer_time_near(pcm_handle, params, &buffer_time, 0);
snd_pcm_hw_params_set_period_time_near(pcm_handle, params, &period_time, 0);
snd_pcm_hw_params_set_buffer_time_near(pcm_handle, params, &buffer_time, 0);
// long unsigned int periodsize = stream_->format.msRate() * 50;//2*rate/50;
// if ((pcm = snd_pcm_hw_params_set_buffer_size_near(pcm_handle, params, &periodsize)) < 0)
// cout << "Unable to set buffer size " << (long int)periodsize << ": " << snd_strerror(pcm) << "\n";
/* Write parameters */
if ((pcm = snd_pcm_hw_params(pcm_handle, params)) < 0)
cout << "ERROR: Can't set harware parameters. " << snd_strerror(pcm) << "\n";
/* Write parameters */
if ((pcm = snd_pcm_hw_params(pcm_handle, params)) < 0)
cout << "ERROR: Can't set harware parameters. " << snd_strerror(pcm) << "\n";
/* Resume information */
cout << "PCM name: " << snd_pcm_name(pcm_handle) << "\n";
cout << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle)) << "\n";
snd_pcm_hw_params_get_channels(params, &tmp);
cout << "channels: " << tmp << "\n";
/* Resume information */
cout << "PCM name: " << snd_pcm_name(pcm_handle) << "\n";
cout << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle)) << "\n";
snd_pcm_hw_params_get_channels(params, &tmp);
cout << "channels: " << tmp << "\n";
if (tmp == 1)
printf("(mono)\n");
else if (tmp == 2)
printf("(stereo)\n");
if (tmp == 1)
printf("(mono)\n");
else if (tmp == 2)
printf("(stereo)\n");
snd_pcm_hw_params_get_rate(params, &tmp, 0);
cout << "rate: " << tmp << " bps\n";
snd_pcm_hw_params_get_rate(params, &tmp, 0);
cout << "rate: " << tmp << " bps\n";
/* Allocate buffer to hold single period */
snd_pcm_hw_params_get_period_size(params, &frames, 0);
cout << "frames: " << frames << "\n";
/* Allocate buffer to hold single period */
snd_pcm_hw_params_get_period_size(params, &frames, 0);
cout << "frames: " << frames << "\n";
buff_size = frames * channels * 2 /* 2 -> sample size */;
buff = (char *) malloc(buff_size);
buff_size = frames * channels * 2 /* 2 -> sample size */;
buff = (char *) malloc(buff_size);
snd_pcm_hw_params_get_period_time(params, &tmp, NULL);
cout << "period time: " << tmp << "\n";
snd_pcm_hw_params_get_period_time(params, &tmp, NULL);
cout << "period time: " << tmp << "\n";
snd_pcm_sw_params_t *swparams;
snd_pcm_sw_params_alloca(&swparams);
snd_pcm_sw_params_current(pcm_handle, swparams);
snd_pcm_sw_params_t *swparams;
snd_pcm_sw_params_alloca(&swparams);
snd_pcm_sw_params_current(pcm_handle, swparams);
snd_pcm_sw_params_set_avail_min(pcm_handle, swparams, frames);
snd_pcm_sw_params_set_start_threshold(pcm_handle, swparams, frames);
snd_pcm_sw_params_set_avail_min(pcm_handle, swparams, frames);
snd_pcm_sw_params_set_start_threshold(pcm_handle, swparams, frames);
// snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames);
snd_pcm_sw_params(pcm_handle, swparams);
snd_pcm_sw_params(pcm_handle, swparams);
playerThread = new thread(&Player::worker, this);
playerThread = new thread(&Player::worker, this);
}
void Player::stop()
{
active_ = false;
playerThread->join();
delete playerThread;
active_ = false;
playerThread->join();
delete playerThread;
}
void Player::worker()
{
unsigned int pcm;
snd_pcm_sframes_t avail;
snd_pcm_sframes_t delay;
active_ = true;
while (active_)
{
snd_pcm_avail_delay(pcm_handle, &avail, &delay);
unsigned int pcm;
snd_pcm_sframes_t avail;
snd_pcm_sframes_t delay;
active_ = true;
while (active_)
{
snd_pcm_avail_delay(pcm_handle, &avail, &delay);
if (stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames, 500))
{
if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE)
{
printf("XRUN.\n");
snd_pcm_prepare(pcm_handle);
}
else if (pcm < 0)
{
printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm));
}
}
}
if (stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames, 500))
{
if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE)
{
printf("XRUN.\n");
snd_pcm_prepare(pcm_handle);
}
else if (pcm < 0)
{
printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm));
}
}
}
snd_pcm_drain(pcm_handle);
snd_pcm_close(pcm_handle);
free(buff);
snd_pcm_drain(pcm_handle);
snd_pcm_close(pcm_handle);
free(buff);
}

View file

@ -11,18 +11,18 @@
class Player
{
public:
Player(Stream* stream);
void start();
void stop();
Player(Stream* stream);
void start();
void stop();
private:
void worker();
snd_pcm_t* pcm_handle;
snd_pcm_uframes_t frames;
char *buff;
std::atomic<bool> active_;
Stream* stream_;
std::thread* playerThread;
void worker();
snd_pcm_t* pcm_handle;
snd_pcm_uframes_t frames;
char *buff;
std::atomic<bool> active_;
Stream* stream_;
std::thread* playerThread;
};

View file

@ -22,47 +22,47 @@ namespace po = boost::program_options;
int main (int argc, char *argv[])
{
int deviceIdx;
string ip;
int bufferMs;
size_t port;
bool runAsDaemon;
int deviceIdx;
string ip;
int bufferMs;
size_t port;
bool runAsDaemon;
// string sampleFormat;
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("port,p", po::value<size_t>(&port)->default_value(98765), "port where the server listens on")
("ip,i", po::value<string>(&ip)->default_value("192.168.0.2"), "server IP")
("soundcard,s", po::value<int>(&deviceIdx)->default_value(-1), "index of the soundcard")
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("port,p", po::value<size_t>(&port)->default_value(98765), "port where the server listens on")
("ip,i", po::value<string>(&ip)->default_value("192.168.0.2"), "server IP")
("soundcard,s", po::value<int>(&deviceIdx)->default_value(-1), "index of the soundcard")
// ("sampleformat,f", po::value<string>(&sampleFormat)->default_value("48000:16:2"), "sample format")
("buffer,b", po::value<int>(&bufferMs)->default_value(300), "buffer size [ms]")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
;
("buffer,b", po::value<int>(&bufferMs)->default_value(300), "buffer size [ms]")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help"))
{
cout << desc << "\n";
return 1;
}
if (vm.count("help"))
{
cout << desc << "\n";
return 1;
}
std::clog.rdbuf(new Log("snapclient", LOG_DAEMON));
if (runAsDaemon)
{
daemonize();
std::clog << kLogNotice << "daemon started" << std::endl;
}
std::clog.rdbuf(new Log("snapclient", LOG_DAEMON));
if (runAsDaemon)
{
daemonize();
std::clog << kLogNotice << "daemon started" << std::endl;
}
Controller controller;
controller.start(ip, port, bufferMs);
Controller controller;
controller.start(ip, port, bufferMs);
while(true)
usleep(10000);
while(true)
usleep(10000);
return 0;
return 0;
}

View file

@ -9,34 +9,34 @@ using namespace std;
Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0)
{
buffer.setSize(500);
shortBuffer.setSize(100);
miniBuffer.setSize(20);
cardBuffer.setSize(50);
bufferMs = 500;
buffer.setSize(500);
shortBuffer.setSize(100);
miniBuffer.setSize(20);
cardBuffer.setSize(50);
bufferMs = 500;
}
void Stream::setBufferLen(size_t bufferLenMs)
{
bufferMs = bufferLenMs;
bufferMs = bufferLenMs;
}
void Stream::clearChunks()
{
while (chunks.size() > 0)
chunks.pop();
while (chunks.size() > 0)
chunks.pop();
}
void Stream::addChunk(PcmChunk* chunk)
{
while (chunks.size() * chunk->getDuration() > 10000)
chunks.pop();
chunks.push(shared_ptr<PcmChunk>(chunk));
while (chunks.size() * chunk->getDuration() > 10000)
chunks.pop();
chunks.push(shared_ptr<PcmChunk>(chunk));
// cout << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n";
}
@ -44,11 +44,11 @@ void Stream::addChunk(PcmChunk* chunk)
time_point_ms Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer)
{
if (!chunk)
chunk = chunks.pop();
time_point_ms tp = chunk->timePoint();
memset(outputBuffer, 0, framesPerBuffer * format.frameSize);
return tp;
if (!chunk)
chunk = chunks.pop();
time_point_ms tp = chunk->timePoint();
memset(outputBuffer, 0, framesPerBuffer * format.frameSize);
return tp;
}
@ -72,204 +72,204 @@ time_point_ms Stream::seekTo(const time_point_ms& to)
time_point_ms Stream::seek(long ms)
{
if (!chunk)
chunk = chunks.pop();
if (!chunk)
chunk = chunks.pop();
if (ms <= 0)
return chunk->timePoint();
if (ms <= 0)
return chunk->timePoint();
// time_point_ms tp = chunk->timePoint();
while (ms > chunk->getTimeLeft())
{
chunk = chunks.pop();
ms -= min(ms, (long)chunk->getTimeLeft());
}
chunk->seek(ms * format.msRate());
return chunk->timePoint();
while (ms > chunk->getTimeLeft())
{
chunk = chunks.pop();
ms -= min(ms, (long)chunk->getTimeLeft());
}
chunk->seek(ms * format.msRate());
return chunk->timePoint();
}
time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction)
{
if (!chunk)
if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout)))
throw 0;
if (!chunk)
if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout)))
throw 0;
time_point_ms tp = chunk->timePoint();
int read = 0;
int toRead = framesPerBuffer + correction*format.msRate();
char* buffer;
time_point_ms tp = chunk->timePoint();
int read = 0;
int toRead = framesPerBuffer + correction*format.msRate();
char* buffer;
if (correction != 0)
{
int msBuffer = floor(framesPerBuffer / format.msRate());
if (abs(correction) > msBuffer / 2)
correction = copysign(msBuffer / 2, correction);
buffer = (char*)malloc(toRead * format.frameSize);
}
else
buffer = (char*)outputBuffer;
if (correction != 0)
{
int msBuffer = floor(framesPerBuffer / format.msRate());
if (abs(correction) > msBuffer / 2)
correction = copysign(msBuffer / 2, correction);
buffer = (char*)malloc(toRead * format.frameSize);
}
else
buffer = (char*)outputBuffer;
while (read < toRead)
{
read += chunk->readFrames(buffer + read*format.frameSize, toRead - read);
if (chunk->isEndOfChunk())
if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout)))
throw 0;
}
while (read < toRead)
{
read += chunk->readFrames(buffer + read*format.frameSize, toRead - read);
if (chunk->isEndOfChunk())
if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout)))
throw 0;
}
if (correction != 0)
{
float factor = (float)toRead / framesPerBuffer;//(float)(framesPerBuffer*channels_);
std::cout << "correction: " << correction << ", factor: " << factor << "\n";
float idx = 0;
for (size_t n=0; n<framesPerBuffer; ++n)
{
size_t index(floor(idx));// = (int)(ceil(n*factor));
memcpy((char*)outputBuffer + n*format.frameSize, buffer + index*format.frameSize, format.frameSize);
idx += factor;
}
free(buffer);
}
if (correction != 0)
{
float factor = (float)toRead / framesPerBuffer;//(float)(framesPerBuffer*channels_);
std::cout << "correction: " << correction << ", factor: " << factor << "\n";
float idx = 0;
for (size_t n=0; n<framesPerBuffer; ++n)
{
size_t index(floor(idx));// = (int)(ceil(n*factor));
memcpy((char*)outputBuffer + n*format.frameSize, buffer + index*format.frameSize, format.frameSize);
idx += factor;
}
free(buffer);
}
return tp;
return tp;
}
void Stream::updateBuffers(int age)
{
buffer.add(age);
miniBuffer.add(age);
shortBuffer.add(age);
buffer.add(age);
miniBuffer.add(age);
shortBuffer.add(age);
}
void Stream::resetBuffers()
{
buffer.clear();
miniBuffer.clear();
shortBuffer.clear();
buffer.clear();
miniBuffer.clear();
shortBuffer.clear();
}
bool Stream::getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout)
{
//cout << "framesPerBuffer: " << framesPerBuffer << "\tms: " << framesPerBuffer*2 / PLAYER_CHUNK_MS_SIZE << "\t" << PLAYER_CHUNK_SIZE << "\n";
int msBuffer = framesPerBuffer / format_.msRate();
int msBuffer = framesPerBuffer / format_.msRate();
//cout << msBuffer << " ms, " << framesPerBuffer << "\t" << format_.rate/1000 << "\n";
int ticks = 0;
long currentTick = getTickCount();
if (lastTick == 0)
lastTick = currentTick;
ticks = currentTick - lastTick;
lastTick = currentTick;
int ticks = 0;
long currentTick = getTickCount();
if (lastTick == 0)
lastTick = currentTick;
ticks = currentTick - lastTick;
lastTick = currentTick;
int correction = 0;
if (sleep != 0)
{
resetBuffers();
if (sleep < -msBuffer/2)
{
cout << "Sleep " << sleep;
sleep = PcmChunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
std::cerr << " after: " << sleep << ", chunks: " << chunks.size() << "\n";
int correction = 0;
if (sleep != 0)
{
resetBuffers();
if (sleep < -msBuffer/2)
{
cout << "Sleep " << sleep;
sleep = PcmChunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
std::cerr << " after: " << sleep << ", chunks: " << chunks.size() << "\n";
// std::clog << kLogNotice << "sleep: " << sleep << std::endl;
// if (sleep > -msBuffer/2)
// sleep = 0;
if (sleep < -msBuffer/2)
return true;
}
else if (sleep > msBuffer/2)
{
/* cout << "Sleep " << sleep;
time_point_ms ms(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()));
ms -= std::chrono::milliseconds((long int)(bufferMs - outputBufferDacTime));
cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n";
sleep = Chunk::getAge(seekTo(ms)) - bufferMs + outputBufferDacTime;
cout << " after: " << sleep << "\n";
*/
if (!chunk)
chunk = chunks.pop();
while (sleep > chunk->getDuration())
{
chunk = chunks.pop();
sleep = chunk->getAge() - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
if (sleep < -msBuffer/2)
return true;
}
else if (sleep > msBuffer/2)
{
/* cout << "Sleep " << sleep;
time_point_ms ms(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()));
ms -= std::chrono::milliseconds((long int)(bufferMs - outputBufferDacTime));
cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n";
sleep = Chunk::getAge(seekTo(ms)) - bufferMs + outputBufferDacTime;
cout << " after: " << sleep << "\n";
*/
if (!chunk)
chunk = chunks.pop();
while (sleep > chunk->getDuration())
{
chunk = chunks.pop();
sleep = chunk->getAge() - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
// cout << "chunk->getAge() > chunk->getDuration(): " << chunk->getAge() - bufferMs + outputBufferDacTime << " > " << chunk->getDuration() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime << ", needed: " << msBuffer << ", sleep: " << sleep << "\n";
usleep(1000);
}
cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n";
sleep = 0;
}
else if (sleep < 0)
{
++sleep;
correction = -1;
}
else if (sleep > 0)
{
--sleep;
correction = 1;
}
}
usleep(1000);
}
cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n";
sleep = 0;
}
else if (sleep < 0)
{
++sleep;
correction = -1;
}
else if (sleep > 0)
{
--sleep;
correction = 1;
}
}
long age(0);
try
{
age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
}
catch(int e)
{
return false;
}
long age(0);
try
{
age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
}
catch(int e)
{
return false;
}
if (sleep == 0)
{
if (buffer.full() && (abs(median) > 1))
{
cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n";
sleep = median;
}
else if (shortBuffer.full() && (abs(shortMedian) > 5))
{
cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n";
sleep = shortMedian;
}
else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50))
{
cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n";
sleep = miniBuffer.mean();
}
else if (abs(age) > 200)
{
cout << "age > 50: " << age << "\n";
sleep = age;
}
}
if (sleep == 0)
{
if (buffer.full() && (abs(median) > 1))
{
cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n";
sleep = median;
}
else if (shortBuffer.full() && (abs(shortMedian) > 5))
{
cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n";
sleep = shortMedian;
}
else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50))
{
cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n";
sleep = miniBuffer.mean();
}
else if (abs(age) > 200)
{
cout << "age > 50: " << age << "\n";
sleep = age;
}
}
if (sleep != 0)
std::cerr << "Sleep: " << sleep << "\n";
if (sleep != 0)
std::cerr << "Sleep: " << sleep << "\n";
// std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n";
if (ticks > 2)
{
if (ticks > 2)
{
// cout << age << "\n";
updateBuffers(age);
}
time_t now = time(NULL);
if (now != lastUpdate)
{
lastUpdate = now;
median = buffer.median();
shortMedian = shortBuffer.median();
std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime << "\n";
}
return true;
updateBuffers(age);
}
time_t now = time(NULL);
if (now != lastUpdate)
{
lastUpdate = now;
median = buffer.median();
shortMedian = shortBuffer.median();
std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime << "\n";
}
return true;
}

View file

@ -18,37 +18,37 @@
class Stream
{
public:
Stream(const SampleFormat& format);
void addChunk(PcmChunk* chunk);
void clearChunks();
bool getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout);
void setBufferLen(size_t bufferLenMs);
const SampleFormat& format;
Stream(const SampleFormat& format);
void addChunk(PcmChunk* chunk);
void clearChunks();
bool getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout);
void setBufferLen(size_t bufferLenMs);
const SampleFormat& format;
private:
time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0);
time_point_ms getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer);
time_point_ms seek(long ms);
time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0);
time_point_ms getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer);
time_point_ms seek(long ms);
// time_point_ms seekTo(const time_point_ms& to);
void updateBuffers(int age);
void resetBuffers();
void updateBuffers(int age);
void resetBuffers();
SampleFormat format_;
SampleFormat format_;
long lastTick;
long sleep;
long lastTick;
long sleep;
Queue<std::shared_ptr<PcmChunk>> chunks;
DoubleBuffer<long> cardBuffer;
DoubleBuffer<long> miniBuffer;
DoubleBuffer<long> buffer;
DoubleBuffer<long> shortBuffer;
std::shared_ptr<PcmChunk> chunk;
Queue<std::shared_ptr<PcmChunk>> chunks;
DoubleBuffer<long> cardBuffer;
DoubleBuffer<long> miniBuffer;
DoubleBuffer<long> buffer;
DoubleBuffer<long> shortBuffer;
std::shared_ptr<PcmChunk> chunk;
int median;
int shortMedian;
time_t lastUpdate;
int bufferMs;
int median;
int shortMedian;
time_t lastUpdate;
int bufferMs;
};

View file

@ -10,8 +10,8 @@ using boost::asio::ip::tcp;
class StreamClient : public ClientConnection
{
public:
StreamClient(MessageReceiver* _receiver, const std::string& _ip, size_t _port);
virtual ~StreamClient();
StreamClient(MessageReceiver* _receiver, const std::string& _ip, size_t _port);
virtual ~StreamClient();
};

View file

@ -3,26 +3,26 @@
TimeProvider::TimeProvider() : diffToServer(0)
{
diffBuffer.setSize(60);
diffBuffer.setSize(60);
}
void TimeProvider::setDiffToServer(double ms)
{
diffBuffer.add(ms * 1000);
diffToServer = diffBuffer.median();
diffBuffer.add(ms * 1000);
diffToServer = diffBuffer.median();
}
long TimeProvider::getDiffToServer()
{
return diffToServer;
return diffToServer;
}
long TimeProvider::getDiffToServerMs()
{
return diffToServer / 1000;
return diffToServer / 1000;
}

View file

@ -6,26 +6,26 @@
class TimeProvider
{
public:
static TimeProvider& getInstance()
{
static TimeProvider instance;
return instance;
}
static TimeProvider& getInstance()
{
static TimeProvider instance;
return instance;
}
void setDiffToServer(double ms);
long getDiffToServer();
long getDiffToServerMs();
void setDiffToServer(double ms);
long getDiffToServer();
long getDiffToServerMs();
private:
TimeProvider(); // Constructor? (the {} brackets) are needed here.
// Dont forget to declare these two. You want to make sure they
// are unaccessable otherwise you may accidently get copies of
// your singleton appearing.
TimeProvider(TimeProvider const&); // Don't Implement
void operator=(TimeProvider const&); // Don't implement
TimeProvider(); // Constructor? (the {} brackets) are needed here.
// Dont forget to declare these two. You want to make sure they
// are unaccessable otherwise you may accidently get copies of
// your singleton appearing.
TimeProvider(TimeProvider const&); // Don't Implement
void operator=(TimeProvider const&); // Don't implement
DoubleBuffer<long> diffBuffer;
long diffToServer;
DoubleBuffer<long> diffBuffer;
long diffToServer;
};

View file

@ -8,37 +8,37 @@
class HeaderMessage : public BaseMessage
{
public:
HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size)
{
payload = (char*)malloc(size);
}
HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size)
{
payload = (char*)malloc(size);
}
virtual ~HeaderMessage()
{
free(payload);
}
virtual ~HeaderMessage()
{
free(payload);
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
}
virtual uint32_t getSize()
{
return sizeof(uint32_t) + payloadSize;
}
virtual uint32_t getSize()
{
return sizeof(uint32_t) + payloadSize;
}
uint32_t payloadSize;
char* payload;
uint32_t payloadSize;
char* payload;
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize);
}
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize);
}
};

View file

@ -2,47 +2,47 @@
Log::Log(std::string ident, int facility)
{
facility_ = facility;
priority_ = LOG_DEBUG;
strncpy(ident_, ident.c_str(), sizeof(ident_));
ident_[sizeof(ident_)-1] = '\0';
facility_ = facility;
priority_ = LOG_DEBUG;
strncpy(ident_, ident.c_str(), sizeof(ident_));
ident_[sizeof(ident_)-1] = '\0';
openlog(ident_, LOG_PID, facility_);
openlog(ident_, LOG_PID, facility_);
}
int Log::sync()
{
if (buffer_.length())
{
if (priority_ == dbg)
std::cout << buffer_.c_str();
else
syslog(priority_, "%s", buffer_.c_str());
buffer_.erase();
priority_ = LOG_DEBUG; // default to debug for each message
}
return 0;
if (buffer_.length())
{
if (priority_ == dbg)
std::cout << buffer_.c_str();
else
syslog(priority_, "%s", buffer_.c_str());
buffer_.erase();
priority_ = LOG_DEBUG; // default to debug for each message
}
return 0;
}
int Log::overflow(int c)
{
if (c != EOF)
{
buffer_ += static_cast<char>(c);
}
else
{
sync();
}
return c;
if (c != EOF)
{
buffer_ += static_cast<char>(c);
}
else
{
sync();
}
return c;
}
std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority)
{
static_cast<Log *>(os.rdbuf())->priority_ = (int)log_priority;
if (log_priority == dbg)
os.flush();
return os;
static_cast<Log *>(os.rdbuf())->priority_ = (int)log_priority;
if (log_priority == dbg)
os.flush();
return os;
}

View file

@ -9,15 +9,15 @@
enum LogPriority
{
kLogEmerg = LOG_EMERG, // system is unusable
kLogAlert = LOG_ALERT, // action must be taken immediately
kLogCrit = LOG_CRIT, // critical conditions
kLogErr = LOG_ERR, // error conditions
kLogWarning = LOG_WARNING, // warning conditions
kLogNotice = LOG_NOTICE, // normal, but significant, condition
kLogInfo = LOG_INFO, // informational message
kLogDebug = LOG_DEBUG, // debug-level message
dbg
kLogEmerg = LOG_EMERG, // system is unusable
kLogAlert = LOG_ALERT, // action must be taken immediately
kLogCrit = LOG_CRIT, // critical conditions
kLogErr = LOG_ERR, // error conditions
kLogWarning = LOG_WARNING, // warning conditions
kLogNotice = LOG_NOTICE, // normal, but significant, condition
kLogInfo = LOG_INFO, // informational message
kLogDebug = LOG_DEBUG, // debug-level message
dbg
};
std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority);
@ -25,18 +25,18 @@ std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority);
class Log : public std::basic_streambuf<char, std::char_traits<char> >
{
public:
explicit Log(std::string ident, int facility);
explicit Log(std::string ident, int facility);
protected:
int sync();
int overflow(int c);
int sync();
int overflow(int c);
private:
friend std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority);
std::string buffer_;
int facility_;
int priority_;
char ident_[50];
friend std::ostream& operator<< (std::ostream& os, const LogPriority& log_priority);
std::string buffer_;
int facility_;
int priority_;
char ident_[50];
};

View file

@ -12,171 +12,171 @@ template<typename CharT, typename TraitsT = std::char_traits<CharT> >
class vectorwrapbuf : public std::basic_streambuf<CharT, TraitsT>
{
public:
vectorwrapbuf(std::vector<CharT> &vec)
{
this->setg(vec.data(), vec.data(), vec.data() + vec.size());
}
vectorwrapbuf(std::vector<CharT> &vec)
{
this->setg(vec.data(), vec.data(), vec.data() + vec.size());
}
};
struct membuf : public std::basic_streambuf<char>
{
membuf(char* begin, char* end)
{
this->setg(begin, begin, end);
}
membuf(char* begin, char* end)
{
this->setg(begin, begin, end);
}
};
enum message_type
{
base = 0,
header = 1,
payload = 2,
sampleformat = 3,
serversettings = 4,
timemsg = 5,
requestmsg = 6
base = 0,
header = 1,
payload = 2,
sampleformat = 3,
serversettings = 4,
timemsg = 5,
requestmsg = 6
};
struct tv
{
tv()
{
timeval t;
gettimeofday(&t, NULL);
sec = t.tv_sec;
usec = t.tv_usec;
}
tv(timeval tv) : sec(tv.tv_sec), usec(tv.tv_usec) {};
tv(int32_t _sec, int32_t _usec) : sec(_sec), usec(_usec) {};
tv()
{
timeval t;
gettimeofday(&t, NULL);
sec = t.tv_sec;
usec = t.tv_usec;
}
tv(timeval tv) : sec(tv.tv_sec), usec(tv.tv_usec) {};
tv(int32_t _sec, int32_t _usec) : sec(_sec), usec(_usec) {};
int32_t sec;
int32_t usec;
/*
5.3 - 6.2 = -0.9
-1
0.1
int32_t sec;
int32_t usec;
/*
5.3 - 6.2 = -0.9
-1
0.1
5.3 - 6.4 = -1.1
-1
-0.1
*/
5.3 - 6.4 = -1.1
-1
-0.1
*/
//(timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec)
tv operator-(const tv& other)
{
tv result(*this);
result.sec -= other.sec;
result.usec -= other.usec;
if (result.usec > 0)
{
result.sec += 1;
result.usec = 1000000 - result.usec;
}
else if (result.usec < 0)
{
result.usec *= -1;
}
/* else if (result.usec >= 1000000)
{
result.usec -= 1000000;
result.sec += 1;
}
*/ return result;
}
tv operator-(const tv& other)
{
tv result(*this);
result.sec -= other.sec;
result.usec -= other.usec;
if (result.usec > 0)
{
result.sec += 1;
result.usec = 1000000 - result.usec;
}
else if (result.usec < 0)
{
result.usec *= -1;
}
/* else if (result.usec >= 1000000)
{
result.usec -= 1000000;
result.sec += 1;
}
*/ return result;
}
};
struct BaseMessage
{
BaseMessage() : type(base), id(0), refersTo(0)
{
}
BaseMessage() : type(base), id(0), refersTo(0)
{
}
BaseMessage(message_type type_) : type(type_), id(0), refersTo(0)
{
}
BaseMessage(message_type type_) : type(type_), id(0), refersTo(0)
{
}
virtual ~BaseMessage()
{
}
virtual ~BaseMessage()
{
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char*>(&type), sizeof(uint16_t));
stream.read(reinterpret_cast<char*>(&id), sizeof(uint16_t));
stream.read(reinterpret_cast<char*>(&refersTo), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&sent.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&sent.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&received.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&received.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char*>(&size), sizeof(uint32_t));
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char*>(&type), sizeof(uint16_t));
stream.read(reinterpret_cast<char*>(&id), sizeof(uint16_t));
stream.read(reinterpret_cast<char*>(&refersTo), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&sent.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&sent.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&received.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&received.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char*>(&size), sizeof(uint32_t));
}
void deserialize(char* payload)
{
membuf databuf(payload, payload + BaseMessage::getSize());
std::istream is(&databuf);
read(is);
}
void deserialize(char* payload)
{
membuf databuf(payload, payload + BaseMessage::getSize());
std::istream is(&databuf);
read(is);
}
void deserialize(const BaseMessage& baseMessage, char* payload)
{
type = baseMessage.type;
id = baseMessage.id;
refersTo = baseMessage.refersTo;
sent = baseMessage.sent;
received = baseMessage.received;
size = baseMessage.size;
membuf databuf(payload, payload + size);
std::istream is(&databuf);
read(is);
}
void deserialize(const BaseMessage& baseMessage, char* payload)
{
type = baseMessage.type;
id = baseMessage.id;
refersTo = baseMessage.refersTo;
sent = baseMessage.sent;
received = baseMessage.received;
size = baseMessage.size;
membuf databuf(payload, payload + size);
std::istream is(&databuf);
read(is);
}
virtual void serialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char*>(&type), sizeof(uint16_t));
stream.write(reinterpret_cast<char*>(&id), sizeof(uint16_t));
stream.write(reinterpret_cast<char*>(&refersTo), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&sent.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&sent.usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&received.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&received.usec), sizeof(int32_t));
size = getSize();
stream.write(reinterpret_cast<char*>(&size), sizeof(uint32_t));
doserialize(stream);
}
virtual void serialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char*>(&type), sizeof(uint16_t));
stream.write(reinterpret_cast<char*>(&id), sizeof(uint16_t));
stream.write(reinterpret_cast<char*>(&refersTo), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&sent.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&sent.usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&received.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&received.usec), sizeof(int32_t));
size = getSize();
stream.write(reinterpret_cast<char*>(&size), sizeof(uint32_t));
doserialize(stream);
}
virtual uint32_t getSize()
{
return 3*sizeof(uint16_t) + 2*sizeof(tv) + sizeof(uint32_t);
};
virtual uint32_t getSize()
{
return 3*sizeof(uint16_t) + 2*sizeof(tv) + sizeof(uint32_t);
};
uint16_t type;
uint16_t id;
uint16_t refersTo;
tv sent;
tv received;
uint32_t size;
uint16_t type;
uint16_t id;
uint16_t refersTo;
tv sent;
tv received;
uint32_t size;
protected:
virtual void doserialize(std::ostream& stream)
{
};
virtual void doserialize(std::ostream& stream)
{
};
};
struct SerializedMessage
{
~SerializedMessage()
{
free(buffer);
}
~SerializedMessage()
{
free(buffer);
}
BaseMessage message;
char* buffer;
BaseMessage message;
char* buffer;
};

View file

@ -24,56 +24,56 @@ PcmChunk::~PcmChunk()
bool PcmChunk::isEndOfChunk() const
{
return idx >= getFrameCount();
return idx >= getFrameCount();
}
double PcmChunk::getFrameCount() const
{
return (payloadSize / format.frameSize);
return (payloadSize / format.frameSize);
}
double PcmChunk::getDuration() const
{
return getFrameCount() / format.msRate();
return getFrameCount() / format.msRate();
}
double PcmChunk::getTimeLeft() const
{
return (getFrameCount() - idx) / format.msRate();
return (getFrameCount() - idx) / format.msRate();
}
int PcmChunk::seek(int frames)
{
idx += frames;
if (idx > getFrameCount())
idx = getFrameCount();
if (idx < 0)
idx = 0;
return idx;
idx += frames;
if (idx > getFrameCount())
idx = getFrameCount();
if (idx < 0)
idx = 0;
return idx;
}
int PcmChunk::readFrames(void* outputBuffer, size_t frameCount)
{
//logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl;
int result = frameCount;
if (idx + frameCount > (payloadSize / format.frameSize))
result = (payloadSize / format.frameSize) - idx;
int result = frameCount;
if (idx + frameCount > (payloadSize / format.frameSize))
result = (payloadSize / format.frameSize) - idx;
//logd << ", from: " << format.frameSize*idx << ", to: " << format.frameSize*idx + format.frameSize*result;
if (outputBuffer != NULL)
memcpy((char*)outputBuffer, (char*)(payload) + format.frameSize*idx, format.frameSize*result);
if (outputBuffer != NULL)
memcpy((char*)outputBuffer, (char*)(payload) + format.frameSize*idx, format.frameSize*result);
idx += result;
idx += result;
//logd << ", new idx: " << idx << ", result: " << result << ", wireChunk->length: " << wireChunk->length << ", format.frameSize: " << format.frameSize << "\n";//std::endl;
return result;
return result;
}

View file

@ -13,57 +13,57 @@ typedef std::chrono::time_point<std::chrono::high_resolution_clock, std::chrono:
class PcmChunk : public WireChunk
{
public:
PcmChunk(const SampleFormat& sampleFormat, size_t ms);
PcmChunk();
~PcmChunk();
PcmChunk(const SampleFormat& sampleFormat, size_t ms);
PcmChunk();
~PcmChunk();
int readFrames(void* outputBuffer, size_t frameCount);
bool isEndOfChunk() const;
int readFrames(void* outputBuffer, size_t frameCount);
bool isEndOfChunk() const;
inline time_point_ms timePoint() const
{
time_point_ms tp;
std::chrono::milliseconds::rep relativeIdxTp = ((double)idx / ((double)format.rate/1000.));
return
tp +
std::chrono::seconds(timestamp.sec) +
std::chrono::milliseconds(timestamp.usec / 1000) +
std::chrono::milliseconds(relativeIdxTp);
}
inline time_point_ms timePoint() const
{
time_point_ms tp;
std::chrono::milliseconds::rep relativeIdxTp = ((double)idx / ((double)format.rate/1000.));
return
tp +
std::chrono::seconds(timestamp.sec) +
std::chrono::milliseconds(timestamp.usec / 1000) +
std::chrono::milliseconds(relativeIdxTp);
}
template<typename T>
inline T getAge() const
{
return getAge<T>(timePoint());
}
template<typename T>
inline T getAge() const
{
return getAge<T>(timePoint());
}
inline long getAge() const
{
return getAge<std::chrono::milliseconds>().count();
}
inline long getAge() const
{
return getAge<std::chrono::milliseconds>().count();
}
inline static long getAge(const time_point_ms& time_point)
{
return getAge<std::chrono::milliseconds>(time_point).count();
}
inline static long getAge(const time_point_ms& time_point)
{
return getAge<std::chrono::milliseconds>(time_point).count();
}
template<typename T, typename U>
static inline T getAge(const std::chrono::time_point<U>& time_point)
{
return std::chrono::duration_cast<T>(std::chrono::high_resolution_clock::now() - time_point);
}
template<typename T, typename U>
static inline T getAge(const std::chrono::time_point<U>& time_point)
{
return std::chrono::duration_cast<T>(std::chrono::high_resolution_clock::now() - time_point);
}
int seek(int frames);
double getDuration() const;
double getDurationUs() const;
double getTimeLeft() const;
double getFrameCount() const;
int seek(int frames);
double getDuration() const;
double getDurationUs() const;
double getTimeLeft() const;
double getFrameCount() const;
SampleFormat format;
SampleFormat format;
private:
// SampleFormat format_;
uint32_t idx;
uint32_t idx;
};

View file

@ -11,81 +11,81 @@ class Queue
{
public:
T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto val = queue_.front();
queue_.pop();
return val;
}
T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto val = queue_.front();
queue_.pop();
return val;
}
T front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
cond_.wait(mlock);
T front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
cond_.wait(mlock);
return queue_.front();
}
return queue_.front();
}
bool try_pop(T& item, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> mlock(mutex_);
bool try_pop(T& item, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> mlock(mutex_);
if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); }))
return false;
if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); }))
return false;
item = std::move(queue_.front());
queue_.pop();
item = std::move(queue_.front());
queue_.pop();
return true;
}
return true;
}
void pop(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
}
void pop(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
}
void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
void push(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(std::move(item));
mlock.unlock();
cond_.notify_one();
}
void push(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(std::move(item));
mlock.unlock();
cond_.notify_one();
}
size_t size() const
{
std::unique_lock<std::mutex> mlock(mutex_);
return queue_.size();
}
size_t size() const
{
std::unique_lock<std::mutex> mlock(mutex_);
return queue_.size();
}
Queue()=default;
Queue(const Queue&) = delete; // disable copying
Queue& operator=(const Queue&) = delete; // disable assignment
Queue()=default;
Queue(const Queue&) = delete; // disable copying
Queue& operator=(const Queue&) = delete; // disable assignment
private:
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cond_;
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cond_;
};

View file

@ -8,40 +8,40 @@
class RequestMsg : public BaseMessage
{
public:
RequestMsg() : BaseMessage(message_type::requestmsg)
{
}
RequestMsg() : BaseMessage(message_type::requestmsg)
{
}
RequestMsg(const std::string& _request) : BaseMessage(message_type::requestmsg), request(_request)
{
}
RequestMsg(const std::string& _request) : BaseMessage(message_type::requestmsg), request(_request)
{
}
virtual ~RequestMsg()
{
}
virtual ~RequestMsg()
{
}
virtual void read(std::istream& stream)
{
int16_t size;
stream.read(reinterpret_cast<char *>(&size), sizeof(int16_t));
request.resize(size);
stream.read(&request[0], size);
}
virtual void read(std::istream& stream)
{
int16_t size;
stream.read(reinterpret_cast<char *>(&size), sizeof(int16_t));
request.resize(size);
stream.read(&request[0], size);
}
virtual uint32_t getSize()
{
return sizeof(int16_t) + request.size();
}
virtual uint32_t getSize()
{
return sizeof(int16_t) + request.size();
}
std::string request;
std::string request;
protected:
virtual void doserialize(std::ostream& stream)
{
int16_t size(request.size());
stream.write(reinterpret_cast<char *>(&size), sizeof(int16_t));
stream.write(request.c_str(), size);
}
virtual void doserialize(std::ostream& stream)
{
int16_t size(request.size());
stream.write(reinterpret_cast<char *>(&size), sizeof(int16_t));
stream.write(request.c_str(), size);
}
};

View file

@ -12,37 +12,37 @@ SampleFormat::SampleFormat() : BaseMessage(message_type::sampleformat)
SampleFormat::SampleFormat(const std::string& format) : BaseMessage(message_type::sampleformat)
{
setFormat(format);
setFormat(format);
}
SampleFormat::SampleFormat(uint16_t sampleRate, uint16_t bitsPerSample, uint16_t channelCount) : BaseMessage(message_type::sampleformat)
{
setFormat(sampleRate, bitsPerSample, channelCount);
setFormat(sampleRate, bitsPerSample, channelCount);
}
void SampleFormat::setFormat(const std::string& format)
{
std::vector<std::string> strs;
boost::split(strs, format, boost::is_any_of(":"));
if (strs.size() == 3)
setFormat(
boost::lexical_cast<uint16_t>(strs[0]),
boost::lexical_cast<uint16_t>(strs[1]),
boost::lexical_cast<uint16_t>(strs[2]));
std::vector<std::string> strs;
boost::split(strs, format, boost::is_any_of(":"));
if (strs.size() == 3)
setFormat(
boost::lexical_cast<uint16_t>(strs[0]),
boost::lexical_cast<uint16_t>(strs[1]),
boost::lexical_cast<uint16_t>(strs[2]));
}
void SampleFormat::setFormat(uint16_t rate, uint16_t bits, uint16_t channels)
{
this->rate = rate;
this->bits = bits;
this->channels = channels;
sampleSize = bits / 8;
if (bits == 24)
sampleSize = 4;
frameSize = channels*sampleSize;
this->rate = rate;
this->bits = bits;
this->channels = channels;
sampleSize = bits / 8;
if (bits == 24)
sampleSize = 4;
frameSize = channels*sampleSize;
}

View file

@ -8,56 +8,56 @@
class SampleFormat : public BaseMessage
{
public:
SampleFormat();
SampleFormat(const std::string& format);
SampleFormat(uint16_t rate, uint16_t bits, uint16_t channels);
SampleFormat();
SampleFormat(const std::string& format);
SampleFormat(uint16_t rate, uint16_t bits, uint16_t channels);
void setFormat(const std::string& format);
void setFormat(uint16_t rate, uint16_t bits, uint16_t channels);
void setFormat(const std::string& format);
void setFormat(uint16_t rate, uint16_t bits, uint16_t channels);
uint16_t rate;
uint16_t bits;
uint16_t channels;
uint16_t rate;
uint16_t bits;
uint16_t channels;
uint16_t sampleSize;
uint16_t frameSize;
uint16_t sampleSize;
uint16_t frameSize;
float msRate() const
{
return (float)rate/1000.f;
}
float msRate() const
{
return (float)rate/1000.f;
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&rate), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&bits), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&channels), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&sampleSize), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&frameSize), sizeof(uint16_t));
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&rate), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&bits), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&channels), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&sampleSize), sizeof(uint16_t));
stream.read(reinterpret_cast<char *>(&frameSize), sizeof(uint16_t));
}
virtual uint32_t getSize()
{
return 5*sizeof(int16_t);
}
virtual uint32_t getSize()
{
return 5*sizeof(int16_t);
}
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&rate), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&bits), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&channels), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&sampleSize), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&frameSize), sizeof(uint16_t));
}
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&rate), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&bits), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&channels), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&sampleSize), sizeof(uint16_t));
stream.write(reinterpret_cast<char *>(&frameSize), sizeof(uint16_t));
}
/*private:
uint16_t rate_;
uint16_t bits_;
uint16_t channels_;
uint16_t bytes_;
uint16_t frameSize_;
*/
/*private:
uint16_t rate_;
uint16_t bits_;
uint16_t channels_;
uint16_t bytes_;
uint16_t frameSize_;
*/
};

View file

@ -8,31 +8,31 @@
class ServerSettings : public BaseMessage
{
public:
ServerSettings(size_t _port = 0) : BaseMessage(message_type::serversettings), port(_port)
{
}
ServerSettings(size_t _port = 0) : BaseMessage(message_type::serversettings), port(_port)
{
}
virtual ~ServerSettings()
{
}
virtual ~ServerSettings()
{
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&port), sizeof(int32_t));
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&port), sizeof(int32_t));
}
virtual uint32_t getSize()
{
return sizeof(int32_t);
}
virtual uint32_t getSize()
{
return sizeof(int32_t);
}
int32_t port;
int32_t port;
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&port), sizeof(int32_t));
}
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&port), sizeof(int32_t));
}
};

View file

@ -9,19 +9,19 @@ extern bool g_terminated;
void signal_handler(int sig)
{
switch(sig)
{
case SIGHUP:
syslog(LOG_WARNING, "Received SIGHUP signal.");
break;
case SIGTERM:
syslog(LOG_WARNING, "Received SIGTERM signal.");
g_terminated = true;
break;
default:
syslog(LOG_WARNING, "Unhandled signal ");
break;
}
switch(sig)
{
case SIGHUP:
syslog(LOG_WARNING, "Received SIGHUP signal.");
break;
case SIGTERM:
syslog(LOG_WARNING, "Received SIGTERM signal.");
g_terminated = true;
break;
default:
syslog(LOG_WARNING, "Unhandled signal ");
break;
}
}
#endif

View file

@ -23,27 +23,35 @@ SocketConnection::~SocketConnection()
void SocketConnection::socketRead(void* _to, size_t _bytes)
{
// std::unique_lock<std::mutex> mlock(mutex_);
size_t toRead = _bytes;
size_t len = 0;
do
{
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead));
toRead = _bytes - len;
}
while (toRead > 0);
size_t toRead = _bytes;
size_t len = 0;
do
{
// cout << "/";
// cout.flush();
boost::system::error_code error;
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error);
//cout << "len: " << len << ", error: " << error << endl;
toRead = _bytes - len;
// cout << "\\";
// cout.flush();
}
while (toRead > 0);
}
void SocketConnection::start()
{
receiverThread = new thread(&SocketConnection::worker, this);
receiverThread = new thread(&SocketConnection::worker, this);
}
void SocketConnection::stop()
{
active_ = false;
receiverThread->join();
active_ = false;
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
socket->close();
receiverThread->join();
}
@ -51,87 +59,87 @@ bool SocketConnection::send(BaseMessage* message)
{
// std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
if (!connected())
return false;
if (!connected())
return false;
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(*socket.get(), streambuf);
return true;
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(*socket.get(), streambuf);
return true;
}
shared_ptr<SerializedMessage> SocketConnection::sendRequest(BaseMessage* message, size_t timeout)
{
shared_ptr<SerializedMessage> response(NULL);
if (++reqId == 0)
++reqId;
message->id = reqId;
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId));
shared_ptr<SerializedMessage> response(NULL);
if (++reqId == 0)
++reqId;
message->id = reqId;
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId));
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.insert(pendingRequest);
}
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.insert(pendingRequest);
}
// std::mutex mtx;
std::unique_lock<std::mutex> lck(m);
send(message);
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{
response = pendingRequest->response;
}
else
{
cout << "timeout while waiting for response to: " << reqId << "\n";
}
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.erase(pendingRequest);
}
return response;
std::unique_lock<std::mutex> lck(m);
send(message);
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{
response = pendingRequest->response;
}
else
{
cout << "timeout while waiting for response to: " << reqId << "\n";
}
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.erase(pendingRequest);
}
return response;
}
void SocketConnection::getNextMessage()
{
//cout << "getNextMessage\n";
BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize);
socketRead(&buffer[0], baseMsgSize);
baseMessage.deserialize(&buffer[0]);
BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize);
socketRead(&buffer[0], baseMsgSize);
baseMessage.deserialize(&buffer[0]);
//cout << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << "\n";
if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size);
socketRead(&buffer[0], baseMessage.size);
tv t;
baseMessage.received = t;
if (baseMessage.size > buffer.size())
buffer.resize(baseMessage.size);
socketRead(&buffer[0], baseMessage.size);
tv t;
baseMessage.received = t;
{
std::unique_lock<std::mutex> mlock(mutex_);
for (auto req: pendingRequests)
{
if (req->id == baseMessage.refersTo)
{
{
std::unique_lock<std::mutex> mlock(mutex_);
for (auto req: pendingRequests)
{
if (req->id == baseMessage.refersTo)
{
//cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
//long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec);
//cout << "latency: " << latency << "\n";
req->response.reset(new SerializedMessage());
req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size);
std::unique_lock<std::mutex> lck(m);
req->cv.notify_one();
return;
}
}
}
req->response.reset(new SerializedMessage());
req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size);
std::unique_lock<std::mutex> lck(m);
req->cv.notify_one();
return;
}
}
}
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]);
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]);
}

View file

@ -20,71 +20,71 @@ class SocketConnection;
struct PendingRequest
{
PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {};
PendingRequest(uint16_t reqId) : id(reqId), response(NULL) {};
uint16_t id;
std::shared_ptr<SerializedMessage> response;
std::condition_variable cv;
uint16_t id;
std::shared_ptr<SerializedMessage> response;
std::condition_variable cv;
};
class MessageReceiver
{
public:
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0;
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0;
};
class SocketConnection
{
public:
SocketConnection(MessageReceiver* _receiver);
virtual ~SocketConnection();
virtual void start();
virtual void stop();
virtual bool send(BaseMessage* _message);
virtual std::shared_ptr<SerializedMessage> sendRequest(BaseMessage* message, size_t timeout);
SocketConnection(MessageReceiver* _receiver);
virtual ~SocketConnection();
virtual void start();
virtual void stop();
virtual bool send(BaseMessage* _message);
virtual std::shared_ptr<SerializedMessage> sendRequest(BaseMessage* message, size_t timeout);
template <typename T>
std::shared_ptr<T> sendReq(BaseMessage* message, size_t timeout)
{
std::shared_ptr<SerializedMessage> reply = sendRequest(message, timeout);
if (!reply)
return NULL;
std::shared_ptr<T> msg(new T);
msg->deserialize(reply->message, reply->buffer);
return msg;
}
template <typename T>
std::shared_ptr<T> sendReq(BaseMessage* message, size_t timeout)
{
std::shared_ptr<SerializedMessage> reply = sendRequest(message, timeout);
if (!reply)
return NULL;
std::shared_ptr<T> msg(new T);
msg->deserialize(reply->message, reply->buffer);
return msg;
}
virtual bool active()
{
return active_;
}
virtual bool active()
{
return active_;
}
virtual bool connected()
{
return (socket != 0);
virtual bool connected()
{
return (socket != 0);
// return (connected_ && socket);
}
}
protected:
virtual void worker() = 0;
virtual void worker() = 0;
void socketRead(void* _to, size_t _bytes);
std::shared_ptr<tcp::socket> socket;
void socketRead(void* _to, size_t _bytes);
std::shared_ptr<tcp::socket> socket;
// boost::asio::ip::tcp::endpoint endpt;
std::atomic<bool> active_;
std::atomic<bool> connected_;
MessageReceiver* messageReceiver;
void getNextMessage();
boost::asio::io_service io_service;
tcp::resolver::iterator iterator;
std::thread* receiverThread;
mutable std::mutex mutex_;
std::mutex m;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
uint16_t reqId;
std::atomic<bool> active_;
std::atomic<bool> connected_;
MessageReceiver* messageReceiver;
void getNextMessage();
boost::asio::io_service io_service;
tcp::resolver::iterator iterator;
std::thread* receiverThread;
mutable std::mutex mutex_;
std::mutex m;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
uint16_t reqId;
};

View file

@ -7,31 +7,31 @@
class TimeMsg : public BaseMessage
{
public:
TimeMsg() : BaseMessage(message_type::timemsg)
{
}
TimeMsg() : BaseMessage(message_type::timemsg)
{
}
virtual ~TimeMsg()
{
}
virtual ~TimeMsg()
{
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&latency), sizeof(double));
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&latency), sizeof(double));
}
virtual uint32_t getSize()
{
return sizeof(double);
}
virtual uint32_t getSize()
{
return sizeof(double);
}
double latency;
double latency;
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&latency), sizeof(double));
}
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&latency), sizeof(double));
}
};

View file

@ -48,32 +48,32 @@ inline static long getAge(const time_point_ms& time_point)
static void addMs(timeval& tv, int ms)
{
if (ms < 0)
{
timeval t;
t.tv_sec = -ms / 1000;
t.tv_usec = (-ms % 1000) * 1000;
timersub(&tv, &t, &tv);
return;
}
tv.tv_usec += ms*1000;
tv.tv_sec += (tv.tv_usec / 1000000);
tv.tv_usec %= 1000000;
if (ms < 0)
{
timeval t;
t.tv_sec = -ms / 1000;
t.tv_usec = (-ms % 1000) * 1000;
timersub(&tv, &t, &tv);
return;
}
tv.tv_usec += ms*1000;
tv.tv_sec += (tv.tv_usec / 1000000);
tv.tv_usec %= 1000000;
}
static void addUs(timeval& tv, int us)
{
if (us < 0)
{
timeval t;
t.tv_sec = -us / 1000000;
t.tv_usec = (-us % 1000000);
timersub(&tv, &t, &tv);
return;
}
tv.tv_usec += us;
tv.tv_sec += (tv.tv_usec / 1000000);
tv.tv_usec %= 1000000;
if (us < 0)
{
timeval t;
t.tv_sec = -us / 1000000;
t.tv_usec = (-us % 1000000);
timersub(&tv, &t, &tv);
return;
}
tv.tv_usec += us;
tv.tv_sec += (tv.tv_usec / 1000000);
tv.tv_usec %= 1000000;
}
@ -86,17 +86,17 @@ static void addUs(timeval& tv, int us)
static long getTickCount()
{
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
return now.tv_sec*1000 + now.tv_nsec / 1000000;
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
return now.tv_sec*1000 + now.tv_nsec / 1000000;
}
static long getuTickCount()
{
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
return now.tv_sec*1000000 + now.tv_nsec / 1000;
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
return now.tv_sec*1000000 + now.tv_nsec / 1000;
}

View file

@ -15,81 +15,81 @@
// trim from start
static inline std::string &ltrim(std::string &s)
{
s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun<int, int>(std::isspace))));
return s;
s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::ptr_fun<int, int>(std::isspace))));
return s;
}
// trim from end
static inline std::string &rtrim(std::string &s)
{
s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun<int, int>(std::isspace))).base(), s.end());
return s;
s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::ptr_fun<int, int>(std::isspace))).base(), s.end());
return s;
}
// trim from both ends
static inline std::string &trim(std::string &s)
{
return ltrim(rtrim(s));
return ltrim(rtrim(s));
}
static std::string getMacAddress()
{
std::ifstream t("/sys/class/net/eth0/address");
std::string str((std::istreambuf_iterator<char>(t)),
std::istreambuf_iterator<char>());
return trim(str);
std::ifstream t("/sys/class/net/eth0/address");
std::string str((std::istreambuf_iterator<char>(t)),
std::istreambuf_iterator<char>());
return trim(str);
}
std::vector<std::string> split(const std::string& str)
{
std::istringstream iss(str);
std::vector<std::string> splitStr;
std::copy(std::istream_iterator<std::string>(iss), std::istream_iterator<std::string>(), std::back_inserter<std::vector<std::string> >(splitStr));
return splitStr;
std::istringstream iss(str);
std::vector<std::string> splitStr;
std::copy(std::istream_iterator<std::string>(iss), std::istream_iterator<std::string>(), std::back_inserter<std::vector<std::string> >(splitStr));
return splitStr;
}
static void daemonize()
{
/* Our process ID and Session ID */
pid_t pid, sid;
/* Our process ID and Session ID */
pid_t pid, sid;
/* Fork off the parent process */
pid = fork();
if (pid < 0)
exit(EXIT_FAILURE);
/* Fork off the parent process */
pid = fork();
if (pid < 0)
exit(EXIT_FAILURE);
/* If we got a good PID, then
we can exit the parent process. */
if (pid > 0)
exit(EXIT_SUCCESS);
/* If we got a good PID, then
we can exit the parent process. */
if (pid > 0)
exit(EXIT_SUCCESS);
/* Change the file mode mask */
umask(0);
/* Change the file mode mask */
umask(0);
/* Open any logs here */
/* Open any logs here */
/* Create a new SID for the child process */
sid = setsid();
if (sid < 0)
{
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Create a new SID for the child process */
sid = setsid();
if (sid < 0)
{
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Change the current working directory */
if ((chdir("/")) < 0)
{
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Change the current working directory */
if ((chdir("/")) < 0)
{
/* Log the failure */
exit(EXIT_FAILURE);
}
/* Close out the standard file descriptors */
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
/* Close out the standard file descriptors */
close(STDIN_FILENO);
close(STDOUT_FILENO);
close(STDERR_FILENO);
}

View file

@ -13,42 +13,42 @@
class WireChunk : public BaseMessage
{
public:
WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size)
{
payload = (char*)malloc(size);
}
WireChunk(size_t size = 0) : BaseMessage(message_type::payload), payloadSize(size)
{
payload = (char*)malloc(size);
}
virtual ~WireChunk()
{
free(payload);
}
virtual ~WireChunk()
{
free(payload);
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&timestamp.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&timestamp.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
}
virtual void read(std::istream& stream)
{
stream.read(reinterpret_cast<char *>(&timestamp.sec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&timestamp.usec), sizeof(int32_t));
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize);
}
virtual uint32_t getSize()
{
return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize;
}
virtual uint32_t getSize()
{
return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize;
}
tv timestamp;
uint32_t payloadSize;
char* payload;
tv timestamp;
uint32_t payloadSize;
char* payload;
protected:
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&timestamp.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&timestamp.usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize);
}
virtual void doserialize(std::ostream& stream)
{
stream.write(reinterpret_cast<char *>(&timestamp.sec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&timestamp.usec), sizeof(int32_t));
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize);
}
};

View file

@ -12,63 +12,63 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL
void ControlServer::onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer)
{
// cout << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::requestmsg)
{
RequestMsg requestMsg;
requestMsg.deserialize(baseMessage, buffer);
cout << "request: " << requestMsg.request << "\n";
if (requestMsg.request == "time")
{
if (baseMessage.type == message_type::requestmsg)
{
RequestMsg requestMsg;
requestMsg.deserialize(baseMessage, buffer);
cout << "request: " << requestMsg.request << "\n";
if (requestMsg.request == "time")
{
// timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec);
TimeMsg timeMsg;
timeMsg.refersTo = requestMsg.id;
timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.;
TimeMsg timeMsg;
timeMsg.refersTo = requestMsg.id;
timeMsg.latency = (requestMsg.received.sec - requestMsg.sent.sec) + (requestMsg.received.usec - requestMsg.sent.usec) / 1000000.;
// tv diff = timeMsg.received - timeMsg.sent;
// cout << "Latency: " << diff.sec << "." << diff.usec << "\n";
connection->send(&timeMsg);
}
else if (requestMsg.request == "serverSettings")
{
serverSettings->refersTo = requestMsg.id;
connection->send(serverSettings);
}
else if (requestMsg.request == "sampleFormat")
{
sampleFormat->refersTo = requestMsg.id;
connection->send(sampleFormat);
}
else if (requestMsg.request == "headerChunk")
{
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
}
}
connection->send(&timeMsg);
}
else if (requestMsg.request == "serverSettings")
{
serverSettings->refersTo = requestMsg.id;
connection->send(serverSettings);
}
else if (requestMsg.request == "sampleFormat")
{
sampleFormat->refersTo = requestMsg.id;
connection->send(sampleFormat);
}
else if (requestMsg.request == "headerChunk")
{
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
}
}
}
void ControlServer::acceptor()
{
tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service_));
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
a.accept(*sock);
cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n";
ServerConnection* session = new ServerConnection(this, sock);
sessions.insert(shared_ptr<ServerConnection>(session));
session->start();
}
tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service_));
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
a.accept(*sock);
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n";
ServerConnection* session = new ServerConnection(this, sock);
sessions.insert(shared_ptr<ServerConnection>(session));
session->start();
}
}
void ControlServer::start()
{
acceptThread = new thread(&ControlServer::acceptor, this);
acceptThread = new thread(&ControlServer::acceptor, this);
}
@ -80,23 +80,23 @@ void ControlServer::stop()
void ControlServer::setHeader(HeaderMessage* header)
{
if (header)
headerChunk = header;
if (header)
headerChunk = header;
}
void ControlServer::setFormat(SampleFormat* format)
{
if (format)
sampleFormat = format;
if (format)
sampleFormat = format;
}
void ControlServer::setServerSettings(ServerSettings* settings)
{
if (settings)
serverSettings = settings;
if (settings)
serverSettings = settings;
}

View file

@ -25,24 +25,24 @@ using namespace std;
class ControlServer : public MessageReceiver
{
public:
ControlServer(unsigned short port);
ControlServer(unsigned short port);
void start();
void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
void setHeader(HeaderMessage* header);
void setFormat(SampleFormat* format);
void setServerSettings(ServerSettings* settings);
void start();
void stop();
virtual void onMessageReceived(SocketConnection* connection, const BaseMessage& baseMessage, char* buffer);
void setHeader(HeaderMessage* header);
void setFormat(SampleFormat* format);
void setServerSettings(ServerSettings* settings);
private:
void acceptor();
set<shared_ptr<ServerConnection>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
HeaderMessage* headerChunk;
SampleFormat* sampleFormat;
ServerSettings* serverSettings;
thread* acceptThread;
void acceptor();
set<shared_ptr<ServerConnection>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
HeaderMessage* headerChunk;
SampleFormat* sampleFormat;
ServerSettings* serverSettings;
thread* acceptThread;
};

View file

@ -8,18 +8,18 @@
class Encoder
{
public:
Encoder(const SampleFormat& format) : sampleFormat(format)
{
}
Encoder(const SampleFormat& format) : sampleFormat(format)
{
}
virtual double encode(PcmChunk* chunk) = 0;
virtual HeaderMessage* getHeader()
{
return NULL;
}
virtual double encode(PcmChunk* chunk) = 0;
virtual HeaderMessage* getHeader()
{
return NULL;
}
protected:
SampleFormat sampleFormat;
SampleFormat sampleFormat;
};

View file

@ -7,183 +7,183 @@ using namespace std;
OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format), headerChunk(NULL)
{
init();
lastGranulepos = -1;
init();
lastGranulepos = -1;
}
HeaderMessage* OggEncoder::getHeader()
{
return headerChunk;
return headerChunk;
}
double OggEncoder::encode(PcmChunk* chunk)
{
double res = 0;
if (tv_sec == 0)
{
tv_sec = chunk->timestamp.sec;
tv_usec = chunk->timestamp.usec;
}
double res = 0;
if (tv_sec == 0)
{
tv_sec = chunk->timestamp.sec;
tv_usec = chunk->timestamp.usec;
}
//cout << "-> pcm: " << wireChunk->length << endl;
int bytes = chunk->payloadSize / 4;
float **buffer=vorbis_analysis_buffer(&vd, bytes);
int bytes = chunk->payloadSize / 4;
float **buffer=vorbis_analysis_buffer(&vd, bytes);
/* uninterleave samples */
for(int i=0; i<bytes; i++)
{
buffer[0][i]=((chunk->payload[i*4+1]<<8)|
(0x00ff&(int)chunk->payload[i*4]))/32768.f;
buffer[1][i]=((chunk->payload[i*4+3]<<8)|
(0x00ff&(int)chunk->payload[i*4+2]))/32768.f;
}
/* uninterleave samples */
for(int i=0; i<bytes; i++)
{
buffer[0][i]=((chunk->payload[i*4+1]<<8)|
(0x00ff&(int)chunk->payload[i*4]))/32768.f;
buffer[1][i]=((chunk->payload[i*4+3]<<8)|
(0x00ff&(int)chunk->payload[i*4+2]))/32768.f;
}
/* tell the library how much we actually submitted */
vorbis_analysis_wrote(&vd, bytes);
/* tell the library how much we actually submitted */
vorbis_analysis_wrote(&vd, bytes);
/* vorbis does some data preanalysis, then divvies up blocks for
more involved (potentially parallel) processing. Get a single
block for encoding now */
size_t pos = 0;
while(vorbis_analysis_blockout(&vd,&vb)==1)
{
/* analysis, assume we want to use bitrate management */
vorbis_analysis(&vb,NULL);
vorbis_bitrate_addblock(&vb);
/* vorbis does some data preanalysis, then divvies up blocks for
more involved (potentially parallel) processing. Get a single
block for encoding now */
size_t pos = 0;
while(vorbis_analysis_blockout(&vd,&vb)==1)
{
/* analysis, assume we want to use bitrate management */
vorbis_analysis(&vb,NULL);
vorbis_bitrate_addblock(&vb);
while(vorbis_bitrate_flushpacket(&vd,&op))
{
/* weld the packet into the bitstream */
ogg_stream_packetin(&os,&op);
while(vorbis_bitrate_flushpacket(&vd,&op))
{
/* weld the packet into the bitstream */
ogg_stream_packetin(&os,&op);
/* write out pages (if any) */
while(true)
{
/* write out pages (if any) */
while(true)
{
// int result = ogg_stream_pageout(&os,&og);
int result = ogg_stream_flush(&os,&og);
if (result == 0)
break;
res = true;
int result = ogg_stream_flush(&os,&og);
if (result == 0)
break;
res = true;
size_t nextLen = pos + og.header_len + og.body_len;
if (chunk->payloadSize < nextLen)
chunk->payload = (char*)realloc(chunk->payload, nextLen);
size_t nextLen = pos + og.header_len + og.body_len;
if (chunk->payloadSize < nextLen)
chunk->payload = (char*)realloc(chunk->payload, nextLen);
memcpy(chunk->payload + pos, og.header, og.header_len);
pos += og.header_len;
memcpy(chunk->payload + pos, og.body, og.body_len);
pos += og.body_len;
}
}
}
if (res)
{
if (lastGranulepos == -1)
res = os.granulepos;
else
res = os.granulepos - lastGranulepos;
res /= 48.;
lastGranulepos = os.granulepos;
chunk->payload = (char*)realloc(chunk->payload, pos);
chunk->payloadSize = pos;
tv_sec = 0;
tv_usec = 0;
}
return res;
memcpy(chunk->payload + pos, og.header, og.header_len);
pos += og.header_len;
memcpy(chunk->payload + pos, og.body, og.body_len);
pos += og.body_len;
}
}
}
if (res)
{
if (lastGranulepos == -1)
res = os.granulepos;
else
res = os.granulepos - lastGranulepos;
res /= 48.;
lastGranulepos = os.granulepos;
chunk->payload = (char*)realloc(chunk->payload, pos);
chunk->payloadSize = pos;
tv_sec = 0;
tv_usec = 0;
}
return res;
}
void OggEncoder::init()
{
/********** Encode setup ************/
tv_sec = 0;
tv_usec = 0;
/********** Encode setup ************/
tv_sec = 0;
tv_usec = 0;
vorbis_info_init(&vi);
vorbis_info_init(&vi);
/* choose an encoding mode. A few possibilities commented out, one
actually used: */
/* choose an encoding mode. A few possibilities commented out, one
actually used: */
/*********************************************************************
Encoding using a VBR quality mode. The usable range is -.1
(lowest quality, smallest file) to 1. (highest quality, largest file).
Example quality mode .4: 44kHz stereo coupled, roughly 128kbps VBR
/*********************************************************************
Encoding using a VBR quality mode. The usable range is -.1
(lowest quality, smallest file) to 1. (highest quality, largest file).
Example quality mode .4: 44kHz stereo coupled, roughly 128kbps VBR
ret = vorbis_encode_init_vbr(&vi,2,44100,.4);
ret = vorbis_encode_init_vbr(&vi,2,44100,.4);
---------------------------------------------------------------------
---------------------------------------------------------------------
Encoding using an average bitrate mode (ABR).
example: 44kHz stereo coupled, average 128kbps VBR
Encoding using an average bitrate mode (ABR).
example: 44kHz stereo coupled, average 128kbps VBR
ret = vorbis_encode_init(&vi,2,44100,-1,128000,-1);
ret = vorbis_encode_init(&vi,2,44100,-1,128000,-1);
---------------------------------------------------------------------
---------------------------------------------------------------------
Encode using a quality mode, but select that quality mode by asking for
an approximate bitrate. This is not ABR, it is true VBR, but selected
using the bitrate interface, and then turning bitrate management off:
Encode using a quality mode, but select that quality mode by asking for
an approximate bitrate. This is not ABR, it is true VBR, but selected
using the bitrate interface, and then turning bitrate management off:
ret = ( vorbis_encode_setup_managed(&vi,2,44100,-1,128000,-1) ||
vorbis_encode_ctl(&vi,OV_ECTL_RATEMANAGE2_SET,NULL) ||
vorbis_encode_setup_init(&vi));
ret = ( vorbis_encode_setup_managed(&vi,2,44100,-1,128000,-1) ||
vorbis_encode_ctl(&vi,OV_ECTL_RATEMANAGE2_SET,NULL) ||
vorbis_encode_setup_init(&vi));
*********************************************************************/
*********************************************************************/
ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 0.7);
ret=vorbis_encode_init_vbr(&vi, sampleFormat.channels, sampleFormat.rate, 0.7);
/* do not continue if setup failed; this can happen if we ask for a
mode that libVorbis does not support (eg, too low a bitrate, etc,
will return 'OV_EIMPL') */
/* do not continue if setup failed; this can happen if we ask for a
mode that libVorbis does not support (eg, too low a bitrate, etc,
will return 'OV_EIMPL') */
if(ret)exit(1);
if(ret)exit(1);
/* add a comment */
vorbis_comment_init(&vc);
vorbis_comment_add_tag(&vc,"ENCODER","snapstream");
/* add a comment */
vorbis_comment_init(&vc);
vorbis_comment_add_tag(&vc,"ENCODER","snapstream");
/* set up the analysis state and auxiliary encoding storage */
vorbis_analysis_init(&vd,&vi);
vorbis_block_init(&vd,&vb);
/* set up the analysis state and auxiliary encoding storage */
vorbis_analysis_init(&vd,&vi);
vorbis_block_init(&vd,&vb);
/* set up our packet->stream encoder */
/* pick a random serial number; that way we can more likely build
chained streams just by concatenation */
srand(time(NULL));
ogg_stream_init(&os,rand());
/* set up our packet->stream encoder */
/* pick a random serial number; that way we can more likely build
chained streams just by concatenation */
srand(time(NULL));
ogg_stream_init(&os,rand());
/* Vorbis streams begin with three headers; the initial header (with
most of the codec setup parameters) which is mandated by the Ogg
bitstream spec. The second header holds any comment fields. The
third header holds the bitstream codebook. We merely need to
make the headers, then pass them to libvorbis one at a time;
libvorbis handles the additional Ogg bitstream constraints */
/* Vorbis streams begin with three headers; the initial header (with
most of the codec setup parameters) which is mandated by the Ogg
bitstream spec. The second header holds any comment fields. The
third header holds the bitstream codebook. We merely need to
make the headers, then pass them to libvorbis one at a time;
libvorbis handles the additional Ogg bitstream constraints */
vorbis_analysis_headerout(&vd,&vc,&header,&header_comm,&header_code);
ogg_stream_packetin(&os,&header);
ogg_stream_packetin(&os,&header_comm);
ogg_stream_packetin(&os,&header_code);
vorbis_analysis_headerout(&vd,&vc,&header,&header_comm,&header_code);
ogg_stream_packetin(&os,&header);
ogg_stream_packetin(&os,&header_comm);
ogg_stream_packetin(&os,&header_code);
/* This ensures the actual
* audio data will start on a new page, as per spec
*/
/* This ensures the actual
* audio data will start on a new page, as per spec
*/
// while(!eos){
size_t pos(0);
headerChunk = new HeaderMessage();
while (true)
{
int result=ogg_stream_flush(&os,&og);
if (result == 0)
break;
headerChunk->payloadSize += og.header_len + og.body_len;
headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->payloadSize);
cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n";
memcpy(headerChunk->payload + pos, og.header, og.header_len);
pos += og.header_len;
memcpy(headerChunk->payload + pos, og.body, og.body_len);
pos += og.body_len;
}
size_t pos(0);
headerChunk = new HeaderMessage();
while (true)
{
int result=ogg_stream_flush(&os,&og);
if (result == 0)
break;
headerChunk->payloadSize += og.header_len + og.body_len;
headerChunk->payload = (char*)realloc(headerChunk->payload, headerChunk->payloadSize);
cout << "HeadLen: " << og.header_len << ", bodyLen: " << og.body_len << ", result: " << result << "\n";
memcpy(headerChunk->payload + pos, og.header, og.header_len);
pos += og.header_len;
memcpy(headerChunk->payload + pos, og.body, og.body_len);
pos += og.body_len;
}
// fwrite(og.header,1,og.header_len,stdout);

View file

@ -7,37 +7,37 @@
class OggEncoder : public Encoder
{
public:
OggEncoder(const SampleFormat& format);
virtual double encode(PcmChunk* chunk);
virtual HeaderMessage* getHeader();
OggEncoder(const SampleFormat& format);
virtual double encode(PcmChunk* chunk);
virtual HeaderMessage* getHeader();
private:
void init();
void init();
ogg_stream_state os; /* take physical pages, weld into a logical
ogg_stream_state os; /* take physical pages, weld into a logical
stream of packets */
ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */
ogg_packet op; /* one raw packet of data for decode */
ogg_page og; /* one Ogg bitstream page. Vorbis packets are inside */
ogg_packet op; /* one raw packet of data for decode */
vorbis_info vi; /* struct that stores all the static vorbis bitstream
vorbis_info vi; /* struct that stores all the static vorbis bitstream
settings */
vorbis_comment vc; /* struct that stores all the user comments */
vorbis_comment vc; /* struct that stores all the user comments */
vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */
vorbis_block vb; /* local working space for packet->PCM decode */
vorbis_dsp_state vd; /* central working state for the packet->PCM decoder */
vorbis_block vb; /* local working space for packet->PCM decode */
ogg_packet header;
ogg_packet header_comm;
ogg_packet header_code;
ogg_packet header;
ogg_packet header_comm;
ogg_packet header_code;
ogg_int64_t lastGranulepos;
HeaderMessage* headerChunk;
ogg_int64_t lastGranulepos;
HeaderMessage* headerChunk;
int eos=0,ret;
int i, founddata;
int eos=0,ret;
int i, founddata;
int32_t tv_sec;
int32_t tv_usec;
int32_t tv_sec;
int32_t tv_usec;
};

View file

@ -7,11 +7,11 @@ PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format)
double PcmEncoder::encode(PcmChunk* chunk)
{
/* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n)
wireChunk->payload[n] *= 1;
*/
return chunk->getDuration();
/* WireChunk* wireChunk = chunk->wireChunk;
for (size_t n=0; n<wireChunk->length; ++n)
wireChunk->payload[n] *= 1;
*/
return chunk->getDuration();
}

View file

@ -6,8 +6,8 @@
class PcmEncoder : public Encoder
{
public:
PcmEncoder(const SampleFormat& format);
virtual double encode(PcmChunk* chunk);
PcmEncoder(const SampleFormat& format);
virtual double encode(PcmChunk* chunk);
};

View file

@ -11,25 +11,25 @@ using namespace std;
ServerConnection::ServerConnection(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket) : SocketConnection(_receiver)
{
socket = _socket;
socket = _socket;
}
void ServerConnection::worker()
{
active_ = true;
try
{
while (active_)
{
getNextMessage();
}
}
catch (const std::exception& e)
{
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
}
active_ = false;
active_ = true;
try
{
while (active_)
{
getNextMessage();
}
}
catch (const std::exception& e)
{
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
}
active_ = false;
}

View file

@ -12,10 +12,10 @@ using boost::asio::ip::tcp;
class ServerConnection : public SocketConnection
{
public:
ServerConnection(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket);
ServerConnection(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket);
protected:
virtual void worker();
virtual void worker();
};

View file

@ -22,133 +22,133 @@ using namespace std;
int main(int argc, char* argv[])
{
try
{
string sampleFormat;
try
{
string sampleFormat;
size_t port;
string fifoName;
string codec;
bool runAsDaemon;
size_t port;
string fifoName;
string codec;
bool runAsDaemon;
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("port,p", po::value<size_t>(&port)->default_value(98765), "port to listen on")
("sampleformat,s", po::value<string>(&sampleFormat)->default_value("48000:16:2"), "sample format")
("codec,c", po::value<string>(&codec)->default_value("ogg"), "transport codec [ogg|pcm]")
("fifo,f", po::value<string>(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
;
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help message")
("port,p", po::value<size_t>(&port)->default_value(98765), "port to listen on")
("sampleformat,s", po::value<string>(&sampleFormat)->default_value("48000:16:2"), "sample format")
("codec,c", po::value<string>(&codec)->default_value("ogg"), "transport codec [ogg|pcm]")
("fifo,f", po::value<string>(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
;
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
if (vm.count("help"))
{
cout << desc << "\n";
return 1;
}
if (vm.count("help"))
{
cout << desc << "\n";
return 1;
}
if (runAsDaemon)
{
daemonize();
syslog (LOG_NOTICE, "First daemon started.");
}
if (runAsDaemon)
{
daemonize();
syslog (LOG_NOTICE, "First daemon started.");
}
openlog ("firstdaemon", LOG_PID, LOG_DAEMON);
openlog ("firstdaemon", LOG_PID, LOG_DAEMON);
using namespace std; // For atoi.
using namespace std; // For atoi.
timeval tvChunk;
gettimeofday(&tvChunk, NULL);
long nextTick = getTickCount();
timeval tvChunk;
gettimeofday(&tvChunk, NULL);
long nextTick = getTickCount();
mkfifo(fifoName.c_str(), 0777);
SampleFormat format(sampleFormat);
size_t duration = 50;
mkfifo(fifoName.c_str(), 0777);
SampleFormat format(sampleFormat);
size_t duration = 50;
//size_t chunkSize = duration*format.rate*format.frameSize / 1000;
std::auto_ptr<Encoder> encoder;
if (codec == "ogg")
encoder.reset(new OggEncoder(sampleFormat));
else if (codec == "pcm")
encoder.reset(new PcmEncoder(sampleFormat));
else
{
cout << "unknown codec: " << codec << "\n";
return 1;
}
std::auto_ptr<Encoder> encoder;
if (codec == "ogg")
encoder.reset(new OggEncoder(sampleFormat));
else if (codec == "pcm")
encoder.reset(new PcmEncoder(sampleFormat));
else
{
cout << "unknown codec: " << codec << "\n";
return 1;
}
std::auto_ptr<ServerSettings> serverSettings(new ServerSettings(port + 1));
ControlServer* controlServer = new ControlServer(port);
controlServer->setServerSettings(serverSettings.get());
controlServer->setFormat(&format);
controlServer->setHeader(encoder->getHeader());
controlServer->start();
std::auto_ptr<ServerSettings> serverSettings(new ServerSettings(port + 1));
ControlServer* controlServer = new ControlServer(port);
controlServer->setServerSettings(serverSettings.get());
controlServer->setFormat(&format);
controlServer->setHeader(encoder->getHeader());
controlServer->start();
StreamServer* server = new StreamServer(port + 1);
server->start();
StreamServer* server = new StreamServer(port + 1);
server->start();
while (!g_terminated)
{
int fd = open(fifoName.c_str(), O_RDONLY);
try
{
shared_ptr<PcmChunk> chunk;//(new WireChunk());
while (true)//cin.good())
{
chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE));
int toRead = chunk->payloadSize;
int len = 0;
do
{
int count = read(fd, chunk->payload + len, toRead - len);
if (count <= 0)
throw ServerException("count = " + boost::lexical_cast<string>(count));
while (!g_terminated)
{
int fd = open(fifoName.c_str(), O_RDONLY);
try
{
shared_ptr<PcmChunk> chunk;//(new WireChunk());
while (true)//cin.good())
{
chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE));
int toRead = chunk->payloadSize;
int len = 0;
do
{
int count = read(fd, chunk->payload + len, toRead - len);
if (count <= 0)
throw ServerException("count = " + boost::lexical_cast<string>(count));
len += count;
}
while (len < toRead);
len += count;
}
while (len < toRead);
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
double chunkDuration = encoder->encode(chunk.get());
if (chunkDuration > 0)
server->send(chunk);
chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec;
double chunkDuration = encoder->encode(chunk.get());
if (chunkDuration > 0)
server->send(chunk);
//cout << chunk->tv_sec << ", " << chunk->tv_usec / 1000 << "\n";
// addUs(tvChunk, 1000*chunk->getDuration());
addUs(tvChunk, chunkDuration * 1000);
nextTick += duration;
long currentTick = getTickCount();
if (nextTick > currentTick)
{
usleep((nextTick - currentTick) * 1000);
}
else
{
gettimeofday(&tvChunk, NULL);
nextTick = getTickCount();
}
}
}
catch(const std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
close(fd);
}
addUs(tvChunk, chunkDuration * 1000);
nextTick += duration;
long currentTick = getTickCount();
if (nextTick > currentTick)
{
usleep((nextTick - currentTick) * 1000);
}
else
{
gettimeofday(&tvChunk, NULL);
nextTick = getTickCount();
}
}
}
catch(const std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
close(fd);
}
server->stop();
}
catch (const std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
server->stop();
}
catch (const std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
syslog (LOG_NOTICE, "First daemon terminated.");
closelog();
syslog (LOG_NOTICE, "First daemon terminated.");
closelog();
}

View file

@ -9,34 +9,34 @@ StreamSession::StreamSession(std::shared_ptr<tcp::socket> _socket) : ServerConne
void StreamSession::worker()
{
active_ = true;
try
{
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
for (;;)
{
shared_ptr<BaseMessage> message(messages.pop());
ServerConnection::send(message.get());
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
active_ = false;
}
active_ = false;
active_ = true;
try
{
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
for (;;)
{
shared_ptr<BaseMessage> message(messages.pop());
ServerConnection::send(message.get());
}
}
catch (std::exception& e)
{
std::cerr << "Exception in thread: " << e.what() << "\n";
active_ = false;
}
active_ = false;
}
void StreamSession::send(shared_ptr<BaseMessage> message)
{
if (!message)
return;
if (!message)
return;
while (messages.size() > 100)// chunk->getDuration() > 10000)
messages.pop();
messages.push(message);
while (messages.size() > 100)// chunk->getDuration() > 10000)
messages.pop();
messages.push(message);
}
@ -50,45 +50,45 @@ StreamServer::StreamServer(unsigned short port) : port_(port), headerChunk(NULL)
void StreamServer::acceptor()
{
tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service_));
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
a.accept(*sock);
cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n";
StreamSession* session = new StreamSession(sock);
sessions.insert(shared_ptr<StreamSession>(session));
session->start();
}
tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service_));
a.accept(*sock);
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n";
StreamSession* session = new StreamSession(sock);
sessions.insert(shared_ptr<StreamSession>(session));
session->start();
}
}
void StreamServer::send(shared_ptr<BaseMessage> message)
{
for (std::set<shared_ptr<StreamSession>>::iterator it = sessions.begin(); it != sessions.end(); )
{
if (!(*it)->active())
{
cout << "Session inactive. Removing\n";
sessions.erase(it++);
}
else
++it;
}
for (std::set<shared_ptr<StreamSession>>::iterator it = sessions.begin(); it != sessions.end(); )
{
if (!(*it)->active())
{
cout << "Session inactive. Removing\n";
sessions.erase(it++);
}
else
++it;
}
for (auto s : sessions)
s->send(message);
for (auto s : sessions)
s->send(message);
}
void StreamServer::start()
{
acceptThread = new thread(&StreamServer::acceptor, this);
acceptThread = new thread(&StreamServer::acceptor, this);
}

View file

@ -25,13 +25,13 @@ using namespace std;
class StreamSession : public ServerConnection
{
public:
StreamSession(socket_ptr sock);
void send(shared_ptr<BaseMessage> message);
StreamSession(socket_ptr sock);
void send(shared_ptr<BaseMessage> message);
protected:
virtual void worker();
thread* senderThread;
Queue<shared_ptr<BaseMessage>> messages;
virtual void worker();
thread* senderThread;
Queue<shared_ptr<BaseMessage>> messages;
};
@ -39,41 +39,41 @@ protected:
class StreamServer
{
public:
StreamServer(unsigned short port);
void send(shared_ptr<BaseMessage> message);
StreamServer(unsigned short port);
void send(shared_ptr<BaseMessage> message);
void start();
void stop();
void start();
void stop();
private:
void acceptor();
set<shared_ptr<StreamSession>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
shared_ptr<HeaderMessage> headerChunk;
shared_ptr<SampleFormat> sampleFormat;
thread* acceptThread;
void acceptor();
set<shared_ptr<StreamSession>> sessions;
boost::asio::io_service io_service_;
unsigned short port_;
shared_ptr<HeaderMessage> headerChunk;
shared_ptr<SampleFormat> sampleFormat;
thread* acceptThread;
};
class ServerException : public std::exception
{
public:
ServerException(const std::string& what) : what_(what)
{
}
ServerException(const std::string& what) : what_(what)
{
}
virtual ~ServerException() throw()
{
}
virtual ~ServerException() throw()
{
}
virtual const char* what() const throw()
{
return what_.c_str();
}
virtual const char* what() const throw()
{
return what_.c_str();
}
private:
std::string what_;
std::string what_;
};