fixed crash during server shutdown

This commit is contained in:
badaix 2016-11-08 19:23:36 +01:00
parent d98db9177a
commit 33f9557904
5 changed files with 78 additions and 64 deletions

View file

@ -33,9 +33,8 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty()) while (queue_.empty())
{
cond_.wait(mlock); cond_.wait(mlock);
}
auto val = queue_.front(); auto val = queue_.front();
queue_.pop(); queue_.pop();
return val; return val;
@ -54,7 +53,10 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); })) if (!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); }))
return false;
if (queue_.empty())
return false; return false;
item = std::move(queue_.front()); item = std::move(queue_.front());
@ -68,6 +70,11 @@ public:
return try_pop(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout)); return try_pop(item, std::chrono::duration_cast<std::chrono::microseconds>(timeout));
} }
void abort_wait()
{
cond_.notify_one();
}
void pop(T& item) void pop(T& item)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);

View file

@ -54,7 +54,7 @@ void StreamServer::onChunkRead(const PcmStream* pcmStream, const msg::PcmChunk*
bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get()); bool isDefaultStream(pcmStream == streamManager_->getDefaultStream().get());
std::shared_ptr<const msg::BaseMessage> shared_message(chunk); std::shared_ptr<const msg::BaseMessage> shared_message(chunk);
std::lock_guard<std::mutex> mlock(sessionsMutex_); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto s : sessions_) for (auto s : sessions_)
{ {
if (!s->pcmStream() && isDefaultStream)//->getName() == "default") if (!s->pcmStream() && isDefaultStream)//->getName() == "default")
@ -73,17 +73,8 @@ void StreamServer::onResync(const PcmStream* pcmStream, double ms)
void StreamServer::onDisconnect(StreamSession* streamSession) void StreamServer::onDisconnect(StreamSession* streamSession)
{ {
std::lock_guard<std::mutex> mlock(sessionsMutex_); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
std::shared_ptr<StreamSession> session = nullptr; session_ptr session = getStreamSession(streamSession);
for (auto s: sessions_)
{
if (s.get() == streamSession)
{
session = s;
break;
}
}
if (session == nullptr) if (session == nullptr)
return; return;
@ -192,8 +183,8 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
clientInfo->config.streamId = streamId; clientInfo->config.streamId = streamId;
response = clientInfo->config.streamId; response = clientInfo->config.streamId;
StreamSession* session = getStreamSession(request.getParam("client").get<string>()); session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != NULL) if (session != nullptr)
{ {
session->add(stream->getHeader()); session->add(stream->getHeader());
session->setPcmStream(stream); session->setPcmStream(stream);
@ -218,8 +209,8 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
serverSettings.setMuted(clientInfo->config.volume.muted); serverSettings.setMuted(clientInfo->config.volume.muted);
serverSettings.setLatency(clientInfo->config.latency); serverSettings.setLatency(clientInfo->config.latency);
StreamSession* session = getStreamSession(request.getParam("client").get<string>()); session_ptr session = getStreamSession(request.getParam("client").get<string>());
if (session != NULL) if (session != nullptr)
session->send(&serverSettings); session->send(&serverSettings);
Config::instance().save(); Config::instance().save();
@ -244,7 +235,7 @@ void StreamServer::onMessageReceived(ControlSession* controlSession, const std::
void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer) void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseMessage& baseMessage, char* buffer)
{ {
logD << "getNextMessage: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n"; // logD << "onMessageReceived: " << baseMessage.type << ", size: " << baseMessage.size << ", id: " << baseMessage.id << ", refers: " << baseMessage.refersTo << ", sent: " << baseMessage.sent.sec << "," << baseMessage.sent.usec << ", recv: " << baseMessage.received.sec << "," << baseMessage.received.usec << "\n";
if (baseMessage.type == message_type::kTime) if (baseMessage.type == message_type::kTime)
{ {
msg::Time timeMsg; msg::Time timeMsg;
@ -321,16 +312,29 @@ void StreamServer::onMessageReceived(StreamSession* connection, const msg::BaseM
} }
StreamSession* StreamServer::getStreamSession(const std::string& mac) session_ptr StreamServer::getStreamSession(StreamSession* streamSession) const
{
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_)
{
if (session.get() == streamSession)
return session;
}
return nullptr;
}
session_ptr StreamServer::getStreamSession(const std::string& mac) const
{ {
// logO << "getStreamSession: " << mac << "\n"; // logO << "getStreamSession: " << mac << "\n";
std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_) for (auto session: sessions_)
{ {
// logO << "getStreamSession, checking: " << session->macAddress << "\n"; // logO << "getStreamSession, checking: " << session->macAddress << "\n";
if (session->macAddress == mac) if (session->macAddress == mac)
return session.get(); return session;
} }
return NULL; return nullptr;
} }
@ -357,12 +361,13 @@ void StreamServer::handleAccept(socket_ptr socket)
logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; logS(kLogNotice) << "StreamServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<StreamSession> session = make_shared<StreamSession>(this, socket); shared_ptr<StreamSession> session = make_shared<StreamSession>(this, socket);
{
std::lock_guard<std::mutex> mlock(sessionsMutex_); session->setBufferMs(settings_.bufferMs);
session->setBufferMs(settings_.bufferMs); session->start();
session->start();
sessions_.insert(session); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
} sessions_.insert(session);
startAccept(); startAccept();
} }
@ -399,12 +404,16 @@ void StreamServer::start()
void StreamServer::stop() void StreamServer::stop()
{ {
// std::lock_guard<std::mutex> mlock(sessionsMutex_); std::lock_guard<std::recursive_mutex> mlock(sessionsMutex_);
for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it) for (auto session: sessions_)//it = sessions_.begin(); it != sessions_.end(); ++it)
{ {
if (session) if (session)
{
session->stop(); session->stop();
session = nullptr;
}
} }
sessions_.clear();
if (controlServer_) if (controlServer_)
{ {

View file

@ -39,7 +39,7 @@
using asio::ip::tcp; using asio::ip::tcp;
typedef std::shared_ptr<tcp::socket> socket_ptr; typedef std::shared_ptr<tcp::socket> socket_ptr;
typedef std::shared_ptr<StreamSession> session_ptr;
struct StreamServerSettings struct StreamServerSettings
{ {
@ -96,9 +96,10 @@ public:
private: private:
void startAccept(); void startAccept();
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
StreamSession* getStreamSession(const std::string& mac); session_ptr getStreamSession(const std::string& mac) const;
mutable std::mutex sessionsMutex_; session_ptr getStreamSession(StreamSession* session) const;
std::set<std::shared_ptr<StreamSession>> sessions_; mutable std::recursive_mutex sessionsMutex_;
std::set<session_ptr> sessions_;
asio::io_service* io_service_; asio::io_service* io_service_;
std::shared_ptr<tcp::acceptor> acceptor_; std::shared_ptr<tcp::acceptor> acceptor_;

View file

@ -28,7 +28,7 @@ using namespace std;
StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : StreamSession::StreamSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) :
active_(true), messageReceiver_(receiver), pcmStream_(nullptr) active_(false), readerThread_(nullptr), writerThread_(nullptr), messageReceiver_(receiver), pcmStream_(nullptr)
{ {
socket_ = socket; socket_ = socket;
} }
@ -54,16 +54,20 @@ const PcmStreamPtr StreamSession::pcmStream() const
void StreamSession::start() void StreamSession::start()
{ {
setActive(true); std::lock_guard<std::recursive_mutex> mlock(mutex_);
readerThread_ = new thread(&StreamSession::reader, this); active_ = true;
writerThread_ = new thread(&StreamSession::writer, this); readerThread_.reset(new thread(&StreamSession::reader, this));
writerThread_.reset(new thread(&StreamSession::writer, this));
} }
void StreamSession::stop() void StreamSession::stop()
{ {
setActive(false); std::lock_guard<std::recursive_mutex> mlock(mutex_);
std::unique_lock<std::mutex> mlock(mutex_); if (!active_)
return;
active_ = false;
try try
{ {
std::error_code ec; std::error_code ec;
@ -74,25 +78,25 @@ void StreamSession::stop()
socket_->close(ec); socket_->close(ec);
if (ec) logE << "Error in socket close: " << ec.message() << "\n"; if (ec) logE << "Error in socket close: " << ec.message() << "\n";
} }
if (readerThread_) if (readerThread_ && readerThread_->joinable())
{ {
logD << "joining readerThread\n"; logD << "joining readerThread\n";
messages_.abort_wait();
readerThread_->join(); readerThread_->join();
delete readerThread_;
} }
if (writerThread_) if (writerThread_ && writerThread_->joinable())
{ {
logD << "joining writerThread\n"; logD << "joining writerThread\n";
writerThread_->join(); writerThread_->join();
delete writerThread_;
} }
} }
catch(...) catch(...)
{ {
} }
readerThread_ = NULL;
writerThread_ = NULL; readerThread_ = nullptr;
socket_ = NULL; writerThread_ = nullptr;
socket_ = nullptr;
logD << "StreamSession stopped\n"; logD << "StreamSession stopped\n";
} }
@ -135,7 +139,7 @@ bool StreamSession::send(const msg::BaseMessage* message) const
{ {
//TODO on exception: set active = false //TODO on exception: set active = false
// logO << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; // logO << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
std::unique_lock<std::mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(mutex_);
if (!socket_ || !active_) if (!socket_ || !active_)
return false; return false;
asio::streambuf streambuf; asio::streambuf streambuf;
@ -169,7 +173,7 @@ void StreamSession::getNextMessage()
tv t; tv t;
baseMessage.received = t; baseMessage.received = t;
if (messageReceiver_ != NULL) if (active_ && (messageReceiver_ != NULL))
messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]); messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]);
} }
@ -187,7 +191,9 @@ void StreamSession::reader()
{ {
logS(kLogErr) << "Exception in StreamSession::reader(): " << e.what() << endl; logS(kLogErr) << "Exception in StreamSession::reader(): " << e.what() << endl;
} }
setActive(false);
if (active_ && (messageReceiver_ != NULL))
messageReceiver_->onDisconnect(this);
} }
@ -224,16 +230,9 @@ void StreamSession::writer()
{ {
logS(kLogErr) << "Exception in StreamSession::writer(): " << e.what() << endl; logS(kLogErr) << "Exception in StreamSession::writer(): " << e.what() << endl;
} }
setActive(false);
}
if (active_ && (messageReceiver_ != NULL))
void StreamSession::setActive(bool active)
{
std::lock_guard<std::mutex> mlock(activeMutex_);
if (active_ && !active && (messageReceiver_ != NULL))
messageReceiver_->onDisconnect(this); messageReceiver_->onDisconnect(this);
active_ = active;
} }

