diff --git a/server/stream_session.cpp b/server/stream_session.cpp index 4d3d8f4a..899513bc 100644 --- a/server/stream_session.cpp +++ b/server/stream_session.cpp @@ -21,7 +21,6 @@ #include "common/aixlog.hpp" #include "message/pcm_chunk.hpp" #include -#include using namespace std; @@ -57,15 +56,15 @@ void StreamSession::read_next() << ", refers: " << baseMessage_.refersTo << "\n"; if (baseMessage_.type > message_type::kLast) { - stringstream ss; - ss << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size; - throw std::runtime_error(ss.str().c_str()); + LOG(ERROR) << "unknown message type received: " << baseMessage_.type << ", size: " << baseMessage_.size << "\n"; + messageReceiver_->onDisconnect(this); + return; } else if (baseMessage_.size > msg::max_size) { - stringstream ss; - ss << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size; - throw std::runtime_error(ss.str().c_str()); + LOG(ERROR) << "received message of type " << baseMessage_.type << " to large: " << baseMessage_.size << "\n"; + messageReceiver_->onDisconnect(this); + return; } if (baseMessage_.size > buffer_.size()) @@ -129,7 +128,6 @@ void StreamSession::send_next() auto buffer = messages_.front(); boost::asio::async_write(socket_, buffer, boost::asio::bind_executor(strand_, [this, self, buffer](boost::system::error_code ec, std::size_t length) { - // boost::asio::async_write(socket_, *buffer, [this, self](boost::system::error_code ec, std::size_t length) { messages_.pop_front(); if (ec) { @@ -143,10 +141,13 @@ void StreamSession::send_next() } -void StreamSession::sendAsync(shared_const_buffer const_buf) +void StreamSession::sendAsync(shared_const_buffer const_buf, bool send_now) { - strand_.post([this, const_buf]() { - messages_.push_back(const_buf); + strand_.post([this, const_buf, send_now]() { + if (send_now) + messages_.push_front(const_buf); + else + messages_.push_back(const_buf); if (messages_.size() > 1) { LOG(DEBUG) << "outstanding async_write\n"; @@ -166,7 +167,6 @@ void StreamSession::sendAsync(msg::message_ptr message, bool sendNow) message->sent = t; std::ostringstream oss; message->serialize(oss); - sendAsync(shared_const_buffer(oss.str())); } @@ -175,15 +175,3 @@ void StreamSession::setBufferMs(size_t bufferMs) { bufferMs_ = bufferMs; } - - -bool StreamSession::send(msg::message_ptr message) -{ - sendAsync(message); - // // TODO on exception: set active = false - // // LOG(INFO) << "send: " << message->type << ", size: " << message->getSize() << ", id: " << message->id << ", refers: " << message->refersTo << - // "\n"; boost::asio::streambuf streambuf; std::ostream stream(&streambuf); tv t; message->sent = t; message->serialize(stream); boost::asio::write(socket_, - // streambuf); - // // LOG(INFO) << "done: " << message->type << ", size: " << message->size << ", id: " << message->id << ", refers: " << message->refersTo << "\n"; - // return true; -} diff --git a/server/stream_session.hpp b/server/stream_session.hpp index b1e26e81..77db3288 100644 --- a/server/stream_session.hpp +++ b/server/stream_session.hpp @@ -92,16 +92,11 @@ public: void start(); void stop(); - /// Sends a message to the client (synchronous) - bool send(msg::message_ptr message); + /// Sends a message to the client (asynchronous) + void sendAsync(msg::message_ptr message, bool send_now = false); /// Sends a message to the client (asynchronous) - void sendAsync(msg::message_ptr message, bool sendNow = false); - - /// Sends a message to the client (asynchronous) - // void sendAsync(std::shared_ptr sb); - - void sendAsync(shared_const_buffer const_buf); + void sendAsync(shared_const_buffer const_buf, bool send_now = false); /// Max playout latency. No need to send PCM data that is older than bufferMs void setBufferMs(size_t bufferMs);