thread safe queue

git-svn-id: svn://elaine/murooma/trunk@146 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-08-06 20:50:13 +00:00
parent 2b3400e78b
commit a499f6561d
4 changed files with 120 additions and 27 deletions

83
queue.h Normal file
View file

@ -0,0 +1,83 @@
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
template <typename T>
class Queue
{
public:
T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto val = queue_.front();
queue_.pop();
return val;
}
T front()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
cond_.wait(mlock);
return queue_.front();
}
bool try_pop(T& item, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> mlock(mutex_);
if(!cond_.wait_for(mlock, timeout, [this] { return !queue_.empty(); }))
return false;
item = std::move(queue_.front());
queue_.pop();
return true;
}
void pop(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
}
void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
void push(T&& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(std::move(item));
mlock.unlock();
cond_.notify_one();
}
Queue()=default;
Queue(const Queue&) = delete; // disable copying
Queue& operator=(const Queue&) = delete; // disable assignment
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};

View file

@ -16,7 +16,7 @@
#include "boost/date_time/posix_time/posix_time_types.hpp" #include "boost/date_time/posix_time/posix_time_types.hpp"
const short multicast_port = 30001; const short multicast_port = 30001;
const int max_message_count = 10; const int max_message_count = 100;
class sender class sender
{ {
@ -40,16 +40,28 @@ public:
void handle_send_to(const boost::system::error_code& error) void handle_send_to(const boost::system::error_code& error)
{ {
if (!error && message_count_ < max_message_count) if (!error)
{
std::ostringstream os;
os << "Message " << message_count_++;
message_ = os.str();
socket_.async_send_to(
boost::asio::buffer(message_), endpoint_,
boost::bind(&sender::handle_send_to, this,
boost::asio::placeholders::error));
sleep
}
/* if (!error && message_count_ < max_message_count)
{ {
timer_.expires_from_now(boost::posix_time::seconds(1)); timer_.expires_from_now(boost::posix_time::seconds(1));
timer_.async_wait( timer_.async_wait(
boost::bind(&sender::handle_timeout, this, boost::bind(&sender::handle_timeout, this,
boost::asio::placeholders::error)); boost::asio::placeholders::error));
} }
} */ }
void handle_timeout(const boost::system::error_code& error) /* void handle_timeout(const boost::system::error_code& error)
{ {
if (!error) if (!error)
{ {
@ -63,7 +75,7 @@ public:
boost::asio::placeholders::error)); boost::asio::placeholders::error));
} }
} }
*/
private: private:
boost::asio::ip::udp::endpoint endpoint_; boost::asio::ip::udp::endpoint endpoint_;
boost::asio::ip::udp::socket socket_; boost::asio::ip::udp::socket socket_;

View file

@ -3,12 +3,13 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
using namespace std;
Stream::Stream() : sleep(0), median(0), shortMedian(0), lastUpdate(0) Stream::Stream() : sleep(0), median(0), shortMedian(0), lastUpdate(0)
{ {
pBuffer = new DoubleBuffer<int>(15000 / PLAYER_CHUNK_MS); pBuffer = new DoubleBuffer<int>(15000 / PLAYER_CHUNK_MS);
pShortBuffer = new DoubleBuffer<int>(5000 / PLAYER_CHUNK_MS); pShortBuffer = new DoubleBuffer<int>(5000 / PLAYER_CHUNK_MS);
pMiniBuffer = new DoubleBuffer<int>(10); pMiniBuffer = new DoubleBuffer<int>(10);
pLock = new std::unique_lock<std::mutex>(mtx);
bufferMs = 500; bufferMs = 500;
} }
@ -25,22 +26,16 @@ void Stream::addChunk(Chunk* chunk)
{ {
// Chunk* c = new Chunk(*chunk); // Chunk* c = new Chunk(*chunk);
// mutex.lock(); // mutex.lock();
chunks.push_back(chunk); chunks.push(shared_ptr<Chunk>(chunk));
// mutex.unlock(); // mutex.unlock();
cv.notify_all();
} }
Chunk* Stream::getNextChunk() shared_ptr<Chunk> Stream::getNextChunk()
{ {
Chunk* chunk = NULL; if (!chunk)
if (chunks.empty()) chunk = chunks.pop();
cv.wait(*pLock);
// mutex.lock();
chunk = chunks.front();
// mutex.unlock();
return chunk; return chunk;
} }
@ -55,7 +50,7 @@ void Stream::getSilentPlayerChunk(short* outputBuffer)
time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction) time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction)
{ {
Chunk* chunk = getNextChunk(); chunk = getNextChunk();
time_point_ms tp = chunk->timePoint(); time_point_ms tp = chunk->timePoint();
int read = 0; int read = 0;
int toRead = PLAYER_CHUNK_SIZE + correction*PLAYER_CHUNK_MS_SIZE; int toRead = PLAYER_CHUNK_SIZE + correction*PLAYER_CHUNK_MS_SIZE;
@ -71,8 +66,7 @@ time_point_ms Stream::getNextPlayerChunk(short* outputBuffer, int correction)
read += chunk->read(buffer + read, toRead - read); read += chunk->read(buffer + read, toRead - read);
if (chunk->isEndOfChunk()) if (chunk->isEndOfChunk())
{ {
chunks.pop_front(); chunk = NULL;
delete chunk;
chunk = getNextChunk(); chunk = getNextChunk();
} }
} }
@ -112,8 +106,8 @@ void Stream::getChunk(short* outputBuffer, double outputBufferDacTime, unsigned
} }
else else
{ {
for (size_t i=0; i<chunks.size(); ++i) // for (size_t i=0; i<chunks.size(); ++i)
std::cerr << "Chunk " << i << ": " << chunks[i]->getAge() - bufferMs << "\n"; // std::cerr << "Chunk " << i << ": " << chunks[i]->getAge() - bufferMs << "\n";
while (true)// (int i=0; i<(int)(round((float)sleep / (float)PLAYER_CHUNK_MS)) + 1; ++i) while (true)// (int i=0; i<(int)(round((float)sleep / (float)PLAYER_CHUNK_MS)) + 1; ++i)
{ {
int age = Chunk::getAge(getNextPlayerChunk(outputBuffer)) - bufferMs; int age = Chunk::getAge(getNextPlayerChunk(outputBuffer)) - bufferMs;

View file

@ -6,9 +6,11 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <vector> #include <vector>
#include <memory>
#include "doubleBuffer.h" #include "doubleBuffer.h"
#include "chunk.h" #include "chunk.h"
#include "timeUtils.h" #include "timeUtils.h"
#include "queue.h"
class Stream class Stream
@ -16,7 +18,7 @@ class Stream
public: public:
Stream(); Stream();
void addChunk(Chunk* chunk); void addChunk(Chunk* chunk);
Chunk* getNextChunk(); std::shared_ptr<Chunk> getNextChunk();
time_point_ms getNextPlayerChunk(short* outputBuffer, int correction = 0); time_point_ms getNextPlayerChunk(short* outputBuffer, int correction = 0);
void getSilentPlayerChunk(short* outputBuffer); void getSilentPlayerChunk(short* outputBuffer);
void getChunk(short* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer); void getChunk(short* outputBuffer, double outputBufferDacTime, unsigned long framesPerBuffer);
@ -26,14 +28,16 @@ private:
void sleepMs(int ms); void sleepMs(int ms);
int sleep; int sleep;
std::deque<Chunk*> chunks; // std::deque<Chunk*> chunks;
std::mutex mtx; // std::mutex mtx;
std::mutex mutex; // std::mutex mutex;
std::unique_lock<std::mutex>* pLock; // std::unique_lock<std::mutex>* pLock;
std::condition_variable cv; // std::condition_variable cv;
Queue<std::shared_ptr<Chunk>> chunks;
DoubleBuffer<int>* pMiniBuffer; DoubleBuffer<int>* pMiniBuffer;
DoubleBuffer<int>* pBuffer; DoubleBuffer<int>* pBuffer;
DoubleBuffer<int>* pShortBuffer; DoubleBuffer<int>* pShortBuffer;
std::shared_ptr<Chunk> chunk;
int median; int median;
int shortMedian; int shortMedian;