View file

@ -22,11 +22,11 @@
#include <string> #include <string>
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <mutex>
#include <memory> #include <memory>
#include <asio.hpp> #include <asio.hpp>
#include <condition_variable> #include <condition_variable>
#include <set> #include <set>
#include <mutex>
#include "message/message.h" #include "message/message.h"
#include "common/queue.h" #include "common/queue.h"
#include "streamreader/streamManager.h" #include "streamreader/streamManager.h"
@ -88,14 +88,12 @@ protected:
void getNextMessage(); void getNextMessage();
void reader(); void reader();
void writer(); void writer();
void setActive(bool active);
mutable std::mutex activeMutex_; mutable std::recursive_mutex mutex_;
std::atomic<bool> active_; std::atomic<bool> active_;
mutable std::mutex mutex_; std::unique_ptr<std::thread> readerThread_;
std::thread* readerThread_; std::unique_ptr<std::thread> writerThread_;
std::thread* writerThread_;
std::shared_ptr<tcp::socket> socket_; std::shared_ptr<tcp::socket> socket_;
MessageReceiver* messageReceiver_; MessageReceiver* messageReceiver_;
Queue<std::shared_ptr<const msg::BaseMessage>> messages_; Queue<std::shared_ptr<const msg::BaseMessage>> messages_;