start stream

git-svn-id: svn://elaine/murooma/trunk@282 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-21 07:45:09 +00:00
parent ebfcbf5c26
commit 001bfa6aba
8 changed files with 75 additions and 6 deletions

View file

@ -10,6 +10,7 @@
#include "common/serverSettings.h" #include "common/serverSettings.h"
#include "common/timeMsg.h" #include "common/timeMsg.h"
#include "common/requestMsg.h" #include "common/requestMsg.h"
#include "common/ackMsg.h"
using namespace std; using namespace std;
@ -109,6 +110,10 @@ void Controller::worker()
Player player(stream); Player player(stream);
player.start(); player.start();
RequestMsg startStream("startStream");
shared_ptr<AckMsg> ackMsg(NULL);
while (!(ackMsg = clientConnection->sendReq<AckMsg>(&startStream, 1000)));
try try
{ {
while (active_) while (active_)

42
common/ackMsg.h Normal file
View file

@ -0,0 +1,42 @@
#ifndef ACK_MSG_H
#define ACK_MSG_H
#include "message.h"
class AckMsg : public BaseMessage
{
public:
AckMsg() : BaseMessage(message_type::ackMsg)
{
}
virtual ~AckMsg()
{
}
virtual void read(std::istream& stream)
{
// stream.read(reinterpret_cast<char *>(&latency), sizeof(double));
}
virtual uint32_t getSize()
{
return 0;//sizeof(double);
}
// double latency;
protected:
virtual void doserialize(std::ostream& stream)
{
// stream.write(reinterpret_cast<char *>(&latency), sizeof(double));
}
};
#endif

View file

@ -36,7 +36,8 @@ enum message_type
sampleformat = 3, sampleformat = 3,
serversettings = 4, serversettings = 4,
timemsg = 5, timemsg = 5,
requestmsg = 6 requestmsg = 6,
ackMsg = 7
}; };

View file

@ -60,7 +60,7 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item); queue_.push(item);
// mlock.unlock(); mlock.unlock();
cond_.notify_one(); cond_.notify_one();
} }
@ -68,7 +68,7 @@ public:
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(std::move(item)); queue_.push(std::move(item));
// mlock.unlock(); mlock.unlock();
cond_.notify_one(); cond_.notify_one();
} }

View file

@ -1,5 +1,6 @@
#include "controlServer.h" #include "controlServer.h"
#include "common/timeMsg.h" #include "common/timeMsg.h"
#include "common/ackMsg.h"
#include "common/requestMsg.h" #include "common/requestMsg.h"
#include <iostream> #include <iostream>
@ -13,6 +14,7 @@ ControlServer::ControlServer(unsigned short port) : port_(port), headerChunk(NUL
void ControlServer::send(shared_ptr<BaseMessage> message) void ControlServer::send(shared_ptr<BaseMessage> message)
{ {
std::unique_lock<std::mutex> mlock(mutex);
for (std::set<shared_ptr<ServerSession>>::iterator it = sessions.begin(); it != sessions.end(); ) for (std::set<shared_ptr<ServerSession>>::iterator it = sessions.begin(); it != sessions.end(); )
{ {
if (!(*it)->active()) if (!(*it)->active())
@ -63,6 +65,13 @@ void ControlServer::onMessageReceived(ServerSession* connection, const BaseMessa
headerChunk->refersTo = requestMsg.id; headerChunk->refersTo = requestMsg.id;
connection->send(headerChunk); connection->send(headerChunk);
} }
else if (requestMsg.request == "startStream")
{
AckMsg ackMsg;
ackMsg.refersTo = requestMsg.id;
connection->send(&ackMsg);
connection->setStreamActive(true);
}
} }
} }
@ -81,9 +90,12 @@ void ControlServer::acceptor()
setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); setsockopt(sock->native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n"; cout << "ControlServer::NewConnection: " << sock->remote_endpoint().address().to_string() << "\n";
ServerSession* session = new ServerSession(this, sock); ServerSession* session = new ServerSession(this, sock);
{
std::unique_lock<std::mutex> mlock(mutex);
sessions.insert(shared_ptr<ServerSession>(session)); sessions.insert(shared_ptr<ServerSession>(session));
session->start(); session->start();
} }
}
} }

View file

@ -7,6 +7,7 @@
#include <memory> #include <memory>
#include <set> #include <set>
#include <sstream> #include <sstream>
#include <mutex>
#include "serverSession.h" #include "serverSession.h"
#include "common/timeUtils.h" #include "common/timeUtils.h"
@ -37,6 +38,7 @@ public:
private: private:
void acceptor(); void acceptor();
mutable std::mutex mutex;
set<shared_ptr<ServerSession>> sessions; set<shared_ptr<ServerSession>> sessions;
boost::asio::io_service io_service_; boost::asio::io_service io_service_;
unsigned short port_; unsigned short port_;

View file

@ -18,6 +18,7 @@ ServerSession::ServerSession(MessageReceiver* _receiver, std::shared_ptr<tcp::so
void ServerSession::start() void ServerSession::start()
{ {
active_ = true; active_ = true;
streamActive = false;
readerThread = new thread(&ServerSession::reader, this); readerThread = new thread(&ServerSession::reader, this);
writerThread = new thread(&ServerSession::writer, this); writerThread = new thread(&ServerSession::writer, this);
} }
@ -37,7 +38,7 @@ void ServerSession::socketRead(void* _to, size_t _bytes)
void ServerSession::add(shared_ptr<BaseMessage> message) void ServerSession::add(shared_ptr<BaseMessage> message)
{ {
if (!message) if (!message || !streamActive)
return; return;
while (messages.size() > 100)// chunk->getDuration() > 10000) while (messages.size() > 100)// chunk->getDuration() > 10000)

View file

@ -43,6 +43,11 @@ public:
return active_; return active_;
} }
virtual void setStreamActive(bool active)
{
streamActive = active;
}
protected: protected:
void socketRead(void* _to, size_t _bytes); void socketRead(void* _to, size_t _bytes);
@ -51,6 +56,7 @@ protected:
void writer(); void writer();
std::atomic<bool> active_; std::atomic<bool> active_;
std::atomic<bool> streamActive;
mutable std::mutex mutex_; mutable std::mutex mutex_;
std::thread* readerThread; std::thread* readerThread;
std::thread* writerThread; std::thread* writerThread;