Airplay meta pipe reader uses asio event loop

This commit is contained in:
badaix 2020-01-04 12:34:59 +01:00
parent 8c6d703ec5
commit 9f2c256172
2 changed files with 83 additions and 3 deletions

View file

@ -49,7 +49,8 @@ string hex2str(string input)
* move to Makefile?
*/
AirplayStream::AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri) : ProcessStream(pcmListener, ioc, uri), port_(5000)
AirplayStream::AirplayStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
: ProcessStream(pcmListener, ioc, uri), port_(5000), pipe_open_timer_(ioc)
{
logStderr_ = true;
@ -68,8 +69,17 @@ AirplayStream::AirplayStream(PcmListener* pcmListener, boost::asio::io_context&
params_wo_port_ += " --metadata-pipename " + pipePath_;
params_ = params_wo_port_ + " --port=" + cpt::to_string(port_);
#ifdef HAS_EXPAT
createParser();
#endif
#if 0
// This thread is replaced with asio based "pipeReadLine".
// Also seems that this thread was leaking.
// The new implementation is _not tested_
pipeReaderThread_ = thread(&AirplayStream::pipeReader, this);
pipeReaderThread_.detach();
#endif
}
@ -129,6 +139,71 @@ void AirplayStream::push()
}
#endif
void AirplayStream::do_connect()
{
ProcessStream::do_connect();
pipeReadLine();
}
void AirplayStream::pipeReadLine()
{
if (!pipe_fd_ || !pipe_fd_->is_open())
{
try
{
int fd = open(pipePath_.c_str(), O_RDONLY | O_NONBLOCK);
pipe_fd_ = std::make_unique<boost::asio::posix::stream_descriptor>(ioc_, fd);
}
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Error opening pipe: " << e.what() << "\n";
pipe_fd_ = nullptr;
auto self = this->shared_from_this();
pipe_open_timer_.expires_after(std::chrono::milliseconds(500));
pipe_open_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
pipeReadLine();
}
});
return;
}
}
const std::string delimiter = "\n";
auto self = shared_from_this();
boost::asio::async_read_until(*pipe_fd_, streambuf_pipe_, delimiter, [this, self, delimiter](const std::error_code& ec, std::size_t bytes_transferred) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error while reading from control socket: " << ec.message() << "\n";
return;
}
// Extract up to the first delimiter.
std::string line{buffers_begin(streambuf_pipe_.data()), buffers_begin(streambuf_pipe_.data()) + bytes_transferred - delimiter.length()};
if (!line.empty())
{
if (line.back() == '\r')
line.resize(line.size() - 1);
#ifdef HAS_EXPAT
parse(line);
#endif
}
streambuf_pipe_.consume(bytes_transferred);
pipeReadLine();
});
}
#if 0
// This thread is replaced with asio based "pipeReadLine".
// Also seems that this thread was leaking.
// The new implementation is _not tested_
void AirplayStream::pipeReader()
{
#ifdef HAS_EXPAT
@ -155,6 +230,7 @@ void AirplayStream::pipeReader()
this_thread::sleep_for(chrono::milliseconds(500));
}
}
#endif
void AirplayStream::initExeAndPath(const string& filename)
{