clean server shutdown

This commit is contained in:
badaix 2015-08-03 19:00:27 +02:00
parent 83d7ca8856
commit 16f81e9c8e
4 changed files with 43 additions and 33 deletions

View file

@ -119,26 +119,29 @@ void ControlServer::onMessageReceived(ServerSession* connection, const msg::Base
} }
void ControlServer::acceptor()
void ControlServer::startAccept()
{
socket_ptr socket(new tcp::socket(io_service_));
acceptor_->async_accept(*socket, bind(&ControlServer::handleAccept, this, socket));
}
void ControlServer::handleAccept(socket_ptr socket)
{ {
tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), settings_.port));
for (;;)
{
socket_ptr sock(new tcp::socket(io_service_));
struct timeval tv; struct timeval tv;
tv.tv_sec = 5; tv.tv_sec = 5;
tv.tv_usec = 0; tv.tv_usec = 0;
a.accept(*sock); setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); logS(kLogNotice) << "ControlServer::NewConnection: " << socket->remote_endpoint().address().to_string() << endl;
logS(kLogNotice) << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << endl; ServerSession* session = new ServerSession(this, socket);
ServerSession* session = new ServerSession(this, sock);
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
session->start(); session->start();
sessions_.insert(shared_ptr<ServerSession>(session)); sessions_.insert(shared_ptr<ServerSession>(session));
} }
} startAccept();
} }
@ -146,12 +149,25 @@ void ControlServer::start()
{ {
pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName); pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName);
pipeReader_->start(); pipeReader_->start();
acceptThread_ = new thread(&ControlServer::acceptor, this); acceptor_ = shared_ptr<tcp::acceptor>(new tcp::acceptor(io_service_, tcp::endpoint(tcp::v4(), settings_.port)));
startAccept();
acceptThread_ = thread(&ControlServer::acceptor, this);
} }
void ControlServer::stop() void ControlServer::stop()
{ {
// acceptThread->join(); io_service_.stop();
acceptor_->cancel();
acceptThread_.join();
std::unique_lock<std::mutex> mlock(mutex_);
for (auto it = sessions_.begin(); it != sessions_.end(); ++it)
(*it)->stop();
}
void ControlServer::acceptor()
{
io_service_.run();
} }

View file

@ -64,16 +64,19 @@ public:
virtual void onResync(const PipeReader* pipeReader, double ms); virtual void onResync(const PipeReader* pipeReader, double ms);
private: private:
void startAccept();
void handleAccept(socket_ptr socket);
void acceptor(); void acceptor();
mutable std::mutex mutex_; mutable std::mutex mutex_;
PipeReader* pipeReader_; PipeReader* pipeReader_;
set<shared_ptr<ServerSession>> sessions_; set<shared_ptr<ServerSession>> sessions_;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
shared_ptr<tcp::acceptor> acceptor_;
ControlServerSettings settings_; ControlServerSettings settings_;
msg::SampleFormat sampleFormat_; msg::SampleFormat sampleFormat_;
msg::ServerSettings serverSettings_; msg::ServerSettings serverSettings_;
thread* acceptThread_; thread acceptThread_;
Queue<shared_ptr<msg::BaseMessage>> messages_; Queue<shared_ptr<msg::BaseMessage>> messages_;
}; };

View file

@ -26,7 +26,6 @@ using namespace std;
ServerSession::ServerSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver) ServerSession::ServerSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver)
{ {
socket_ = socket; socket_ = socket;
@ -83,7 +82,6 @@ void ServerSession::stop()
} }
void ServerSession::socketRead(void* _to, size_t _bytes) void ServerSession::socketRead(void* _to, size_t _bytes)
{ {
size_t read = 0; size_t read = 0;
@ -143,7 +141,6 @@ void ServerSession::getNextMessage()
} }
void ServerSession::reader() void ServerSession::reader()
{ {
active_ = true; active_ = true;
@ -162,8 +159,6 @@ void ServerSession::reader()
} }
void ServerSession::writer() void ServerSession::writer()
{ {
try try
@ -185,11 +180,3 @@ void ServerSession::writer()
} }

View file

@ -126,6 +126,10 @@ int main(int argc, char* argv[])
while (!g_terminated) while (!g_terminated)
usleep(100*1000); usleep(100*1000);
logO << "Stopping controlServer" << endl;
controlServer->stop();
logO << "done" << endl;
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {