git-svn-id: svn://elaine/murooma/trunk@269 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-16 18:21:44 +00:00
parent 2fedce489c
commit d85858ac62
8 changed files with 134 additions and 127 deletions

View file

@ -14,9 +14,8 @@
using namespace std;
Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL), sampleFormat(NULL)
Controller::Controller() : MessageReceiver(), active_(false), streamClient(NULL), sampleFormat(NULL), decoder(NULL)
{
decoder = new OggDecoder();
}
@ -28,39 +27,16 @@ void Controller::onMessageReceived(SocketConnection* connection, const BaseMessa
{
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
pcmChunk->deserialize(baseMessage, buffer);
//cout << "chunk: " << pcmChunk->payloadSize;
cout << "chunk: " << pcmChunk->payloadSize;
if (decoder->decode(pcmChunk))
{
stream->addChunk(pcmChunk);
//cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->tv_sec << ", usec: " << pcmChunk->tv_usec/1000 << ", type: " << pcmChunk->type << "\n";
cout << ", decoded: " << pcmChunk->payloadSize << ", Duration: " << pcmChunk->getDuration() << ", sec: " << pcmChunk->timestamp.sec << ", usec: " << pcmChunk->timestamp.usec/1000 << ", type: " << pcmChunk->type << "\n";
}
else
delete pcmChunk;
}
}
/* else if (baseMessage.type == message_type::header)
{
if (decoder != NULL)
{
HeaderMessage* headerMessage = new HeaderMessage();
headerMessage->deserialize(baseMessage, buffer);
decoder->setHeader(headerMessage);
}
}
else if (baseMessage.type == message_type::sampleformat)
{
sampleFormat = new SampleFormat();
sampleFormat->deserialize(baseMessage, buffer);
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
}
else if (baseMessage.type == message_type::serversettings)
{
ServerSettings* serverSettings = new ServerSettings();
serverSettings->deserialize(baseMessage, buffer);
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
}
*/
}
@ -86,22 +62,28 @@ void Controller::worker()
{
// Decoder* decoder;
active_ = true;
decoder = NULL;
while (active_)
{
try
{
RequestMsg requestMsg("serverSettings");
shared_ptr<ServerSettings> serverSettings(NULL);
while (!(serverSettings = controlConnection->sendReq<ServerSettings>(&requestMsg, 2000)));
while (!(serverSettings = controlConnection->sendReq<ServerSettings>(&requestMsg, 1000)));
cout << "ServerSettings port: " << serverSettings->port << "\n";
streamClient = new StreamClient(this, ip, serverSettings->port);
requestMsg.request = "sampleFormat";
while (!(sampleFormat = controlConnection->sendReq<SampleFormat>(&requestMsg, 2000)));
while (!(sampleFormat = controlConnection->sendReq<SampleFormat>(&requestMsg, 1000)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
decoder = new OggDecoder();
if (decoder != NULL)
{
requestMsg.request = "headerChunk";
shared_ptr<HeaderMessage> headerChunk(NULL);
while (!(headerChunk = controlConnection->sendReq<HeaderMessage>(&requestMsg, 2000)));
while (!(headerChunk = controlConnection->sendReq<HeaderMessage>(&requestMsg, 1000)));
decoder->setHeader(headerChunk.get());
}
@ -124,22 +106,42 @@ void Controller::worker()
Player player(stream);
player.start();
try
{
while (active_)
{
usleep(1000000);
try
{
shared_ptr<TimeMsg> reply = controlConnection->sendReq<TimeMsg>(&timeReq, 2000);
shared_ptr<TimeMsg> reply = controlConnection->sendReq<TimeMsg>(&timeReq, 1000);
if (reply)
{
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
// cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl;
// cout << "C2S: " << timeMsg.latency << ", S2C: " << latency << ", diff: " << (timeMsg.latency - latency) / 2 << endl;
TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
cout << TimeProvider::getInstance().getDiffToServer() << "\n";
}
}
}
catch (const std::exception& e)
{
cout << "Stopping player\n";
player.stop();
cout << "Stopping streamClient\n";
streamClient->stop();
delete streamClient;
streamClient = NULL;
delete stream;
stream = NULL;
cout << "done\n";
throw e;
}
}
catch (const std::exception& e)
{
cout << "Exception in Controller::worker(): " << e.what() << "\n";
if (decoder != NULL)
delete decoder;
decoder = NULL;
usleep(1000000);
}
}
}

View file

@ -7,6 +7,7 @@ class Decoder
{
public:
Decoder() {};
virtual ~Decoder() {};
virtual bool decode(PcmChunk* chunk) = 0;
virtual bool setHeader(HeaderMessage* chunk) = 0;
};

View file

@ -18,7 +18,7 @@ OggDecoder::OggDecoder() : Decoder()
OggDecoder::~OggDecoder()
{
ogg_sync_init(&oy); /* Now we can read pages */
// ogg_sync_init(&oy); /* Now we can read pages */
delete convbuffer;
}

View file

@ -114,21 +114,23 @@ void Player::start()
void Player::stop()
{
active_ = false;
playerThread->join();
delete playerThread;
}
void Player::worker()
{
unsigned int pcm;
snd_pcm_sframes_t avail;
snd_pcm_sframes_t delay;
active_ = true;
while (active_)
{
snd_pcm_sframes_t avail;
snd_pcm_sframes_t delay;
snd_pcm_avail_delay(pcm_handle, &avail, &delay);
stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames);
if (stream_->getPlayerChunk(buff, (float)delay / stream_->format.msRate(), frames, 500))
{
if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) {
printf("XRUN.\n");
snd_pcm_prepare(pcm_handle);
@ -136,6 +138,7 @@ void Player::worker()
printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm));
}
}
}
snd_pcm_drain(pcm_handle);
snd_pcm_close(pcm_handle);

