diff --git a/common/chunk.cpp b/common/chunk.cpp index 798453d5..3a9ceeb9 100644 --- a/common/chunk.cpp +++ b/common/chunk.cpp @@ -3,40 +3,50 @@ #include -Chunk::Chunk(WireChunk* _wireChunk) : idx(0), wireChunk(_wireChunk) +Chunk::Chunk(size_t hz, size_t channels, size_t bitPerSample, WireChunk* _wireChunk) : wireChunk(_wireChunk), hz_(hz), channels_(channels), bytesPerSample_(bitPerSample/8), frameSize_(bytesPerSample_*channels_), idx(0) { } +Chunk::Chunk(size_t hz, size_t channels, size_t bitPerSample, size_t ms) : hz_(hz), channels_(channels), bytesPerSample_(bitPerSample/8), frameSize_(bytesPerSample_*channels_), idx(0) +{ + wireChunk = new WireChunk; + wireChunk->length = hz*channels*bytesPerSample_*ms / 1000; + wireChunk->payload = (char*)malloc(wireChunk->length); +} + + Chunk::~Chunk() { + free(wireChunk->payload); delete wireChunk; } -bool Chunk::isEndOfChunk() +bool Chunk::isEndOfChunk() const { - return idx >= WIRE_CHUNK_SIZE; + return idx >= (wireChunk->length / frameSize_); } -bool Chunk::getNext(int16_t& _result) + +double Chunk::getDuration() const { - if (isEndOfChunk()) - return false; - _result = wireChunk->payload[idx++]; - return true; +// std::cout << "len: " << wireChunk->length << ", channels: " << channels_ << ", bytesPerSample: " << bytesPerSample_ << ", hz: " << hz_ << "\n"; + return wireChunk->length / (channels_ * bytesPerSample_ * hz_ / 1000.); } -int Chunk::read(short* _outputBuffer, int _count) -{ - int result = _count; - if (idx + _count > WIRE_CHUNK_SIZE) - result = WIRE_CHUNK_SIZE - idx; - if (_outputBuffer != NULL) - memcpy(_outputBuffer, &wireChunk->payload[idx], sizeof(int16_t)*result); +int Chunk::read(void* outputBuffer, size_t frameCount) +{ +//std::cout << "read: " << frameCount << std::endl; + int result = frameCount; + if (idx + frameCount > wireChunk->length / frameSize_) + result = (wireChunk->length / frameSize_) - idx; + + if (outputBuffer != NULL) + memcpy(outputBuffer, wireChunk->payload + frameSize_*idx, frameSize_*result); idx += result; return result; diff --git a/common/chunk.h b/common/chunk.h index af034e88..746196c7 100644 --- a/common/chunk.h +++ b/common/chunk.h @@ -2,20 +2,8 @@ #define CHUNK_H #include +#include -#define SAMPLE_RATE (48000) -//#define SAMPLE_BIT (16) -#define CHANNELS (2) - -#define WIRE_CHUNK_MS (50) -#define WIRE_CHUNK_SIZE ((SAMPLE_RATE*CHANNELS*WIRE_CHUNK_MS)/1000) -#define WIRE_CHUNK_MS_SIZE ((SAMPLE_RATE*CHANNELS)/1000) -#define SAMPLE_SIZE (CHANNELS) - -#define PLAYER_CHUNK_MS (20) -#define PLAYER_CHUNK_SIZE ((SAMPLE_RATE*CHANNELS*PLAYER_CHUNK_MS)/1000) -#define PLAYER_CHUNK_MS_SIZE ((SAMPLE_RATE*CHANNELS)/1000) -#define FRAMES_PER_BUFFER ((SAMPLE_RATE*PLAYER_CHUNK_MS)/1000) typedef std::chrono::time_point time_point_ms; @@ -25,7 +13,8 @@ struct WireChunk { int32_t tv_sec; int32_t tv_usec; - int16_t payload[WIRE_CHUNK_SIZE]; + uint32_t length; + char* payload; }; @@ -33,27 +22,39 @@ struct WireChunk class Chunk { public: - Chunk(WireChunk* _wireChunk); + Chunk(size_t hz, size_t channels, size_t bitPerSample, WireChunk* _wireChunk); + Chunk(size_t hz, size_t channels, size_t bitPerSample, size_t ms); ~Chunk(); - int read(short* _outputBuffer, int _count); - bool isEndOfChunk(); +/* static WireChunk* make_chunk(size_t size, size_t bytesPerSample) + { + WireChunk* wireChunk = new WireChunk(bytesPerSample*size); + wireChunk->length = bytesPerSample*size; + wireChunk->payload = (char*)malloc(wireChunk->length); + return wireChunk; + } +*/ + static size_t getHeaderSize() + { + return sizeof(WireChunk::tv_sec) + sizeof(WireChunk::tv_usec) + sizeof(WireChunk::length); + } - bool getNext(int16_t& _result); + int read(void* outputBuffer, size_t frameCount); + bool isEndOfChunk() const; - inline time_point_ms timePoint() + inline time_point_ms timePoint() const { time_point_ms tp; - return tp + std::chrono::seconds(wireChunk->tv_sec) + std::chrono::milliseconds(wireChunk->tv_usec / 1000) + std::chrono::milliseconds(idx / WIRE_CHUNK_MS_SIZE); + return tp + std::chrono::seconds(wireChunk->tv_sec) + std::chrono::milliseconds(wireChunk->tv_usec / 1000) + std::chrono::milliseconds(idx / (hz_*channels_/1000)); } template - inline T getAge() + inline T getAge() const { return getAge(timePoint()); } - inline long getAge() + inline long getAge() const { return getAge().count(); } @@ -69,9 +70,17 @@ public: return std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - time_point); } -private: - int32_t idx; + double getDuration() const; + WireChunk* wireChunk; + size_t hz_; + size_t channels_; + size_t bytesPerSample_; + size_t frameSize_; + +private: + + int32_t idx; }; diff --git a/server/Makefile b/server/Makefile index d2e956fd..36d13cbc 100644 --- a/server/Makefile +++ b/server/Makefile @@ -3,7 +3,7 @@ CC = /usr/bin/g++ CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. LDFLAGS = -lrt -lpthread -lportaudio -lboost_system -lboost_program_options -OBJ = snapServer.o +OBJ = snapServer.o ../common/chunk.o BIN = snapserver all: server diff --git a/server/snapServer.cpp b/server/snapServer.cpp index a645c854..39c25b2a 100644 --- a/server/snapServer.cpp +++ b/server/snapServer.cpp @@ -66,13 +66,23 @@ public: { for (;;) { - shared_ptr chunk(chunks.pop()); + shared_ptr chunk(chunks.pop()); size_t written = 0; + size_t toWrite = Chunk::getHeaderSize();// + chunk->length; + WireChunk* wireChunk = chunk->wireChunk; do { - written += boost::asio::write(*socket_, boost::asio::buffer(chunk.get() + written, sizeof(WireChunk) - written));//, error); + written += boost::asio::write(*socket_, boost::asio::buffer(wireChunk + written, toWrite - written));//, error); } - while (written < sizeof(WireChunk)); + while (written < toWrite); + + written = 0; + toWrite = wireChunk->length; + do + { + written += boost::asio::write(*socket_, boost::asio::buffer(wireChunk->payload + written, toWrite - written));//, error); + } + while (written < toWrite); } } catch (std::exception& e) @@ -89,9 +99,9 @@ public: // readerThread.join(); } - void send(shared_ptr chunk) + void send(shared_ptr chunk) { - while (chunks.size() * WIRE_CHUNK_MS > 10000) + while (chunks.size() * chunk->getDuration() > 10000) chunks.pop(); chunks.push(chunk); } @@ -105,7 +115,7 @@ private: bool active_; socket_ptr socket_; thread* senderThread; - Queue> chunks; + Queue> chunks; }; @@ -130,7 +140,7 @@ public: } } - void send(shared_ptr chunk) + void send(shared_ptr chunk) { for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) { @@ -165,11 +175,35 @@ private: }; +class ServerException : public std::exception +{ +public: + ServerException(const std::string& what) : what_(what) + { + } + + virtual ~ServerException() throw() + { + } + + virtual const char* what() const throw() + { + return what_.c_str(); + } + +private: + std::string what_; +}; + int main(int argc, char* argv[]) { try { + uint16_t sampleRate; + short channels; + uint16_t bps; + size_t port; string fifoName; bool runAsDaemon; @@ -178,6 +212,9 @@ int main(int argc, char* argv[]) desc.add_options() ("help,h", "produce help message") ("port,p", po::value(&port)->default_value(98765), "port to listen on") + ("channels,c", po::value(&channels)->default_value(2), "number of channels") + ("samplerate,r", po::value(&sampleRate)->default_value(48000), "sample rate") + ("bps,b", po::value(&bps)->default_value(16), "bit per sample") ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file") ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") ; @@ -207,8 +244,11 @@ int main(int argc, char* argv[]) } */ - if (runAsDaemon) - daemonize(); +// if (runAsDaemon) + { +// daemonize(); +// syslog (LOG_NOTICE, "First daemon started."); + } openlog ("firstdaemon", LOG_PID, LOG_DAEMON); @@ -221,37 +261,38 @@ int main(int argc, char* argv[]) long nextTick = getTickCount(); mkfifo(fifoName.c_str(), 0777); +size_t duration = 50; while (!g_terminated) { -syslog (LOG_NOTICE, "First daemon started."); int fd = open(fifoName.c_str(), O_RDONLY); try { - shared_ptr chunk;//(new WireChunk()); + shared_ptr chunk;//(new WireChunk()); while (true)//cin.good()) { - chunk.reset(new WireChunk()); - int toRead = sizeof(WireChunk::payload); + chunk.reset(new Chunk(sampleRate, channels, bps, duration));//2*WIRE_CHUNK_SIZE)); + WireChunk* wireChunk = chunk->wireChunk; + int toRead = wireChunk->length; // cout << "tr: " << toRead << ", size: " << WIRE_CHUNK_SIZE << "\t"; - char* payload = (char*)(&chunk->payload[0]); +// char* payload = (char*)(&chunk->payload[0]); int len = 0; do { - int count = read(fd, payload + len, toRead - len); - cout.flush(); + int count = read(fd, wireChunk->payload + len, toRead - len); if (count <= 0) - throw new std::exception(); + throw ServerException("count = " + boost::lexical_cast(count)); + len += count; } while (len < toRead); - chunk->tv_sec = tvChunk.tv_sec; - chunk->tv_usec = tvChunk.tv_usec; + wireChunk->tv_sec = tvChunk.tv_sec; + wireChunk->tv_usec = tvChunk.tv_usec; server->send(chunk); - addMs(tvChunk, WIRE_CHUNK_MS); - nextTick += WIRE_CHUNK_MS; + addMs(tvChunk, duration); + nextTick += duration; long currentTick = getTickCount(); if (nextTick > currentTick) { @@ -265,18 +306,18 @@ syslog (LOG_NOTICE, "First daemon started."); } } } - catch(const std::exception&) + catch(const std::exception& e) { - cout << "Exception\n"; + std::cerr << "Exception: " << e.what() << std::endl; } close(fd); } server->stop(); } - catch (std::exception& e) + catch (const std::exception& e) { - std::cerr << "Exception: " << e.what() << "\n"; + std::cerr << "Exception: " << e.what() << std::endl; } syslog (LOG_NOTICE, "First daemon terminated.");