code cleanup

git-svn-id: svn://elaine/murooma/trunk@337 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2015-01-01 18:15:20 +00:00
parent 2ea718dee0
commit 0a20924e66
15 changed files with 332 additions and 325 deletions

View file

@ -18,4 +18,10 @@ installclient:
installserver:
$(MAKE) install -C server
uninstallclient:
$(MAKE) uninstall -C client
uninstallserver:
$(MAKE) uninstall -C server

View file

@ -7,7 +7,7 @@
using namespace std;
Player::Player(const PcmDevice& pcmDevice, Stream* stream) : pcm_handle(NULL), buff(NULL), active_(false), stream_(stream), pcmDevice_(pcmDevice)
Player::Player(const PcmDevice& pcmDevice, Stream* stream) : pcm_handle_(NULL), buff_(NULL), active_(false), stream_(stream), pcmDevice_(pcmDevice)
{
}
@ -19,11 +19,12 @@ void Player::start()
snd_pcm_hw_params_t *params;
int buff_size;
rate = stream_->format.rate;
channels = stream_->format.channels;
const msg::SampleFormat& format = stream_->getFormat();
rate = format.rate;
channels = format.channels;
/* Open the PCM device in playback mode */
if ((pcm = snd_pcm_open(&pcm_handle, pcmDevice_.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0)
if ((pcm = snd_pcm_open(&pcm_handle_, pcmDevice_.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0)
logE << "ERROR: Can't open " << pcmDevice_.name << " PCM device. " << snd_strerror(pcm) << "\n";
/* struct snd_pcm_playback_info_t pinfo;
@ -34,19 +35,19 @@ void Player::start()
/* Allocate parameters object and fill it with default values*/
snd_pcm_hw_params_alloca(&params);
snd_pcm_hw_params_any(pcm_handle, params);
snd_pcm_hw_params_any(pcm_handle_, params);
/* Set parameters */
if ((pcm = snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
if ((pcm = snd_pcm_hw_params_set_access(pcm_handle_, params, SND_PCM_ACCESS_RW_INTERLEAVED)) < 0)
logE << "ERROR: Can't set interleaved mode. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE)) < 0)
if ((pcm = snd_pcm_hw_params_set_format(pcm_handle_, params, SND_PCM_FORMAT_S16_LE)) < 0)
logE << "ERROR: Can't set format. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle, params, channels)) < 0)
if ((pcm = snd_pcm_hw_params_set_channels(pcm_handle_, params, channels)) < 0)
logE << "ERROR: Can't set channels number. " << snd_strerror(pcm) << "\n";
if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle, params, &rate, 0)) < 0)
if ((pcm = snd_pcm_hw_params_set_rate_near(pcm_handle_, params, &rate, 0)) < 0)
logE << "ERROR: Can't set rate. " << snd_strerror(pcm) << "\n";
unsigned int buffer_time;
@ -56,20 +57,20 @@ void Player::start()
unsigned int period_time = buffer_time / 4;
snd_pcm_hw_params_set_period_time_near(pcm_handle, params, &period_time, 0);
snd_pcm_hw_params_set_buffer_time_near(pcm_handle, params, &buffer_time, 0);
snd_pcm_hw_params_set_period_time_near(pcm_handle_, params, &period_time, 0);
snd_pcm_hw_params_set_buffer_time_near(pcm_handle_, params, &buffer_time, 0);
// long unsigned int periodsize = stream_->format.msRate() * 50;//2*rate/50;
// if ((pcm = snd_pcm_hw_params_set_buffer_size_near(pcm_handle, params, &periodsize)) < 0)
// logE << "Unable to set buffer size " << (long int)periodsize << ": " << snd_strerror(pcm) << "\n";
/* Write parameters */
if ((pcm = snd_pcm_hw_params(pcm_handle, params)) < 0)
if ((pcm = snd_pcm_hw_params(pcm_handle_, params)) < 0)
logE << "ERROR: Can't set harware parameters. " << snd_strerror(pcm) << "\n";
/* Resume information */
logD << "PCM name: " << snd_pcm_name(pcm_handle) << "\n";
logD << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle)) << "\n";
logD << "PCM name: " << snd_pcm_name(pcm_handle_) << "\n";
logD << "PCM state: " << snd_pcm_state_name(snd_pcm_state(pcm_handle_)) << "\n";
snd_pcm_hw_params_get_channels(params, &tmp);
logD << "channels: " << tmp << "\n";
@ -77,26 +78,26 @@ void Player::start()
logD << "rate: " << tmp << " bps\n";
/* Allocate buffer to hold single period */
snd_pcm_hw_params_get_period_size(params, &frames, 0);
logD << "frames: " << frames << "\n";
snd_pcm_hw_params_get_period_size(params, &frames_, 0);
logD << "frames: " << frames_ << "\n";
buff_size = frames * channels * 2 /* 2 -> sample size */;
buff = (char *) malloc(buff_size);
buff_size = frames_ * channels * 2 /* 2 -> sample size */;
buff_ = (char *) malloc(buff_size);
snd_pcm_hw_params_get_period_time(params, &tmp, NULL);
logD << "period time: " << tmp << "\n";
snd_pcm_sw_params_t *swparams;
snd_pcm_sw_params_alloca(&swparams);
snd_pcm_sw_params_current(pcm_handle, swparams);
snd_pcm_sw_params_current(pcm_handle_, swparams);
snd_pcm_sw_params_set_avail_min(pcm_handle, swparams, frames);
snd_pcm_sw_params_set_start_threshold(pcm_handle, swparams, frames);
// snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames);
snd_pcm_sw_params(pcm_handle, swparams);
snd_pcm_sw_params_set_avail_min(pcm_handle_, swparams, frames_);
snd_pcm_sw_params_set_start_threshold(pcm_handle_, swparams, frames_);
// snd_pcm_sw_params_set_stop_threshold(pcm_handle, swparams, frames_);
snd_pcm_sw_params(pcm_handle_, swparams);
active_ = true;
playerThread = new thread(&Player::worker, this);
playerThread_ = new thread(&Player::worker, this);
}
@ -108,24 +109,24 @@ Player::~Player()
void Player::stop() {
active_ = false;
if (playerThread != NULL)
if (playerThread_ != NULL)
{
playerThread->join();
delete playerThread;
playerThread = NULL;
playerThread_->join();
delete playerThread_;
playerThread_ = NULL;
}
if (pcm_handle != NULL)
if (pcm_handle_ != NULL)
{
snd_pcm_drain(pcm_handle);
snd_pcm_close(pcm_handle);
pcm_handle = NULL;
snd_pcm_drain(pcm_handle_);
snd_pcm_close(pcm_handle_);
pcm_handle_ = NULL;
}
if (buff != NULL)
if (buff_ != NULL)
{
free(buff);
buff = NULL;
free(buff_);
buff_ = NULL;
}
}
@ -137,16 +138,16 @@ void Player::worker()
snd_pcm_sframes_t framesDelay;
while (active_)
{
snd_pcm_avail_delay(pcm_handle, &framesAvail, &framesDelay);
chronos::usec delay((chronos::usec::rep) (1000 * (double) framesDelay / stream_->format.msRate()));
snd_pcm_avail_delay(pcm_handle_, &framesAvail, &framesDelay);
chronos::usec delay((chronos::usec::rep) (1000 * (double) framesDelay / stream_->getFormat().msRate()));
logD << "Avail: " << framesAvail << ", delay: " << framesDelay << ", delay[ms]: " << delay.count() / 1000 << "\n";
if (stream_->getPlayerChunk(buff, delay, frames))
if (stream_->getPlayerChunk(buff_, delay, frames_))
{
if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE)
if ((pcm = snd_pcm_writei(pcm_handle_, buff_, frames_)) == -EPIPE)
{
logE << "XRUN\n";
snd_pcm_prepare(pcm_handle);
snd_pcm_prepare(pcm_handle_);
}
else if (pcm < 0)
{

View file

@ -21,12 +21,12 @@ public:
private:
void worker();
snd_pcm_t* pcm_handle;
snd_pcm_uframes_t frames;
char *buff;
snd_pcm_t* pcm_handle_;
snd_pcm_uframes_t frames_;
char *buff_;
std::atomic<bool> active_;
Stream* stream_;
std::thread* playerThread;
std::thread* playerThread_;
PcmDevice pcmDevice_;
};

View file

@ -10,7 +10,7 @@
using namespace std;
ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(1), ip(_ip), port(_port), readerThread(NULL), sumTimeout(chronos::msec(0))
ClientConnection::ClientConnection(MessageReceiver* receiver, const std::string& ip, size_t port) : active_(false), connected_(false), messageReceiver_(receiver), reqId_(1), ip_(ip), port_(port), readerThread_(NULL), sumTimeout_(chronos::msec(0))
{
}
@ -31,7 +31,7 @@ void ClientConnection::socketRead(void* _to, size_t _bytes)
// cout << "/";
// cout.flush();
// boost::system::error_code error;
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead));
len += socket_->read_some(boost::asio::buffer((char*)_to + len, toRead));
//cout << "len: " << len << ", error: " << error << endl;
toRead = _bytes - len;
// cout << "\\";
@ -43,23 +43,24 @@ void ClientConnection::socketRead(void* _to, size_t _bytes)
void ClientConnection::start()
{
boost::asio::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
iterator = resolver.resolve(query);
tcp::resolver::query query(tcp::v4(), ip_, boost::lexical_cast<string>(port_));
auto iterator = resolver.resolve(query);
logO << "connecting\n";
socket.reset(new tcp::socket(io_service));
socket_.reset(new tcp::socket(io_service));
// struct timeval tv;
// tv.tv_sec = 5;
// tv.tv_usec = 0;
// cout << "socket: " << socket->native() << "\n";
// setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
// setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
socket->connect(*iterator);
logO << "MAC: \"" << getMacAddress(socket->native()) << "\"\n";
socket_->connect(*iterator);
logO << "MAC: \"" << getMacAddress(socket_->native()) << "\"\n";
connected_ = true;
logS(kLogNotice) << "connected" << endl;
active_ = true;
readerThread = new thread(&ClientConnection::reader, this);
readerThread_ = new thread(&ClientConnection::reader, this);
}
@ -70,24 +71,24 @@ void ClientConnection::stop()
try
{
boost::system::error_code ec;
if (socket)
if (socket_)
{
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec) logE << "Error in socket shutdown: " << ec << "\n";
socket->close(ec);
socket_->close(ec);
if (ec) logE << "Error in socket close: " << ec << "\n";
}
if (readerThread)
if (readerThread_)
{
logD << "joining readerThread\n";
readerThread->join();
delete readerThread;
readerThread_->join();
delete readerThread_;
}
}
catch(...)
{
}
readerThread = NULL;
readerThread_ = NULL;
logD << "readerThread terminated\n";
}
@ -104,7 +105,7 @@ bool ClientConnection::send(msg::BaseMessage* message)
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(*socket.get(), streambuf);
boost::asio::write(*socket_.get(), streambuf);
return true;
}
@ -112,34 +113,35 @@ bool ClientConnection::send(msg::BaseMessage* message)
shared_ptr<msg::SerializedMessage> ClientConnection::sendRequest(msg::BaseMessage* message, const chronos::msec& timeout)
{
shared_ptr<msg::SerializedMessage> response(NULL);
if (++reqId == 10000)
reqId = 1;
message->id = reqId;
if (++reqId_ == 10000)
reqId_ = 1;
message->id = reqId_;
//logD << "Req: " << reqId << "\n";
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId));
shared_ptr<PendingRequest> pendingRequest(new PendingRequest(reqId_));
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.insert(pendingRequest);
pendingRequests_.insert(pendingRequest);
}
std::mutex m;
std::unique_lock<std::mutex> lck(m);
send(message);
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
{
response = pendingRequest->response;
sumTimeout = chronos::msec(0);
sumTimeout_ = chronos::msec(0);
//logD << "Resp: " << pendingRequest->id << "\n";
}
else
{
sumTimeout += timeout;
logO << "timeout while waiting for response to: " << reqId << ", timeout " << sumTimeout.count() << "\n";
if (sumTimeout > chronos::sec(10))
sumTimeout_ += timeout;
logO << "timeout while waiting for response to: " << reqId_ << ", timeout " << sumTimeout_.count() << "\n";
if (sumTimeout_ > chronos::sec(10))
throw SnapException("sum timeout exceeded 10s");
}
{
std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.erase(pendingRequest);
pendingRequests_.erase(pendingRequest);
}
return response;
}
@ -162,7 +164,7 @@ void ClientConnection::getNextMessage()
{
std::unique_lock<std::mutex> mlock(mutex_);
{
for (auto req: pendingRequests)
for (auto req: pendingRequests_)
{
if (req->id == baseMessage.refersTo)
{
@ -177,8 +179,8 @@ void ClientConnection::getNextMessage()
}
}
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]);
if (messageReceiver_ != NULL)
messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]);
}
@ -194,8 +196,8 @@ void ClientConnection::reader()
}
catch (const std::exception& e)
{
if (messageReceiver != NULL)
messageReceiver->onException(this, e);
if (messageReceiver_ != NULL)
messageReceiver_->onException(this, e);
}
catch (...)
{

View file

@ -40,11 +40,11 @@ public:
class ClientConnection
{
public:
ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port);
ClientConnection(MessageReceiver* receiver, const std::string& ip, size_t port);
virtual ~ClientConnection();
virtual void start();
virtual void stop();
virtual bool send(msg::BaseMessage* _message);
virtual bool send(msg::BaseMessage* message);
virtual std::shared_ptr<msg::SerializedMessage> sendRequest(msg::BaseMessage* message, const chronos::msec& timeout = chronos::msec(1000));
template <typename T>
@ -65,31 +65,27 @@ public:
virtual bool connected()
{
return (socket != 0);
return (socket_ != 0);
// return (connected_ && socket);
}
protected:
virtual void reader();
void socketRead(void* _to, size_t _bytes);
std::shared_ptr<tcp::socket> socket;
void socketRead(void* to, size_t bytes);
void getNextMessage();
// boost::asio::ip::tcp::endpoint endpt;
std::shared_ptr<tcp::socket> socket_;
std::atomic<bool> active_;
std::atomic<bool> connected_;
MessageReceiver* messageReceiver;
void getNextMessage();
boost::asio::io_service io_service;
tcp::resolver::iterator iterator;
MessageReceiver* messageReceiver_;
mutable std::mutex mutex_;
std::mutex m;
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
uint16_t reqId;
std::string ip;
size_t port;
std::thread* readerThread;
chronos::msec sumTimeout;
std::set<std::shared_ptr<PendingRequest>> pendingRequests_;
uint16_t reqId_;
std::string ip_;
size_t port_;
std::thread* readerThread_;
chronos::msec sumTimeout_;
};

