git-svn-id: svn://elaine/murooma/trunk@14 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-06-26 15:53:34 +00:00
parent 6ff3dc4980
commit e876455033
2 changed files with 47 additions and 41 deletions

View file

@ -1,7 +1,7 @@
VERSION = 0.01 VERSION = 0.01
CC = /usr/bin/g++ CC = /usr/bin/g++
CFLAGS = -Wall -g -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\" CFLAGS = -std=gnu++0x -Wall -g -O3 -D_REENTRANT -DVERSION=\"$(VERSION)\"
LDFLAGS = -lzmq LDFLAGS = -lzmq -lpthread
OBJ_SERVER = server.o OBJ_SERVER = server.o
BIN_SERVER = server BIN_SERVER = server
@ -17,7 +17,7 @@ server: $(OBJ)
client: $(OBJ) client: $(OBJ)
$(CC) $(CFLAGS) -o $(BIN_CLIENT) $(OBJ_CLIENT) $(LDFLAGS) $(CC) $(CFLAGS) -o $(BIN_CLIENT) $(OBJ_CLIENT) $(LDFLAGS)
%.o: %.c %.o: %.cpp
$(CC) $(CFLAGS) -c $< $(CC) $(CFLAGS) -c $<
clean: clean:

View file

@ -12,8 +12,11 @@
#include <deque> #include <deque>
#include <vector> #include <vector>
#include <algorithm> #include <algorithm>
#include <thread>
#include <mutex>
const size_t size(1764); //44100 / 20 = 2205
const size_t size(4*2205);
struct Chunk struct Chunk
{ {
@ -24,7 +27,9 @@ struct Chunk
std::deque<Chunk*> chunks; std::deque<Chunk*> chunks;
std::deque<int> timeDiffs; std::deque<int> timeDiffs;
std::mutex mtx;
std::mutex mutex;
std::condition_variable cv;
std::string timeToStr(const timeval& timestamp) std::string timeToStr(const timeval& timestamp)
{ {
@ -51,40 +56,59 @@ int getAge(const Chunk& chunk)
timeval now; timeval now;
gettimeofday(&now, NULL); gettimeofday(&now, NULL);
timeval ts; timeval ts;
ts.tv_sec = chunks.front()->tv_sec; ts.tv_sec = chunk.tv_sec;
ts.tv_usec = chunks.front()->tv_usec; ts.tv_usec = chunk.tv_usec;
return diff_ms(now, ts); return diff_ms(now, ts);
} }
void player()
{
std::unique_lock<std::mutex> lck(mtx);
bool playing = true;
while (1)
{
if (chunks.empty())
cv.wait(lck);
mutex.lock();
Chunk* chunk = chunks.front();
chunks.pop_front();
mutex.unlock();
// playing = playing || (getAge(*chunks.front()) > 200);
std::cerr << "Chunk: " << getAge(*chunk) << "\n";
if (playing)
{
for (size_t n=0; n<size; ++n)
{
std::cout << chunk->payload[n] << std::flush;
// if (size % 100 == 0)
// std::cout << std::flush;
}
}
delete chunk;
}
}
int main (int argc, char *argv[]) int main (int argc, char *argv[])
{ {
zmq::context_t context (1); zmq::context_t context (1);
// Socket to talk to server
// std::cout << "Collecting updates from weather server…\n" << std::endl;
zmq::socket_t subscriber (context, ZMQ_SUB); zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://192.168.0.2:123458"); subscriber.connect("tcp://192.168.0.2:123458");
// Subscribe to zipcode, default is NYC, 10001
const char* filter = ""; const char* filter = "";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter)); subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
std::thread playerThread(player);
// Process 100 updates
int update_nbr;
long total_temp = 0;
int i = 0;
bool playing = false;
while (1) while (1)
{ {
zmq::message_t update; zmq::message_t update;
subscriber.recv(&update); subscriber.recv(&update);
// std::cerr << "received\n";
// std::istringstream iss(static_cast<char*>(update.data()));
// iss >> zipcode >> relhumidity;
Chunk* chunk = new Chunk(); Chunk* chunk = new Chunk();
memcpy(chunk, update.data(), sizeof(Chunk)); memcpy(chunk, update.data(), sizeof(Chunk));
/* timeDiffs.push_back(diff_ms(now, ts)); /* timeDiffs.push_back(diff_ms(now, ts));
if (timeDiffs.size() > 100) if (timeDiffs.size() > 100)
timeDiffs.pop_front(); timeDiffs.pop_front();
@ -93,28 +117,10 @@ int main (int argc, char *argv[])
std::cerr << "Median: " << v[v.size()/2] << "\n"; std::cerr << "Median: " << v[v.size()/2] << "\n";
*/ */
/* if (false && (i++ == 100)) mutex.lock();
{
std::cerr << diff_ms(now, ts) << "\n" << std::flush;//timeToStr(ts) << "\t" << chunk->tv_usec << "\n";
i = 0;
}
*/
// std::cout << "update\n";
chunks.push_back(chunk); chunks.push_back(chunk);
playing = playing || (getAge(*chunks.front()) > 200); mutex.unlock();
cv.notify_all();
if (playing)
{
// std::cerr << "Chunk: " << getAge(*chunks.front()) << "\n";
for (size_t n=0; n<size; ++n)
std::cout << chunks.front()->payload[n] << std::flush;
chunks.pop_front();
}
// std::cerr << (chunk->timestamp).tv_sec << ":" << (chunk->timestamp).tv_usec << "\n";
// delete chunk;
// std::cout << std::flush;
// std::cerr << "flushed\n";
} }
return 0; return 0;
} }