diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index 35d40502..cb442c45 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -35,6 +35,7 @@ ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string& ClientConnection::~ClientConnection() { + stop(); } @@ -113,6 +114,7 @@ void ClientConnection::stop() { } readerThread_ = NULL; + socket_.reset(); logD << "readerThread terminated\n"; } diff --git a/client/controller.cpp b/client/controller.cpp index 31077260..8f67c4e5 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -16,7 +16,6 @@ along with this program. If not, see . ***/ -#include "controller.h" #include #include #include @@ -28,23 +27,23 @@ #include "timeProvider.h" #include "common/log.h" #include "common/snapException.h" -#include "message/serverSettings.h" #include "message/time.h" #include "message/request.h" #include "message/hello.h" +#include "controller.h" using namespace std; -Controller::Controller() : MessageReceiver(), active_(false), stream_(NULL), decoder_(NULL), player_(nullptr), asyncException_(false) +Controller::Controller() : MessageReceiver(), active_(false), latency_(0), stream_(nullptr), decoder_(nullptr), player_(nullptr), serverSettings_(nullptr), asyncException_(false) { } void Controller::onException(ClientConnection* connection, const std::exception& exception) { - logE << "onException: " << exception.what() << "\n"; - exception_ = exception; + logE << "Controller::onException: " << exception.what() << "\n"; + exception_ = exception.what(); asyncException_ = true; } @@ -53,7 +52,7 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base { if (baseMessage.type == message_type::kWireChunk) { - if ((stream_ != NULL) && (decoder_ != NULL)) + if (stream_ && decoder_) { msg::PcmChunk* pcmChunk = new msg::PcmChunk(sampleFormat_, 0); pcmChunk->deserialize(baseMessage, buffer); @@ -79,15 +78,46 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base } else if (baseMessage.type == message_type::kServerSettings) { - msg::ServerSettings serverSettings; - serverSettings.deserialize(baseMessage, buffer); - logO << "ServerSettings - buffer: " << serverSettings.bufferMs << ", latency: " << serverSettings.latency << ", volume: " << serverSettings.volume << ", muted: " << serverSettings.muted << "\n"; - if (player_ != nullptr) + serverSettings_.reset(new msg::ServerSettings()); + serverSettings_->deserialize(baseMessage, buffer); + logO << "ServerSettings - buffer: " << serverSettings_->bufferMs << ", latency: " << serverSettings_->latency << ", volume: " << serverSettings_->volume << ", muted: " << serverSettings_->muted << "\n"; + if (stream_ && player_) { - player_->setVolume(serverSettings.volume / 100.); - player_->setMute(serverSettings.muted); + player_->setVolume(serverSettings_->volume / 100.); + player_->setMute(serverSettings_->muted); + stream_->setBufferLen(serverSettings_->bufferMs - serverSettings_->latency); } - stream_->setBufferLen(serverSettings.bufferMs - serverSettings.latency); + } + else if (baseMessage.type == message_type::kHeader) + { + headerChunk_.reset(new msg::Header()); + headerChunk_->deserialize(baseMessage, buffer); + + logO << "Codec: " << headerChunk_->codec << "\n"; + if (headerChunk_->codec == "pcm") + decoder_.reset(new PcmDecoder()); +#ifndef ANDROID + if (headerChunk_->codec == "ogg") + decoder_.reset(new OggDecoder()); +#endif + else if (headerChunk_->codec == "flac") + decoder_.reset(new FlacDecoder()); + sampleFormat_ = decoder_->setHeader(headerChunk_.get()); + logO << "sample rate: " << sampleFormat_.rate << "Hz\n"; + logO << "bits/sample: " << sampleFormat_.bits << "\n"; + logO << "channels : " << sampleFormat_.channels << "\n"; + + stream_.reset(new Stream(sampleFormat_)); + stream_->setBufferLen(serverSettings_->bufferMs - latency_); + +#ifndef ANDROID + player_.reset(new AlsaPlayer(pcmDevice_, stream_.get())); +#else + player_.reset(new OpenslPlayer(pcmDevice_, stream_.get())); +#endif + player_->setVolume(serverSettings_->volume / 100.); + player_->setMute(serverSettings_->muted); + player_->start(); } if (baseMessage.type != message_type::kTime) @@ -114,8 +144,8 @@ void Controller::start(const PcmDevice& pcmDevice, const std::string& host, size { pcmDevice_ = pcmDevice; latency_ = latency; - clientConnection_ = new ClientConnection(this, host, port); - controllerThread_ = new thread(&Controller::worker, this); + clientConnection_.reset(new ClientConnection(this, host, port)); + controllerThread_ = thread(&Controller::worker, this); } @@ -123,18 +153,14 @@ void Controller::stop() { logD << "Stopping Controller" << endl; active_ = false; - controllerThread_->join(); + controllerThread_.join(); clientConnection_->stop(); - delete controllerThread_; - delete clientConnection_; } void Controller::worker() { active_ = true; - decoder_ = NULL; - stream_ = NULL; while (active_) { @@ -143,27 +169,7 @@ void Controller::worker() clientConnection_->start(); msg::Hello hello(clientConnection_->getMacAddress()); - msg::Request requestMsg(kServerSettings); - shared_ptr serverSettings(NULL); - while (active_ && !(serverSettings = clientConnection_->sendReq(&hello))); - logO << "ServerSettings - buffer: " << serverSettings->bufferMs << ", latency: " << serverSettings->latency << ", volume: " << serverSettings->volume << ", muted: " << serverSettings->muted << "\n"; - - requestMsg.request = kHeader; - shared_ptr headerChunk(NULL); - while (active_ && !(headerChunk = clientConnection_->sendReq(&requestMsg))); - logO << "Codec: " << headerChunk->codec << "\n"; - if (headerChunk->codec == "pcm") - decoder_ = new PcmDecoder(); -#ifndef ANDROID - if (headerChunk->codec == "ogg") - decoder_ = new OggDecoder(); -#endif - else if (headerChunk->codec == "flac") - decoder_ = new FlacDecoder(); - sampleFormat_ = decoder_->setHeader(headerChunk.get()); - logO << "sample rate: " << sampleFormat_.rate << "Hz\n"; - logO << "bits/sample: " << sampleFormat_.bits << "\n"; - logO << "channels : " << sampleFormat_.channels << "\n"; + clientConnection_->send(&hello); msg::Request timeReq(kTime); for (size_t n=0; n<50 && active_; ++n) @@ -178,25 +184,13 @@ void Controller::worker() } logO << "diff to server [ms]: " << (float)TimeProvider::getInstance().getDiffToServer().count() / 1000.f << "\n"; - stream_ = new Stream(sampleFormat_); - stream_->setBufferLen(serverSettings->bufferMs - latency_); - -#ifndef ANDROID - player_.reset(new AlsaPlayer(pcmDevice_, stream_)); -#else - player_.reset(new OpenslPlayer(pcmDevice_, stream_)); -#endif - player_->setVolume(serverSettings->volume / 100.); - player_->setMute(serverSettings->muted); - player_->start(); - while (active_) { for (size_t n=0; n<10 && active_; ++n) { usleep(100*1000); if (asyncException_) - throw exception_; + throw AsyncSnapException(exception_); } if (sendTimeSyncMessage(5000)) @@ -207,18 +201,10 @@ void Controller::worker() { asyncException_ = false; logS(kLogErr) << "Exception in Controller::worker(): " << e.what() << endl; - logO << "Stopping clientConnection" << endl; clientConnection_->stop(); - if (player_ != nullptr) - player_->stop(); - logO << "Deleting stream" << endl; - if (stream_ != NULL) - delete stream_; - stream_ = NULL; - if (decoder_ != NULL) - delete decoder_; - decoder_ = NULL; - logO << "done" << endl; + player_.reset(); + stream_.reset(); + decoder_.reset(); for (size_t n=0; (n<10) && active_; ++n) usleep(100*1000); } diff --git a/client/controller.h b/client/controller.h index 67aeae9f..703d0936 100644 --- a/client/controller.h +++ b/client/controller.h @@ -23,6 +23,7 @@ #include #include "decoder/decoder.h" #include "message/message.h" +#include "message/serverSettings.h" #include "player/pcmDevice.h" #ifdef ANDROID #include "player/openslPlayer.h" @@ -58,17 +59,19 @@ private: void worker(); bool sendTimeSyncMessage(long after = 1000); std::atomic active_; - std::thread* controllerThread_; - ClientConnection* clientConnection_; - Stream* stream_; - std::string ip_; + std::thread controllerThread_; SampleFormat sampleFormat_; - Decoder* decoder_; PcmDevice pcmDevice_; size_t latency_; + std::unique_ptr clientConnection_; + std::unique_ptr stream_; + std::unique_ptr decoder_; std::unique_ptr player_; + std::shared_ptr serverSettings_; + std::shared_ptr headerChunk_; - std::exception exception_; + + std::string exception_; bool asyncException_; }; diff --git a/common/snapException.h b/common/snapException.h index bdb36455..63eddee3 100644 --- a/common/snapException.h +++ b/common/snapException.h @@ -58,14 +58,23 @@ public: -class ServerException : public SnapException +class AsyncSnapException : public SnapException { public: - ServerException(const char* text) : SnapException(text) + AsyncSnapException(const char* text) : SnapException(text) { } - virtual ~ServerException() throw() + AsyncSnapException(const std::string& text) : SnapException(text) + { + } + + AsyncSnapException(const AsyncSnapException& e) : SnapException(e.what()) + { + } + + + virtual ~AsyncSnapException() throw() { } }; diff --git a/server/controlSession.cpp b/server/controlSession.cpp index d1107b73..bd317ddb 100644 --- a/server/controlSession.cpp +++ b/server/controlSession.cpp @@ -113,9 +113,12 @@ void ControlSession::reader() while (active_) { asio::streambuf response; - asio::read_until(*socket_, response, "\r\n"); + asio::read_until(*socket_, response, "\n"); std::string s((istreambuf_iterator(&response)), istreambuf_iterator()); - s.resize(s.length() - 2); + size_t len = s.length() - 1; + if ((len >= 2) && s[len-2] == '\r') + --len; + s.resize(len); if (messageReceiver_ != NULL) messageReceiver_->onMessageReceived(this, s); diff --git a/server/encoder/encoderFactory.cpp b/server/encoder/encoderFactory.cpp index 1dfb5f75..dc6341e8 100644 --- a/server/encoder/encoderFactory.cpp +++ b/server/encoder/encoderFactory.cpp @@ -17,10 +17,12 @@ ***/ #include "encoderFactory.h" -#include "common/utils.h" #include "pcmEncoder.h" #include "oggEncoder.h" #include "flacEncoder.h" +#include "common/utils.h" +#include "common/snapException.h" +#include "common/log.h" using namespace std; @@ -44,8 +46,7 @@ Encoder* EncoderFactory::createEncoder(const std::string& codecSettings) const encoder = new FlacEncoder(codecOptions); else { - cout << "unknown codec: " << codec << "\n"; - return NULL; + throw SnapException("unknown codec: " + codec); } return encoder; diff --git a/server/pcmreader/pcmReader.cpp b/server/pcmreader/pcmReader.cpp index 3b1fc155..8ac4b417 100644 --- a/server/pcmreader/pcmReader.cpp +++ b/server/pcmreader/pcmReader.cpp @@ -86,7 +86,9 @@ const SampleFormat& PcmReader::getSampleFormat() const void PcmReader::start() { logE << "PcmReader start: " << sampleFormat_.getFormat() << "\n"; +//TODO: wrong encoder settings leads to: terminate called after throwing an instance of 'std::system_error' what(): Invalid argument encoder_->init(this, sampleFormat_); + active_ = true; readerThread_ = thread(&PcmReader::worker, this); } diff --git a/server/pcmreader/streamManager.cpp b/server/pcmreader/streamManager.cpp index d7dfe558..62b6be3e 100644 --- a/server/pcmreader/streamManager.cpp +++ b/server/pcmreader/streamManager.cpp @@ -43,11 +43,11 @@ PcmReader* StreamManager::addStream(const std::string& uri) if (readerUri.query.find("buffer_ms") == readerUri.query.end()) readerUri.query["buffer_ms"] = to_string(readBufferMs_); - logE << "\nURI: " << readerUri.uri << "\nscheme: " << readerUri.scheme << "\nhost: " - << readerUri.host << "\npath: " << readerUri.path << "\nfragment: " << readerUri.fragment << "\n"; +// logE << "\nURI: " << readerUri.uri << "\nscheme: " << readerUri.scheme << "\nhost: " +// << readerUri.host << "\npath: " << readerUri.path << "\nfragment: " << readerUri.fragment << "\n"; - for (auto kv: readerUri.query) - logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n"; +// for (auto kv: readerUri.query) +// logE << "key: '" << kv.first << "' value: '" << kv.second << "'\n"; if (readerUri.scheme == "pipe") { diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 4514caf1..53e287b6 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -39,17 +39,20 @@ StreamServer::~StreamServer() { } - +/* void StreamServer::send(const msg::BaseMessage* message) { - std::unique_lock mlock(mutex_); + std::lock_guard mlock(mutex_); + logE << "send: " << sessions_.size() << "\n"; for (auto it = sessions_.begin(); it != sessions_.end(); ) { + logE << "send: " << (*it)->macAddress << ", " << !(*it)->active() << "\n"; if (!(*it)->active()) { logS(kLogErr) << "Session inactive. Removing\n"; + logE << "Session inactive. Removing\n"; // don't block: remove ServerSession in a thread onDisconnect(it->get()); auto func = [](shared_ptr s)->void{s->stop();}; @@ -62,24 +65,25 @@ void StreamServer::send(const msg::BaseMessage* message) } -/* for (auto it = sessions_.begin(); it != sessions_.end(); ) - { - if (!(*it)->active()) - onDisconnect(it->get()); - } -*/ +// for (auto it = sessions_.begin(); it != sessions_.end(); ) +// { +// if (!(*it)->active()) +// onDisconnect(it->get()); +// } + std::shared_ptr shared_message(message); for (auto s : sessions_) s->add(shared_message); } - +*/ void StreamServer::onChunkRead(const PcmReader* pcmReader, const msg::PcmChunk* chunk, double duration) { - logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n"; +// logO << "onChunkRead (" << pcmReader->getName() << "): " << duration << "ms\n"; bool isDefaultStream(pcmReader == streamManager_->getDefaultStream().get()); std::shared_ptr shared_message(chunk); + std::lock_guard mlock(sessionsMutex_); for (auto s : sessions_) { if (isDefaultStream)//->getName() == "default") @@ -96,28 +100,36 @@ void StreamServer::onResync(const PcmReader* pcmReader, double ms) void StreamServer::onDisconnect(StreamSession* streamSession) { - logO << "onDisconnect: " << streamSession->macAddress << "\n"; - if (streamSession->macAddress.empty()) - return; -/* auto func = [](StreamSession* s)->void{s->stop();}; - std::thread t(func, streamSession); - t.detach(); -*/ - ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress); -/* // don't block: remove StreamSession in a thread - for (auto it = sessions_.begin(); it != sessions_.end(); ) + std::lock_guard mlock(sessionsMutex_); + std::shared_ptr session = nullptr; + + for (auto s: sessions_) { - if (it->get() == streamSession) + if (s.get() == streamSession) { - logO << "erase: " << (*it)->macAddress << "\n"; - sessions_.erase(it); + session = s; break; } } -*/ - // notify controllers if not yet done - if (!clientInfo->connected) + + if (session == nullptr) return; + + logO << "onDisconnect: " << session->macAddress << "\n"; + ClientInfoPtr clientInfo = Config::instance().getClientInfo(streamSession->macAddress); + logE << "sessions: " << sessions_.size() << "\n"; + // don't block: remove StreamSession in a thread + auto func = [](shared_ptr s)->void{s->stop();}; + std::thread t(func, session); + t.detach(); + sessions_.erase(session); + + logE << "sessions: " << sessions_.size() << "\n"; + + // notify controllers if not yet done + if (!clientInfo || !clientInfo->connected) + return; + clientInfo->connected = false; gettimeofday(&clientInfo->lastSeen, NULL); Config::instance().save(); @@ -252,7 +264,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM } else if (requestMsg.request == kHeader) { - std::unique_lock mlock(mutex_); +// std::lock_guard mlock(mutex_); //TODO: use the correct stream msg::Header* headerChunk = streamManager_->getDefaultStream()->getHeader(); headerChunk->refersTo = requestMsg.id; @@ -267,7 +279,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM logO << "Hello from " << connection->macAddress << ", host: " << helloMsg.getHostName() << ", v" << helloMsg.getVersion() << "\n"; logD << "request kServerSettings: " << connection->macAddress << "\n"; - std::unique_lock mlock(mutex_); +// std::lock_guard mlock(mutex_); ClientInfoPtr clientInfo = Config::instance().getClientInfo(connection->macAddress, true); if (clientInfo == nullptr) { @@ -285,6 +297,11 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM connection->send(&serverSettings); } +//TODO: use the correct stream + msg::Header* headerChunk = streamManager_->getDefaultStream()->getHeader(); + connection->send(headerChunk); + + ClientInfoPtr client = Config::instance().getClientInfo(connection->macAddress); client->ipAddress = connection->getIP(); client->hostName = helloMsg.getHostName(); @@ -294,6 +311,7 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM Config::instance().save(); json notification = JsonNotification::getJson("Client.OnConnect", client->toJson()); + logO << notification.dump(4) << "\n"; controlServer_->send(notification.dump()); } } @@ -301,8 +319,10 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM StreamSession* StreamServer::getStreamSession(const std::string& mac) { +// logO << "getStreamSession: " << mac << "\n"; for (auto session: sessions_) { +// logO << "getStreamSession, checking: " << session->macAddress << "\n"; if (session->macAddress == mac) return session.get(); } @@ -327,7 +347,7 @@ void StreamServer::handleAccept(socket_ptr socket) logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; shared_ptr session = make_shared(this, socket); { - std::unique_lock mlock(mutex_); + std::lock_guard mlock(sessionsMutex_); session->setBufferMs(settings_.bufferMs); session->start(); sessions_.insert(session); @@ -338,12 +358,14 @@ void StreamServer::handleAccept(socket_ptr socket) void StreamServer::start() { +// throw SnapException("good"); controlServer_.reset(new ControlServer(io_service_, settings_.controlPort, this)); controlServer_->start(); streamManager_.reset(new StreamManager(this, settings_.sampleFormat, settings_.codec, settings_.streamReadMs)); for (auto& streamUri: settings_.pcmStreams) logE << "Stream: " << streamManager_->addStream(streamUri)->getUri().toJson() << "\n"; +// throw SnapException("bad"); streamManager_->start(); @@ -359,7 +381,7 @@ void StreamServer::stop() streamManager_->stop(); - std::unique_lock mlock(mutex_); +// std::lock_guard mlock(sessionsMutex_); for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it) session->stop(); } diff --git a/server/streamServer.h b/server/streamServer.h index 442c6024..26acc276 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -79,7 +79,7 @@ public: void stop(); /// Send a message to all connceted clients - void send(const msg::BaseMessage* message); +// void send(const msg::BaseMessage* message); /// Clients call this when they receive a message. Implementation of MessageReceiver::onMessageReceived virtual void onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer); @@ -96,7 +96,7 @@ private: void startAccept(); void handleAccept(socket_ptr socket); StreamSession* getStreamSession(const std::string& mac); - mutable std::mutex mutex_; + mutable std::mutex sessionsMutex_; std::set> sessions_; asio::io_service* io_service_; std::shared_ptr acceptor_; diff --git a/server/streamSession.cpp b/server/streamSession.cpp index a7bb18cd..5280e044 100644 --- a/server/streamSession.cpp +++ b/server/streamSession.cpp @@ -49,8 +49,8 @@ void StreamSession::start() void StreamSession::stop() { - std::unique_lock mlock(mutex_); setActive(false); + std::unique_lock mlock(mutex_); try { std::error_code ec; @@ -121,7 +121,8 @@ void StreamSession::setBufferMs(size_t bufferMs) bool StreamSession::send(const msg::BaseMessage* message) const { -// logO << "send: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; + //TODO on exception: set active = false +// logO << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; std::unique_lock mlock(mutex_); if (!socket_ || !active_) return false; @@ -211,6 +212,7 @@ void StreamSession::writer() void StreamSession::setActive(bool active) { + std::lock_guard mlock(activeMutex_); if (active_ && !active && (messageReceiver_ != NULL)) messageReceiver_->onDisconnect(this); active_ = active; diff --git a/server/streamSession.h b/server/streamSession.h index 37d81972..639d64fd 100644 --- a/server/streamSession.h +++ b/server/streamSession.h @@ -86,7 +86,9 @@ protected: void writer(); void setActive(bool active); + mutable std::mutex activeMutex_; std::atomic active_; + mutable std::mutex mutex_; std::thread* readerThread_; std::thread* writerThread_;