View file

@ -18,7 +18,7 @@
using namespace std;
Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL), decoder(NULL)
Controller::Controller() : MessageReceiver(), active_(false), sampleFormat_(NULL), decoder_(NULL)
{
}
@ -33,14 +33,14 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
{
if (baseMessage.type == message_type::kPayload)
{
if ((stream != NULL) && (decoder != NULL))
if ((stream_ != NULL) && (decoder_ != NULL))
{
msg::PcmChunk* pcmChunk = new msg::PcmChunk(*sampleFormat, 0);
msg::PcmChunk* pcmChunk = new msg::PcmChunk(*sampleFormat_, 0);
pcmChunk->deserialize(baseMessage, buffer);
//logD << "chunk: " << pcmChunk->payloadSize;
if (decoder->decode(pcmChunk))
if (decoder_->decode(pcmChunk))
{
stream->addChunk(pcmChunk);
stream_->addChunk(pcmChunk);
//logD << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n";
}
else
@ -50,13 +50,13 @@ void Controller::onMessageReceived(ClientConnection* connection, const msg::Base
}
void Controller::start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency)
void Controller::start(const PcmDevice& pcmDevice, const std::string& ip, size_t port, size_t latency)
{
ip = _ip;
ip_ = ip;
pcmDevice_ = pcmDevice;
latency_ = latency;
clientConnection = new ClientConnection(this, ip, _port);
controllerThread = new thread(&Controller::worker, this);
clientConnection_ = new ClientConnection(this, ip, port);
controllerThread_ = new thread(&Controller::worker, this);
}
@ -64,48 +64,47 @@ void Controller::stop()
{
logD << "Stopping\n";
active_ = false;
controllerThread->join();
clientConnection->stop();
delete controllerThread;
delete clientConnection;
controllerThread_->join();
clientConnection_->stop();
delete controllerThread_;
delete clientConnection_;
}
void Controller::worker()
{
// Decoder* decoder;
active_ = true;
decoder = NULL;
stream = NULL;
decoder_ = NULL;
stream_ = NULL;
while (active_)
{
try
{
clientConnection->start();
clientConnection_->start();
msg::Request requestMsg(kServerSettings);
shared_ptr<msg::ServerSettings> serverSettings(NULL);
while (active_ && !(serverSettings = clientConnection->sendReq<msg::ServerSettings>(&requestMsg)));
while (active_ && !(serverSettings = clientConnection_->sendReq<msg::ServerSettings>(&requestMsg)));
logO << "ServerSettings buffer: " << serverSettings->bufferMs << "\n";
requestMsg.request = kSampleFormat;
while (active_ && !(sampleFormat = clientConnection->sendReq<msg::SampleFormat>(&requestMsg)));
logO << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
while (active_ && !(sampleFormat_ = clientConnection_->sendReq<msg::SampleFormat>(&requestMsg)));
logO << "SampleFormat rate: " << sampleFormat_->rate << ", bits: " << sampleFormat_->bits << ", channels: " << sampleFormat_->channels << "\n";
requestMsg.request = kHeader;
shared_ptr<msg::Header> headerChunk(NULL);
while (active_ && !(headerChunk = clientConnection->sendReq<msg::Header>(&requestMsg)));
while (active_ && !(headerChunk = clientConnection_->sendReq<msg::Header>(&requestMsg)));
logO << "Codec: " << headerChunk->codec << "\n";
if (headerChunk->codec == "ogg")
decoder = new OggDecoder();
decoder_ = new OggDecoder();
else if (headerChunk->codec == "pcm")
decoder = new PcmDecoder();
decoder->setHeader(headerChunk.get());
decoder_ = new PcmDecoder();
decoder_->setHeader(headerChunk.get());
msg::Request timeReq(kTime);
for (size_t n=0; n<50 && active_; ++n)
{
shared_ptr<msg::Time> reply = clientConnection->sendReq<msg::Time>(&timeReq, chronos::msec(2000));
shared_ptr<msg::Time> reply = clientConnection_->sendReq<msg::Time>(&timeReq, chronos::msec(2000));
if (reply)
{
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
@ -115,21 +114,21 @@ void Controller::worker()
}
logO << "diff to server [ms]: " << TimeProvider::getInstance().getDiffToServer<chronos::msec>().count() << "\n";
stream = new Stream(*sampleFormat);
stream->setBufferLen(serverSettings->bufferMs - latency_);
stream_ = new Stream(*sampleFormat_);
stream_->setBufferLen(serverSettings->bufferMs - latency_);
Player player(pcmDevice_, stream);
Player player(pcmDevice_, stream_);
player.start();
msg::Command startStream("startStream");
shared_ptr<msg::Ack> ackMsg(NULL);
while (active_ && !(ackMsg = clientConnection->sendReq<msg::Ack>(&startStream)));
while (active_ && !(ackMsg = clientConnection_->sendReq<msg::Ack>(&startStream)));
while (active_)
{
usleep(500*1000);
//throw SnapException("timeout");
shared_ptr<msg::Time> reply = clientConnection->sendReq<msg::Time>(&timeReq);
shared_ptr<msg::Time> reply = clientConnection_->sendReq<msg::Time>(&timeReq);
if (reply)
{
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
@ -141,14 +140,14 @@ void Controller::worker()
{
logS(kLogErr) << "Exception in Controller::worker(): " << e.what() << endl;
logO << "Stopping clientConnection" << endl;
clientConnection->stop();
clientConnection_->stop();
logO << "Deleting stream" << endl;
if (stream != NULL)
delete stream;
stream = NULL;
if (decoder != NULL)
delete decoder;
decoder = NULL;
if (stream_ != NULL)
delete stream_;
stream_ = NULL;
if (decoder_ != NULL)
delete decoder_;
decoder_ = NULL;
logO << "done" << endl;
if (active_)
usleep(500*1000);

View file

@ -14,7 +14,7 @@ class Controller : public MessageReceiver
{
public:
Controller();
void start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port, size_t latency);
void start(const PcmDevice& pcmDevice, const std::string& ip, size_t port, size_t latency);
void stop();
virtual void onMessageReceived(ClientConnection* connection, const msg::BaseMessage& baseMessage, char* buffer);
virtual void onException(ClientConnection* connection, const std::exception& exception);
@ -22,12 +22,12 @@ public:
private:
void worker();
std::atomic<bool> active_;
std::thread* controllerThread;
ClientConnection* clientConnection;
Stream* stream;
std::string ip;
std::shared_ptr<msg::SampleFormat> sampleFormat;
Decoder* decoder;
std::thread* controllerThread_;
ClientConnection* clientConnection_;
Stream* stream_;
std::string ip_;
std::shared_ptr<msg::SampleFormat> sampleFormat_;
Decoder* decoder_;
PcmDevice pcmDevice_;
size_t latency_;
};

View file

@ -8,13 +8,13 @@ using namespace std;
using namespace chronos;
Stream::Stream(const msg::SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0), playedFrames(0)
Stream::Stream(const msg::SampleFormat& sampleFormat) : format_(sampleFormat), sleep_(0), median_(0), shortMedian_(0), lastUpdate_(0), playedFrames_(0)
{
buffer.setSize(500);
shortBuffer.setSize(100);
miniBuffer.setSize(20);
// cardBuffer.setSize(50);
bufferMs = msec(500);
buffer_.setSize(500);
shortBuffer_.setSize(100);
miniBuffer_.setSize(20);
// cardBuffer_.setSize(50);
bufferMs_ = msec(500);
/*
48000 x
@ -23,50 +23,50 @@ Stream::Stream(const msg::SampleFormat& sampleFormat) : format(format_), format_
x = 1,000016667 / (1,000016667 - 1)
*/
setRealSampleRate(format.rate);
setRealSampleRate(format_.rate);
}
void Stream::setRealSampleRate(double sampleRate)
{
if (sampleRate == format.rate)
correctAfterXFrames = 0;
if (sampleRate == format_.rate)
correctAfterXFrames_ = 0;
else
correctAfterXFrames = round((format.rate / sampleRate) / (format.rate / sampleRate - 1.));
// logD << "Correct after X: " << correctAfterXFrames << " (Real rate: " << sampleRate << ", rate: " << format.rate << ")\n";
correctAfterXFrames_ = round((format_.rate / sampleRate) / (format_.rate / sampleRate - 1.));
// logD << "Correct after X: " << correctAfterXFrames_ << " (Real rate: " << sampleRate << ", rate: " << format_.rate << ")\n";
}
void Stream::setBufferLen(size_t bufferLenMs)
{
bufferMs = msec(bufferLenMs);
bufferMs_ = msec(bufferLenMs);
}
void Stream::clearChunks()
{
while (chunks.size() > 0)
chunks.pop();
while (chunks_.size() > 0)
chunks_.pop();
}
void Stream::addChunk(msg::PcmChunk* chunk)
{
while (chunks.size() * chunk->duration<chronos::msec>().count() > 10000)
chunks.pop();
chunks.push(shared_ptr<msg::PcmChunk>(chunk));
// logD << "new chunk: " << chunk->getDuration() << ", Chunks: " << chunks.size() << "\n";
while (chunks_.size() * chunk_->duration<chronos::msec>().count() > 10000)
chunks_.pop();
chunks_.push(shared_ptr<msg::PcmChunk>(chunk));
// logD << "new chunk: " << chunk_->getDuration() << ", Chunks: " << chunks_.size() << "\n";
}
time_point_hrc Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer)
{
if (!chunk)
chunk = chunks.pop();
time_point_hrc tp = chunk->start();
memset(outputBuffer, 0, framesPerBuffer * format.frameSize);
if (!chunk_)
chunk_ = chunks_.pop();
time_point_hrc tp = chunk_->start();
memset(outputBuffer, 0, framesPerBuffer * format_.frameSize);
return tp;
}
@ -75,16 +75,16 @@ time_point_hrc Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long fr
time_point_ms Stream::seekTo(const time_point_ms& to)
{
if (!chunk)
chunk = chunks.pop();
// time_point_ms tp = chunk->timePoint();
while (to > chunk->timePoint())// + std::chrono::milliseconds((long int)chunk->getDuration()))//
chunk_ = chunks_.pop();
// time_point_ms tp = chunk_->timePoint();
while (to > chunk_->timePoint())// + std::chrono::milliseconds((long int)chunk_->getDuration()))//
{
chunk = chunks.pop();
logD << "\nto: " << Chunk::getAge(to) << "\t chunk: " << Chunk::getAge(chunk->timePoint()) << "\n";
logD << "diff: " << std::chrono::duration_cast<std::chrono::milliseconds>((to - chunk->timePoint())).count() << "\n";
chunk_ = chunks_.pop();
logD << "\nto: " << Chunk::getAge(to) << "\t chunk: " << Chunk::getAge(chunk_->timePoint()) << "\n";
logD << "diff: " << std::chrono::duration_cast<std::chrono::milliseconds>((to - chunk_->timePoint())).count() << "\n";
}
chunk->seek(std::chrono::duration_cast<std::chrono::milliseconds>(to - chunk->timePoint()).count() * format.msRate());
return chunk->timePoint();
chunk_->seek(std::chrono::duration_cast<std::chrono::milliseconds>(to - chunk_->timePoint()).count() * format_.msRate());
return chunk_->timePoint();
}
*/
@ -92,35 +92,35 @@ time_point_ms Stream::seekTo(const time_point_ms& to)
time_point_hrc Stream::seek(long ms)
{
if (!chunk)
chunk = chunks.pop();
chunk_ = chunks_.pop();
if (ms <= 0)
return chunk->start();
return chunk_->start();
// time_point_ms tp = chunk->timePoint();
while (ms > chunk->duration<chronos::msec>().count())
// time_point_ms tp = chunk_->timePoint();
while (ms > chunk_->duration<chronos::msec>().count())
{
chunk = chunks.pop();
ms -= min(ms, (long)chunk->durationLeft<chronos::msec>().count());
chunk_ = chunks_.pop();
ms -= min(ms, (long)chunk_->durationLeft<chronos::msec>().count());
}
chunk->seek(ms * format.msRate());
return chunk->start();
chunk_->seek(ms * format_.msRate());
return chunk_->start();
}
*/
time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer)
{
if (!chunk && !chunks.try_pop(chunk, timeout))
if (!chunk_ && !chunks_.try_pop(chunk_, timeout))
throw 0;
time_point_hrc tp = chunk->start();
time_point_hrc tp = chunk_->start();
char* buffer = (char*)outputBuffer;
unsigned long read = 0;
while (read < framesPerBuffer)
{
read += chunk->readFrames(buffer + read*format.frameSize, framesPerBuffer - read);
if (chunk->isEndOfChunk() && !chunks.try_pop(chunk, timeout))
read += chunk_->readFrames(buffer + read*format_.frameSize, framesPerBuffer - read);
if (chunk_->isEndOfChunk() && !chunks_.try_pop(chunk_, timeout))
throw 0;
}
return tp;
@ -133,7 +133,7 @@ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::use
return getNextPlayerChunk(outputBuffer, timeout, framesPerBuffer);
long toRead = framesPerBuffer + framesCorrection;
char* buffer = (char*)malloc(toRead * format.frameSize);
char* buffer = (char*)malloc(toRead * format_.frameSize);
time_point_hrc tp = getNextPlayerChunk(buffer, timeout, toRead);
float factor = (float)toRead / framesPerBuffer;//(float)(framesPerBuffer*channels_);
@ -143,7 +143,7 @@ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::use
for (size_t n=0; n<framesPerBuffer; ++n)
{
size_t index(floor(idx));// = (int)(ceil(n*factor));
memcpy((char*)outputBuffer + n*format.frameSize, buffer + index*format.frameSize, format.frameSize);
memcpy((char*)outputBuffer + n*format_.frameSize, buffer + index*format_.frameSize, format_.frameSize);
idx += factor;
}
free(buffer);
@ -155,36 +155,36 @@ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, const chronos::use
void Stream::updateBuffers(int age)
{
buffer.add(age);
miniBuffer.add(age);
shortBuffer.add(age);
buffer_.add(age);
miniBuffer_.add(age);
shortBuffer_.add(age);
}
void Stream::resetBuffers()
{
buffer.clear();
miniBuffer.clear();
shortBuffer.clear();
buffer_.clear();
miniBuffer_.clear();
shortBuffer_.clear();
}
bool Stream::getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer)
{
if (outputBufferDacTime > bufferMs)
if (outputBufferDacTime > bufferMs_)
return false;
if (!chunk && !chunks.try_pop(chunk, outputBufferDacTime))
if (!chunk_ && !chunks_.try_pop(chunk_, outputBufferDacTime))
return false;
playedFrames += framesPerBuffer;
playedFrames_ += framesPerBuffer;
chronos::usec age = std::chrono::duration_cast<usec>(TimeProvider::serverNow() - chunk->start() - bufferMs + outputBufferDacTime);
if ((sleep.count() == 0) && (chronos::abs(age) > chronos::msec(200)))
chronos::usec age = std::chrono::duration_cast<usec>(TimeProvider::serverNow() - chunk_->start() - bufferMs_ + outputBufferDacTime);
if ((sleep_.count() == 0) && (chronos::abs(age) > chronos::msec(200)))
{
logO << "age > 200: " << age.count() << "\n";
sleep = age;
sleep_ = age;
}
try
@ -195,126 +195,126 @@ bool Stream::getPlayerChunk(void* outputBuffer, const chronos::usec& outputBuffe
// logD << "buffer duration: " << bufferDuration.count() << "\n";
chronos::usec correction = chronos::usec(0);
if (sleep.count() != 0)
if (sleep_.count() != 0)
{
resetBuffers();
if (sleep < -bufferDuration/2)
if (sleep_ < -bufferDuration/2)
{
// We're early: not enough chunks. play silence. Reference chunk is the oldest (front) one
sleep = chrono::duration_cast<usec>(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, framesPerBuffer) - bufferMs + outputBufferDacTime);
//logD << "-sleep: " << sleep.count() << " " << -bufferDuration.count() / 2000 << "\n";
if (sleep < -bufferDuration/2)
// We're early: not enough chunks_. play silence. Reference chunk_ is the oldest (front) one
sleep_ = chrono::duration_cast<usec>(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, framesPerBuffer) - bufferMs_ + outputBufferDacTime);
//logD << "-sleep: " << sleep_.count() << " " << -bufferDuration.count() / 2000 << "\n";
if (sleep_ < -bufferDuration/2)
return true;
}
else if (sleep > bufferDuration/2)
else if (sleep_ > bufferDuration/2)
{
// We're late: discard oldest chunks
while (sleep > chunk->duration<chronos::usec>())
while (sleep_ > chunk_->duration<chronos::usec>())
{
logO << "sleep > chunk->getDuration(): " << sleep.count() << " > " << chunk->duration<chronos::msec>().count() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime.count() << ", needed: " << bufferDuration.count() << "\n";
sleep = std::chrono::duration_cast<usec>(TimeProvider::serverNow() - chunk->start() - bufferMs + outputBufferDacTime);
if (!chunks.try_pop(chunk, outputBufferDacTime))
logO << "sleep > chunk_->getDuration(): " << sleep_.count() << " > " << chunk_->duration<chronos::msec>().count() << ", chunks: " << chunks_.size() << ", out: " << outputBufferDacTime.count() << ", needed: " << bufferDuration.count() << "\n";
sleep_ = std::chrono::duration_cast<usec>(TimeProvider::serverNow() - chunk_->start() - bufferMs_ + outputBufferDacTime);
if (!chunks_.try_pop(chunk_, outputBufferDacTime))
{
logO << "no chunks available\n";
chunk = NULL;
sleep = chronos::usec(0);
chunk_ = NULL;
sleep_ = chronos::usec(0);
return false;
}
}
}
// out of sync, can be corrected by playing faster/slower
if (sleep < -chronos::usec(100))
if (sleep_ < -chronos::usec(100))
{
sleep += chronos::usec(100);
sleep_ += chronos::usec(100);
correction = -chronos::usec(100);
}
else if (sleep > chronos::usec(100))
else if (sleep_ > chronos::usec(100))
{
sleep -= chronos::usec(100);
sleep_ -= chronos::usec(100);
correction = chronos::usec(100);
}
else
{
logO << "Sleep " << sleep.count() << "\n";
correction = sleep;
sleep = chronos::usec(0);
logO << "Sleep " << sleep_.count() << "\n";
correction = sleep_;
sleep_ = chronos::usec(0);
}
}
long framesCorrection = correction.count()*format.usRate();
if ((correctAfterXFrames != 0) && (playedFrames >= (unsigned long)abs(correctAfterXFrames)))
long framesCorrection = correction.count()*format_.usRate();
if ((correctAfterXFrames_ != 0) && (playedFrames_ >= (unsigned long)abs(correctAfterXFrames_)))
{
framesCorrection += (correctAfterXFrames > 0)?1:-1;
playedFrames -= abs(correctAfterXFrames);
framesCorrection += (correctAfterXFrames_ > 0)?1:-1;
playedFrames_ -= abs(correctAfterXFrames_);
}
age = std::chrono::duration_cast<usec>(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, outputBufferDacTime, framesPerBuffer, framesCorrection) - bufferMs + outputBufferDacTime);
age = std::chrono::duration_cast<usec>(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, outputBufferDacTime, framesPerBuffer, framesCorrection) - bufferMs_ + outputBufferDacTime);
setRealSampleRate(format.rate);
if (sleep.count() == 0)
setRealSampleRate(format_.rate);
if (sleep_.count() == 0)
{
if (buffer.full())
if (buffer_.full())
{
if (chronos::usec(abs(median)) > chronos::msec(1))
if (chronos::usec(abs(median_)) > chronos::msec(1))
{
logO << "pBuffer->full() && (abs(median) > 1): " << median << "\n";
sleep = chronos::usec(shortMedian);
logO << "pBuffer->full() && (abs(median_) > 1): " << median_ << "\n";
sleep_ = chronos::usec(shortMedian_);
}
/* else if (chronos::usec(median) > chronos::usec(300))
/* else if (chronos::usec(median_) > chronos::usec(300))
{
setRealSampleRate(format.rate - format.rate / 1000);
setRealSampleRate(format_.rate - format_.rate / 1000);
}
else if (chronos::usec(median) < -chronos::usec(300))
else if (chronos::usec(median_) < -chronos::usec(300))
{
setRealSampleRate(format.rate + format.rate / 1000);
setRealSampleRate(format_.rate + format_.rate / 1000);
}
*/ }
else if (shortBuffer.full())
else if (shortBuffer_.full())
{
if (chronos::usec(abs(shortMedian)) > chronos::msec(5))
if (chronos::usec(abs(shortMedian_)) > chronos::msec(5))
{
logO << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n";
sleep = chronos::usec(shortMedian);
logO << "pShortBuffer->full() && (abs(shortMedian_) > 5): " << shortMedian_ << "\n";
sleep_ = chronos::usec(shortMedian_);
}
/* else
{
setRealSampleRate(format.rate + -shortMedian / 100);
setRealSampleRate(format_.rate + -shortMedian_ / 100);
}
*/ }
else if (miniBuffer.full() && (chronos::usec(abs(miniBuffer.median())) > chronos::msec(50)))
else if (miniBuffer_.full() && (chronos::usec(abs(miniBuffer_.median())) > chronos::msec(50)))
{
logO << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n";
sleep = chronos::usec((chronos::msec::rep)miniBuffer.mean());
logO << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer_.median() << "\n";
sleep_ = chronos::usec((chronos::msec::rep)miniBuffer_.mean());
}
}
if (sleep.count() != 0)
logO << "Sleep: " << sleep.count() << "\n";
else if (shortBuffer.full())
if (sleep_.count() != 0)
logO << "Sleep: " << sleep_.count() << "\n";
else if (shortBuffer_.full())
{
if (chronos::usec(shortMedian) > chronos::usec(100))
setRealSampleRate(format.rate * 0.9999);
else if (chronos::usec(shortMedian) < -chronos::usec(100))
setRealSampleRate(format.rate * 1.0001);
if (chronos::usec(shortMedian_) > chronos::usec(100))
setRealSampleRate(format_.rate * 0.9999);
else if (chronos::usec(shortMedian_) < -chronos::usec(100))
setRealSampleRate(format_.rate * 1.0001);
}
updateBuffers(age.count());
// print sync stats
time_t now = time(NULL);
if (now != lastUpdate)
if (now != lastUpdate_)
{
lastUpdate = now;
median = buffer.median();
shortMedian = shortBuffer.median();
logO << "Chunk: " << age.count()/100 << "\t" << miniBuffer.median()/100 << "\t" << shortMedian/100 << "\t" << median/100 << "\t" << buffer.size() << "\t" << outputBufferDacTime.count() << "\n";
lastUpdate_ = now;
median_ = buffer_.median();
shortMedian_ = shortBuffer_.median();
logO << "Chunk: " << age.count()/100 << "\t" << miniBuffer_.median()/100 << "\t" << shortMedian_/100 << "\t" << median_/100 << "\t" << buffer_.size() << "\t" << outputBufferDacTime.count() << "\n";
}
return true;
}
catch(int e)
{
sleep = chronos::usec(0);
sleep_ = chronos::usec(0);
return false;
}
}

View file

@ -24,7 +24,10 @@ public:
void clearChunks();
bool getPlayerChunk(void* outputBuffer, const chronos::usec& outputBufferDacTime, unsigned long framesPerBuffer);
void setBufferLen(size_t bufferLenMs);
const msg::SampleFormat& format;
const msg::SampleFormat& getFormat() const
{
return format_;
}
private:
chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, const chronos::usec& timeout, unsigned long framesPerBuffer);
@ -38,22 +41,22 @@ private:
msg::SampleFormat format_;
long lastTick;
chronos::usec sleep;
long lastTick_;
chronos::usec sleep_;
Queue<std::shared_ptr<msg::PcmChunk>> chunks;
Queue<std::shared_ptr<msg::PcmChunk>> chunks_;
// DoubleBuffer<chronos::usec::rep> cardBuffer;
DoubleBuffer<chronos::usec::rep> miniBuffer;
DoubleBuffer<chronos::usec::rep> buffer;
DoubleBuffer<chronos::usec::rep> shortBuffer;
std::shared_ptr<msg::PcmChunk> chunk;
DoubleBuffer<chronos::usec::rep> miniBuffer_;
DoubleBuffer<chronos::usec::rep> buffer_;
DoubleBuffer<chronos::usec::rep> shortBuffer_;
std::shared_ptr<msg::PcmChunk> chunk_;
int median;
int shortMedian;
time_t lastUpdate;
chronos::msec bufferMs;
unsigned long playedFrames;
long correctAfterXFrames;
int median_;
int shortMedian_;
time_t lastUpdate_;
chronos::msec bufferMs_;
unsigned long playedFrames_;
long correctAfterXFrames_;
};

