git-svn-id: svn://elaine/murooma/trunk@322 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-12-27 12:19:13 +00:00
parent f3a2486560
commit deca098bb4
8 changed files with 99 additions and 93 deletions

View file

@ -6,11 +6,13 @@
using namespace std; using namespace std;
Player::Player(const PcmDevice& pcmDevice, Stream* stream) : Player::Player(const PcmDevice& pcmDevice, Stream* stream) : pcm_handle(NULL), buff(NULL), active_(false), stream_(stream), pcmDevice_(pcmDevice)
active_(false), stream_(stream), pcmDevice_(pcmDevice) { {
} }
void Player::start() {
void Player::start()
{
unsigned int pcm, tmp, rate; unsigned int pcm, tmp, rate;
int channels; int channels;
snd_pcm_hw_params_t *params; snd_pcm_hw_params_t *params;
@ -110,39 +112,66 @@ void Player::start() {
playerThread = new thread(&Player::worker, this); playerThread = new thread(&Player::worker, this);
} }
Player::~Player()
{
stop();
}
void Player::stop() { void Player::stop() {
active_ = false; active_ = false;
playerThread->join(); if (playerThread != NULL)
delete playerThread; {
playerThread->join();
delete playerThread;
playerThread = NULL;
}
if (pcm_handle != NULL)
{
snd_pcm_drain(pcm_handle);
snd_pcm_close(pcm_handle);
pcm_handle = NULL;
}
if (buff != NULL)
{
free(buff);
buff = NULL;
}
} }
void Player::worker() { void Player::worker() {
unsigned int pcm; unsigned int pcm;
snd_pcm_sframes_t framesAvail; snd_pcm_sframes_t framesAvail;
snd_pcm_sframes_t framesDelay; snd_pcm_sframes_t framesDelay;
active_ = true; active_ = true;
while (active_) { while (active_)
{
snd_pcm_avail_delay(pcm_handle, &framesAvail, &framesDelay); snd_pcm_avail_delay(pcm_handle, &framesAvail, &framesDelay);
chronos::usec delay((chronos::usec::rep) (1000 * (double) framesDelay / stream_->format.msRate()));
// cout << "Avail: " << framesAvail << ", delay: " << framesDelay << ", delay[ms]: " << delay.count() / 1000 << "\n";
chronos::usec delay( if (stream_->getPlayerChunk(buff, delay, frames))
(chronos::usec::rep) (1000 * (double) framesDelay {
/ stream_->format.msRate())); if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE)
if (stream_->getPlayerChunk(buff, delay, frames)) { {
if ((pcm = snd_pcm_writei(pcm_handle, buff, frames)) == -EPIPE) {
printf("XRUN.\n"); printf("XRUN.\n");
snd_pcm_prepare(pcm_handle); snd_pcm_prepare(pcm_handle);
} else if (pcm < 0) { }
printf("ERROR. Can't write to PCM device. %s\n", else if (pcm < 0)
snd_strerror(pcm)); {
printf("ERROR. Can't write to PCM device. %s\n", snd_strerror(pcm));
} }
} }
else
usleep(100*1000);
} }
snd_pcm_drain(pcm_handle);
snd_pcm_close(pcm_handle);
free(buff);
} }
vector<PcmDevice> Player::pcm_list(void) { vector<PcmDevice> Player::pcm_list(void) {
void **hints, **n; void **hints, **n;
char *name, *descr, *io; char *name, *descr, *io;
@ -163,23 +192,9 @@ vector<PcmDevice> Player::pcm_list(void) {
pcmDevice.description = descr; pcmDevice.description = descr;
pcmDevice.idx = idx++; pcmDevice.idx = idx++;
result.push_back(pcmDevice); result.push_back(pcmDevice);
// printf("%s\n", name);
//cout << "Name: " << name << "\n"; __end:
//cout << "Desc: " << descr << "\n"; if (name != NULL)
/*
if ((descr1 = descr) != NULL) {
printf(" ");
while (*descr1) {
if (*descr1 == '\n')
printf("\n ");
else
putchar(*descr1);
descr1++;
}
putchar('\n');
}
*/
__end: if (name != NULL)
free(name); free(name);
if (descr != NULL) if (descr != NULL)
free(descr); free(descr);

View file

@ -14,6 +14,7 @@ class Player
{ {
public: public:
Player(const PcmDevice& pcmDevice, Stream* stream); Player(const PcmDevice& pcmDevice, Stream* stream);
virtual ~Player();
void start(); void start();
void stop(); void stop();
static std::vector<PcmDevice> pcm_list(void); static std::vector<PcmDevice> pcm_list(void);

View file

@ -123,7 +123,6 @@ shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
pendingRequests.insert(pendingRequest); pendingRequests.insert(pendingRequest);
} }
// std::mutex mtx;
std::unique_lock<std::mutex> lck(m); std::unique_lock<std::mutex> lck(m);
send(message); send(message);
if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout) if (pendingRequest->cv.wait_for(lck,std::chrono::milliseconds(timeout)) == std::cv_status::no_timeout)
@ -136,8 +135,8 @@ shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message
{ {
sumTimeout += timeout; sumTimeout += timeout;
cout << "timeout while waiting for response to: " << reqId << ", timeout " << sumTimeout.count() << "\n"; cout << "timeout while waiting for response to: " << reqId << ", timeout " << sumTimeout.count() << "\n";
if (sumTimeout > chronos::sec(10)) if (sumTimeout > chronos::sec(5))
throw snapException("sum timeout exceeded 10s"); throw SnapException("sum timeout exceeded 10s");
} }
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
@ -149,7 +148,6 @@ shared_ptr<SerializedMessage> ClientConnection::sendRequest(BaseMessage* message
void ClientConnection::getNextMessage() void ClientConnection::getNextMessage()
{ {
//cout << "getNextMessage\n";
BaseMessage baseMessage; BaseMessage baseMessage;
size_t baseMsgSize = baseMessage.getSize(); size_t baseMsgSize = baseMessage.getSize();
vector<char> buffer(baseMsgSize); vector<char> buffer(baseMsgSize);
@ -163,8 +161,7 @@ void ClientConnection::getNextMessage()
baseMessage.received = t; baseMessage.received = t;
{ {
std::unique_lock<std::mutex> mlock(mutex_);//, std::defer_lock); std::unique_lock<std::mutex> mlock(mutex_);
// if (mlock.try_lock_for(std::chrono::milliseconds(1000)))
{ {
for (auto req: pendingRequests) for (auto req: pendingRequests)
{ {
@ -174,7 +171,6 @@ void ClientConnection::getNextMessage()
req->response->message = baseMessage; req->response->message = baseMessage;
req->response->buffer = (char*)malloc(baseMessage.size); req->response->buffer = (char*)malloc(baseMessage.size);
memcpy(req->response->buffer, &buffer[0], baseMessage.size); memcpy(req->response->buffer, &buffer[0], baseMessage.size);
// std::unique_lock<std::mutex> lck(m);
req->cv.notify_one(); req->cv.notify_one();
return; return;
} }

View file

@ -64,6 +64,8 @@ void Controller::stop()
active_ = false; active_ = false;
controllerThread->join(); controllerThread->join();
clientConnection->stop(); clientConnection->stop();
delete controllerThread;
delete clientConnection;
} }
@ -72,6 +74,7 @@ void Controller::worker()
// Decoder* decoder; // Decoder* decoder;
active_ = true; active_ = true;
decoder = NULL; decoder = NULL;
stream = NULL;
while (active_) while (active_)
{ {
@ -120,35 +123,24 @@ void Controller::worker()
shared_ptr<AckMsg> ackMsg(NULL); shared_ptr<AckMsg> ackMsg(NULL);
while (active_ && !(ackMsg = clientConnection->sendReq<AckMsg>(&startStream))); while (active_ && !(ackMsg = clientConnection->sendReq<AckMsg>(&startStream)));
try while (active_)
{ {
while (active_) usleep(500*1000);
{ shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq);
usleep(500*1000); if (reply)
shared_ptr<TimeMsg> reply = clientConnection->sendReq<TimeMsg>(&timeReq); {
if (reply) double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
{ TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; }
TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
// cout << "Median: " << TimeProvider::getInstance().getDiffToServer() << "\n";
}
}
}
catch (const std::exception& e)
{
cout << "Exception in Controller::worker(): " << e.what() << "\n";
cout << "Stopping player\n";
player.stop();
cout << "Deleting stream\n";
delete stream;
stream = NULL;
cout << "done\n";
throw;
} }
} }
catch (const std::exception& e) catch (const std::exception& e)
{ {
cout << "Exception in Controller::worker(): " << e.what() << "\n"; cout << "Exception in Controller::worker(): " << e.what() << "\n";
cout << "Deleting stream\n";
if (stream != NULL)
delete stream;
stream = NULL;
if (decoder != NULL) if (decoder != NULL)
delete decoder; delete decoder;
decoder = NULL; decoder = NULL;
@ -156,7 +148,7 @@ void Controller::worker()
clientConnection->stop(); clientConnection->stop();
cout << "done\n"; cout << "done\n";
if (active_) if (active_)
usleep(1000000); usleep(500*1000);
} }
} }
cout << "Thread stopped\n"; cout << "Thread stopped\n";

View file

@ -19,8 +19,7 @@
using namespace std; using namespace std;
namespace po = boost::program_options; namespace po = boost::program_options;
//bool g_terminated = false; bool g_terminated = false;
std::condition_variable terminateSignaled;
PcmDevice getPcmDevice(const std::string& soundcard) PcmDevice getPcmDevice(const std::string& soundcard)
{ {
@ -64,7 +63,6 @@ int main (int argc, char *argv[])
("ip,i", po::value<string>(&ip)->default_value("192.168.0.2"), "server IP") ("ip,i", po::value<string>(&ip)->default_value("192.168.0.2"), "server IP")
("port,p", po::value<size_t>(&port)->default_value(98765), "server port") ("port,p", po::value<size_t>(&port)->default_value(98765), "server port")
("soundcard,s", po::value<string>(&soundcard)->default_value("default"), "index or name of the soundcard") ("soundcard,s", po::value<string>(&soundcard)->default_value("default"), "index or name of the soundcard")
// ("buffer,b", po::value<int>(&bufferMs)->default_value(300), "buffer size [ms]")
("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize")
("latency", po::value<size_t>(&latency)->default_value(0), "latency") ("latency", po::value<size_t>(&latency)->default_value(0), "latency")
; ;
@ -112,10 +110,9 @@ int main (int argc, char *argv[])
Controller controller; Controller controller;
controller.start(pcmDevice, ip, port, latency); controller.start(pcmDevice, ip, port, latency);
while(!g_terminated)
usleep(100*1000);
std::mutex mtx;
std::unique_lock<std::mutex> lck(mtx);
terminateSignaled.wait(lck);
controller.stop(); controller.stop();
return 0; return 0;

View file

@ -3,9 +3,8 @@
#include <signal.h> #include <signal.h>
#include <syslog.h> #include <syslog.h>
#include <condition_variable>
extern std::condition_variable terminateSignaled; extern bool g_terminated;
void signal_handler(int sig) void signal_handler(int sig)
{ {
@ -17,11 +16,11 @@ void signal_handler(int sig)
break; break;
case SIGTERM: case SIGTERM:
syslog(LOG_WARNING, "Received SIGTERM signal."); syslog(LOG_WARNING, "Received SIGTERM signal.");
terminateSignaled.notify_all(); g_terminated = true;
break; break;
case SIGINT: case SIGINT:
syslog(LOG_WARNING, "Received SIGINT signal."); syslog(LOG_WARNING, "Received SIGINT signal.");
terminateSignaled.notify_all(); g_terminated = true;
break; break;
default: default:
syslog(LOG_WARNING, "Unhandled signal "); syslog(LOG_WARNING, "Unhandled signal ");

View file

@ -6,25 +6,27 @@
#include <cstring> // std::strlen, std::strcpy #include <cstring> // std::strlen, std::strcpy
// text_exception uses a dynamically-allocated internal c-string for what(): // text_exception uses a dynamically-allocated internal c-string for what():
class snapException : std::exception { class SnapException : public std::exception {
char* text_; char* text_;
public: public:
snapException(const char* text) SnapException(const char* text)
{ {
text_ = new char[std::strlen(text)]; std::strcpy (text_,text); text_ = new char[std::strlen(text)];
std::strcpy(text_, text);
} }
snapException(const snapException& e) SnapException(const SnapException& e)
{ {
text_ = new char[std::strlen(e.text_)]; std::strcpy (text_,e.text_); text_ = new char[std::strlen(e.text_)];
std::strcpy(text_, e.text_);
} }
~snapException() throw() virtual ~SnapException() throw()
{ {
delete[] text_; delete[] text_;
} }
const char* what() const noexcept virtual const char* what() const noexcept
{ {
return text_; return text_;
} }

View file

@ -8,7 +8,7 @@
#include "message/message.h" #include "message/message.h"
#include "pcmEncoder.h" #include "pcmEncoder.h"
#include "oggEncoder.h" #include "oggEncoder.h"
c#include "controlServer.h" #include "controlServer.h"
bool g_terminated = false; bool g_terminated = false;
@ -97,16 +97,17 @@ int main(int argc, char* argv[])
controlServer->setHeader(encoder->getHeader()); controlServer->setHeader(encoder->getHeader());
controlServer->start(); controlServer->start();
// StreamServer* server = new StreamServer(port + 1); signal(SIGHUP, signal_handler);
// server->start(); signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
while (!g_terminated) while (!g_terminated)
{ {
int fd = open(fifoName.c_str(), O_RDONLY); int fd = open(fifoName.c_str(), O_RDONLY | O_NONBLOCK);
try try
{ {
shared_ptr<PcmChunk> chunk;//(new WireChunk()); shared_ptr<PcmChunk> chunk;//(new WireChunk());
while (true)//cin.good()) while (!g_terminated)//cin.good())
{ {
chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE)); chunk.reset(new PcmChunk(sampleFormat, duration));//2*WIRE_CHUNK_SIZE));
int toRead = chunk->payloadSize; int toRead = chunk->payloadSize;
@ -114,12 +115,14 @@ int main(int argc, char* argv[])
do do
{ {
int count = read(fd, chunk->payload + len, toRead - len); int count = read(fd, chunk->payload + len, toRead - len);
if (count <= 0) if (count == 0)
throw ServerException("count = " + boost::lexical_cast<string>(count)); throw ServerException("count = 0");
else if (count == -1)
len += count; usleep(100*1000);
else
len += count;
} }
while (len < toRead); while ((len < toRead) && !g_terminated);
chunk->timestamp.sec = tvChunk.tv_sec; chunk->timestamp.sec = tvChunk.tv_sec;
chunk->timestamp.usec = tvChunk.tv_usec; chunk->timestamp.usec = tvChunk.tv_usec;
@ -157,6 +160,7 @@ int main(int argc, char* argv[])
} }
syslog (LOG_NOTICE, "First daemon terminated."); syslog (LOG_NOTICE, "First daemon terminated.");
cout << "Terminated\n";
closelog(); closelog();
} }