View file

@ -9,10 +9,10 @@ using namespace std;
Stream::Stream(const SampleFormat& sampleFormat) : format(format_), format_(sampleFormat), sleep(0), median(0), shortMedian(0), lastUpdate(0)
{
pBuffer = new DoubleBuffer<long>(500);
pShortBuffer = new DoubleBuffer<long>(100);
pMiniBuffer = new DoubleBuffer<long>(20);
pCardBuffer = new DoubleBuffer<long>(50);
buffer.setSize(500);
shortBuffer.setSize(100);
miniBuffer.setSize(20);
cardBuffer.setSize(50);
bufferMs = 500;
}
@ -89,10 +89,11 @@ time_point_ms Stream::seek(long ms)
}
time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, int correction)
time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction)
{
if (!chunk)
chunk = chunks.pop();
if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout)))
throw 0;
time_point_ms tp = chunk->timePoint();
int read = 0;
@ -113,7 +114,8 @@ time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frame
{
read += chunk->readFrames(buffer + read*format.frameSize, toRead - read);
if (chunk->isEndOfChunk())
chunk = chunks.pop();
if (!chunks.try_pop(chunk, std::chrono::milliseconds(timeout)))
throw 0;
}
if (correction != 0)
@ -137,21 +139,21 @@ time_point_ms Stream::getNextPlayerChunk(void* outputBuffer, unsigned long frame
void Stream::updateBuffers(int age)
{
pBuffer->add(age);
pMiniBuffer->add(age);
pShortBuffer->add(age);
buffer.add(age);
miniBuffer.add(age);
shortBuffer.add(age);
}
void Stream::resetBuffers()
{
pBuffer->clear();
pMiniBuffer->clear();
pShortBuffer->clear();
buffer.clear();
miniBuffer.clear();
shortBuffer.clear();
}
void Stream::getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer)
bool Stream::getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout)
{
//cout << "framesPerBuffer: " << framesPerBuffer << "\tms: " << framesPerBuffer*2 / PLAYER_CHUNK_MS_SIZE << "\t" << PLAYER_CHUNK_SIZE << "\n";
int msBuffer = framesPerBuffer / format_.msRate();
@ -163,12 +165,6 @@ int msBuffer = framesPerBuffer / format_.msRate();
ticks = currentTick - lastTick;
lastTick = currentTick;
int cardBuffer = 0;
if (ticks > 1)
pCardBuffer->add(ticks);
if (pCardBuffer->full())
cardBuffer = pCardBuffer->percentil(90);
int correction = 0;
if (sleep != 0)
{
@ -182,7 +178,7 @@ int msBuffer = framesPerBuffer / format_.msRate();
// if (sleep > -msBuffer/2)
// sleep = 0;
if (sleep < -msBuffer/2)
return;
return true;
}
else if (sleep > msBuffer/2)
{
@ -219,31 +215,34 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n"
long age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
long age(0);
try
{
age = PcmChunk::getAge(getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs();
}
catch(int e)
{
return false;
}
// if (pCardBuffer->full())
// age += 4*cardBuffer;
// cout << age << "\t" << outputBufferDacTime << "\t";// << framesPerBuffer << "\t" << msBuffer << "\t" << ticks << "\t" << cardBuffer << "\t" << outputBufferDacTime << "\n";
if (sleep == 0)
{
if (pBuffer->full() && (abs(median) > 1))
if (buffer.full() && (abs(median) > 1))
{
cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n";
sleep = median;
}
else if (pShortBuffer->full() && (abs(shortMedian) > 5))
else if (shortBuffer.full() && (abs(shortMedian) > 5))
{
cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n";
sleep = shortMedian;
}
else if (pMiniBuffer->full() && (abs(pMiniBuffer->median()) > 50))
else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50))
{
cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << pMiniBuffer->median() << "\n";
sleep = pMiniBuffer->mean();
cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n";
sleep = miniBuffer.mean();
}
else if (abs(age) > 200)
{
@ -266,10 +265,11 @@ cout << "\nms: " << Chunk::getAge(ms) << "\t chunk: " << chunk->getAge() << "\n"
if (now != lastUpdate)
{
lastUpdate = now;
median = pBuffer->median();
shortMedian = pShortBuffer->median();
std::cerr << "Chunk: " << age << "\t" << pMiniBuffer->median() << "\t" << shortMedian << "\t" << median << "\t" << pBuffer->size() << "\t" << cardBuffer << "\t" << outputBufferDacTime << "\n";
median = buffer.median();
shortMedian = shortBuffer.median();
std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime << "\n";
}
return true;
}

View file

@ -21,12 +21,12 @@ public:
Stream(const SampleFormat& format);
void addChunk(PcmChunk* chunk);
void clearChunks();
void getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer);
bool getPlayerChunk(void* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer, size_t timeout);
void setBufferLen(size_t bufferLenMs);
const SampleFormat& format;
private:
time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, int correction = 0);
time_point_ms getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0);
time_point_ms getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer);
time_point_ms seek(long ms);
// time_point_ms seekTo(const time_point_ms& to);
@ -39,10 +39,10 @@ private:
long sleep;
Queue<std::shared_ptr<PcmChunk>> chunks;
DoubleBuffer<long>* pCardBuffer;
DoubleBuffer<long>* pMiniBuffer;
DoubleBuffer<long>* pBuffer;
DoubleBuffer<long>* pShortBuffer;
DoubleBuffer<long> cardBuffer;
DoubleBuffer<long> miniBuffer;
DoubleBuffer<long> buffer;
DoubleBuffer<long> shortBuffer;
std::shared_ptr<PcmChunk> chunk;
int median;

View file

@ -44,6 +44,7 @@ void SocketConnection::start()
void SocketConnection::stop()
{
active_ = false;
receiverThread->join();
}
@ -161,7 +162,7 @@ void ClientConnection::worker()
{
{
// std::unique_lock<std::mutex> mlock(mutex_);
//cout << "connecting\n";
cout << "connecting\n";
socket.reset(new tcp::socket(io_service));
struct timeval tv;
tv.tv_sec = 5;
@ -169,7 +170,7 @@ void ClientConnection::worker()
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
socket->connect(*iterator);
connected_ = true;
//cout << "connected\n";
cout << "connected\n";
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
}
while(active_)
@ -181,7 +182,7 @@ void ClientConnection::worker()
{
connected_ = false;
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
usleep(500*1000);
usleep(1000*1000);
}
}
}

View file

@ -55,7 +55,7 @@ void StreamServer::acceptor()
{
socket_ptr sock(new tcp::socket(io_service_));
a.accept(*sock);
cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n";
cout << "StreamServer::New connection: " << sock->remote_endpoint().address().to_string() << "\n";
StreamSession* session = new StreamSession(sock);
sessions.insert(shared_ptr<StreamSession>(session));
session->start();