diff --git a/client/clientConnection.cpp b/client/clientConnection.cpp index cd039c9e..4401fe06 100644 --- a/client/clientConnection.cpp +++ b/client/clientConnection.cpp @@ -9,7 +9,7 @@ using namespace std; -ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0), ip(_ip), port(_port) +ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0), ip(_ip), port(_port), readerThread(NULL) { } @@ -29,8 +29,8 @@ void ClientConnection::socketRead(void* _to, size_t _bytes) { // cout << "/"; // cout.flush(); - boost::system::error_code error; - len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error); +// boost::system::error_code error; + len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead)); //cout << "len: " << len << ", error: " << error << endl; toRead = _bytes - len; // cout << "\\"; @@ -45,16 +45,49 @@ void ClientConnection::start() tcp::resolver resolver(io_service); tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast(port)); iterator = resolver.resolve(query); - receiverThread = new thread(&ClientConnection::worker, this); + cout << "connecting\n"; + socket.reset(new tcp::socket(io_service)); +// struct timeval tv; +// tv.tv_sec = 5; +// tv.tv_usec = 0; +// cout << "socket: " << socket->native() << "\n"; +// setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); +// setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + socket->connect(*iterator); + connected_ = true; + cout << "connected\n"; + std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; + active_ = true; + readerThread = new thread(&ClientConnection::reader, this); } void ClientConnection::stop() { + connected_ = false; active_ = false; - socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both); - socket->close(); - receiverThread->join(); + try + { + boost::system::error_code ec; + if (socket) + { + socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + if (ec) cout << "Error in socket shutdown: " << ec << "\n"; + socket->close(ec); + if (ec) cout << "Error in socket close: " << ec << "\n"; + } + if (readerThread) + { + cout << "joining readerThread\n"; + readerThread->join(); + delete readerThread; + } + } + catch(...) + { + } + readerThread = NULL; + cout << "readerThread terminated\n"; } @@ -127,9 +160,6 @@ void ClientConnection::getNextMessage() { if (req->id == baseMessage.refersTo) { -//cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n"; -//long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec); -//cout << "latency: " << latency << "\n"; req->response.reset(new SerializedMessage()); req->response->message = baseMessage; req->response->buffer = (char*)malloc(baseMessage.size); @@ -147,45 +177,25 @@ void ClientConnection::getNextMessage() -void ClientConnection::worker() +void ClientConnection::reader() { - active_ = true; - while (active_) + try { - connected_ = false; - try + while(active_) { - { -// std::unique_lock mlock(mutex_); - cout << "connecting\n"; - socket.reset(new tcp::socket(io_service)); - struct timeval tv; - tv.tv_sec = 5; - tv.tv_usec = 0; - cout << "socket: " << socket->native() << "\n"; - setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - socket->connect(*iterator); - connected_ = true; - cout << "connected\n"; - std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl; - } - while(active_) - { -// cout << "."; -// cout.flush(); - getNextMessage(); -// cout << "|"; -// cout.flush(); - } - } - catch (const std::exception& e) - { - connected_ = false; - cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl; - usleep(1000*1000); + getNextMessage(); } } + catch (const std::exception& e) + { + if (messageReceiver != NULL) + messageReceiver->onException(this, e); + } + catch (...) + { + } + connected_ = false; + active_ = false; } diff --git a/client/clientConnection.h b/client/clientConnection.h index 65ff2039..c89042ac 100644 --- a/client/clientConnection.h +++ b/client/clientConnection.h @@ -32,6 +32,7 @@ class MessageReceiver { public: virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0; + virtual void onException(ClientConnection* connection, const std::exception& exception) = 0; }; @@ -68,7 +69,7 @@ public: } protected: - virtual void worker(); + virtual void reader(); void socketRead(void* _to, size_t _bytes); std::shared_ptr socket; @@ -80,13 +81,13 @@ protected: void getNextMessage(); boost::asio::io_service io_service; tcp::resolver::iterator iterator; - std::thread* receiverThread; mutable std::mutex mutex_; std::mutex m; std::set> pendingRequests; uint16_t reqId; std::string ip; size_t port; + std::thread* readerThread; }; diff --git a/client/controller.cpp b/client/controller.cpp index 93aa4db1..f041339f 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -19,12 +19,19 @@ Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL) } +void Controller::onException(ClientConnection* connection, const std::exception& exception) +{ + cout << "onException: " << exception.what() << "\n"; +} + + void Controller::onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) { if (baseMessage.type == message_type::payload) { if ((stream != NULL) && (decoder != NULL)) { +cout << "."; PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0); pcmChunk->deserialize(baseMessage, buffer); //cout << "chunk: " << pcmChunk->payloadSize; @@ -64,9 +71,9 @@ void Controller::worker() while (active_) { - clientConnection->start(); try { + clientConnection->start(); RequestMsg requestMsg("serverSettings"); shared_ptr serverSettings(NULL); while (!(serverSettings = clientConnection->sendReq(&requestMsg, 1000))); @@ -119,13 +126,14 @@ void Controller::worker() } catch (const std::exception& e) { + cout << "Exception in Controller::worker(): " << e.what() << "\n"; cout << "Stopping player\n"; player.stop(); cout << "Deleting stream\n"; delete stream; stream = NULL; cout << "done\n"; - throw e; + throw; } } catch (const std::exception& e) @@ -134,6 +142,9 @@ void Controller::worker() if (decoder != NULL) delete decoder; decoder = NULL; + cout << "Stopping clientConnection\n"; + clientConnection->stop(); + cout << "done\n"; usleep(1000000); } } diff --git a/client/controller.h b/client/controller.h index ee8bea88..3737af5a 100644 --- a/client/controller.h +++ b/client/controller.h @@ -16,6 +16,7 @@ public: void start(const std::string& _ip, size_t _port, int _bufferMs); void stop(); virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer); + virtual void onException(ClientConnection* connection, const std::exception& exception); private: void worker(); diff --git a/client/oggDecoder.cpp b/client/oggDecoder.cpp index 60e93e5f..206e7284 100644 --- a/client/oggDecoder.cpp +++ b/client/oggDecoder.cpp @@ -8,7 +8,7 @@ using namespace std; -OggDecoder::OggDecoder() : Decoder() +OggDecoder::OggDecoder() : Decoder(), buffer(NULL) { ogg_sync_init(&oy); /* Now we can read pages */ convsize = 4096; @@ -19,7 +19,22 @@ OggDecoder::OggDecoder() : Decoder() OggDecoder::~OggDecoder() { // ogg_sync_init(&oy); /* Now we can read pages */ - delete convbuffer; + free(convbuffer); +// if (buffer != NULL) +// free(buffer); +cout << "1\n" << flush; + vorbis_block_clear(&vb); +cout << "2\n" << flush; + vorbis_dsp_clear(&vd); +cout << "3\n" << flush; + ogg_stream_clear(&os); +cout << "4\n" << flush; + vorbis_comment_clear(&vc); +cout << "5\n" << flush; + vorbis_info_clear(&vi); /* must be called last */ +cout << "6\n" << flush; + ogg_sync_clear(&oy); +cout << "7\n" << flush; }