init player when receiving a header message

This commit is contained in:
badaix 2016-01-31 23:56:28 +01:00
parent 94944681c2
commit 7b1323646f
12 changed files with 149 additions and 117 deletions

View file

@ -35,6 +35,7 @@ ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string&
ClientConnection::~ClientConnection()
{
stop();
}
@ -113,6 +114,7 @@ void ClientConnection::stop()
{
}
readerThread_ = NULL;
socket_.reset();
logD << "readerThread terminated\n";
}

View file

@ -16,7 +16,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
***/
#include "controller.h"
#include <iostream>
#include <string>
#include <memory>
@ -28,23 +27,23 @@
#include "timeProvider.h"
#include "common/log.h"
#include "common/snapException.h"
#include "message/serverSettings.h"
#include "message/time.h"
#include "message/request.h"
#include "message/hello.h"
#include "controller.h"
using namespace std;
Controller::Controller() : MessageReceiver(), active_(false), stream_(NULL), decoder_(NULL), player_(nullptr), asyncException_(false)
Controller::Controller() : MessageReceiver(), active_(false), latency_(0), stream_(nullptr), decoder_(nullptr), player_(nullptr), serverSettings_(nullptr), asyncException_(false)
{
}
void Controller::onException(ClientConnection* connection, const std::exception& exception)
{
logE << "onException: " << exception.what() << "\n";
exception_ = exception;
logE << "Controller::onException: " << exception.what() << "\n";
exception_ = exception.what();
asyncException_ = true;
}
@ -53,7 +52,7 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
{
if (baseMessage.type == message_type::kWireChunk)
{
if ((stream_ != NULL) && (decoder_ != NULL))
if (stream_ && decoder_)
{
msg::PcmChunk* pcmChunk = new msg::PcmChunk(sampleFormat_, 0);
pcmChunk->deserialize(baseMessage, buffer);
@ -79,15 +78,46 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
}
else if (baseMessage.type == message_type::kServerSettings)
{
msg::ServerSettings serverSettings;
serverSettings.deserialize(baseMessage, buffer);
logO << "ServerSettings - buffer: " << serverSettings.bufferMs << ", latency: " << serverSettings.latency << ", volume: " << serverSettings.volume << ", muted: " << serverSettings.muted << "\n";
if (player_ != nullptr)
serverSettings_.reset(new msg::ServerSettings());
serverSettings_->deserialize(baseMessage, buffer);
logO << "ServerSettings - buffer: " << serverSettings_->bufferMs << ", latency: " << serverSettings_->latency << ", volume: " << serverSettings_->volume << ", muted: " << serverSettings_->muted << "\n";
if (stream_ && player_)
{
player_->setVolume(serverSettings.volume / 100.);
player_->setMute(serverSettings.muted);
player_->setVolume(serverSettings_->volume / 100.);
player_->setMute(serverSettings_->muted);
stream_->setBufferLen(serverSettings_->bufferMs - serverSettings_->latency);
}
stream_->setBufferLen(serverSettings.bufferMs - serverSettings.latency);
}
else if (baseMessage.type == message_type::kHeader)
{
headerChunk_.reset(new msg::Header());
headerChunk_->deserialize(baseMessage, buffer);
logO << "Codec: " << headerChunk_->codec << "\n";
if (headerChunk_->codec == "pcm")
decoder_.reset(new PcmDecoder());
#ifndef ANDROID
if (headerChunk_->codec == "ogg")
decoder_.reset(new OggDecoder());
#endif
else if (headerChunk_->codec == "flac")
decoder_.reset(new FlacDecoder());
sampleFormat_ = decoder_->setHeader(headerChunk_.get());
logO << "sample rate: " << sampleFormat_.rate << "Hz\n";
logO << "bits/sample: " << sampleFormat_.bits << "\n";
logO << "channels : " << sampleFormat_.channels << "\n";
stream_.reset(new Stream(sampleFormat_));
stream_->setBufferLen(serverSettings_->bufferMs - latency_);
#ifndef ANDROID
player_.reset(new AlsaPlayer(pcmDevice_, stream_.get()));
#else
player_.reset(new OpenslPlayer(pcmDevice_, stream_.get()));
#endif
player_->setVolume(serverSettings_->volume / 100.);
player_->setMute(serverSettings_->muted);
player_->start();
}
if (baseMessage.type != message_type::kTime)
@ -114,8 +144,8 @@ void Controller::start(const PcmDevice& pcmDevice, const std::string& host, size
{
pcmDevice_ = pcmDevice;
latency_ = latency;
clientConnection_ = new ClientConnection(this, host, port);
controllerThread_ = new thread(&Controller::worker, this);
clientConnection_.reset(new ClientConnection(this, host, port));
controllerThread_ = thread(&Controller::worker, this);
}
@ -123,18 +153,14 @@ void Controller::stop()
{
logD << "Stopping Controller" << endl;
active_ = false;
controllerThread_->join();
controllerThread_.join();
clientConnection_->stop();
delete controllerThread_;
delete clientConnection_;
}
void Controller::worker()
{
active_ = true;
decoder_ = NULL;
stream_ = NULL;
while (active_)
{
@ -143,27 +169,7 @@ void Controller::worker()
clientConnection_->start();
msg::Hello hello(clientConnection_->getMacAddress());
msg::Request requestMsg(kServerSettings);
shared_ptr<msg::ServerSettings> serverSettings(NULL);
while (active_ && !(serverSettings = clientConnection_->sendReq<msg::ServerSettings>(&hello)));
logO << "ServerSettings - buffer: " << serverSettings->bufferMs << ", latency: " << serverSettings->latency << ", volume: " << serverSettings->volume << ", muted: " << serverSettings->muted << "\n";
requestMsg.request = kHeader;
shared_ptr<msg::Header> headerChunk(NULL);
while (active_ && !(headerChunk = clientConnection_->sendReq<msg::Header>(&requestMsg)));
logO << "Codec: " << headerChunk->codec << "\n";
if (headerChunk->codec == "pcm")
decoder_ = new PcmDecoder();
#ifndef ANDROID
if (headerChunk->codec == "ogg")
decoder_ = new OggDecoder();
#endif
else if (headerChunk->codec == "flac")
decoder_ = new FlacDecoder();
sampleFormat_ = decoder_->setHeader(headerChunk.get());
logO << "sample rate: " << sampleFormat_.rate << "Hz\n";
logO << "bits/sample: " << sampleFormat_.bits << "\n";
logO << "channels : " << sampleFormat_.channels << "\n";
clientConnection_->send(&hello);
msg::Request timeReq(kTime);
for (size_t n=0; n<50 && active_; ++n)
@ -178,25 +184,13 @@ void Controller::worker()
}
logO << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer<chronos::usec>().count() / 1000.f << "\n";
stream_ = new Stream(sampleFormat_);
stream_->setBufferLen(serverSettings->bufferMs - latency_);
#ifndef ANDROID
player_.reset(new AlsaPlayer(pcmDevice_, stream_));
#else
player_.reset(new OpenslPlayer(pcmDevice_, stream_));
#endif
player_->setVolume(serverSettings->volume / 100.);
player_->setMute(serverSettings->muted);
player_->start();
while (active_)
{
for (size_t n=0; n<10 && active_; ++n)
{
usleep(100*1000);
if (asyncException_)
throw exception_;
throw AsyncSnapException(exception_);
}
if (sendTimeSyncMessage(5000))
@ -207,18 +201,10 @@ void Controller::worker()
{
asyncException_ = false;
logS(kLogErr) << "Exception in Controller::worker(): " << e.what() << endl;
logO << "Stopping clientConnection" << endl;
clientConnection_->stop();
if (player_ != nullptr)
player_->stop();
logO << "Deleting stream" << endl;
if (stream_ != NULL)
delete stream_;
stream_ = NULL;
if (decoder_ != NULL)
delete decoder_;
decoder_ = NULL;
logO << "done" << endl;
player_.reset();
stream_.reset();
decoder_.reset();
for (size_t n=0; (n<10) && active_; ++n)
usleep(100*1000);
}

View file

@ -23,6 +23,7 @@
#include <atomic>
#include "decoder/decoder.h"
#include "message/message.h"
#include "message/serverSettings.h"
#include "player/pcmDevice.h"
#ifdef ANDROID
#include "player/openslPlayer.h"
@ -58,17 +59,19 @@ private:
void worker();
bool sendTimeSyncMessage(long after = 1000);
std::atomic<bool> active_;
std::thread* controllerThread_;
ClientConnection* clientConnection_;
Stream* stream_;
std::string ip_;
std::thread controllerThread_;
SampleFormat sampleFormat_;
Decoder* decoder_;
PcmDevice pcmDevice_;
size_t latency_;
std::unique_ptr<ClientConnection> clientConnection_;
std::unique_ptr<Stream> stream_;
std::unique_ptr<Decoder> decoder_;
std::unique_ptr<Player> player_;
std::shared_ptr<msg::ServerSettings> serverSettings_;
std::shared_ptr<msg::Header> headerChunk_;
std::exception exception_;
std::string exception_;
bool asyncException_;
};

View file

@ -58,14 +58,23 @@ public:
class ServerException : public SnapException
class AsyncSnapException : public SnapException
{
public:
ServerException(const char* text) : SnapException(text)
AsyncSnapException(const char* text) : SnapException(text)
{
}
virtual ~ServerException() throw()
AsyncSnapException(const std::string& text) : SnapException(text)
{
}
AsyncSnapException(const AsyncSnapException& e) : SnapException(e.what())
{
}
virtual ~AsyncSnapException() throw()
{
}
};

View file

@ -113,9 +113,12 @@ void ControlSession::reader()
while (active_)
{
asio::streambuf response;
asio::read_until(*socket_, response, "\r\n");
asio::read_until(*socket_, response, "\n");
std::string s((istreambuf_iterator<char>(&response)), istreambuf_iterator<char>());
s.resize(s.length() - 2);
size_t len = s.length() - 1;
if ((len >= 2) && s[len-2] == '\r')
--len;
s.resize(len);
if (messageReceiver_ != NULL)
messageReceiver_->onMessageReceived(this, s);

View file

@ -17,10 +17,12 @@
***/
#include "encoderFactory.h"
#include "common/utils.h"
#include "pcmEncoder.h"
#include "oggEncoder.h"
#include "flacEncoder.h"
#include "common/utils.h"
#include "common/snapException.h"
#include "common/log.h"
using namespace std;
@ -44,8 +46,7 @@ Encoder* EncoderFactory::createEncoder(const std::string& codecSettings) const
encoder = new FlacEncoder(codecOptions);
else
{
cout << "unknown codec: " << codec << "\n";
return NULL;
throw SnapException("unknown codec: " + codec);
}
return encoder;

View file

@ -86,7 +86,9 @@ const SampleFormat& PcmReader::getSampleFormat() const
void PcmReader::start()
{
logE << "PcmReader start: " << sampleFormat_.getFormat() << "\n";
//TODO: wrong encoder settings leads to: terminate called after throwing an instance of 'std::system_error' what(): Invalid argument
encoder_->init(this, sampleFormat_);
active_ = true;
readerThread_ = thread(&PcmReader::worker, this);
}

View file

@ -43,11 +43,11 @@ PcmReader* StreamManager::addStream(const std::string& uri)
if (readerUri.query.find("buffer_ms") == readerUri.query.end())
readerUri.query["buffer_ms"] = to_string(readBufferMs_);
logE << "\nURI: " << readerUri.uri << "\nscheme: " << readerUri.scheme << "\nhost: "
<< readerUri.host << "\npath: " << readerUri.path << "\nfragment: " << readerUri.fragment << "\n";
// logE << "\nURI: " << readerUri.uri << "\nscheme: " << readerUri.scheme << "\nhost: "
// << readerUri.host << "\npath: " << readerUri.path << "\nfragment: " << readerUri.fragment << "\n";
for (auto kv: readerUri.query)
logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n";
// for (auto kv: readerUri.query)
// logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n";
if (readerUri.scheme == "pipe")
{

View file

@ -39,17 +39,20 @@ StreamServer::~StreamServer()
{
}
/*
void StreamServer::send(const msg::BaseMessage* message)
{
std::unique_lock<std::mutex> mlock(mutex_);
std::lock_guard<std::mutex> mlock(mutex_);
logE << "send: " << sessions_.size() << "\n";
for (auto it = sessions_.begin(); it != sessions_.end(); )
{
logE << "send: " << (*it)->macAddress << ", " << !(*it)->active() << "\n";
if (!(*it)->active())
{
logS(kLogErr) << "Session inactive. Removing\n";
logE << "Session inactive. Removing\n";
// don't block: remove ServerSession in a thread
onDisconnect(it->get());
auto func = [](shared_ptr<StreamSession> s)->void{s->stop();};
@ -62,24 +65,25 @@ void StreamServer::send(const msg::BaseMessage* message)
}
/* for (auto it = sessions_.begin(); it != sessions_.end(); )
{
if (!(*it)->active())
onDisconnect(it->get());
}
*/
// for (auto it = sessions_.begin(); it != sessions_.end(); )
// {
// if (!(*it)->active())
// onDisconnect(it->get());
// }
std::shared_ptr<const msg::BaseMessage> shared_message(message);
for (auto s : sessions_)
s->add(shared_message);
}
*/
void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration)
{
logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n";
// logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n";
bool isDefaultStream(pcmReader == streamManager_->getDefaultStream().get());
std::shared_ptr<const msg::BaseMessage> shared_message(chunk);
std::lock_guard<std::mutex> mlock(sessionsMutex_);
for (auto s : sessions_)
{
if (isDefaultStream)//->getName() == "default")
@ -96,28 +100,36 @@ void StreamServer::onResync(const PcmReader* pcmReader, double ms)
void StreamServer::onDisconnect(StreamSession* streamSession)
{
logO << "onDisconnect: " << streamSession->macAddress << "\n";
if (streamSession->macAddress.empty())
return;
/* auto func = [](StreamSession* s)->void{s->stop();};
std::thread t(func, streamSession);
t.detach();
*/
ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress);
/* // don't block: remove StreamSession in a thread
for (auto it = sessions_.begin(); it != sessions_.end(); )
std::lock_guard<std::mutex> mlock(sessionsMutex_);
std::shared_ptr<StreamSession> session = nullptr;
for (auto s: sessions_)
{
if (it->get() == streamSession)
if (s.get() == streamSession)
{
logO << "erase: " << (*it)->macAddress << "\n";
sessions_.erase(it);
session = s;
break;
}
}
*/
// notify controllers if not yet done
if (!clientInfo->connected)
if (session == nullptr)
return;
logO << "onDisconnect: " << session->macAddress << "\n";
ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress);
logE << "sessions: " << sessions_.size() << "\n";
// don't block: remove StreamSession in a thread
auto func = [](shared_ptr<StreamSession> s)->void{s->stop();};
std::thread t(func, session);
t.detach();
sessions_.erase(session);
logE << "sessions: " << sessions_.size() << "\n";
// notify controllers if not yet done
if (!clientInfo || !clientInfo->connected)
return;
clientInfo->connected = false;
gettimeofday(&clientInfo->lastSeen, NULL);
Config::instance().save();
@ -252,7 +264,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
}
else if (requestMsg.request == kHeader)
{
std::unique_lock<std::mutex> mlock(mutex_);
// std::lock_guard<std::mutex> mlock(mutex_);
//TODO: use the correct stream
msg::Header* headerChunk = streamManager_->getDefaultStream()->getHeader();
headerChunk->refersTo = requestMsg.id;
@ -267,7 +279,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
logO << "Hello from " << connection->macAddress << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() << "\n";
logD << "request kServerSettings: " << connection->macAddress << "\n";
std::unique_lock<std::mutex> mlock(mutex_);
// std::lock_guard<std::mutex> mlock(mutex_);
ClientInfoPtr clientInfo = Config::instance().getClientInfo(connection->macAddress, true);
if (clientInfo == nullptr)
{
@ -285,6 +297,11 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
connection->send(&serverSettings);
}
//TODO: use the correct stream
msg::Header* headerChunk = streamManager_->getDefaultStream()->getHeader();
connection->send(headerChunk);
ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress);
client->ipAddress = connection->getIP();
client->hostName = helloMsg.getHostName();
@ -294,6 +311,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
Config::instance().save();
json notification = JsonNotification::getJson("Client.OnConnect", client->toJson());
logO << notification.dump(4) << "\n";
controlServer_->send(notification.dump());
}
}
@ -301,8 +319,10 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
StreamSession* StreamServer::getStreamSession(const std::string& mac)
{
// logO << "getStreamSession: " << mac << "\n";
for (auto session: sessions_)
{
// logO << "getStreamSession, checking: " << session->macAddress << "\n";
if (session->macAddress == mac)
return session.get();
}
@ -327,7 +347,7 @@ void StreamServer::handleAccept(socket_ptr socket)
logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<StreamSession> session = make_shared<StreamSession>(this, socket);
{
std::unique_lock<std::mutex> mlock(mutex_);
std::lock_guard<std::mutex> mlock(sessionsMutex_);
session->setBufferMs(settings_.bufferMs);
session->start();
sessions_.insert(session);
@ -338,12 +358,14 @@ void StreamServer::handleAccept(socket_ptr socket)
void StreamServer::start()
{
// throw SnapException("good");
controlServer_.reset(new ControlServer(io_service_, settings_.controlPort, this));
controlServer_->start();
streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs));
for (auto& streamUri: settings_.pcmStreams)
logE << "Stream: " << streamManager_->addStream(streamUri)->getUri().toJson() << "\n";
// throw SnapException("bad");
streamManager_->start();
@ -359,7 +381,7 @@ void StreamServer::stop()
streamManager_->stop();
std::unique_lock<std::mutex> mlock(mutex_);
// std::lock_guard<std::mutex> mlock(sessionsMutex_);
for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it)
session->stop();
}

View file

@ -79,7 +79,7 @@ public:
void stop();
/// Send a message to all connceted clients
void send(const msg::BaseMessage* message);
// void send(const msg::BaseMessage* message);
/// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived
virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer);
@ -96,7 +96,7 @@ private:
void startAccept();
void handleAccept(socket_ptr socket);
StreamSession* getStreamSession(const std::string& mac);
mutable std::mutex mutex_;
mutable std::mutex sessionsMutex_;
std::set<std::shared_ptr<StreamSession>> sessions_;
asio::io_service* io_service_;
std::shared_ptr<tcp::acceptor> acceptor_;

View file

@ -49,8 +49,8 @@ void StreamSession::start()
void StreamSession::stop()
{
std::unique_lock<std::mutex> mlock(mutex_);
setActive(false);
std::unique_lock<std::mutex> mlock(mutex_);
try
{
std::error_code ec;
@ -121,7 +121,8 @@ void StreamSession::setBufferMs(size_t bufferMs)
bool StreamSession::send(const msg::BaseMessage* message) const
{
// logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
//TODO on exception: set active = false
// logO << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
std::unique_lock<std::mutex> mlock(mutex_);
if (!socket_ || !active_)
return false;
@ -211,6 +212,7 @@ void StreamSession::writer()
void StreamSession::setActive(bool active)
{
std::lock_guard<std::mutex> mlock(activeMutex_);
if (active_ && !active && (messageReceiver_ != NULL))
messageReceiver_->onDisconnect(this);
active_ = active;

View file

@ -86,7 +86,9 @@ protected:
void writer();
void setActive(bool active);
mutable std::mutex activeMutex_;
std::atomic<bool> active_;
mutable std::mutex mutex_;
std::thread* readerThread_;
std::thread* writerThread_;