async control server

This commit is contained in:
badaix 2019-09-28 10:43:11 +02:00
parent 50f10dc952
commit c4a61dff47
4 changed files with 92 additions and 159 deletions

View file

@ -44,17 +44,12 @@ ControlServer::~ControlServer()
void ControlServer::cleanup() void ControlServer::cleanup()
{ {
std::lock_guard<std::recursive_mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
for (auto it = sessions_.begin(); it != sessions_.end();) for (auto it = sessions_.begin(); it != sessions_.end();)
{ {
if (!(*it)->active()) if (it->expired())
{ {
SLOG(ERROR) << "Session inactive. Removing\n"; SLOG(ERROR) << "Session inactive. Removing\n";
// don't block: remove ClientSession in a thread
auto func = [](shared_ptr<ControlSession> s) -> void { s->stop(); };
std::thread t(func, *it);
t.detach();
//(*it)->stop();
sessions_.erase(it++); sessions_.erase(it++);
} }
else else
@ -68,35 +63,41 @@ void ControlServer::send(const std::string& message, const ControlSession* exclu
cleanup(); cleanup();
for (auto s : sessions_) for (auto s : sessions_)
{ {
if (s.get() != excludeSession) if (auto session = s.lock())
s->sendAsync(message); {
if (session.get() != excludeSession)
session->sendAsync(message);
}
} }
} }
void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message) void ControlServer::onMessageReceived(ControlSession* connection, const std::string& message)
{ {
std::lock_guard<std::recursive_mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
LOG(DEBUG) << "received: \"" << message << "\"\n"; LOG(DEBUG) << "received: \"" << message << "\"\n";
if ((message == "quit") || (message == "exit") || (message == "bye")) // if ((message == "quit") || (message == "exit") || (message == "bye"))
{ // {
for (auto it = sessions_.begin(); it != sessions_.end(); ++it) // for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
{ // {
if (it->get() == connection) // auto session = it->lock();
{ // if (!session)
/// delete in a thread to avoid deadlock // continue;
auto func = [&](std::shared_ptr<ControlSession> s) -> void { sessions_.erase(s); }; // if (session.get() == connection)
std::thread t(func, *it); // {
t.detach(); // /// delete in a thread to avoid deadlock
break; // auto func = [&](std::shared_ptr<ControlSession> s) -> void { sessions_.erase(s); };
} // std::thread t(func, *it);
} // t.detach();
} // break;
else // }
{ // }
// }
// else
// {
if (controlMessageReceiver_ != nullptr) if (controlMessageReceiver_ != nullptr)
controlMessageReceiver_->onMessageReceived(connection, message); controlMessageReceiver_->onMessageReceived(connection, message);
} // }
} }
@ -129,9 +130,9 @@ void ControlServer::handleAccept(socket_ptr socket)
SLOG(NOTICE) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl; SLOG(NOTICE) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
shared_ptr<ControlSession> session = make_shared<ControlSession>(this, socket); shared_ptr<ControlSession> session = make_shared<ControlSession>(this, socket);
{ {
std::lock_guard<std::recursive_mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
session->start(); session->start();
sessions_.insert(session); sessions_.emplace_back(session);
cleanup(); cleanup();
} }
} }
@ -191,7 +192,10 @@ void ControlServer::stop()
acceptor_v6_->cancel(); acceptor_v6_->cancel();
acceptor_v6_ = nullptr; acceptor_v6_ = nullptr;
} }
std::lock_guard<std::recursive_mutex> mlock(mutex_); std::lock_guard<std::recursive_mutex> mlock(session_mutex_);
for (auto s : sessions_) for (auto s : sessions_)
s->stop(); {
if (auto session = s.lock())
session->stop();
}
} }

View file

@ -62,13 +62,12 @@ private:
void startAccept(); void startAccept();
void handleAccept(socket_ptr socket); void handleAccept(socket_ptr socket);
void cleanup(); void cleanup();
// void acceptor();
mutable std::recursive_mutex mutex_; mutable std::recursive_mutex session_mutex_;
std::set<std::shared_ptr<ControlSession>> sessions_; std::vector<std::weak_ptr<ControlSession>> sessions_;
std::shared_ptr<tcp::acceptor> acceptor_v4_; std::shared_ptr<tcp::acceptor> acceptor_v4_;
std::shared_ptr<tcp::acceptor> acceptor_v6_; std::shared_ptr<tcp::acceptor> acceptor_v6_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_;
asio::io_service* io_service_; asio::io_service* io_service_;
size_t port_; size_t port_;
ControlMessageReceiver* controlMessageReceiver_; ControlMessageReceiver* controlMessageReceiver_;

View file

