diff --git a/client/controller.cpp b/client/controller.cpp index f03d599f..beb54853 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -14,9 +14,8 @@ using namespace std; -Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL), sampleFormat(NULL) +Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL), sampleFormat(NULL), decoder(NULL) { - decoder = new OggDecoder(); } @@ -28,39 +27,16 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa { PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); pcmChunk->deserialize(baseMessage, buffer); -//cout << "chunk: " << pcmChunk->payloadSize; +cout << "chunk: " << pcmChunk->payloadSize; if (decoder->decode(pcmChunk)) { stream->addChunk(pcmChunk); -//cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->tv_sec << ", usec: " << pcmChunk->tv_usec/1000 << ", type: " << pcmChunk->type << "\n"; +cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n"; } else delete pcmChunk; } } -/* else if (baseMessage.type == message_type::header) - { - if (decoder != NULL) - { - HeaderMessage* headerMessage = new HeaderMessage(); - headerMessage->deserialize(baseMessage, buffer); - decoder->setHeader(headerMessage); - } - } - else if (baseMessage.type == message_type::sampleformat) - { - sampleFormat = new SampleFormat(); - sampleFormat->deserialize(baseMessage, buffer); - cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; - } - else if (baseMessage.type == message_type::serversettings) - { - ServerSettings* serverSettings = new ServerSettings(); - serverSettings->deserialize(baseMessage, buffer); - cout << "ServerSettings port: " << serverSettings->port << "\n"; - streamClient = new StreamClient(this, ip, serverSettings->port); - } -*/ } @@ -86,60 +62,86 @@ void Controller::worker() { // Decoder* decoder; active_ = true; - - RequestMsg requestMsg("serverSettings"); - shared_ptr serverSettings(NULL); - while (!(serverSettings = controlConnection->sendReq(&requestMsg, 2000))); - cout << "ServerSettings port: " << serverSettings->port << "\n"; - streamClient = new StreamClient(this, ip, serverSettings->port); - - requestMsg.request = "sampleFormat"; - while (!(sampleFormat = controlConnection->sendReq(&requestMsg, 2000))); - cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; - - if (decoder != NULL) - { - requestMsg.request = "headerChunk"; - shared_ptr headerChunk(NULL); - while (!(headerChunk = controlConnection->sendReq(&requestMsg, 2000))); - decoder->setHeader(headerChunk.get()); - } - - RequestMsg timeReq("time"); - for (size_t n=0; n<10; ++n) - { - shared_ptr reply = controlConnection->sendReq(&timeReq, 2000); - if (reply) - { - double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; - TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); - usleep(1000); - } - } - - streamClient->start(); - stream = new Stream(*sampleFormat); - stream->setBufferLen(bufferMs); - - Player player(stream); - player.start(); + decoder = NULL; while (active_) { - usleep(1000000); try - { - shared_ptr reply = controlConnection->sendReq(&timeReq, 2000); - if (reply) + { + RequestMsg requestMsg("serverSettings"); + shared_ptr serverSettings(NULL); + while (!(serverSettings = controlConnection->sendReq(&requestMsg, 1000))); + cout << "ServerSettings port: " << serverSettings->port << "\n"; + streamClient = new StreamClient(this, ip, serverSettings->port); + + requestMsg.request = "sampleFormat"; + while (!(sampleFormat = controlConnection->sendReq(&requestMsg, 1000))); + cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; + + decoder = new OggDecoder(); + if (decoder != NULL) { - double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; -// cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl; - TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); - cout << TimeProvider::getInstance().getDiffToServer() << "\n"; + requestMsg.request = "headerChunk"; + shared_ptr headerChunk(NULL); + while (!(headerChunk = controlConnection->sendReq(&requestMsg, 1000))); + decoder->setHeader(headerChunk.get()); + } + + RequestMsg timeReq("time"); + for (size_t n=0; n<10; ++n) + { + shared_ptr reply = controlConnection->sendReq(&timeReq, 2000); + if (reply) + { + double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; + TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); + usleep(1000); + } + } + + streamClient->start(); + stream = new Stream(*sampleFormat); + stream->setBufferLen(bufferMs); + + Player player(stream); + player.start(); + + try + { + while (active_) + { + usleep(1000000); + shared_ptr reply = controlConnection->sendReq(&timeReq, 1000); + if (reply) + { + double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; + // cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl; + TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); + cout << TimeProvider::getInstance().getDiffToServer() << "\n"; + } + } + } + catch (const std::exception& e) + { + cout << "Stopping player\n"; + player.stop(); + cout << "Stopping streamClient\n"; + streamClient->stop(); + delete streamClient; + streamClient = NULL; + delete stream; + stream = NULL; + cout << "done\n"; + throw e; } } catch (const std::exception& e) { + cout << "Exception in Controller::worker(): " << e.what() << "\n"; + if (decoder != NULL) + delete decoder; + decoder = NULL; + usleep(1000000); } } } diff --git a/client/decoder.h b/client/decoder.h index 7935d23c..7ef85143 100644 --- a/client/decoder.h +++ b/client/decoder.h @@ -7,6 +7,7 @@ class Decoder { public: Decoder() {}; + virtual ~Decoder() {}; virtual bool decode(PcmChunk* chunk) = 0; virtual bool setHeader(HeaderMessage* chunk) = 0; }; diff --git a/client/oggDecoder.cpp b/client/oggDecoder.cpp index e9943c10..f0622947 100644 --- a/client/oggDecoder.cpp +++ b/client/oggDecoder.cpp @@ -18,7 +18,7 @@ OggDecoder::OggDecoder() : Decoder() OggDecoder::~OggDecoder() { - ogg_sync_init(&oy); /* Now we can read pages */ +// ogg_sync_init(&oy); /* Now we can read pages */ delete convbuffer; } diff --git a/client/player.cpp b/client/player.cpp index 30eb4fa5..44d3673d 100644 --- a/client/player.cpp +++ b/client/player.cpp @@ -114,26 +114,29 @@ void Player::start() void Player::stop() { active_ = false; + playerThread->join(); + delete playerThread; } void Player::worker() { unsigned int pcm; + snd_pcm_sframes_t avail; + snd_pcm_sframes_t delay; active_ = true; while (active_) { - snd_pcm_sframes_t avail; - snd_pcm_sframes_t delay; snd_pcm_avail_delay(pcm_handle, &avail, &delay); - stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames); - - if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) { - printf("XRUN.\n"); - snd_pcm_prepare(pcm_handle); - } else if (pcm < 0) { - printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm)); + if (stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames, 500)) + { + if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) { + printf("XRUN.\n"); + snd_pcm_prepare(pcm_handle); + } else if (pcm < 0) { + printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm)); + } } } diff --git a/client/stream.cpp b/client/stream.cpp index ceb1a3c2..06832310 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -9,10 +9,10 @@ using namespace std; Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0) { - pBuffer = new DoubleBuffer(500); - pShortBuffer = new DoubleBuffer(100); - pMiniBuffer = new DoubleBuffer(20); - pCardBuffer = new DoubleBuffer(50); + buffer.setSize(500); + shortBuffer.setSize(100); + miniBuffer.setSize(20); + cardBuffer.setSize(50); bufferMs = 500; } @@ -89,10 +89,11 @@ time_point_ms Stream::seek(long ms) } -time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, int correction) +time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction) { if (!chunk) - chunk = chunks.pop(); + if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout))) + throw 0; time_point_ms tp = chunk->timePoint(); int read = 0; @@ -113,7 +114,8 @@ time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frame { read += chunk->readFrames(buffer + read*format.frameSize, toRead - read); if (chunk->isEndOfChunk()) - chunk = chunks.pop(); + if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout))) + throw 0; } if (correction != 0) @@ -137,21 +139,21 @@ time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frame void Stream::updateBuffers(int age) { - pBuffer->add(age); - pMiniBuffer->add(age); - pShortBuffer->add(age); + buffer.add(age); + miniBuffer.add(age); + shortBuffer.add(age); } void Stream::resetBuffers() { - pBuffer->clear(); - pMiniBuffer->clear(); - pShortBuffer->clear(); + buffer.clear(); + miniBuffer.clear(); + shortBuffer.clear(); } -void Stream::getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer) +bool Stream::getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout) { //cout << "framesPerBuffer: " << framesPerBuffer << "\tms: " << framesPerBuffer*2 / PLAYER_CHUNK_MS_SIZE << "\t" << PLAYER_CHUNK_SIZE << "\n"; int msBuffer = framesPerBuffer / format_.msRate(); @@ -163,12 +165,6 @@ int msBuffer = framesPerBuffer / format_.msRate(); ticks = currentTick - lastTick; lastTick = currentTick; - int cardBuffer = 0; - if (ticks > 1) - pCardBuffer->add(ticks); - if (pCardBuffer->full()) - cardBuffer = pCardBuffer->percentil(90); - int correction = 0; if (sleep != 0) { @@ -182,7 +178,7 @@ int msBuffer = framesPerBuffer / format_.msRate(); // if (sleep > -msBuffer/2) // sleep = 0; if (sleep < -msBuffer/2) - return; + return true; } else if (sleep > msBuffer/2) { @@ -219,31 +215,34 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n" - long age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); + long age(0); + try + { + age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); + } + catch(int e) + { + return false; + } -// if (pCardBuffer->full()) -// age += 4*cardBuffer; - -// cout << age << "\t" << outputBufferDacTime << "\t";// << framesPerBuffer << "\t" << msBuffer << "\t" << ticks << "\t" << cardBuffer << "\t" << outputBufferDacTime << "\n"; - if (sleep == 0) { - if (pBuffer->full() && (abs(median) > 1)) + if (buffer.full() && (abs(median) > 1)) { cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n"; sleep = median; } - else if (pShortBuffer->full() && (abs(shortMedian) > 5)) + else if (shortBuffer.full() && (abs(shortMedian) > 5)) { cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n"; sleep = shortMedian; } - else if (pMiniBuffer->full() && (abs(pMiniBuffer->median()) > 50)) + else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50)) { - cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << pMiniBuffer->median() << "\n"; - sleep = pMiniBuffer->mean(); + cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n"; + sleep = miniBuffer.mean(); } else if (abs(age) > 200) { @@ -266,10 +265,11 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n" if (now != lastUpdate) { lastUpdate = now; - median = pBuffer->median(); - shortMedian = pShortBuffer->median(); - std::cerr << "Chunk: " << age << "\t" << pMiniBuffer->median() << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << cardBuffer << "\t" << outputBufferDacTime << "\n"; + median = buffer.median(); + shortMedian = shortBuffer.median(); + std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime << "\n"; } + return true; } diff --git a/client/stream.h b/client/stream.h index 0a44b781..2344026b 100644 --- a/client/stream.h +++ b/client/stream.h @@ -21,12 +21,12 @@ public: Stream(const SampleFormat& format); void addChunk(PcmChunk* chunk); void clearChunks(); - void getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer); + bool getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout); void setBufferLen(size_t bufferLenMs); const SampleFormat& format; private: - time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, int correction = 0); + time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0); time_point_ms getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer); time_point_ms seek(long ms); // time_point_ms seekTo(const time_point_ms& to); @@ -39,10 +39,10 @@ private: long sleep; Queue> chunks; - DoubleBuffer* pCardBuffer; - DoubleBuffer* pMiniBuffer; - DoubleBuffer* pBuffer; - DoubleBuffer* pShortBuffer; + DoubleBuffer cardBuffer; + DoubleBuffer miniBuffer; + DoubleBuffer buffer; + DoubleBuffer shortBuffer; std::shared_ptr chunk; int median; diff --git a/common/socketConnection.cpp b/common/socketConnection.cpp index 845b19d5..bffa8d50 100644 --- a/common/socketConnection.cpp +++ b/common/socketConnection.cpp @@ -44,6 +44,7 @@ void SocketConnection::start() void SocketConnection::stop() { active_ = false; + receiverThread->join(); } @@ -161,7 +162,7 @@ void ClientConnection::worker() { { // std::unique_lock mlock(mutex_); -//cout << "connecting\n"; +cout << "connecting\n"; socket.reset(new tcp::socket(io_service)); struct timeval tv; tv.tv_sec = 5; @@ -169,7 +170,7 @@ void ClientConnection::worker() setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); socket->connect(*iterator); connected_ = true; -//cout << "connected\n"; +cout << "connected\n"; std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; } while(active_) @@ -181,7 +182,7 @@ void ClientConnection::worker() { connected_ = false; cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; - usleep(500*1000); + usleep(1000*1000); } } } diff --git a/server/streamServer.cpp b/server/streamServer.cpp index ecadd368..995de2c8 100644 --- a/server/streamServer.cpp +++ b/server/streamServer.cpp @@ -55,7 +55,7 @@ void StreamServer::acceptor() { socket_ptr sock(new tcp::socket(io_service_)); a.accept(*sock); - cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n"; StreamSession* session = new StreamSession(sock); sessions.insert(shared_ptr(session)); session->start();