- made unit testing easier by (mostly) removing the event queue singleton.

- fixed code style in many places (mostly indentation).
This commit is contained in:
Nick Bolton 2013-06-29 14:17:49 +00:00
parent 13b2649fa0
commit 608074c041
143 changed files with 2220 additions and 2163 deletions

View file

@ -35,7 +35,9 @@
// CTCPSocket
//
CTCPSocket::CTCPSocket() :
CTCPSocket::CTCPSocket(IEventQueue* events) :
IDataSocket(events),
m_events(events),
m_mutex(),
m_flushed(&m_mutex, true)
{
@ -49,7 +51,9 @@ CTCPSocket::CTCPSocket() :
init();
}
CTCPSocket::CTCPSocket(CArchSocket socket) :
CTCPSocket::CTCPSocket(IEventQueue* events, CArchSocket socket) :
IDataSocket(events),
m_events(events),
m_mutex(),
m_socket(socket),
m_flushed(&m_mutex, true)
@ -96,7 +100,7 @@ CTCPSocket::close()
// clear buffers and enter disconnected state
if (m_connected) {
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
}
onDisconnected();
@ -136,7 +140,7 @@ CTCPSocket::read(void* buffer, UInt32 n)
// if no more data and we cannot read or write then send disconnected
if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
@ -152,7 +156,7 @@ CTCPSocket::write(const void* buffer, UInt32 n)
// must not have shutdown output
if (!m_writable) {
sendEvent(getOutputErrorEvent());
sendEvent(m_events->forIStream().outputError());
return;
}
@ -201,7 +205,7 @@ CTCPSocket::shutdownInput()
// shutdown buffer for reading
if (m_readable) {
sendEvent(getInputShutdownEvent());
sendEvent(m_events->forIStream().inputShutdown());
onInputShutdown();
useNewJob = true;
}
@ -228,7 +232,7 @@ CTCPSocket::shutdownOutput()
// shutdown buffer for writing
if (m_writable) {
sendEvent(getOutputShutdownEvent());
sendEvent(m_events->forIStream().outputShutdown());
onOutputShutdown();
useNewJob = true;
}
@ -266,7 +270,7 @@ CTCPSocket::connect(const CNetworkAddress& addr)
try {
if (ARCH->connectSocket(m_socket, addr.getAddress())) {
sendEvent(getConnectedEvent());
sendEvent(m_events->forIDataSocket().connected());
onConnected();
}
else {
@ -351,14 +355,14 @@ void
CTCPSocket::sendConnectionFailedEvent(const char* msg)
{
CConnectionFailedInfo* info = new CConnectionFailedInfo(msg);
EVENTQUEUE->addEvent(CEvent(getConnectionFailedEvent(),
m_events->addEvent(CEvent(m_events->forIDataSocket().connectionFailed(),
getEventTarget(), info, CEvent::kDontFreeData));
}
void
CTCPSocket::sendEvent(CEvent::Type type)
{
EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
m_events->addEvent(CEvent(type, getEventTarget(), NULL));
}
void
@ -432,7 +436,7 @@ CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
}
if (write) {
sendEvent(getConnectedEvent());
sendEvent(m_events->forIDataSocket().connected());
onConnected();
return newJob();
}
@ -447,7 +451,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
CLock lock(&m_mutex);
if (error) {
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
return newJob();
}
@ -465,7 +469,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
if (n > 0) {
m_outputBuffer.pop(n);
if (m_outputBuffer.getSize() == 0) {
sendEvent(getOutputFlushedEvent());
sendEvent(m_events->forIStream().outputFlushed());
m_flushed = true;
m_flushed.broadcast();
needNewJob = true;
@ -476,9 +480,9 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
// remote read end of stream hungup. our output side
// has therefore shutdown.
onOutputShutdown();
sendEvent(getOutputShutdownEvent());
sendEvent(m_events->forIStream().outputShutdown());
if (!m_readable && m_inputBuffer.getSize() == 0) {
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
needNewJob = true;
@ -486,15 +490,15 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
catch (XArchNetworkDisconnected&) {
// stream hungup
onDisconnected();
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
needNewJob = true;
}
catch (XArchNetwork& e) {
// other write error
LOG((CLOG_WARN "error writing socket: %s", e.what().c_str()));
onDisconnected();
sendEvent(getOutputErrorEvent());
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forIStream().outputError());
sendEvent(m_events->forISocket().disconnected());
needNewJob = true;
}
}
@ -514,16 +518,16 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
// send input ready if input buffer was empty
if (wasEmpty) {
sendEvent(getInputReadyEvent());
sendEvent(m_events->forIStream().inputReady());
}
}
else {
// remote write end of stream hungup. our input side
// has therefore shutdown but don't flush our buffer
// since there's still data to be read.
sendEvent(getInputShutdownEvent());
sendEvent(m_events->forIStream().inputShutdown());
if (!m_writable && m_inputBuffer.getSize() == 0) {
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
m_connected = false;
}
m_readable = false;
@ -532,7 +536,7 @@ CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
}
catch (XArchNetworkDisconnected&) {
// stream hungup
sendEvent(getDisconnectedEvent());
sendEvent(m_events->forISocket().disconnected());
onDisconnected();
needNewJob = true;
}