diff --git a/client/controller.cpp b/client/controller.cpp index 64793478..61c2a94f 100644 --- a/client/controller.cpp +++ b/client/controller.cpp @@ -10,6 +10,7 @@ #include "common/serverSettings.h" #include "common/timeMsg.h" #include "common/requestMsg.h" +#include "common/ackMsg.h" using namespace std; @@ -109,6 +110,10 @@ void Controller::worker() Player player(stream); player.start(); + RequestMsg startStream("startStream"); + shared_ptr ackMsg(NULL); + while (!(ackMsg = clientConnection->sendReq(&startStream, 1000))); + try { while (active_) diff --git a/common/ackMsg.h b/common/ackMsg.h new file mode 100644 index 00000000..c383eace --- /dev/null +++ b/common/ackMsg.h @@ -0,0 +1,42 @@ +#ifndef ACK_MSG_H +#define ACK_MSG_H + +#include "message.h" + + +class AckMsg : public BaseMessage +{ +public: + AckMsg() : BaseMessage(message_type::ackMsg) + { + } + + virtual ~AckMsg() + { + } + + virtual void read(std::istream& stream) + { +// stream.read(reinterpret_cast(&latency), sizeof(double)); + } + + virtual uint32_t getSize() + { + return 0;//sizeof(double); + } + +// double latency; + +protected: + virtual void doserialize(std::ostream& stream) + { +// stream.write(reinterpret_cast(&latency), sizeof(double)); + } +}; + + + + +#endif + + diff --git a/common/message.h b/common/message.h index 109685ab..58571608 100644 --- a/common/message.h +++ b/common/message.h @@ -36,7 +36,8 @@ enum message_type sampleformat = 3, serversettings = 4, timemsg = 5, - requestmsg = 6 + requestmsg = 6, + ackMsg = 7 }; diff --git a/common/queue.h b/common/queue.h index 882128f8..4af2c009 100644 --- a/common/queue.h +++ b/common/queue.h @@ -60,7 +60,7 @@ public: { std::unique_lock mlock(mutex_); queue_.push(item); -// mlock.unlock(); + mlock.unlock(); cond_.notify_one(); } @@ -68,7 +68,7 @@ public: { std::unique_lock mlock(mutex_); queue_.push(std::move(item)); -// mlock.unlock(); + mlock.unlock(); cond_.notify_one(); } diff --git a/server/controlServer.cpp b/server/controlServer.cpp index d43e3317..94b91fe9 100644 --- a/server/controlServer.cpp +++ b/server/controlServer.cpp @@ -1,5 +1,6 @@ #include "controlServer.h" #include "common/timeMsg.h" +#include "common/ackMsg.h" #include "common/requestMsg.h" #include @@ -13,6 +14,7 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL void ControlServer::send(shared_ptr message) { + std::unique_lock mlock(mutex); for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) { if (!(*it)->active()) @@ -63,6 +65,13 @@ void ControlServer::onMessageReceived(ServerSession* connection, const BaseMessa headerChunk->refersTo = requestMsg.id; connection->send(headerChunk); } + else if (requestMsg.request == "startStream") + { + AckMsg ackMsg; + ackMsg.refersTo = requestMsg.id; + connection->send(&ackMsg); + connection->setStreamActive(true); + } } } @@ -81,8 +90,11 @@ void ControlServer::acceptor() setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n"; ServerSession* session = new ServerSession(this, sock); - sessions.insert(shared_ptr(session)); - session->start(); + { + std::unique_lock mlock(mutex); + sessions.insert(shared_ptr(session)); + session->start(); + } } } diff --git a/server/controlServer.h b/server/controlServer.h index 0eb0aae5..dabcc3a2 100644 --- a/server/controlServer.h +++ b/server/controlServer.h @@ -7,6 +7,7 @@ #include #include #include +#include #include "serverSession.h" #include "common/timeUtils.h" @@ -37,6 +38,7 @@ public: private: void acceptor(); + mutable std::mutex mutex; set> sessions; boost::asio::io_service io_service_; unsigned short port_; diff --git a/server/serverSession.cpp b/server/serverSession.cpp index 93516d95..6edbd9b2 100644 --- a/server/serverSession.cpp +++ b/server/serverSession.cpp @@ -18,6 +18,7 @@ ServerSession::ServerSession(MessageReceiver* _receiver, std::shared_ptr message) { - if (!message) + if (!message || !streamActive) return; while (messages.size() > 100)// chunk->getDuration() > 10000) diff --git a/server/serverSession.h b/server/serverSession.h index d14333ce..9ff7cc2d 100644 --- a/server/serverSession.h +++ b/server/serverSession.h @@ -43,6 +43,11 @@ public: return active_; } + virtual void setStreamActive(bool active) + { + streamActive = active; + } + protected: void socketRead(void* _to, size_t _bytes); @@ -51,6 +56,7 @@ protected: void writer(); std::atomic active_; + std::atomic streamActive; mutable std::mutex mutex_; std::thread* readerThread; std::thread* writerThread;