@ -26,7 +26,7 @@ using namespace std;
ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : active_(false), messageReceiver_(receiver) ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver)
{ {
socket_ = socket; socket_ = socket;
} }
@ -34,53 +34,55 @@ ControlSession::ControlSession(ControlMessageReceiver* receiver, std::shared_ptr
ControlSession::~ControlSession() ControlSession::~ControlSession()
{ {
LOG(DEBUG) << "ControlSession::~ControlSession()\n";
stop(); stop();
} }
void ControlSession::do_read()
{
const std::string delimiter = "\n";
auto self(shared_from_this());
asio::async_read_until(*socket_, streambuf_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_.data()), buffers_begin(streambuf_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
LOG(INFO) << "received: " << line << "\n";
if ((messageReceiver_ != nullptr) && !line.empty())
messageReceiver_->onMessageReceived(this, line);
}
streambuf_.consume(bytes_transferred);
do_read();
});
}
void ControlSession::start() void ControlSession::start()
{ {
{ do_read();
std::lock_guard<std::recursive_mutex> activeLock(activeMutex_);
active_ = true;
}
readerThread_ = thread(&ControlSession::reader, this);
writerThread_ = thread(&ControlSession::writer, this);
} }
void ControlSession::stop() void ControlSession::stop()
{ {
LOG(DEBUG) << "ControlSession::stop\n"; LOG(DEBUG) << "ControlSession::stop\n";
std::lock_guard<std::recursive_mutex> activeLock(activeMutex_); std::error_code ec;
active_ = false; if (socket_)
try
{
std::error_code ec;
if (socket_)
{
std::lock_guard<std::recursive_mutex> socketLock(socketMutex_);
socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
if (ec)
LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n";
socket_->close(ec);
if (ec)
LOG(ERROR) << "Error in socket close: " << ec.message() << "\n";
}
if (readerThread_.joinable())
{
LOG(DEBUG) << "ControlSession joining readerThread\n";
readerThread_.join();
}
if (writerThread_.joinable())
{
LOG(DEBUG) << "ControlSession joining writerThread\n";
messages_.abort_wait();
writerThread_.join();
}
}
catch (...)
{ {
socket_->shutdown(asio::ip::tcp::socket::shutdown_both, ec);
if (ec)
LOG(ERROR) << "Error in socket shutdown: " << ec.message() << "\n";
socket_->close(ec);
if (ec)
LOG(ERROR) << "Error in socket close: " << ec.message() << "\n";
} }
socket_ = nullptr; socket_ = nullptr;
LOG(DEBUG) << "ControlSession ControlSession stopped\n"; LOG(DEBUG) << "ControlSession ControlSession stopped\n";
@ -90,84 +92,23 @@ void ControlSession::stop()
void ControlSession::sendAsync(const std::string& message) void ControlSession::sendAsync(const std::string& message)
{ {
messages_.push(message); auto self(shared_from_this());
asio::async_write(*socket_, asio::buffer(message + "\r\n"), [this, self](std::error_code ec, std::size_t length) {
if (ec)
{
LOG(ERROR) << "Error while writing to control socket: " << ec.message() << "\n";
}
else
{
LOG(DEBUG) << "Wrote " << length << " bytes to control socket\n";
}
});
} }
bool ControlSession::send(const std::string& message) const bool ControlSession::send(const std::string& message) const
{ {
// LOG(INFO) << "send: " << message << ", size: " << message.length() << "\n"; error_code ec;
std::lock_guard<std::recursive_mutex> socketLock(socketMutex_); asio::write(*socket_, asio::buffer(message + "\r\n"), ec);
{ return !ec;
std::lock_guard<std::recursive_mutex> activeLock(activeMutex_);
if (!socket_ || !active_)
return false;
}
asio::streambuf streambuf;
std::ostream request_stream(&streambuf);
request_stream << message << "\r\n";
asio::write(*socket_.get(), streambuf);
// LOG(INFO) << "done\n";
return true;
}
void ControlSession::reader()
{
try
{
std::stringstream message;
while (active_)
{
asio::streambuf response;
asio::read_until(*socket_, response, "\n");
std::string s((istreambuf_iterator<char>(&response)), istreambuf_iterator<char>());
message << s;
if (s.empty() || (s.back() != '\n'))
continue;
string line;
while (std::getline(message, line, '\n'))
{
if (line.empty())
continue;
size_t len = line.length() - 1;
if ((len >= 2) && line[len - 2] == '\r')
--len;
line.resize(len);
if ((messageReceiver_ != nullptr) && !line.empty())
messageReceiver_->onMessageReceived(this, line);
}
message.str("");
message.clear();
}
}
catch (const std::exception& e)
{
SLOG(ERROR) << "Exception in ControlSession::reader(): " << e.what() << endl;
}
active_ = false;
}
void ControlSession::writer()
{
try
{
asio::streambuf streambuf;
std::ostream stream(&streambuf);
string message;
while (active_)
{
if (messages_.try_pop(message, std::chrono::milliseconds(500)))
send(message);
}
}
catch (const std::exception& e)
{
SLOG(ERROR) << "Exception in ControlSession::writer(): " << e.what() << endl;
}
active_ = false;
} }

View file

@ -51,7 +51,7 @@ public:
* Messages are sent to the client with the "send" method. * Messages are sent to the client with the "send" method.
* Received messages from the client are passed to the ControlMessageReceiver callback * Received messages from the client are passed to the ControlMessageReceiver callback
*/ */
class ControlSession class ControlSession : public std::enable_shared_from_this<ControlSession>
{ {
public: public:
/// ctor. Received message from the client are passed to MessageReceiver /// ctor. Received message from the client are passed to MessageReceiver
@ -66,23 +66,12 @@ public:
/// Sends a message to the client (asynchronous) /// Sends a message to the client (asynchronous)
void sendAsync(const std::string& message); void sendAsync(const std::string& message);
bool active() const
{
return active_;
}
protected: protected:
void reader(); void do_read();
void writer();
std::atomic<bool> active_;
mutable std::recursive_mutex activeMutex_;
mutable std::recursive_mutex socketMutex_;
std::thread readerThread_;
std::thread writerThread_;
std::shared_ptr<tcp::socket> socket_; std::shared_ptr<tcp::socket> socket_;
ControlMessageReceiver* messageReceiver_; ControlMessageReceiver* messageReceiver_;
Queue<std::string> messages_; asio::streambuf streambuf_;
}; };