From 0a20924e668769f28c6aed2290bb0fb80733b868 Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Thu, 1 Jan 2015 18:15:20 +0000 Subject: [PATCH] code cleanup git-svn-id: svn://elaine/murooma/trunk@337 d8a302eb-03bc-478d-80e4-98257eca68ef --- Makefile | 6 + client/alsaPlayer.cpp | 81 ++++++------- client/alsaPlayer.h | 8 +- client/clientConnection.cpp | 64 ++++++----- client/clientConnection.h | 30 +++-- client/controller.cpp | 73 ++++++------ client/controller.h | 14 +-- client/stream.cpp | 224 ++++++++++++++++++------------------ client/stream.h | 31 ++--- client/timeProvider.cpp | 8 +- client/timeProvider.h | 6 +- server/controlServer.cpp | 30 ++--- server/controlServer.h | 14 +-- server/serverSession.cpp | 52 ++++----- server/serverSession.h | 16 +-- 15 files changed, 332 insertions(+), 325 deletions(-) diff --git a/Makefile b/Makefile index 2eadaa0b..45248814 100644 --- a/Makefile +++ b/Makefile @@ -18,4 +18,10 @@ installclient: installserver: $(MAKE) install -C server + +uninstallclient: + $(MAKE) uninstall -C client + +uninstallserver: + $(MAKE) uninstall -C server diff --git a/client/alsaPlayer.cpp b/client/alsaPlayer.cpp index 2f238440..da34f855 100644 --- a/client/alsaPlayer.cpp +++ b/client/alsaPlayer.cpp @@ -7,7 +7,7 @@ using namespace std; -Player::Player(const PcmDevice& pcmDevice, Stream* stream) : pcm_handle(NULL), buff(NULL), active_(false), stream_(stream), pcmDevice_(pcmDevice) +Player::Player(const PcmDevice& pcmDevice, Stream* stream) : pcm_handle_(NULL), buff_(NULL), active_(false), stream_(stream), pcmDevice_(pcmDevice) { } @@ -19,11 +19,12 @@ void Player::start() snd_pcm_hw_params_t *params; int buff_size; - rate = stream_->format.rate; - channels = stream_->format.channels; + const msg::SampleFormat& format = stream_->getFormat(); + rate = format.rate; + channels = format.channels; /* Open the PCM device in playback mode */ - if ((pcm = snd_pcm_open(&pcm_handle, pcmDevice_.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0) + if ((pcm = snd_pcm_open(&pcm_handle_, pcmDevice_.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0) logE << "ERROR: Can't open " << pcmDevice_.name << " PCM device. " << snd_strerror(pcm) << "\n"; /* struct snd_pcm_playback_info_t pinfo; @@ -34,19 +35,19 @@ void Player::start() /* Allocate parameters object and fill it with default values*/ snd_pcm_hw_params_alloca(¶ms); - snd_pcm_hw_params_any(pcm_handle, params); + snd_pcm_hw_params_any(pcm_handle_, params); /* Set parameters */ - if ((pcm = snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) + if ((pcm = snd_pcm_hw_params_set_access(pcm_handle_, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) logE << "ERROR: Can't set interleaved mode. " << snd_strerror(pcm) << "\n"; - if ((pcm = snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE)) < 0) + if ((pcm = snd_pcm_hw_params_set_format(pcm_handle_, params, SND_PCM_FORMAT_S16_LE)) < 0) logE << "ERROR: Can't set format. " << snd_strerror(pcm) << "\n"; - if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels)) < 0) + if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle_, params, channels)) < 0) logE << "ERROR: Can't set channels number. " << snd_strerror(pcm) << "\n"; - if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0)) < 0) + if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle_, params, &rate, 0)) < 0) logE << "ERROR: Can't set rate. " << snd_strerror(pcm) << "\n"; unsigned int buffer_time; @@ -56,20 +57,20 @@ void Player::start() unsigned int period_time = buffer_time / 4; - snd_pcm_hw_params_set_period_time_near(pcm_handle, params, &period_time, 0); - snd_pcm_hw_params_set_buffer_time_near(pcm_handle, params, &buffer_time, 0); + snd_pcm_hw_params_set_period_time_near(pcm_handle_, params, &period_time, 0); + snd_pcm_hw_params_set_buffer_time_near(pcm_handle_, params, &buffer_time, 0); // long unsigned int periodsize = stream_->format.msRate() * 50;//2*rate/50; // if ((pcm = snd_pcm_hw_params_set_buffer_size_near(pcm_handle, params, &periodsize)) < 0) // logE << "Unable to set buffer size " << (long int)periodsize << ": " << snd_strerror(pcm) << "\n"; /* Write parameters */ - if ((pcm = snd_pcm_hw_params(pcm_handle, params)) < 0) + if ((pcm = snd_pcm_hw_params(pcm_handle_, params)) < 0) logE << "ERROR: Can't set harware parameters. " << snd_strerror(pcm) << "\n"; /* Resume information */ - logD << "PCM name: " << snd_pcm_name(pcm_handle) << "\n"; - logD << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle)) << "\n"; + logD << "PCM name: " << snd_pcm_name(pcm_handle_) << "\n"; + logD << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle_)) << "\n"; snd_pcm_hw_params_get_channels(params, &tmp); logD << "channels: " << tmp << "\n"; @@ -77,26 +78,26 @@ void Player::start() logD << "rate: " << tmp << " bps\n"; /* Allocate buffer to hold single period */ - snd_pcm_hw_params_get_period_size(params, &frames, 0); - logD << "frames: " << frames << "\n"; + snd_pcm_hw_params_get_period_size(params, &frames_, 0); + logD << "frames: " << frames_ << "\n"; - buff_size = frames * channels * 2 /* 2 -> sample size */; - buff = (char *) malloc(buff_size); + buff_size = frames_ * channels * 2 /* 2 -> sample size */; + buff_ = (char *) malloc(buff_size); snd_pcm_hw_params_get_period_time(params, &tmp, NULL); logD << "period time: " << tmp << "\n"; snd_pcm_sw_params_t *swparams; snd_pcm_sw_params_alloca(&swparams); - snd_pcm_sw_params_current(pcm_handle, swparams); + snd_pcm_sw_params_current(pcm_handle_, swparams); - snd_pcm_sw_params_set_avail_min(pcm_handle, swparams, frames); - snd_pcm_sw_params_set_start_threshold(pcm_handle, swparams, frames); -// snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames); - snd_pcm_sw_params(pcm_handle, swparams); + snd_pcm_sw_params_set_avail_min(pcm_handle_, swparams, frames_); + snd_pcm_sw_params_set_start_threshold(pcm_handle_, swparams, frames_); +// snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames_); + snd_pcm_sw_params(pcm_handle_, swparams); active_ = true; - playerThread = new thread(&Player::worker, this); + playerThread_ = new thread(&Player::worker, this); } @@ -108,24 +109,24 @@ Player::~Player() void Player::stop() { active_ = false; - if (playerThread != NULL) + if (playerThread_ != NULL) { - playerThread->join(); - delete playerThread; - playerThread = NULL; + playerThread_->join(); + delete playerThread_; + playerThread_ = NULL; } - if (pcm_handle != NULL) + if (pcm_handle_ != NULL) { - snd_pcm_drain(pcm_handle); - snd_pcm_close(pcm_handle); - pcm_handle = NULL; + snd_pcm_drain(pcm_handle_); + snd_pcm_close(pcm_handle_); + pcm_handle_ = NULL; } - if (buff != NULL) + if (buff_ != NULL) { - free(buff); - buff = NULL; + free(buff_); + buff_ = NULL; } } @@ -137,16 +138,16 @@ void Player::worker() snd_pcm_sframes_t framesDelay; while (active_) { - snd_pcm_avail_delay(pcm_handle, &framesAvail, &framesDelay); - chronos::usec delay((chronos::usec::rep) (1000 * (double) framesDelay / stream_->format.msRate())); + snd_pcm_avail_delay(pcm_handle_, &framesAvail, &framesDelay); + chronos::usec delay((chronos::usec::rep) (1000 * (double) framesDelay / stream_->getFormat().msRate())); logD << "Avail: " << framesAvail << ", delay: " << framesDelay << ", delay[ms]: " << delay.count() / 1000 << "\n"; - if (stream_->getPlayerChunk(buff, delay, frames)) + if (stream_->getPlayerChunk(buff_, delay, frames_)) { - if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) + if ((pcm = snd_pcm_writei(pcm_handle_, buff_, frames_)) == -EPIPE) { logE << "XRUN\n"; - snd_pcm_prepare(pcm_handle); + snd_pcm_prepare(pcm_handle_); } else if (pcm < 0) { diff --git a/client/alsaPlayer.h b/client/alsaPlayer.h index 3a60d23d..f09f4a1d 100644 --- a/client/alsaPlayer.h +++ b/client/alsaPlayer.h @@ -21,12 +21,12 @@ public: private: void worker(); - snd_pcm_t* pcm_handle; - snd_pcm_uframes_t frames; - char *buff; + snd_pcm_t* pcm_handle_; + snd_pcm_uframes_t frames_; + char *buff_; std::atomic active_; Stream* stream_; - std::thread* playerThread; + std::thread* playerThread_; PcmDevice pcmDevice_; }; diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index 18ef6bdf..fc28c63b 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -10,7 +10,7 @@ using namespace std; -ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(1), ip(_ip), port(_port), readerThread(NULL), sumTimeout(chronos::msec(0)) +ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string& ip, size_t port) : active_(false), connected_(false), messageReceiver_(receiver), reqId_(1), ip_(ip), port_(port), readerThread_(NULL), sumTimeout_(chronos::msec(0)) { } @@ -31,7 +31,7 @@ void ClientConnection::socketRead(void* _to, size_t _bytes) // cout << "/"; // cout.flush(); // boost::system::error_code error; - len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead)); + len += socket_->read_some(boost::asio::buffer((char*)_to + len, toRead)); //cout << "len: " << len << ", error: " << error << endl; toRead = _bytes - len; // cout << "\\"; @@ -43,23 +43,24 @@ void ClientConnection::socketRead(void* _to, size_t _bytes) void ClientConnection::start() { + boost::asio::io_service io_service; tcp::resolver resolver(io_service); - tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); - iterator = resolver.resolve(query); + tcp::resolver::query query(tcp::v4(), ip_, boost::lexical_cast(port_)); + auto iterator = resolver.resolve(query); logO << "connecting\n"; - socket.reset(new tcp::socket(io_service)); + socket_.reset(new tcp::socket(io_service)); // struct timeval tv; // tv.tv_sec = 5; // tv.tv_usec = 0; // cout << "socket: " << socket->native() << "\n"; // setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); // setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - socket->connect(*iterator); - logO << "MAC: \"" << getMacAddress(socket->native()) << "\"\n"; + socket_->connect(*iterator); + logO << "MAC: \"" << getMacAddress(socket_->native()) << "\"\n"; connected_ = true; logS(kLogNotice) << "connected" << endl; active_ = true; - readerThread = new thread(&ClientConnection::reader, this); + readerThread_ = new thread(&ClientConnection::reader, this); } @@ -70,24 +71,24 @@ void ClientConnection::stop() try { boost::system::error_code ec; - if (socket) + if (socket_) { - socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); if (ec) logE << "Error in socket shutdown: " << ec << "\n"; - socket->close(ec); + socket_->close(ec); if (ec) logE << "Error in socket close: " << ec << "\n"; } - if (readerThread) + if (readerThread_) { logD << "joining readerThread\n"; - readerThread->join(); - delete readerThread; + readerThread_->join(); + delete readerThread_; } } catch(...) { } - readerThread = NULL; + readerThread_ = NULL; logD << "readerThread terminated\n"; } @@ -104,7 +105,7 @@ bool ClientConnection::send(msg::BaseMessage* message) tv t; message->sent = t; message->serialize(stream); - boost::asio::write(*socket.get(), streambuf); + boost::asio::write(*socket_.get(), streambuf); return true; } @@ -112,34 +113,35 @@ bool ClientConnection::send(msg::BaseMessage* message) shared_ptr ClientConnection::sendRequest(msg::BaseMessage* message, const chronos::msec& timeout) { shared_ptr response(NULL); - if (++reqId == 10000) - reqId = 1; - message->id = reqId; + if (++reqId_ == 10000) + reqId_ = 1; + message->id = reqId_; //logD << "Req: " << reqId << "\n"; - shared_ptr pendingRequest(new PendingRequest(reqId)); + shared_ptr pendingRequest(new PendingRequest(reqId_)); { std::unique_lock mlock(mutex_); - pendingRequests.insert(pendingRequest); + pendingRequests_.insert(pendingRequest); } + std::mutex m; std::unique_lock lck(m); send(message); if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) { response = pendingRequest->response; - sumTimeout = chronos::msec(0); + sumTimeout_ = chronos::msec(0); //logD << "Resp: " << pendingRequest->id << "\n"; } else { - sumTimeout += timeout; - logO << "timeout while waiting for response to: " << reqId << ", timeout " << sumTimeout.count() << "\n"; - if (sumTimeout > chronos::sec(10)) + sumTimeout_ += timeout; + logO << "timeout while waiting for response to: " << reqId_ << ", timeout " << sumTimeout_.count() << "\n"; + if (sumTimeout_ > chronos::sec(10)) throw SnapException("sum timeout exceeded 10s"); } { std::unique_lock mlock(mutex_); - pendingRequests.erase(pendingRequest); + pendingRequests_.erase(pendingRequest); } return response; } @@ -162,7 +164,7 @@ void ClientConnection::getNextMessage() { std::unique_lock mlock(mutex_); { - for (auto req: pendingRequests) + for (auto req: pendingRequests_) { if (req->id == baseMessage.refersTo) { @@ -177,8 +179,8 @@ void ClientConnection::getNextMessage() } } - if (messageReceiver != NULL) - messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); + if (messageReceiver_ != NULL) + messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]); } @@ -194,8 +196,8 @@ void ClientConnection::reader() } catch (const std::exception& e) { - if (messageReceiver != NULL) - messageReceiver->onException(this, e); + if (messageReceiver_ != NULL) + messageReceiver_->onException(this, e); } catch (...) { diff --git a/client/clientConnection.h b/client/clientConnection.h index cf9d127a..d13f78a2 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -40,11 +40,11 @@ public: class ClientConnection { public: - ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port); + ClientConnection(MessageReceiver* receiver, const std::string& ip, size_t port); virtual ~ClientConnection(); virtual void start(); virtual void stop(); - virtual bool send(msg::BaseMessage* _message); + virtual bool send(msg::BaseMessage* message); virtual std::shared_ptr sendRequest(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000)); template @@ -65,31 +65,27 @@ public: virtual bool connected() { - return (socket != 0); + return (socket_ != 0); // return (connected_ && socket); } protected: virtual void reader(); - void socketRead(void* _to, size_t _bytes); - std::shared_ptr socket; + void socketRead(void* to, size_t bytes); + void getNextMessage(); -// boost::asio::ip::tcp::endpoint endpt; + std::shared_ptr socket_; std::atomic active_; std::atomic connected_; - MessageReceiver* messageReceiver; - void getNextMessage(); - boost::asio::io_service io_service; - tcp::resolver::iterator iterator; + MessageReceiver* messageReceiver_; mutable std::mutex mutex_; - std::mutex m; - std::set> pendingRequests; - uint16_t reqId; - std::string ip; - size_t port; - std::thread* readerThread; - chronos::msec sumTimeout; + std::set> pendingRequests_; + uint16_t reqId_; + std::string ip_; + size_t port_; + std::thread* readerThread_; + chronos::msec sumTimeout_; }; diff --git a/client/controller.cpp b/client/controller.cpp index 1862f89c..f80179a8 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -18,7 +18,7 @@ using namespace std; -Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL), decoder(NULL) +Controller::Controller() : MessageReceiver(), active_(false), sampleFormat_(NULL), decoder_(NULL) { } @@ -33,14 +33,14 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base { if (baseMessage.type == message_type::kPayload) { - if ((stream != NULL) && (decoder != NULL)) + if ((stream_ != NULL) && (decoder_ != NULL)) { - msg::PcmChunk* pcmChunk = new msg::PcmChunk(*sampleFormat, 0); + msg::PcmChunk* pcmChunk = new msg::PcmChunk(*sampleFormat_, 0); pcmChunk->deserialize(baseMessage, buffer); //logD << "chunk: " << pcmChunk->payloadSize; - if (decoder->decode(pcmChunk)) + if (decoder_->decode(pcmChunk)) { - stream->addChunk(pcmChunk); + stream_->addChunk(pcmChunk); //logD << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n"; } else @@ -50,13 +50,13 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base } -void Controller::start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency) +void Controller::start(const PcmDevice& pcmDevice, const std::string& ip, size_t port, size_t latency) { - ip = _ip; + ip_ = ip; pcmDevice_ = pcmDevice; latency_ = latency; - clientConnection = new ClientConnection(this, ip, _port); - controllerThread = new thread(&Controller::worker, this); + clientConnection_ = new ClientConnection(this, ip, port); + controllerThread_ = new thread(&Controller::worker, this); } @@ -64,48 +64,47 @@ void Controller::stop() { logD << "Stopping\n"; active_ = false; - controllerThread->join(); - clientConnection->stop(); - delete controllerThread; - delete clientConnection; + controllerThread_->join(); + clientConnection_->stop(); + delete controllerThread_; + delete clientConnection_; } void Controller::worker() { -// Decoder* decoder; active_ = true; - decoder = NULL; - stream = NULL; + decoder_ = NULL; + stream_ = NULL; while (active_) { try { - clientConnection->start(); + clientConnection_->start(); msg::Request requestMsg(kServerSettings); shared_ptr serverSettings(NULL); - while (active_ && !(serverSettings = clientConnection->sendReq(&requestMsg))); + while (active_ && !(serverSettings = clientConnection_->sendReq(&requestMsg))); logO << "ServerSettings buffer: " << serverSettings->bufferMs << "\n"; requestMsg.request = kSampleFormat; - while (active_ && !(sampleFormat = clientConnection->sendReq(&requestMsg))); - logO << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; + while (active_ && !(sampleFormat_ = clientConnection_->sendReq(&requestMsg))); + logO << "SampleFormat rate: " << sampleFormat_->rate << ", bits: " << sampleFormat_->bits << ", channels: " << sampleFormat_->channels << "\n"; requestMsg.request = kHeader; shared_ptr headerChunk(NULL); - while (active_ && !(headerChunk = clientConnection->sendReq(&requestMsg))); + while (active_ && !(headerChunk = clientConnection_->sendReq(&requestMsg))); logO << "Codec: " << headerChunk->codec << "\n"; if (headerChunk->codec == "ogg") - decoder = new OggDecoder(); + decoder_ = new OggDecoder(); else if (headerChunk->codec == "pcm") - decoder = new PcmDecoder(); - decoder->setHeader(headerChunk.get()); + decoder_ = new PcmDecoder(); + decoder_->setHeader(headerChunk.get()); msg::Request timeReq(kTime); for (size_t n=0; n<50 && active_; ++n) { - shared_ptr reply = clientConnection->sendReq(&timeReq, chronos::msec(2000)); + shared_ptr reply = clientConnection_->sendReq(&timeReq, chronos::msec(2000)); if (reply) { double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; @@ -115,21 +114,21 @@ void Controller::worker() } logO << "diff to server [ms]: " << TimeProvider::getInstance().getDiffToServer().count() << "\n"; - stream = new Stream(*sampleFormat); - stream->setBufferLen(serverSettings->bufferMs - latency_); + stream_ = new Stream(*sampleFormat_); + stream_->setBufferLen(serverSettings->bufferMs - latency_); - Player player(pcmDevice_, stream); + Player player(pcmDevice_, stream_); player.start(); msg::Command startStream("startStream"); shared_ptr ackMsg(NULL); - while (active_ && !(ackMsg = clientConnection->sendReq(&startStream))); + while (active_ && !(ackMsg = clientConnection_->sendReq(&startStream))); while (active_) { usleep(500*1000); //throw SnapException("timeout"); - shared_ptr reply = clientConnection->sendReq(&timeReq); + shared_ptr reply = clientConnection_->sendReq(&timeReq); if (reply) { double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; @@ -141,14 +140,14 @@ void Controller::worker() { logS(kLogErr) << "Exception in Controller::worker(): " << e.what() << endl; logO << "Stopping clientConnection" << endl; - clientConnection->stop(); + clientConnection_->stop(); logO << "Deleting stream" << endl; - if (stream != NULL) - delete stream; - stream = NULL; - if (decoder != NULL) - delete decoder; - decoder = NULL; + if (stream_ != NULL) + delete stream_; + stream_ = NULL; + if (decoder_ != NULL) + delete decoder_; + decoder_ = NULL; logO << "done" << endl; if (active_) usleep(500*1000); diff --git a/client/controller.h b/client/controller.h index 89254620..8d0f5e24 100644 --- a/client/controller.h +++ b/client/controller.h @@ -14,7 +14,7 @@ class Controller : public MessageReceiver { public: Controller(); - void start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency); + void start(const PcmDevice& pcmDevice, const std::string& ip, size_t port, size_t latency); void stop(); virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer); virtual void onException(ClientConnection* connection, const std::exception& exception); @@ -22,12 +22,12 @@ public: private: void worker(); std::atomic active_; - std::thread* controllerThread; - ClientConnection* clientConnection; - Stream* stream; - std::string ip; - std::shared_ptr sampleFormat; - Decoder* decoder; + std::thread* controllerThread_; + ClientConnection* clientConnection_; + Stream* stream_; + std::string ip_; + std::shared_ptr sampleFormat_; + Decoder* decoder_; PcmDevice pcmDevice_; size_t latency_; }; diff --git a/client/stream.cpp b/client/stream.cpp index 1d5336b5..dfe9c3c2 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -8,13 +8,13 @@ using namespace std; using namespace chronos; -Stream::Stream(const msg::SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0), playedFrames(0) +Stream::Stream(const msg::SampleFormat& sampleFormat) : format_(sampleFormat), sleep_(0), median_(0), shortMedian_(0), lastUpdate_(0), playedFrames_(0) { - buffer.setSize(500); - shortBuffer.setSize(100); - miniBuffer.setSize(20); -// cardBuffer.setSize(50); - bufferMs = msec(500); + buffer_.setSize(500); + shortBuffer_.setSize(100); + miniBuffer_.setSize(20); +// cardBuffer_.setSize(50); + bufferMs_ = msec(500); /* 48000 x @@ -23,50 +23,50 @@ Stream::Stream(const msg::SampleFormat& sampleFormat) : format(format_), format_ x = 1,000016667 / (1,000016667 - 1) */ - setRealSampleRate(format.rate); + setRealSampleRate(format_.rate); } void Stream::setRealSampleRate(double sampleRate) { - if (sampleRate == format.rate) - correctAfterXFrames = 0; + if (sampleRate == format_.rate) + correctAfterXFrames_ = 0; else - correctAfterXFrames = round((format.rate / sampleRate) / (format.rate / sampleRate - 1.)); -// logD << "Correct after X: " << correctAfterXFrames << " (Real rate: " << sampleRate << ", rate: " << format.rate << ")\n"; + correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.)); +// logD << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n"; } void Stream::setBufferLen(size_t bufferLenMs) { - bufferMs = msec(bufferLenMs); + bufferMs_ = msec(bufferLenMs); } void Stream::clearChunks() { - while (chunks.size() > 0) - chunks.pop(); + while (chunks_.size() > 0) + chunks_.pop(); } void Stream::addChunk(msg::PcmChunk* chunk) { - while (chunks.size() * chunk->duration().count() > 10000) - chunks.pop(); - chunks.push(shared_ptr(chunk)); -// logD << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n"; + while (chunks_.size() * chunk_->duration().count() > 10000) + chunks_.pop(); + chunks_.push(shared_ptr(chunk)); +// logD << "new chunk: " << chunk_->getDuration() << ", Chunks: " << chunks_.size() << "\n"; } time_point_hrc Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer) { - if (!chunk) - chunk = chunks.pop(); - time_point_hrc tp = chunk->start(); - memset(outputBuffer, 0, framesPerBuffer * format.frameSize); + if (!chunk_) + chunk_ = chunks_.pop(); + time_point_hrc tp = chunk_->start(); + memset(outputBuffer, 0, framesPerBuffer * format_.frameSize); return tp; } @@ -75,16 +75,16 @@ time_point_hrc Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long fr time_point_ms Stream::seekTo(const time_point_ms& to) { if (!chunk) - chunk = chunks.pop(); -// time_point_ms tp = chunk->timePoint(); - while (to > chunk->timePoint())// + std::chrono::milliseconds((long int)chunk->getDuration()))// + chunk_ = chunks_.pop(); +// time_point_ms tp = chunk_->timePoint(); + while (to > chunk_->timePoint())// + std::chrono::milliseconds((long int)chunk_->getDuration()))// { - chunk = chunks.pop(); - logD << "\nto: " << Chunk::getAge(to) << "\t chunk: " << Chunk::getAge(chunk->timePoint()) << "\n"; - logD << "diff: " << std::chrono::duration_cast((to - chunk->timePoint())).count() << "\n"; + chunk_ = chunks_.pop(); + logD << "\nto: " << Chunk::getAge(to) << "\t chunk: " << Chunk::getAge(chunk_->timePoint()) << "\n"; + logD << "diff: " << std::chrono::duration_cast((to - chunk_->timePoint())).count() << "\n"; } - chunk->seek(std::chrono::duration_cast(to - chunk->timePoint()).count() * format.msRate()); - return chunk->timePoint(); + chunk_->seek(std::chrono::duration_cast(to - chunk_->timePoint()).count() * format_.msRate()); + return chunk_->timePoint(); } */ @@ -92,35 +92,35 @@ time_point_ms Stream::seekTo(const time_point_ms& to) time_point_hrc Stream::seek(long ms) { if (!chunk) - chunk = chunks.pop(); + chunk_ = chunks_.pop(); if (ms <= 0) - return chunk->start(); + return chunk_->start(); -// time_point_ms tp = chunk->timePoint(); - while (ms > chunk->duration().count()) +// time_point_ms tp = chunk_->timePoint(); + while (ms > chunk_->duration().count()) { - chunk = chunks.pop(); - ms -= min(ms, (long)chunk->durationLeft().count()); + chunk_ = chunks_.pop(); + ms -= min(ms, (long)chunk_->durationLeft().count()); } - chunk->seek(ms * format.msRate()); - return chunk->start(); + chunk_->seek(ms * format_.msRate()); + return chunk_->start(); } */ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer) { - if (!chunk && !chunks.try_pop(chunk, timeout)) + if (!chunk_ && !chunks_.try_pop(chunk_, timeout)) throw 0; - time_point_hrc tp = chunk->start(); + time_point_hrc tp = chunk_->start(); char* buffer = (char*)outputBuffer; unsigned long read = 0; while (read < framesPerBuffer) { - read += chunk->readFrames(buffer + read*format.frameSize, framesPerBuffer - read); - if (chunk->isEndOfChunk() && !chunks.try_pop(chunk, timeout)) + read += chunk_->readFrames(buffer + read*format_.frameSize, framesPerBuffer - read); + if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_, timeout)) throw 0; } return tp; @@ -133,7 +133,7 @@ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::use return getNextPlayerChunk(outputBuffer, timeout, framesPerBuffer); long toRead = framesPerBuffer + framesCorrection; - char* buffer = (char*)malloc(toRead * format.frameSize); + char* buffer = (char*)malloc(toRead * format_.frameSize); time_point_hrc tp = getNextPlayerChunk(buffer, timeout, toRead); float factor = (float)toRead / framesPerBuffer;//(float)(framesPerBuffer*channels_); @@ -143,7 +143,7 @@ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::use for (size_t n=0; n bufferMs) + if (outputBufferDacTime > bufferMs_) return false; - if (!chunk && !chunks.try_pop(chunk, outputBufferDacTime)) + if (!chunk_ && !chunks_.try_pop(chunk_, outputBufferDacTime)) return false; - playedFrames += framesPerBuffer; + playedFrames_ += framesPerBuffer; - chronos::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk->start() - bufferMs + outputBufferDacTime); - if ((sleep.count() == 0) && (chronos::abs(age) > chronos::msec(200))) + chronos::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start() - bufferMs_ + outputBufferDacTime); + if ((sleep_.count() == 0) && (chronos::abs(age) > chronos::msec(200))) { logO << "age > 200: " << age.count() << "\n"; - sleep = age; + sleep_ = age; } try @@ -195,126 +195,126 @@ bool Stream::getPlayerChunk(void* outputBuffer, const chronos::usec& outputBuffe // logD << "buffer duration: " << bufferDuration.count() << "\n"; chronos::usec correction = chronos::usec(0); - if (sleep.count() != 0) + if (sleep_.count() != 0) { resetBuffers(); - if (sleep < -bufferDuration/2) + if (sleep_ < -bufferDuration/2) { - // We're early: not enough chunks. play silence. Reference chunk is the oldest (front) one - sleep = chrono::duration_cast(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, framesPerBuffer) - bufferMs + outputBufferDacTime); -//logD << "-sleep: " << sleep.count() << " " << -bufferDuration.count() / 2000 << "\n"; - if (sleep < -bufferDuration/2) + // We're early: not enough chunks_. play silence. Reference chunk_ is the oldest (front) one + sleep_ = chrono::duration_cast(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, framesPerBuffer) - bufferMs_ + outputBufferDacTime); +//logD << "-sleep: " << sleep_.count() << " " << -bufferDuration.count() / 2000 << "\n"; + if (sleep_ < -bufferDuration/2) return true; } - else if (sleep > bufferDuration/2) + else if (sleep_ > bufferDuration/2) { // We're late: discard oldest chunks - while (sleep > chunk->duration()) + while (sleep_ > chunk_->duration()) { - logO << "sleep > chunk->getDuration(): " << sleep.count() << " > " << chunk->duration().count() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime.count() << ", needed: " << bufferDuration.count() << "\n"; - sleep = std::chrono::duration_cast(TimeProvider::serverNow() - chunk->start() - bufferMs + outputBufferDacTime); - if (!chunks.try_pop(chunk, outputBufferDacTime)) + logO << "sleep > chunk_->getDuration(): " << sleep_.count() << " > " << chunk_->duration().count() << ", chunks: " << chunks_.size() << ", out: " << outputBufferDacTime.count() << ", needed: " << bufferDuration.count() << "\n"; + sleep_ = std::chrono::duration_cast(TimeProvider::serverNow() - chunk_->start() - bufferMs_ + outputBufferDacTime); + if (!chunks_.try_pop(chunk_, outputBufferDacTime)) { logO << "no chunks available\n"; - chunk = NULL; - sleep = chronos::usec(0); + chunk_ = NULL; + sleep_ = chronos::usec(0); return false; } } } // out of sync, can be corrected by playing faster/slower - if (sleep < -chronos::usec(100)) + if (sleep_ < -chronos::usec(100)) { - sleep += chronos::usec(100); + sleep_ += chronos::usec(100); correction = -chronos::usec(100); } - else if (sleep > chronos::usec(100)) + else if (sleep_ > chronos::usec(100)) { - sleep -= chronos::usec(100); + sleep_ -= chronos::usec(100); correction = chronos::usec(100); } else { - logO << "Sleep " << sleep.count() << "\n"; - correction = sleep; - sleep = chronos::usec(0); + logO << "Sleep " << sleep_.count() << "\n"; + correction = sleep_; + sleep_ = chronos::usec(0); } } - long framesCorrection = correction.count()*format.usRate(); - if ((correctAfterXFrames != 0) && (playedFrames >= (unsigned long)abs(correctAfterXFrames))) + long framesCorrection = correction.count()*format_.usRate(); + if ((correctAfterXFrames_ != 0) && (playedFrames_ >= (unsigned long)abs(correctAfterXFrames_))) { - framesCorrection += (correctAfterXFrames > 0)?1:-1; - playedFrames -= abs(correctAfterXFrames); + framesCorrection += (correctAfterXFrames_ > 0)?1:-1; + playedFrames_ -= abs(correctAfterXFrames_); } - age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, outputBufferDacTime, framesPerBuffer, framesCorrection) - bufferMs + outputBufferDacTime); + age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, outputBufferDacTime, framesPerBuffer, framesCorrection) - bufferMs_ + outputBufferDacTime); - setRealSampleRate(format.rate); - if (sleep.count() == 0) + setRealSampleRate(format_.rate); + if (sleep_.count() == 0) { - if (buffer.full()) + if (buffer_.full()) { - if (chronos::usec(abs(median)) > chronos::msec(1)) + if (chronos::usec(abs(median_)) > chronos::msec(1)) { - logO << "pBuffer->full() && (abs(median) > 1): " << median << "\n"; - sleep = chronos::usec(shortMedian); + logO << "pBuffer->full() && (abs(median_) > 1): " << median_ << "\n"; + sleep_ = chronos::usec(shortMedian_); } -/* else if (chronos::usec(median) > chronos::usec(300)) +/* else if (chronos::usec(median_) > chronos::usec(300)) { - setRealSampleRate(format.rate - format.rate / 1000); + setRealSampleRate(format_.rate - format_.rate / 1000); } - else if (chronos::usec(median) < -chronos::usec(300)) + else if (chronos::usec(median_) < -chronos::usec(300)) { - setRealSampleRate(format.rate + format.rate / 1000); + setRealSampleRate(format_.rate + format_.rate / 1000); } */ } - else if (shortBuffer.full()) + else if (shortBuffer_.full()) { - if (chronos::usec(abs(shortMedian)) > chronos::msec(5)) + if (chronos::usec(abs(shortMedian_)) > chronos::msec(5)) { - logO << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n"; - sleep = chronos::usec(shortMedian); + logO << "pShortBuffer->full() && (abs(shortMedian_) > 5): " << shortMedian_ << "\n"; + sleep_ = chronos::usec(shortMedian_); } /* else { - setRealSampleRate(format.rate + -shortMedian / 100); + setRealSampleRate(format_.rate + -shortMedian_ / 100); } */ } - else if (miniBuffer.full() && (chronos::usec(abs(miniBuffer.median())) > chronos::msec(50))) + else if (miniBuffer_.full() && (chronos::usec(abs(miniBuffer_.median())) > chronos::msec(50))) { - logO << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n"; - sleep = chronos::usec((chronos::msec::rep)miniBuffer.mean()); + logO << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer_.median() << "\n"; + sleep_ = chronos::usec((chronos::msec::rep)miniBuffer_.mean()); } } - if (sleep.count() != 0) - logO << "Sleep: " << sleep.count() << "\n"; - else if (shortBuffer.full()) + if (sleep_.count() != 0) + logO << "Sleep: " << sleep_.count() << "\n"; + else if (shortBuffer_.full()) { - if (chronos::usec(shortMedian) > chronos::usec(100)) - setRealSampleRate(format.rate * 0.9999); - else if (chronos::usec(shortMedian) < -chronos::usec(100)) - setRealSampleRate(format.rate * 1.0001); + if (chronos::usec(shortMedian_) > chronos::usec(100)) + setRealSampleRate(format_.rate * 0.9999); + else if (chronos::usec(shortMedian_) < -chronos::usec(100)) + setRealSampleRate(format_.rate * 1.0001); } updateBuffers(age.count()); // print sync stats time_t now = time(NULL); - if (now != lastUpdate) + if (now != lastUpdate_) { - lastUpdate = now; - median = buffer.median(); - shortMedian = shortBuffer.median(); - logO << "Chunk: " << age.count()/100 << "\t" << miniBuffer.median()/100 << "\t" << shortMedian/100 << "\t" << median/100 << "\t" << buffer.size() << "\t" << outputBufferDacTime.count() << "\n"; + lastUpdate_ = now; + median_ = buffer_.median(); + shortMedian_ = shortBuffer_.median(); + logO << "Chunk: " << age.count()/100 << "\t" << miniBuffer_.median()/100 << "\t" << shortMedian_/100 << "\t" << median_/100 << "\t" << buffer_.size() << "\t" << outputBufferDacTime.count() << "\n"; } return true; } catch(int e) { - sleep = chronos::usec(0); + sleep_ = chronos::usec(0); return false; } } diff --git a/client/stream.h b/client/stream.h index 33dc240c..f0bce3bb 100644 --- a/client/stream.h +++ b/client/stream.h @@ -24,7 +24,10 @@ public: void clearChunks(); bool getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer); void setBufferLen(size_t bufferLenMs); - const msg::SampleFormat& format; + const msg::SampleFormat& getFormat() const + { + return format_; + } private: chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer); @@ -38,22 +41,22 @@ private: msg::SampleFormat format_; - long lastTick; - chronos::usec sleep; + long lastTick_; + chronos::usec sleep_; - Queue> chunks; + Queue> chunks_; // DoubleBuffer cardBuffer; - DoubleBuffer miniBuffer; - DoubleBuffer buffer; - DoubleBuffer shortBuffer; - std::shared_ptr chunk; + DoubleBuffer miniBuffer_; + DoubleBuffer buffer_; + DoubleBuffer shortBuffer_; + std::shared_ptr chunk_; - int median; - int shortMedian; - time_t lastUpdate; - chronos::msec bufferMs; - unsigned long playedFrames; - long correctAfterXFrames; + int median_; + int shortMedian_; + time_t lastUpdate_; + chronos::msec bufferMs_; + unsigned long playedFrames_; + long correctAfterXFrames_; }; diff --git a/client/timeProvider.cpp b/client/timeProvider.cpp index 4ad121f1..5cdf0038 100644 --- a/client/timeProvider.cpp +++ b/client/timeProvider.cpp @@ -1,16 +1,16 @@ #include "timeProvider.h" -TimeProvider::TimeProvider() : diffToServer(0) +TimeProvider::TimeProvider() : diffToServer_(0) { - diffBuffer.setSize(200); + diffBuffer_.setSize(200); } void TimeProvider::setDiffToServer(double ms) { - diffBuffer.add(ms * 1000); - diffToServer = diffBuffer.median(); + diffBuffer_.add(ms * 1000); + diffToServer_ = diffBuffer_.median(); } /* diff --git a/client/timeProvider.h b/client/timeProvider.h index 7536d7e3..2188646b 100644 --- a/client/timeProvider.h +++ b/client/timeProvider.h @@ -22,7 +22,7 @@ public: template inline T getDiffToServer() const { - return std::chrono::duration_cast(chronos::usec(diffToServer)); + return std::chrono::duration_cast(chronos::usec(diffToServer_)); } /* chronos::usec::rep getDiffToServer(); @@ -56,8 +56,8 @@ private: TimeProvider(TimeProvider const&); // Don't Implement void operator=(TimeProvider const&); // Don't implement - DoubleBuffer diffBuffer; - std::atomic diffToServer; + DoubleBuffer diffBuffer_; + std::atomic diffToServer_; }; diff --git a/server/controlServer.cpp b/server/controlServer.cpp index f6089b6f..42cf74dc 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -7,7 +7,7 @@ #include -ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NULL), sampleFormat(NULL) +ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk_(NULL), sampleFormat_(NULL) { } @@ -17,7 +17,7 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL void ControlServer::send(shared_ptr message) { std::unique_lock mlock(mutex); - for (auto it = sessions.begin(); it != sessions.end(); ) + for (auto it = sessions_.begin(); it != sessions_.end(); ) { if (!(*it)->active()) { @@ -25,13 +25,13 @@ void ControlServer::send(shared_ptr message) auto func = [](shared_ptr s)->void{s->stop();}; std::thread t(func, *it); t.detach(); - sessions.erase(it++); + sessions_.erase(it++); } else ++it; } - for (auto s : sessions) + for (auto s : sessions_) s->add(message); } @@ -56,18 +56,18 @@ void ControlServer::onMessageReceived(ServerSession* connection, const msg::Base } else if (requestMsg.request == kServerSettings) { - serverSettings->refersTo = requestMsg.id; - connection->send(serverSettings); + serverSettings_->refersTo = requestMsg.id; + connection->send(serverSettings_); } else if (requestMsg.request == kSampleFormat) { - sampleFormat->refersTo = requestMsg.id; - connection->send(sampleFormat); + sampleFormat_->refersTo = requestMsg.id; + connection->send(sampleFormat_); } else if (requestMsg.request == kHeader) { - headerChunk->refersTo = requestMsg.id; - connection->send(headerChunk); + headerChunk_->refersTo = requestMsg.id; + connection->send(headerChunk_); } } else if (baseMessage.type == message_type::kCommand) @@ -102,7 +102,7 @@ void ControlServer::acceptor() { std::unique_lock mlock(mutex); session->start(); - sessions.insert(shared_ptr(session)); + sessions_.insert(shared_ptr(session)); } } } @@ -110,7 +110,7 @@ void ControlServer::acceptor() void ControlServer::start() { - acceptThread = new thread(&ControlServer::acceptor, this); + acceptThread_ = new thread(&ControlServer::acceptor, this); } @@ -123,14 +123,14 @@ void ControlServer::stop() void ControlServer::setHeader(msg::Header* header) { if (header) - headerChunk = header; + headerChunk_ = header; } void ControlServer::setFormat(msg::SampleFormat* format) { if (format) - sampleFormat = format; + sampleFormat_ = format; } @@ -138,7 +138,7 @@ void ControlServer::setFormat(msg::SampleFormat* format) void ControlServer::setServerSettings(msg::ServerSettings* settings) { if (settings) - serverSettings = settings; + serverSettings_ = settings; } diff --git a/server/controlServer.h b/server/controlServer.h index 2a86292c..246e7607 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -37,15 +37,15 @@ public: private: void acceptor(); - mutable std::mutex mutex; - set> sessions; + mutable std::mutex mutex_; + set> sessions_; boost::asio::io_service io_service_; unsigned short port_; - msg::Header* headerChunk; - msg::SampleFormat* sampleFormat; - msg::ServerSettings* serverSettings; - thread* acceptThread; - Queue> messages; + msg::Header* headerChunk_; + msg::SampleFormat* sampleFormat_; + msg::ServerSettings* serverSettings_; + thread* acceptThread_; + Queue> messages_; }; diff --git a/server/serverSession.cpp b/server/serverSession.cpp index a9c52a4c..2dbb4eff 100644 --- a/server/serverSession.cpp +++ b/server/serverSession.cpp @@ -9,9 +9,9 @@ using namespace std; -ServerSession::ServerSession(MessageReceiver* _receiver, std::shared_ptr _socket) : messageReceiver(_receiver) +ServerSession::ServerSession(MessageReceiver* receiver, std::shared_ptr socket) : messageReceiver_(receiver) { - socket = _socket; + socket_ = socket; } @@ -24,9 +24,9 @@ ServerSession::~ServerSession() void ServerSession::start() { active_ = true; - streamActive = false; - readerThread = new thread(&ServerSession::reader, this); - writerThread = new thread(&ServerSession::writer, this); + streamActive_ = false; + readerThread_ = new thread(&ServerSession::reader, this); + writerThread_ = new thread(&ServerSession::writer, this); } @@ -36,31 +36,31 @@ void ServerSession::stop() try { boost::system::error_code ec; - if (socket) + if (socket_) { - socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); if (ec) logE << "Error in socket shutdown: " << ec << "\n"; - socket->close(ec); + socket_->close(ec); if (ec) logE << "Error in socket close: " << ec << "\n"; } - if (readerThread) + if (readerThread_) { logD << "joining readerThread\n"; - readerThread->join(); - delete readerThread; + readerThread_->join(); + delete readerThread_; } - if (writerThread) + if (writerThread_) { logD << "joining readerThread\n"; - writerThread->join(); - delete writerThread; + writerThread_->join(); + delete writerThread_; } } catch(...) { } - readerThread = NULL; - writerThread = NULL; + readerThread_ = NULL; + writerThread_ = NULL; logD << "ServerSession stopped\n"; } @@ -72,7 +72,7 @@ void ServerSession::socketRead(void* _to, size_t _bytes) do { boost::system::error_code error; - read += socket->read_some(boost::asio::buffer((char*)_to + read, _bytes - read)); + read += socket_->read_some(boost::asio::buffer((char*)_to + read, _bytes - read)); } while (read < _bytes); } @@ -80,26 +80,26 @@ void ServerSession::socketRead(void* _to, size_t _bytes) void ServerSession::add(shared_ptr message) { - if (!message || !streamActive) + if (!message || !streamActive_) return; - while (messages.size() > 100)// chunk->getDuration() > 10000) - messages.pop(); - messages.push(message); + while (messages_.size() > 100)// chunk->getDuration() > 10000) + messages_.pop(); + messages_.push(message); } bool ServerSession::send(msg::BaseMessage* message) { std::unique_lock mlock(mutex_); - if (!socket) + if (!socket_) return false; boost::asio::streambuf streambuf; std::ostream stream(&streambuf); tv t; message->sent = t; message->serialize(stream); - boost::asio::write(*socket.get(), streambuf); + boost::asio::write(*socket_.get(), streambuf); return true; } @@ -119,8 +119,8 @@ void ServerSession::getNextMessage() tv t; baseMessage.received = t; - if (messageReceiver != NULL) - messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]); + if (messageReceiver_ != NULL) + messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]); } @@ -154,7 +154,7 @@ void ServerSession::writer() shared_ptr message; while (active_) { - if (messages.try_pop(message, std::chrono::milliseconds(500))) + if (messages_.try_pop(message, std::chrono::milliseconds(500))) send(message.get()); } } diff --git a/server/serverSession.h b/server/serverSession.h index 0c41b355..e663f20e 100644 --- a/server/serverSession.h +++ b/server/serverSession.h @@ -28,7 +28,7 @@ public: class ServerSession { public: - ServerSession(MessageReceiver* _receiver, std::shared_ptr _socket); + ServerSession(MessageReceiver* receiver, std::shared_ptr socket); ~ServerSession(); void start(); void stop(); @@ -42,7 +42,7 @@ public: virtual void setStreamActive(bool active) { - streamActive = active; + streamActive_ = active; } @@ -53,13 +53,13 @@ protected: void writer(); std::atomic active_; - std::atomic streamActive; + std::atomic streamActive_; mutable std::mutex mutex_; - std::thread* readerThread; - std::thread* writerThread; - std::shared_ptr socket; - MessageReceiver* messageReceiver; - Queue> messages; + std::thread* readerThread_; + std::thread* writerThread_; + std::shared_ptr socket_; + MessageReceiver* messageReceiver_; + Queue> messages_; };