From d0c07e3b6702d40bed5f4f6913888d77df4569ce Mon Sep 17 00:00:00 2001 From: "(no author)" <(no author)@d8a302eb-03bc-478d-80e4-98257eca68ef> Date: Sat, 27 Sep 2014 12:13:47 +0000 Subject: [PATCH] pcmDevice and chronos in stream git-svn-id: svn://elaine/murooma/trunk@298 d8a302eb-03bc-478d-80e4-98257eca68ef --- client/alsaPlayer.cpp | 59 ++++++++++++++++-- client/alsaPlayer.h | 6 +- client/controller.cpp | 14 ++--- client/controller.h | 4 +- client/pcmDevice.h | 18 ++++++ client/snapClient.cpp | 55 +++++++++++++++-- client/stream.cpp | 131 ++++++++++++++++++---------------------- client/stream.h | 4 +- client/timeProvider.cpp | 18 +----- client/timeProvider.h | 19 ++++-- common/timeDefs.h | 7 +++ message/pcmChunk.h | 8 ++- message/sampleFormat.h | 14 ++++- 13 files changed, 239 insertions(+), 118 deletions(-) create mode 100644 client/pcmDevice.h diff --git a/client/alsaPlayer.cpp b/client/alsaPlayer.cpp index 0d81ea02..9b938f72 100644 --- a/client/alsaPlayer.cpp +++ b/client/alsaPlayer.cpp @@ -2,13 +2,12 @@ #include #include -#define PCM_DEVICE "default" #define BUFFER_TIME 100000 using namespace std; -Player::Player(Stream* stream) : active_(false), stream_(stream) +Player::Player(const PcmDevice& pcmDevice, Stream* stream) : active_(false), stream_(stream), pcmDevice_(pcmDevice) { } @@ -25,8 +24,8 @@ void Player::start() /* Open the PCM device in playback mode */ - if ((pcm = snd_pcm_open(&pcm_handle, PCM_DEVICE, SND_PCM_STREAM_PLAYBACK, 0)) < 0) - cout << "ERROR: Can't open " << PCM_DEVICE << " PCM device. " << snd_strerror(pcm) << "\n"; + if ((pcm = snd_pcm_open(&pcm_handle, pcmDevice_.name.c_str(), SND_PCM_STREAM_PLAYBACK, 0)) < 0) + cout << "ERROR: Can't open " << pcmDevice_.name << " PCM device. " << snd_strerror(pcm) << "\n"; /* struct snd_pcm_playback_info_t pinfo; if ( (pcm = snd_pcm_playback_info( pcm_handle, &pinfo )) < 0 ) @@ -149,3 +148,55 @@ void Player::worker() } + +vector Player::pcm_list(void) +{ + void **hints, **n; + char *name, *descr, *io; + vector result; + PcmDevice pcmDevice; + + if (snd_device_name_hint(-1, "pcm", &hints) < 0) + return result; + n = hints; + size_t idx(0); + while (*n != NULL) { + name = snd_device_name_get_hint(*n, "NAME"); + descr = snd_device_name_get_hint(*n, "DESC"); + io = snd_device_name_get_hint(*n, "IOID"); + if (io != NULL && strcmp(io, "Output") != 0) + goto __end; + pcmDevice.name = name; + pcmDevice.description = descr; + pcmDevice.idx = idx++; + result.push_back(pcmDevice); +// printf("%s\n", name); +//cout << "Name: " << name << "\n"; +//cout << "Desc: " << descr << "\n"; +/* + if ((descr1 = descr) != NULL) { + printf(" "); + while (*descr1) { + if (*descr1 == '\n') + printf("\n "); + else + putchar(*descr1); + descr1++; + } + putchar('\n'); + } +*/ + __end: + if (name != NULL) + free(name); + if (descr != NULL) + free(descr); + if (io != NULL) + free(io); + n++; + } + snd_device_name_free_hint(hints); + return result; +} + + diff --git a/client/alsaPlayer.h b/client/alsaPlayer.h index 3db37f51..a2f6f40e 100644 --- a/client/alsaPlayer.h +++ b/client/alsaPlayer.h @@ -4,16 +4,19 @@ #include #include #include +#include #include #include "stream.h" +#include "pcmDevice.h" class Player { public: - Player(Stream* stream); + Player(const PcmDevice& pcmDevice, Stream* stream); void start(); void stop(); + static std::vector pcm_list(void); private: void worker(); @@ -23,6 +26,7 @@ private: std::atomic active_; Stream* stream_; std::thread* playerThread; + PcmDevice pcmDevice_; }; diff --git a/client/controller.cpp b/client/controller.cpp index 131f0772..54a30a1e 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -47,9 +47,10 @@ void Controller::onMessageReceived(ClientConnection* connection, const BaseMessa } -void Controller::start(const std::string& _ip, size_t _port) +void Controller::start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port) { ip = _ip; + pcmDevice_ = pcmDevice; clientConnection = new ClientConnection(this, ip, _port); controllerThread = new thread(&Controller::worker, this); } @@ -92,25 +93,22 @@ void Controller::worker() decoder->setHeader(headerChunk.get()); RequestMsg timeReq("time"); - for (size_t n=0; n<30; ++n) + for (size_t n=0; n<50; ++n) { shared_ptr reply = clientConnection->sendReq(&timeReq, 2000); if (reply) { double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); -/*cout << TimeProvider::sinceEpoche(chronos::hrc::now()).count() << "\n"; -cout << TimeProvider::sinceEpoche(TimeProvider::now()).count() << "\n"; -cout << TimeProvider::sinceEpoche(TimeProvider::serverNow()).count() << "\n"; -cout << "Received: " << TimeProvider::sinceEpoche(TimeProvider::toTimePoint(reply->received)).count() << "\n\n"; -*/ usleep(1000); + usleep(1000); } } + cout << "diff to server [ms]: " << TimeProvider::getInstance().getDiffToServer().count() << "\n"; stream = new Stream(*sampleFormat); stream->setBufferLen(serverSettings->bufferMs); - Player player(stream); + Player player(pcmDevice_, stream); player.start(); RequestMsg startStream("startStream"); diff --git a/client/controller.h b/client/controller.h index bff8684d..1f63d577 100644 --- a/client/controller.h +++ b/client/controller.h @@ -7,13 +7,14 @@ #include "clientConnection.h" #include "decoder.h" #include "stream.h" +#include "pcmDevice.h" class Controller : public MessageReceiver { public: Controller(); - void start(const std::string& _ip, size_t _port); + void start(const PcmDevice& pcmDevice, const std::string& _ip, size_t _port); void stop(); virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer); virtual void onException(ClientConnection* connection, const std::exception& exception); @@ -27,6 +28,7 @@ private: std::string ip; std::shared_ptr sampleFormat; Decoder* decoder; + PcmDevice pcmDevice_; }; diff --git a/client/pcmDevice.h b/client/pcmDevice.h new file mode 100644 index 00000000..9140619c --- /dev/null +++ b/client/pcmDevice.h @@ -0,0 +1,18 @@ +#ifndef PCM_DEVICE_H +#define PCM_DEVICE_H + +#include + + +class PcmDevice +{ +public: + PcmDevice() : idx(-1){}; + int idx; + std::string name; + std::string description; +}; + + +#endif + diff --git a/client/snapClient.cpp b/client/snapClient.cpp index 9f262d0c..88c4bd3b 100644 --- a/client/snapClient.cpp +++ b/client/snapClient.cpp @@ -12,31 +12,57 @@ #include "common/utils.h" #include "common/log.h" #include "controller.h" - +#include "alsaPlayer.h" using namespace std; namespace po = boost::program_options; +PcmDevice getPcmDevice(const std::string& soundcard) +{ + vector pcmDevices = Player::pcm_list(); + int soundcardIdx = -1; + + try + { + soundcardIdx = boost::lexical_cast(soundcard); + for (auto dev: pcmDevices) + if (dev.idx == soundcardIdx) + return dev; + } + catch(...) + { + } + + for (auto dev: pcmDevices) + if (dev.name.find(soundcard) != string::npos) + return dev; + + PcmDevice pcmDevice; + return pcmDevice; +} + int main (int argc, char *argv[]) { - int deviceIdx; + string soundcard; string ip; // int bufferMs; size_t port; bool runAsDaemon; + bool listPcmDevices; // string sampleFormat; po::options_description desc("Allowed options"); desc.add_options() ("help,h", "produce help message") ("port,p", po::value(&port)->default_value(98765), "port where the server listens on") ("ip,i", po::value(&ip)->default_value("192.168.0.2"), "server IP") - ("soundcard,s", po::value(&deviceIdx)->default_value(-1), "index of the soundcard") + ("soundcard,s", po::value(&soundcard)->default_value("default"), "index or name of the soundcard") // ("sampleformat,f", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") // ("buffer,b", po::value(&bufferMs)->default_value(300), "buffer size [ms]") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") + ("list,l", po::bool_switch(&listPcmDevices)->default_value(false), "list pcm devices") ; po::variables_map vm; @@ -49,6 +75,26 @@ int main (int argc, char *argv[]) return 1; } + if (listPcmDevices) + { + vector pcmDevices = Player::pcm_list(); + for (auto dev: pcmDevices) + { + cout << dev.idx << ": " << dev.name << "\n"; + cout << dev.description << "\n\n"; + } + return 1; + } + + PcmDevice pcmDevice = getPcmDevice(soundcard); + if (pcmDevice.idx != -1) + cout << pcmDevice.idx << ": " << pcmDevice.name << "\n"; + else + { + cout << "soundcard \"" << soundcard << "\" not found\n"; + return 1; + } + std::clog.rdbuf(new Log("snapclient", LOG_DAEMON)); if (runAsDaemon) { @@ -56,8 +102,9 @@ int main (int argc, char *argv[]) std::clog << kLogNotice << "daemon started" << std::endl; } + Controller controller; - controller.start(ip, port); + controller.start(pcmDevice, ip, port); while(true) usleep(10000); diff --git a/client/stream.cpp b/client/stream.cpp index 6fe324df..0ace3915 100644 --- a/client/stream.cpp +++ b/client/stream.cpp @@ -48,7 +48,7 @@ time_point_hrc Stream::getSilentPlayerChunk(void* outputBuffer, unsigned long fr { if (!chunk) chunk = chunks.pop(); - time_point_hrc tp = chunk->timePoint(); + time_point_hrc tp = chunk->start(); memset(outputBuffer, 0, framesPerBuffer * format.frameSize); return tp; } @@ -78,7 +78,7 @@ time_point_hrc Stream::seek(long ms) chunk = chunks.pop(); if (ms <= 0) - return chunk->timePoint(); + return chunk->start(); // time_point_ms tp = chunk->timePoint(); while (ms > chunk->duration().count()) @@ -87,27 +87,27 @@ time_point_hrc Stream::seek(long ms) ms -= min(ms, (long)chunk->durationLeft().count()); } chunk->seek(ms * format.msRate()); - return chunk->timePoint(); + return chunk->start(); } -time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction) +time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, const chronos::usec& correction) { if (!chunk) if (!chunks.try_pop(chunk, chronos::msec(timeout))) throw 0; //cout << "duration: " << chunk->duration().count() << ", " << chunk->duration().count() << ", " << chunk->duration().count() << "\n"; - time_point_hrc tp = chunk->timePoint(); + time_point_hrc tp = chunk->start(); int read = 0; - int toRead = framesPerBuffer + correction*format.msRate(); + int toRead = framesPerBuffer + correction.count()*format.usRate(); char* buffer; - if (correction != 0) + if (correction.count() != 0) { - int msBuffer = floor(framesPerBuffer / format.msRate()); - if (abs(correction) > msBuffer / 2) - correction = copysign(msBuffer / 2, correction); +// chronos::usec usBuffer = (chronos::usec::rep)(framesPerBuffer / format.usRate()); +// if (abs(correction) > usBuffer / 2) +// correction = copysign(usBuffer / 2, correction); buffer = (char*)malloc(toRead * format.frameSize); } else @@ -121,10 +121,10 @@ time_point_hrc Stream::getNextPlayerChunk(void* outputBuffer, unsigned long fram throw 0; } - if (correction != 0) + if (correction.count() != 0) { float factor = (float)toRead / framesPerBuffer;//(float)(framesPerBuffer*channels_); - std::cout << "correction: " << correction << ", factor: " << factor << "\n"; + std::cout << "correction: " << correction.count() << ", factor: " << factor << "\n"; float idx = 0; for (size_t n=0; nfull() && (abs(shortMedian) > 5): 25 -Sleep: 25 -Chunk: 25 25 25 25 23 105941 -Chunk: 25 25 25 25 55 107210 -Chunk: 25 25 25 25 88 104671 -pShortBuffer->full() && (abs(shortMedian) > 5): 25 -Sleep: 25 -Chunk: 25 25 25 25 20 99614 -Chunk: 25 25 25 25 53 96802 -Chunk: 25 25 25 25 86 117732 -*/ - int correction = 0; - if (sleep != 0) + chronos::nsec bufferDuration = chronos::nsec(chronos::usec::rep(framesPerBuffer / format_.nsRate())); +// cout << "buffer duration: " << bufferDuration.count() << "\n"; + + chronos::usec correction = chronos::usec(0); + if (sleep.count() != 0) { +// cout << "Sleep " << sleep.count() << "\n"; resetBuffers(); - if (sleep < -msBuffer/2) + if (sleep < -bufferDuration/2) { - cout << "Sleep " << sleep; - msec age = chrono::duration_cast(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, framesPerBuffer) - bufferMs + outputBufferDacTime); - sleep = age.count(); + usec age = chrono::duration_cast(TimeProvider::serverNow() - getSilentPlayerChunk(outputBuffer, framesPerBuffer) - bufferMs + outputBufferDacTime); + sleep = age; // sleep = PcmChunk::getAge(getSilentPlayerChunk(outputBuffer, framesPerBuffer)) - bufferMs + outputBufferDacTime + TimeProvider::getInstance().getDiffToServerMs(); - std::cerr << " after: " << sleep << ", chunks: " << chunks.size() << "\n"; + std::cerr << " after: " << sleep.count() << ", chunks: " << chunks.size() << "\n"; // std::clog << kLogNotice << "sleep: " << sleep << std::endl; // if (sleep > -msBuffer/2) // sleep = 0; - if (sleep < -msBuffer/2) + if (sleep < -bufferDuration/2) return true; } - else if (sleep > msBuffer/2) + else if (sleep > bufferDuration/2) { if (!chunk) if (!chunks.try_pop(chunk, chronos::msec(timeout))) throw 0; - while (sleep > chunk->duration().count()) + while (sleep > chunk->duration()) { - cout << "sleep > chunk->getDuration(): " << sleep << " > " << chunk->duration().count() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime.count() << ", needed: " << msBuffer << "\n"; + cout << "sleep > chunk->getDuration(): " << sleep.count() << " > " << chunk->duration().count() << ", chunks: " << chunks.size() << ", out: " << outputBufferDacTime.count() << ", needed: " << bufferDuration.count() << "\n"; if (!chunks.try_pop(chunk, chronos::msec(timeout))) throw 0; - msec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk->timePoint() - bufferMs + outputBufferDacTime); - sleep = age.count(); + msec age = std::chrono::duration_cast(TimeProvider::serverNow() - chunk->start() - bufferMs + outputBufferDacTime); + sleep = age; usleep(1000); } // cout << "seek: " << PcmChunk::getAge(seek(sleep)) - bufferMs + outputBufferDacTime << "\n"; - sleep = 0; + sleep = chronos::usec(0); } - else if (sleep < 0) + else if (sleep < -chronos::usec(100)) { - ++sleep; - correction = -1; + sleep += chronos::usec(100); + correction = -chronos::usec(100); } - else if (sleep > 0) + else if (sleep > chronos::usec(100)) { - --sleep; - correction = 1; + sleep -= chronos::usec(100); + correction = chronos::usec(100); + } + else + { + cout << "Sleep " << sleep.count() << "\n"; + correction = sleep; + sleep = chronos::usec(0); } } - long age(0); - age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction) - bufferMs + outputBufferDacTime).count(); + chronos::usec age = std::chrono::duration_cast(TimeProvider::serverNow() - getNextPlayerChunk(outputBuffer, framesPerBuffer, timeout, correction) - bufferMs + outputBufferDacTime); - - if (sleep == 0) + if (sleep.count() == 0) { - if (buffer.full() && (abs(median) > 1)) + if (buffer.full() && (chronos::usec(abs(median)) > chronos::msec(1))) { cout << "pBuffer->full() && (abs(median) > 1): " << median << "\n"; - sleep = median; + sleep = chronos::usec(median); } - else if (shortBuffer.full() && (abs(shortMedian) > 5)) + else if (shortBuffer.full() && (chronos::usec(abs(shortMedian)) > chronos::msec(5))) { cout << "pShortBuffer->full() && (abs(shortMedian) > 5): " << shortMedian << "\n"; - sleep = shortMedian; + sleep = chronos::usec(shortMedian); } - else if (miniBuffer.full() && (abs(miniBuffer.median()) > 50)) + else if (miniBuffer.full() && (chronos::usec(abs(miniBuffer.median())) > chronos::msec(50))) { cout << "pMiniBuffer->full() && (abs(pMiniBuffer->mean()) > 50): " << miniBuffer.median() << "\n"; - sleep = miniBuffer.mean(); + sleep = chronos::usec((chronos::msec::rep)miniBuffer.mean()); } - else if (abs(age) > 200) + else if (chronos::abs(age) > chronos::msec(200)) { - cout << "age > 50: " << age << "\n"; + cout << "age > 50: " << age.count() << "\n"; sleep = age; } } - if (sleep != 0) - std::cerr << "Sleep: " << sleep << "\n"; + if (sleep.count() != 0) + std::cerr << "Sleep: " << sleep.count() << "\n"; + updateBuffers(age.count()); + // std::cerr << "Chunk: " << age << "\t" << outputBufferDacTime*1000 << "\n"; - if (ticks > 2) - { - // cout << age << "\n"; - updateBuffers(age); - } time_t now = time(NULL); if (now != lastUpdate) { lastUpdate = now; median = buffer.median(); shortMedian = shortBuffer.median(); - std::cerr << "Chunk: " << age << "\t" << miniBuffer.median() << "\t" << shortMedian << "\t" << median << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime.count() << "\n"; + std::cerr << "Chunk: " << age.count()/100 << "\t" << miniBuffer.median()/100 << "\t" << shortMedian/100 << "\t" << median/100 << "\t" << buffer.size() << "\t" << /*cardBuffer << "\t" <<*/ outputBufferDacTime.count() << "\n"; } return true; } catch(int e) { - sleep = 0; + sleep = chronos::usec(0); return false; } } diff --git a/client/stream.h b/client/stream.h index 13adb6de..ebce6b2e 100644 --- a/client/stream.h +++ b/client/stream.h @@ -27,7 +27,7 @@ public: const SampleFormat& format; private: - chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, int correction = 0); + chronos::time_point_hrc getNextPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer, size_t timeout, const chronos::usec& correction = chronos::usec(0)); chronos::time_point_hrc getSilentPlayerChunk(void* outputBuffer, unsigned long framesPerBuffer); chronos::time_point_hrc seek(long ms); // time_point_ms seekTo(const time_point_ms& to); @@ -37,7 +37,7 @@ private: SampleFormat format_; long lastTick; - long sleep; + chronos::usec sleep; Queue> chunks; DoubleBuffer cardBuffer; diff --git a/client/timeProvider.cpp b/client/timeProvider.cpp index d79cdcd9..53b2dd07 100644 --- a/client/timeProvider.cpp +++ b/client/timeProvider.cpp @@ -3,7 +3,7 @@ TimeProvider::TimeProvider() : diffToServer(0) { - diffBuffer.setSize(120); + diffBuffer.setSize(100); } @@ -13,22 +13,10 @@ void TimeProvider::setDiffToServer(double ms) diffToServer = diffBuffer.median(); } - -long TimeProvider::getDiffToServer() -{ - return diffToServer; -} - - -long TimeProvider::getDiffToServerMs() -{ - return diffToServer / 1000; -} - - +/* long TimeProvider::getPercentileDiffToServer(size_t percentile) { return diffBuffer.percentile(percentile); } - +*/ diff --git a/client/timeProvider.h b/client/timeProvider.h index 4fb27f62..7536d7e3 100644 --- a/client/timeProvider.h +++ b/client/timeProvider.h @@ -18,10 +18,17 @@ public: } void setDiffToServer(double ms); - long getDiffToServer(); - long getPercentileDiffToServer(size_t percentile); - long getDiffToServerMs(); + template + inline T getDiffToServer() const + { + return std::chrono::duration_cast(chronos::usec(diffToServer)); + } + +/* chronos::usec::rep getDiffToServer(); + chronos::usec::rep getPercentileDiffToServer(size_t percentile); + long getDiffToServerMs(); +*/ template static T sinceEpoche(const chronos::time_point_hrc& point) @@ -41,7 +48,7 @@ public: inline static chronos::time_point_hrc serverNow() { - return chronos::hrc::now() + chronos::usec(TimeProvider::getInstance().getDiffToServer()); + return chronos::hrc::now() + TimeProvider::getInstance().getDiffToServer(); } private: @@ -49,8 +56,8 @@ private: TimeProvider(TimeProvider const&); // Don't Implement void operator=(TimeProvider const&); // Don't implement - DoubleBuffer diffBuffer; - std::atomic diffToServer; + DoubleBuffer diffBuffer; + std::atomic diffToServer; }; diff --git a/common/timeDefs.h b/common/timeDefs.h index 5f8978cf..691da288 100644 --- a/common/timeDefs.h +++ b/common/timeDefs.h @@ -5,6 +5,13 @@ namespace chronos { + template + std::chrono::duration abs(std::chrono::duration d) + { + Rep x = d.count(); + return std::chrono::duration(x >= 0 ? x : -x); + } + typedef std::chrono::high_resolution_clock hrc; typedef std::chrono::time_point time_point_hrc; typedef std::chrono::seconds sec; diff --git a/message/pcmChunk.h b/message/pcmChunk.h index 40e57192..d70c0580 100644 --- a/message/pcmChunk.h +++ b/message/pcmChunk.h @@ -17,8 +17,9 @@ public: ~PcmChunk(); int readFrames(void* outputBuffer, size_t frameCount); + int seek(int frames); - inline chronos::time_point_hrc timePoint() const + inline chronos::time_point_hrc start() const { return chronos::time_point_hrc( chronos::sec(timestamp.sec) + @@ -27,7 +28,10 @@ public: ); } - int seek(int frames); + inline chronos::time_point_hrc end() const + { + return start() + durationLeft(); + } template inline T duration() const diff --git a/message/sampleFormat.h b/message/sampleFormat.h index 5370f0b5..a00e8746 100644 --- a/message/sampleFormat.h +++ b/message/sampleFormat.h @@ -22,9 +22,19 @@ public: uint16_t sampleSize; uint16_t frameSize; - double msRate() const + inline double msRate() const { - return (double)rate/1000.f; + return (double)rate/1000.; + } + + inline double usRate() const + { + return (double)rate/1000000.; + } + + inline double nsRate() const + { + return (double)rate/1000000000.; } virtual void read(std::istream& stream)