Use helper function for async_wait

PosixStream waits 100ms before reconnecting
This commit is contained in:
badaix 2020-01-05 14:36:46 +01:00
parent 5c79d9447d
commit 9353c61d7b
5 changed files with 36 additions and 68 deletions

View file

@ -160,18 +160,7 @@ void AirplayStream::pipeReadLine()
{
LOG(ERROR, LOG_TAG) << "Error opening pipe: " << e.what() << "\n";
pipe_fd_ = nullptr;
auto self = this->shared_from_this();
pipe_open_timer_.expires_after(std::chrono::milliseconds(500));
pipe_open_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
pipeReadLine();
}
});
wait(pipe_open_timer_, 500ms, [this] { pipeReadLine(); });
return;
}
}

View file

@ -47,6 +47,9 @@ protected:
virtual void do_read();
void check_state();
template <typename Timer, typename Rep, typename Period>
void wait(Timer& timer, const std::chrono::duration<Rep, Period>& duration, std::function<void()> handler);
std::unique_ptr<msg::PcmChunk> chunk_;
timeval tv_chunk_;
bool first_;
@ -59,6 +62,24 @@ protected:
};
template <typename ReadStream>
template <typename Timer, typename Rep, typename Period>
void AsioStream<ReadStream>::wait(Timer& timer, const std::chrono::duration<Rep, Period>& duration, std::function<void()> handler)
{
auto self = this->shared_from_this();
timer.expires_after(duration);
timer.async_wait([self, handler = std::move(handler)](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, "AsioStream") << "Error during async wait: " << ec.message() << "\n";
}
else
{
handler();
}
});
}
template <typename ReadStream>
AsioStream<ReadStream>::AsioStream(PcmListener* pcmListener, boost::asio::io_context& ioc, const StreamUri& uri)
@ -82,18 +103,13 @@ template <typename ReadStream>
void AsioStream<ReadStream>::check_state()
{
uint64_t last_read = bytes_read_;
auto self = this->shared_from_this();
state_timer_.expires_after(std::chrono::milliseconds(500 + chunk_ms_));
state_timer_.async_wait([self, this, last_read](const boost::system::error_code& ec) {
if (!ec)
{
wait(state_timer_, std::chrono::milliseconds(500 + chunk_ms_), [this, last_read] {
LOG(DEBUG, "AsioStream") << "check state last: " << last_read << ", read: " << bytes_read_ << "\n";
if (bytes_read_ != last_read)
setState(ReaderState::kPlaying);
else
setState(ReaderState::kIdle);
check_state();
}
});
}

View file

@ -31,7 +31,6 @@
#include <map>
#include <mutex>
#include <string>
#include <thread>
namespace streamreader

View file

@ -29,6 +29,7 @@
using namespace std;
using namespace std::chrono_literals;
namespace streamreader
{
@ -60,18 +61,7 @@ void PosixStream::connect()
catch (const std::exception& e)
{
LOG(ERROR, LOG_TAG) << "Connect exception: " << e.what() << "\n";
auto self = this->shared_from_this();
read_timer_.expires_after(std::chrono::milliseconds(100));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
connect();
}
});
wait(read_timer_, 100ms, [this] { connect(); });
}
}
@ -109,18 +99,7 @@ void PosixStream::do_read()
else if (count < 0)
{
// nothing to read, try again (chunk_ms_ / 2) later
auto self = this->shared_from_this();
read_timer_.expires_after(std::chrono::milliseconds(chunk_ms_ / 2));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
wait(read_timer_, std::chrono::milliseconds(chunk_ms_ / 2), [this] { do_read(); });
return;
}
else if (count == 0)
@ -149,18 +128,7 @@ void PosixStream::do_read()
if (nextTick_ >= currentTick)
{
// synchronize reads to an interval of chunk_ms_
auto self = this->shared_from_this();
read_timer_.expires_after(std::chrono::milliseconds(nextTick_ - currentTick));
read_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (ec)
{
LOG(ERROR, LOG_TAG) << "Error during async wait: " << ec.message() << "\n";
}
else
{
do_read();
}
});
wait(read_timer_, std::chrono::milliseconds(nextTick_ - currentTick), [this] { do_read(); });
return;
}
else
@ -182,7 +150,7 @@ void PosixStream::do_read()
lastException_ = e.what();
}
disconnect();
connect();
wait(read_timer_, 100ms, [this] { connect(); });
}
}

View file

@ -98,11 +98,7 @@ void TcpStream::do_connect()
else
{
LOG(DEBUG) << "Connect failed: " << ec.message() << "\n";
reconnect_timer_.expires_after(std::chrono::milliseconds(1000));
reconnect_timer_.async_wait([self, this](const boost::system::error_code& ec) {
if (!ec)
connect();
});
wait(reconnect_timer_, 1s, [this] { connect(); });
}
});
}
@ -117,4 +113,4 @@ void TcpStream::do_disconnect()
acceptor_->cancel();
reconnect_timer_.cancel();
}
}
} // namespace streamreader