mirror of
https://github.com/badaix/snapcast.git
synced 2025-05-21 04:56:13 +02:00
sockets and mem leaks
git-svn-id: svn://elaine/murooma/trunk@279 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
parent
c7655f3e55
commit
76557f091c
5 changed files with 88 additions and 50 deletions
|
@ -9,7 +9,7 @@
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
|
||||||
ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0), ip(_ip), port(_port)
|
ClientConnection::ClientConnection(MessageReceiver* _receiver, const std::string& _ip, size_t _port) : active_(false), connected_(false), messageReceiver(_receiver), reqId(0), ip(_ip), port(_port), readerThread(NULL)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,8 +29,8 @@ void ClientConnection::socketRead(void* _to, size_t _bytes)
|
||||||
{
|
{
|
||||||
// cout << "/";
|
// cout << "/";
|
||||||
// cout.flush();
|
// cout.flush();
|
||||||
boost::system::error_code error;
|
// boost::system::error_code error;
|
||||||
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead), error);
|
len += socket->read_some(boost::asio::buffer((char*)_to + len, toRead));
|
||||||
//cout << "len: " << len << ", error: " << error << endl;
|
//cout << "len: " << len << ", error: " << error << endl;
|
||||||
toRead = _bytes - len;
|
toRead = _bytes - len;
|
||||||
// cout << "\\";
|
// cout << "\\";
|
||||||
|
@ -45,16 +45,49 @@ void ClientConnection::start()
|
||||||
tcp::resolver resolver(io_service);
|
tcp::resolver resolver(io_service);
|
||||||
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
|
tcp::resolver::query query(tcp::v4(), ip, boost::lexical_cast<string>(port));
|
||||||
iterator = resolver.resolve(query);
|
iterator = resolver.resolve(query);
|
||||||
receiverThread = new thread(&ClientConnection::worker, this);
|
cout << "connecting\n";
|
||||||
|
socket.reset(new tcp::socket(io_service));
|
||||||
|
// struct timeval tv;
|
||||||
|
// tv.tv_sec = 5;
|
||||||
|
// tv.tv_usec = 0;
|
||||||
|
// cout << "socket: " << socket->native() << "\n";
|
||||||
|
// setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
|
||||||
|
// setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
|
||||||
|
socket->connect(*iterator);
|
||||||
|
connected_ = true;
|
||||||
|
cout << "connected\n";
|
||||||
|
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
|
||||||
|
active_ = true;
|
||||||
|
readerThread = new thread(&ClientConnection::reader, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::stop()
|
void ClientConnection::stop()
|
||||||
{
|
{
|
||||||
|
connected_ = false;
|
||||||
active_ = false;
|
active_ = false;
|
||||||
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
|
try
|
||||||
socket->close();
|
{
|
||||||
receiverThread->join();
|
boost::system::error_code ec;
|
||||||
|
if (socket)
|
||||||
|
{
|
||||||
|
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||||
|
if (ec) cout << "Error in socket shutdown: " << ec << "\n";
|
||||||
|
socket->close(ec);
|
||||||
|
if (ec) cout << "Error in socket close: " << ec << "\n";
|
||||||
|
}
|
||||||
|
if (readerThread)
|
||||||
|
{
|
||||||
|
cout << "joining readerThread\n";
|
||||||
|
readerThread->join();
|
||||||
|
delete readerThread;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch(...)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
readerThread = NULL;
|
||||||
|
cout << "readerThread terminated\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -127,9 +160,6 @@ void ClientConnection::getNextMessage()
|
||||||
{
|
{
|
||||||
if (req->id == baseMessage.refersTo)
|
if (req->id == baseMessage.refersTo)
|
||||||
{
|
{
|
||||||
//cout << "getNextMessage response: " << baseMessage.type << ", size: " << baseMessage.size << "\n";
|
|
||||||
//long latency = (baseMessage.received.sec - baseMessage.sent.sec) * 1000000 + (baseMessage.received.usec - baseMessage.sent.usec);
|
|
||||||
//cout << "latency: " << latency << "\n";
|
|
||||||
req->response.reset(new SerializedMessage());
|
req->response.reset(new SerializedMessage());
|
||||||
req->response->message = baseMessage;
|
req->response->message = baseMessage;
|
||||||
req->response->buffer = (char*)malloc(baseMessage.size);
|
req->response->buffer = (char*)malloc(baseMessage.size);
|
||||||
|
@ -147,45 +177,25 @@ void ClientConnection::getNextMessage()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void ClientConnection::worker()
|
void ClientConnection::reader()
|
||||||
{
|
{
|
||||||
active_ = true;
|
|
||||||
while (active_)
|
|
||||||
{
|
|
||||||
connected_ = false;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
{
|
|
||||||
// std::unique_lock<std::mutex> mlock(mutex_);
|
|
||||||
cout << "connecting\n";
|
|
||||||
socket.reset(new tcp::socket(io_service));
|
|
||||||
struct timeval tv;
|
|
||||||
tv.tv_sec = 5;
|
|
||||||
tv.tv_usec = 0;
|
|
||||||
cout << "socket: " << socket->native() << "\n";
|
|
||||||
setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
|
|
||||||
setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
|
|
||||||
socket->connect(*iterator);
|
|
||||||
connected_ = true;
|
|
||||||
cout << "connected\n";
|
|
||||||
std::clog << kLogNotice << "connected\n";// to " << ip << ":" << port << std::endl;
|
|
||||||
}
|
|
||||||
while(active_)
|
while(active_)
|
||||||
{
|
{
|
||||||
// cout << ".";
|
|
||||||
// cout.flush();
|
|
||||||
getNextMessage();
|
getNextMessage();
|
||||||
// cout << "|";
|
|
||||||
// cout.flush();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
{
|
{
|
||||||
|
if (messageReceiver != NULL)
|
||||||
|
messageReceiver->onException(this, e);
|
||||||
|
}
|
||||||
|
catch (...)
|
||||||
|
{
|
||||||
|
}
|
||||||
connected_ = false;
|
connected_ = false;
|
||||||
cout << kLogNotice << "Exception: " << e.what() << ", trying to reconnect" << std::endl;
|
active_ = false;
|
||||||
usleep(1000*1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ class MessageReceiver
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0;
|
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer) = 0;
|
||||||
|
virtual void onException(ClientConnection* connection, const std::exception& exception) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -68,7 +69,7 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void worker();
|
virtual void reader();
|
||||||
|
|
||||||
void socketRead(void* _to, size_t _bytes);
|
void socketRead(void* _to, size_t _bytes);
|
||||||
std::shared_ptr<tcp::socket> socket;
|
std::shared_ptr<tcp::socket> socket;
|
||||||
|
@ -80,13 +81,13 @@ protected:
|
||||||
void getNextMessage();
|
void getNextMessage();
|
||||||
boost::asio::io_service io_service;
|
boost::asio::io_service io_service;
|
||||||
tcp::resolver::iterator iterator;
|
tcp::resolver::iterator iterator;
|
||||||
std::thread* receiverThread;
|
|
||||||
mutable std::mutex mutex_;
|
mutable std::mutex mutex_;
|
||||||
std::mutex m;
|
std::mutex m;
|
||||||
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
|
std::set<std::shared_ptr<PendingRequest>> pendingRequests;
|
||||||
uint16_t reqId;
|
uint16_t reqId;
|
||||||
std::string ip;
|
std::string ip;
|
||||||
size_t port;
|
size_t port;
|
||||||
|
std::thread* readerThread;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,12 +19,19 @@ Controller::Controller() : MessageReceiver(), active_(false), sampleFormat(NULL)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Controller::onException(ClientConnection* connection, const std::exception& exception)
|
||||||
|
{
|
||||||
|
cout << "onException: " << exception.what() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void Controller::onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer)
|
void Controller::onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer)
|
||||||
{
|
{
|
||||||
if (baseMessage.type == message_type::payload)
|
if (baseMessage.type == message_type::payload)
|
||||||
{
|
{
|
||||||
if ((stream != NULL) && (decoder != NULL))
|
if ((stream != NULL) && (decoder != NULL))
|
||||||
{
|
{
|
||||||
|
cout << ".";
|
||||||
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
|
PcmChunk* pcmChunk = new PcmChunk(*sampleFormat, 0);
|
||||||
pcmChunk->deserialize(baseMessage, buffer);
|
pcmChunk->deserialize(baseMessage, buffer);
|
||||||
//cout << "chunk: " << pcmChunk->payloadSize;
|
//cout << "chunk: " << pcmChunk->payloadSize;
|
||||||
|
@ -64,9 +71,9 @@ void Controller::worker()
|
||||||
|
|
||||||
while (active_)
|
while (active_)
|
||||||
{
|
{
|
||||||
clientConnection->start();
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
clientConnection->start();
|
||||||
RequestMsg requestMsg("serverSettings");
|
RequestMsg requestMsg("serverSettings");
|
||||||
shared_ptr<ServerSettings> serverSettings(NULL);
|
shared_ptr<ServerSettings> serverSettings(NULL);
|
||||||
while (!(serverSettings = clientConnection->sendReq<ServerSettings>(&requestMsg, 1000)));
|
while (!(serverSettings = clientConnection->sendReq<ServerSettings>(&requestMsg, 1000)));
|
||||||
|
@ -119,13 +126,14 @@ void Controller::worker()
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
{
|
{
|
||||||
|
cout << "Exception in Controller::worker(): " << e.what() << "\n";
|
||||||
cout << "Stopping player\n";
|
cout << "Stopping player\n";
|
||||||
player.stop();
|
player.stop();
|
||||||
cout << "Deleting stream\n";
|
cout << "Deleting stream\n";
|
||||||
delete stream;
|
delete stream;
|
||||||
stream = NULL;
|
stream = NULL;
|
||||||
cout << "done\n";
|
cout << "done\n";
|
||||||
throw e;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const std::exception& e)
|
catch (const std::exception& e)
|
||||||
|
@ -134,6 +142,9 @@ void Controller::worker()
|
||||||
if (decoder != NULL)
|
if (decoder != NULL)
|
||||||
delete decoder;
|
delete decoder;
|
||||||
decoder = NULL;
|
decoder = NULL;
|
||||||
|
cout << "Stopping clientConnection\n";
|
||||||
|
clientConnection->stop();
|
||||||
|
cout << "done\n";
|
||||||
usleep(1000000);
|
usleep(1000000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@ public:
|
||||||
void start(const std::string& _ip, size_t _port, int _bufferMs);
|
void start(const std::string& _ip, size_t _port, int _bufferMs);
|
||||||
void stop();
|
void stop();
|
||||||
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer);
|
virtual void onMessageReceived(ClientConnection* connection, const BaseMessage& baseMessage, char* buffer);
|
||||||
|
virtual void onException(ClientConnection* connection, const std::exception& exception);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void worker();
|
void worker();
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
|
||||||
OggDecoder::OggDecoder() : Decoder()
|
OggDecoder::OggDecoder() : Decoder(), buffer(NULL)
|
||||||
{
|
{
|
||||||
ogg_sync_init(&oy); /* Now we can read pages */
|
ogg_sync_init(&oy); /* Now we can read pages */
|
||||||
convsize = 4096;
|
convsize = 4096;
|
||||||
|
@ -19,7 +19,22 @@ OggDecoder::OggDecoder() : Decoder()
|
||||||
OggDecoder::~OggDecoder()
|
OggDecoder::~OggDecoder()
|
||||||
{
|
{
|
||||||
// ogg_sync_init(&oy); /* Now we can read pages */
|
// ogg_sync_init(&oy); /* Now we can read pages */
|
||||||
delete convbuffer;
|
free(convbuffer);
|
||||||
|
// if (buffer != NULL)
|
||||||
|
// free(buffer);
|
||||||
|
cout << "1\n" << flush;
|
||||||
|
vorbis_block_clear(&vb);
|
||||||
|
cout << "2\n" << flush;
|
||||||
|
vorbis_dsp_clear(&vd);
|
||||||
|
cout << "3\n" << flush;
|
||||||
|
ogg_stream_clear(&os);
|
||||||
|
cout << "4\n" << flush;
|
||||||
|
vorbis_comment_clear(&vc);
|
||||||
|
cout << "5\n" << flush;
|
||||||
|
vorbis_info_clear(&vi); /* must be called last */
|
||||||
|
cout << "6\n" << flush;
|
||||||
|
ogg_sync_clear(&oy);
|
||||||
|
cout << "7\n" << flush;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue