diff --git a/server/controlServer.cpp b/server/controlServer.cpp index 0b23092e..622d7699 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -34,14 +34,14 @@ using namespace std; using json = nlohmann::json; -ControlServer::ControlServer(size_t port, ControlMessageReceiver* controlMessageReceiver) : port_(port), controlMessageReceiver_(controlMessageReceiver) +ControlServer::ControlServer(boost::asio::io_service* io_service, size_t port, ControlMessageReceiver* controlMessageReceiver) : io_service_(io_service), port_(port), controlMessageReceiver_(controlMessageReceiver) { } ControlServer::~ControlServer() { - stop(); +// stop(); } @@ -93,7 +93,7 @@ void ControlServer::onMessageReceived(ControlSession* connection, const std::str void ControlServer::startAccept() { - socket_ptr socket = make_shared(io_service_); + socket_ptr socket = make_shared(*io_service_); acceptor_->async_accept(*socket, bind(&ControlServer::handleAccept, this, socket)); } @@ -118,25 +118,32 @@ void ControlServer::handleAccept(socket_ptr socket) void ControlServer::start() { - acceptor_ = make_shared(io_service_, tcp::endpoint(tcp::v4(), port_)); + acceptor_ = make_shared(*io_service_, tcp::endpoint(tcp::v4(), port_)); startAccept(); - acceptThread_ = thread(&ControlServer::acceptor, this); +// acceptThread_ = thread(&ControlServer::acceptor, this); } void ControlServer::stop() { acceptor_->cancel(); - io_service_.stop(); - acceptThread_.join(); - std::unique_lock mlock(mutex_); +// io_service_.stop(); +/* try + { + acceptThread_.join(); + } + catch(const exception& e) + { + logO << "ControlServer::stop exception: " << e.what() << "\n"; + } +*/ std::unique_lock mlock(mutex_); for (auto it = sessions_.begin(); it != sessions_.end(); ++it) (*it)->stop(); } -void ControlServer::acceptor() -{ - io_service_.run(); -} +//void ControlServer::acceptor() +//{ +// io_service_.run(); +//} diff --git a/server/controlServer.h b/server/controlServer.h index 9d8b5fc3..c7f35d1c 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -47,7 +47,7 @@ typedef std::shared_ptr socket_ptr; class ControlServer : public ControlMessageReceiver { public: - ControlServer(size_t port, ControlMessageReceiver* controlMessageReceiver = NULL); + ControlServer(boost::asio::io_service* io_service, size_t port, ControlMessageReceiver* controlMessageReceiver = NULL); virtual ~ControlServer(); void start(); @@ -62,14 +62,14 @@ public: private: void startAccept(); void handleAccept(socket_ptr socket); - void acceptor(); +// void acceptor(); mutable std::mutex mutex_; size_t port_; std::set> sessions_; - boost::asio::io_service io_service_; + boost::asio::io_service* io_service_; std::shared_ptr acceptor_; - std::thread acceptThread_; +// std::thread acceptThread_; Queue> messages_; ControlMessageReceiver* controlMessageReceiver_; diff --git a/server/snapServer.cpp b/server/snapServer.cpp index e6980feb..88cc4093 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -131,12 +131,21 @@ int main(int argc, char* argv[]) if (settings.bufferMs < 400) settings.bufferMs = 400; settings.sampleFormat = sampleFormat; - std::unique_ptr streamServer(new StreamServer(settings)); + + boost::asio::io_service io_service; + auto func = [](boost::asio::io_service* ioservice)->void{ioservice->run();}; + std::thread t(func, &io_service); + + std::unique_ptr streamServer(new StreamServer(&io_service, settings)); streamServer->start(); while (!g_terminated) usleep(100*1000); + io_service.stop(); + t.join(); + + logO << "Stopping streamServer" << endl; streamServer->stop(); logO << "done" << endl; diff --git a/server/streamServer.cpp b/server/streamServer.cpp index 0f3f1fa3..2cb3ec22 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -32,7 +32,7 @@ using namespace std; using json = nlohmann::json; -StreamServer::StreamServer(const StreamServerSettings& streamServerSettings) : settings_(streamServerSettings), sampleFormat_(streamServerSettings.sampleFormat) +StreamServer::StreamServer(boost::asio::io_service* io_service, const StreamServerSettings& streamServerSettings) : io_service_(io_service), settings_(streamServerSettings), sampleFormat_(streamServerSettings.sampleFormat) { } @@ -54,7 +54,7 @@ void StreamServer::send(const msg::BaseMessage* message) auto func = [](shared_ptr s)->void{s->stop();}; std::thread t(func, *it); t.detach(); - controlServer->send("Client gone: " + (*it)->macAddress); + controlServer_->send("Client gone: " + (*it)->macAddress); sessions_.erase(it++); } else @@ -87,7 +87,7 @@ void StreamServer::onDisconnect(ClientSession* connection) gettimeofday(&client->lastSeen, NULL); Config::instance().save(); json notification = JsonNotification::getJson("Client.OnDisconnect", client->toJson()); - controlServer->send(notification.dump(4)); + controlServer_->send(notification.dump(4)); } @@ -182,7 +182,7 @@ void StreamServer::onMessageReceived(ControlSession* connection, const std::stri Config::instance().save(); json notification = JsonNotification::getJson("Client.OnUpdate", clientInfo->toJson()); - controlServer->send(notification.dump()); + controlServer_->send(notification.dump()); } connection->send(request.getResponse(response).dump()); @@ -269,7 +269,7 @@ void StreamServer::onMessageReceived(ClientSession* connection, const msg::BaseM Config::instance().save(); json notification = JsonNotification::getJson("Client.OnConnect", client->toJson()); - controlServer->send(notification.dump(4)); + controlServer_->send(notification.dump(4)); } } @@ -287,7 +287,7 @@ ClientSession* StreamServer::getClientSession(const std::string& mac) void StreamServer::startAccept() { - socket_ptr socket = make_shared(io_service_); + socket_ptr socket = make_shared(*io_service_); acceptor_->async_accept(*socket, bind(&StreamServer::handleAccept, this, socket)); } @@ -313,23 +313,23 @@ void StreamServer::handleAccept(socket_ptr socket) void StreamServer::start() { - controlServer.reset(new ControlServer(settings_.port + 1, this)); - controlServer->start(); + controlServer_.reset(new ControlServer(io_service_, settings_.port + 1, this)); + controlServer_->start(); pipeReader_ = new PipeReader(this, settings_.sampleFormat, settings_.codec, settings_.fifoName, settings_.pipeReadMs); pipeReader_->start(); - acceptor_ = make_shared(io_service_, tcp::endpoint(tcp::v4(), settings_.port)); + acceptor_ = make_shared(*io_service_, tcp::endpoint(tcp::v4(), settings_.port)); startAccept(); - acceptThread_ = thread(&StreamServer::acceptor, this); +// acceptThread_ = thread(&StreamServer::acceptor, this); } void StreamServer::stop() { - controlServer->stop(); + controlServer_->stop(); acceptor_->cancel(); - io_service_.stop(); - acceptThread_.join(); +// io_service_.stop(); +// acceptThread_.join(); pipeReader_->stop(); std::unique_lock mlock(mutex_); for (auto it = sessions_.begin(); it != sessions_.end(); ++it) @@ -337,8 +337,8 @@ void StreamServer::stop() } -void StreamServer::acceptor() -{ - io_service_.run(); -} +//void StreamServer::acceptor() +//{ +// io_service_.run(); +//} diff --git a/server/streamServer.h b/server/streamServer.h index badfa24d..fc40288d 100644 --- a/server/streamServer.h +++ b/server/streamServer.h @@ -71,7 +71,7 @@ struct StreamServerSettings class StreamServer : public MessageReceiver, ControlMessageReceiver, PipeListener { public: - StreamServer(const StreamServerSettings& streamServerSettings); + StreamServer(boost::asio::io_service* io_service, const StreamServerSettings& streamServerSettings); virtual ~StreamServer(); void start(); @@ -93,19 +93,19 @@ public: private: void startAccept(); void handleAccept(socket_ptr socket); - void acceptor(); +// void acceptor(); ClientSession* getClientSession(const std::string& mac); mutable std::mutex mutex_; PipeReader* pipeReader_; std::set> sessions_; - boost::asio::io_service io_service_; + boost::asio::io_service* io_service_; std::shared_ptr acceptor_; StreamServerSettings settings_; msg::SampleFormat sampleFormat_; - std::thread acceptThread_; +// std::thread acceptThread_; Queue> messages_; - std::unique_ptr controlServer; + std::unique_ptr controlServer_; };