remove StreamSession::send

This commit is contained in:
badaix 2019-10-23 09:07:46 +02:00
parent 2f06ebed04
commit b6142967c7
2 changed files with 15 additions and 32 deletions

View file

@ -21,7 +21,6 @@
#include "common/aixlog.hpp" #include "common/aixlog.hpp"
#include "message/pcm_chunk.hpp" #include "message/pcm_chunk.hpp"
#include <iostream> #include <iostream>
#include <mutex>
using namespace std; using namespace std;
@ -57,15 +56,15 @@ void StreamSession::read_next()
<< ", refers: " << baseMessage_.refersTo << "\n"; << ", refers: " << baseMessage_.refersTo << "\n";
if (baseMessage_.type > message_type::kLast) if (baseMessage_.type > message_type::kLast)
{ {
stringstream ss; LOG(ERROR) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n";
ss << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size; messageReceiver_->onDisconnect(this);
throw std::runtime_error(ss.str().c_str()); return;
} }
else if (baseMessage_.size > msg::max_size) else if (baseMessage_.size > msg::max_size)
{ {
stringstream ss; LOG(ERROR) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n";
ss << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size; messageReceiver_->onDisconnect(this);
throw std::runtime_error(ss.str().c_str()); return;
} }
if (baseMessage_.size > buffer_.size()) if (baseMessage_.size > buffer_.size())
@ -129,7 +128,6 @@ void StreamSession::send_next()
auto buffer = messages_.front(); auto buffer = messages_.front();
boost::asio::async_write(socket_, buffer, boost::asio::bind_executor(strand_, [this, self, buffer](boost::system::error_code ec, std::size_t length) { boost::asio::async_write(socket_, buffer, boost::asio::bind_executor(strand_, [this, self, buffer](boost::system::error_code ec, std::size_t length) {
// boost::asio::async_write(socket_, *buffer, [this, self](boost::system::error_code ec, std::size_t length) {
messages_.pop_front(); messages_.pop_front();
if (ec) if (ec)
{ {
@ -143,10 +141,13 @@ void StreamSession::send_next()
} }
void StreamSession::sendAsync(shared_const_buffer const_buf) void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now)
{ {
strand_.post([this, const_buf]() { strand_.post([this, const_buf, send_now]() {
messages_.push_back(const_buf); if (send_now)
messages_.push_front(const_buf);
else
messages_.push_back(const_buf);
if (messages_.size() > 1) if (messages_.size() > 1)
{ {
LOG(DEBUG) << "outstanding async_write\n"; LOG(DEBUG) << "outstanding async_write\n";
@ -166,7 +167,6 @@ void StreamSession::sendAsync(msg::message_ptr message, bool sendNow)
message->sent = t; message->sent = t;
std::ostringstream oss; std::ostringstream oss;
message->serialize(oss); message->serialize(oss);
sendAsync(shared_const_buffer(oss.str())); sendAsync(shared_const_buffer(oss.str()));
} }
@ -175,15 +175,3 @@ void StreamSession::setBufferMs(size_t bufferMs)
{ {
bufferMs_ = bufferMs; bufferMs_ = bufferMs;
} }
bool StreamSession::send(msg::message_ptr message)
{
sendAsync(message);
// // TODO on exception: set active = false
// // LOG(INFO) << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo <<
// "\n"; boost::asio::streambuf streambuf; std::ostream stream(&streambuf); tv t; message->sent = t; message->serialize(stream); boost::asio::write(socket_,
// streambuf);
// // LOG(INFO) << "done: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n";
// return true;
}

View file

@ -92,16 +92,11 @@ public:
void start(); void start();
void stop(); void stop();
/// Sends a message to the client (synchronous) /// Sends a message to the client (asynchronous)
bool send(msg::message_ptr message); void sendAsync(msg::message_ptr message, bool send_now = false);
/// Sends a message to the client (asynchronous) /// Sends a message to the client (asynchronous)
void sendAsync(msg::message_ptr message, bool sendNow = false); void sendAsync(shared_const_buffer const_buf, bool send_now = false);
/// Sends a message to the client (asynchronous)
// void sendAsync(std::shared_ptr<boost::asio::streambuf> sb);
void sendAsync(shared_const_buffer const_buf);
/// Max playout latency. No need to send PCM data that is older than bufferMs /// Max playout latency. No need to send PCM data that is older than bufferMs
void setBufferMs(size_t bufferMs); void setBufferMs(size_t bufferMs);