View file

@ -1,16 +1,16 @@
#include "timeProvider.h"
TimeProvider::TimeProvider() : diffToServer(0)
TimeProvider::TimeProvider() : diffToServer_(0)
{
diffBuffer.setSize(200);
diffBuffer_.setSize(200);
}
void TimeProvider::setDiffToServer(double ms)
{
diffBuffer.add(ms * 1000);
diffToServer = diffBuffer.median();
diffBuffer_.add(ms * 1000);
diffToServer_ = diffBuffer_.median();
}
/*

View file

@ -22,7 +22,7 @@ public:
template<typename T>
inline T getDiffToServer() const
{
return std::chrono::duration_cast<T>(chronos::usec(diffToServer));
return std::chrono::duration_cast<T>(chronos::usec(diffToServer_));
}
/* chronos::usec::rep getDiffToServer();
@ -56,8 +56,8 @@ private:
TimeProvider(TimeProvider const&); // Don't Implement
void operator=(TimeProvider const&); // Don't implement
DoubleBuffer<chronos::usec::rep> diffBuffer;
std::atomic<chronos::usec::rep> diffToServer;
DoubleBuffer<chronos::usec::rep> diffBuffer_;
std::atomic<chronos::usec::rep> diffToServer_;
};

View file

@ -7,7 +7,7 @@
#include <iostream>
ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NULL), sampleFormat(NULL)
ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk_(NULL), sampleFormat_(NULL)
{
}
@ -17,7 +17,7 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL
void ControlServer::send(shared_ptr<msg::BaseMessage> message)
{
std::unique_lock<std::mutex> mlock(mutex);
for (auto it = sessions.begin(); it != sessions.end(); )
for (auto it = sessions_.begin(); it != sessions_.end(); )
{
if (!(*it)->active())
{
@ -25,13 +25,13 @@ void ControlServer::send(shared_ptr<msg::BaseMessage> message)
auto func = [](shared_ptr<ServerSession> s)->void{s->stop();};
std::thread t(func, *it);
t.detach();
sessions.erase(it++);
sessions_.erase(it++);
}
else
++it;
}
for (auto s : sessions)
for (auto s : sessions_)
s->add(message);
}
@ -56,18 +56,18 @@ void ControlServer::onMessageReceived(ServerSession* connection, const msg::Base
}
else if (requestMsg.request == kServerSettings)
{
serverSettings->refersTo = requestMsg.id;
connection->send(serverSettings);
serverSettings_->refersTo = requestMsg.id;
connection->send(serverSettings_);
}
else if (requestMsg.request == kSampleFormat)
{
sampleFormat->refersTo = requestMsg.id;
connection->send(sampleFormat);
sampleFormat_->refersTo = requestMsg.id;
connection->send(sampleFormat_);
}
else if (requestMsg.request == kHeader)
{
headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk);
headerChunk_->refersTo = requestMsg.id;
connection->send(headerChunk_);
}
}
else if (baseMessage.type == message_type::kCommand)
@ -102,7 +102,7 @@ void ControlServer::acceptor()
{
std::unique_lock<std::mutex> mlock(mutex);
session->start();
sessions.insert(shared_ptr<ServerSession>(session));
sessions_.insert(shared_ptr<ServerSession>(session));
}
}
}
@ -110,7 +110,7 @@ void ControlServer::acceptor()
void ControlServer::start()
{
acceptThread = new thread(&ControlServer::acceptor, this);
acceptThread_ = new thread(&ControlServer::acceptor, this);
}
@ -123,14 +123,14 @@ void ControlServer::stop()
void ControlServer::setHeader(msg::Header* header)
{
if (header)
headerChunk = header;
headerChunk_ = header;
}
void ControlServer::setFormat(msg::SampleFormat* format)
{
if (format)
sampleFormat = format;
sampleFormat_ = format;
}
@ -138,7 +138,7 @@ void ControlServer::setFormat(msg::SampleFormat* format)
void ControlServer::setServerSettings(msg::ServerSettings* settings)
{
if (settings)
serverSettings = settings;
serverSettings_ = settings;
}

View file

@ -37,15 +37,15 @@ public:
private:
void acceptor();
mutable std::mutex mutex;
set<shared_ptr<ServerSession>> sessions;
mutable std::mutex mutex_;
set<shared_ptr<ServerSession>> sessions_;
boost::asio::io_service io_service_;
unsigned short port_;
msg::Header* headerChunk;
msg::SampleFormat* sampleFormat;
msg::ServerSettings* serverSettings;
thread* acceptThread;
Queue<shared_ptr<msg::BaseMessage>> messages;
msg::Header* headerChunk_;
msg::SampleFormat* sampleFormat_;
msg::ServerSettings* serverSettings_;
thread* acceptThread_;
Queue<shared_ptr<msg::BaseMessage>> messages_;
};

View file

@ -9,9 +9,9 @@ using namespace std;
ServerSession::ServerSession(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket) : messageReceiver(_receiver)
ServerSession::ServerSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket) : messageReceiver_(receiver)
{
socket = _socket;
socket_ = socket;
}
@ -24,9 +24,9 @@ ServerSession::~ServerSession()
void ServerSession::start()
{
active_ = true;
streamActive = false;
readerThread = new thread(&ServerSession::reader, this);
writerThread = new thread(&ServerSession::writer, this);
streamActive_ = false;
readerThread_ = new thread(&ServerSession::reader, this);
writerThread_ = new thread(&ServerSession::writer, this);
}
@ -36,31 +36,31 @@ void ServerSession::stop()
try
{
boost::system::error_code ec;
if (socket)
if (socket_)
{
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (ec) logE << "Error in socket shutdown: " << ec << "\n";
socket->close(ec);
socket_->close(ec);
if (ec) logE << "Error in socket close: " << ec << "\n";
}
if (readerThread)
if (readerThread_)
{
logD << "joining readerThread\n";
readerThread->join();
delete readerThread;
readerThread_->join();
delete readerThread_;
}
if (writerThread)
if (writerThread_)
{
logD << "joining readerThread\n";
writerThread->join();
delete writerThread;
writerThread_->join();
delete writerThread_;
}
}
catch(...)
{
}
readerThread = NULL;
writerThread = NULL;
readerThread_ = NULL;
writerThread_ = NULL;
logD << "ServerSession stopped\n";
}
@ -72,7 +72,7 @@ void ServerSession::socketRead(void* _to, size_t _bytes)
do
{
boost::system::error_code error;
read += socket->read_some(boost::asio::buffer((char*)_to + read, _bytes - read));
read += socket_->read_some(boost::asio::buffer((char*)_to + read, _bytes - read));
}
while (read < _bytes);
}
@ -80,26 +80,26 @@ void ServerSession::socketRead(void* _to, size_t _bytes)
void ServerSession::add(shared_ptr<msg::BaseMessage> message)
{
if (!message || !streamActive)
if (!message || !streamActive_)
return;
while (messages.size() > 100)// chunk->getDuration() > 10000)
messages.pop();
messages.push(message);
while (messages_.size() > 100)// chunk->getDuration() > 10000)
messages_.pop();
messages_.push(message);
}
bool ServerSession::send(msg::BaseMessage* message)
{
std::unique_lock<std::mutex> mlock(mutex_);
if (!socket)
if (!socket_)
return false;
boost::asio::streambuf streambuf;
std::ostream stream(&streambuf);
tv t;
message->sent = t;
message->serialize(stream);
boost::asio::write(*socket.get(), streambuf);
boost::asio::write(*socket_.get(), streambuf);
return true;
}
@ -119,8 +119,8 @@ void ServerSession::getNextMessage()
tv t;
baseMessage.received = t;
if (messageReceiver != NULL)
messageReceiver->onMessageReceived(this, baseMessage, &buffer[0]);
if (messageReceiver_ != NULL)
messageReceiver_->onMessageReceived(this, baseMessage, &buffer[0]);
}
@ -154,7 +154,7 @@ void ServerSession::writer()
shared_ptr<msg::BaseMessage> message;
while (active_)
{
if (messages.try_pop(message, std::chrono::milliseconds(500)))
if (messages_.try_pop(message, std::chrono::milliseconds(500)))
send(message.get());
}
}

View file

@ -28,7 +28,7 @@ public:
class ServerSession
{
public:
ServerSession(MessageReceiver* _receiver, std::shared_ptr<tcp::socket> _socket);
ServerSession(MessageReceiver* receiver, std::shared_ptr<tcp::socket> socket);
~ServerSession();
void start();
void stop();
@ -42,7 +42,7 @@ public:
virtual void setStreamActive(bool active)
{
streamActive = active;
streamActive_ = active;
}
@ -53,13 +53,13 @@ protected:
void writer();
std::atomic<bool> active_;
std::atomic<bool> streamActive;
std::atomic<bool> streamActive_;
mutable std::mutex mutex_;
std::thread* readerThread;
std::thread* writerThread;
std::shared_ptr<tcp::socket> socket;
MessageReceiver* messageReceiver;
Queue<std::shared_ptr<msg::BaseMessage>> messages;
std::thread* readerThread_;
std::thread* writerThread_;
std::shared_ptr<tcp::socket> socket_;
MessageReceiver* messageReceiver_;
Queue<std::shared_ptr<msg::BaseMessage>> messages_;
};