diff --git a/test/Makefile b/test/Makefile new file mode 100644 index 00000000..170331ee --- /dev/null +++ b/test/Makefile @@ -0,0 +1,19 @@ +VERSION = 0.01 +CC = /usr/bin/g++ +CFLAGS = -std=gnu++0x -Wall -Wno-unused-function -D_REENTRANT -DVERSION=\"$(VERSION)\" -I.. +LDFLAGS = -lrt -lpthread -lboost_system -lboost_program_options -lvorbis -lvorbisenc -logg + +OBJ = test.o message.o +BIN = test + +all: server + +server: $(OBJ) + $(CC) $(CFLAGS) -o $(BIN) $(OBJ) $(LDFLAGS) + +%.o: %.cpp + $(CC) $(CFLAGS) -c $< -o $@ + +clean: + rm -rf $(BIN) $(OBJ) + diff --git a/test/message.cpp b/test/message.cpp new file mode 100644 index 00000000..a3a34357 --- /dev/null +++ b/test/message.cpp @@ -0,0 +1,73 @@ +#include "message.h" +#include +#include +#include "common/log.h" + + +Chunk::Chunk(const SampleFormat& sampleFormat, size_t ms) : format(sampleFormat), idx(0) +{ + payloadSize = format.rate*format.frameSize*ms / 1000; + payload = (char*)malloc(payloadSize); +} + + +Chunk::~Chunk() +{ +} + + +bool Chunk::isEndOfChunk() const +{ + return idx >= getFrameCount(); +} + + +double Chunk::getFrameCount() const +{ + return (payloadSize / format.frameSize); +} + + + +double Chunk::getDuration() const +{ + return getFrameCount() / format.msRate(); +} + + + +double Chunk::getTimeLeft() const +{ + return (getFrameCount() - idx) / format.msRate(); +} + + + +int Chunk::seek(int frames) +{ + idx += frames; + if (idx > getFrameCount()) + idx = getFrameCount(); + if (idx < 0) + idx = 0; + return idx; +} + + +int Chunk::read(void* outputBuffer, size_t frameCount) +{ +//logd << "read: " << frameCount << ", total: " << (wireChunk->length / format.frameSize) << ", idx: " << idx;// << std::endl; + int result = frameCount; + if (idx + frameCount > (payloadSize / format.frameSize)) + result = (payloadSize / format.frameSize) - idx; + +//logd << ", from: " << format.frameSize*idx << ", to: " << format.frameSize*idx + format.frameSize*result; + if (outputBuffer != NULL) + memcpy((char*)outputBuffer, (char*)(payload) + format.frameSize*idx, format.frameSize*result); + + idx += result; +//logd << ", new idx: " << idx << ", result: " << result << ", wireChunk->length: " << wireChunk->length << ", format.frameSize: " << format.frameSize << "\n";//std::endl; + return result; +} + + diff --git a/test/message.h b/test/message.h new file mode 100644 index 00000000..b69153f4 --- /dev/null +++ b/test/message.h @@ -0,0 +1,311 @@ +#ifndef CHUNK_H +#define CHUNK_H + +#include +#include +#include +#include +#include "common/sampleFormat.h" + + +typedef std::chrono::time_point time_point_ms; +using namespace std; + + +enum message_type +{ + header = 0, + payload = 1 +}; + + + +struct BaseMessage +{ + BaseMessage() + { + } + + BaseMessage(message_type type_) + { + type = type; + }; + + virtual void read(istream& stream) + { + stream.read(reinterpret_cast(&type), sizeof(uint16_t)); + cout << type << "\n"; + stream.read(reinterpret_cast(&size), sizeof(uint32_t)); + cout << size << "\n"; + } + + virtual void serialize(ostream& stream) + { + stream.write(reinterpret_cast(&type), sizeof(uint16_t)); + size = getSize(); + stream.write(reinterpret_cast(&size), sizeof(uint32_t)); + doserialize(stream); + } + + virtual uint32_t getSize() + { + return sizeof(uint16_t) + sizeof(uint32_t); + }; + + uint16_t type; + uint32_t size; +protected: + virtual void doserialize(ostream& stream) + { + }; +}; + + + +class TestMessage : public BaseMessage +{ +public: + TestMessage() : BaseMessage(message_type::header) + { + } + + TestMessage(int8_t logLevel_, char* text_) : BaseMessage(message_type::header), logLevel(logLevel_), length(sizeof(text_)), text(text_) + { + } + + virtual ~TestMessage() + { + } + + virtual void read(istream& stream) + { + stream.read(reinterpret_cast(&logLevel), sizeof(int8_t)); + stream.read(reinterpret_cast(&length), sizeof(int16_t)); + text = (char*)malloc(length); + stream.read(text, length); + } + + virtual uint32_t getSize() + { + return sizeof(int8_t) + sizeof(int16_t) + length; + } + + int8_t logLevel; + int16_t length; + char* text; + +protected: + virtual void doserialize(ostream& stream) + { + stream.write(reinterpret_cast(&logLevel), sizeof(int8_t)); + stream.write(reinterpret_cast(&length), sizeof(int16_t)); + stream.write(text, length); + } +}; + + +class WireChunk : public BaseMessage +{ +public: + WireChunk() : BaseMessage(message_type::payload) + { + } + +// WireChunk(int8_t logLevel_, char* text_) : BaseMessage(message_type::payload), logLevel(logLevel_), length(sizeof(text_)), text(text_) +// { +// } + + virtual ~WireChunk() + { + } + + virtual void read(istream& stream) + { + stream.read(reinterpret_cast(&tv_sec), sizeof(int32_t)); + stream.read(reinterpret_cast(&tv_usec), sizeof(int32_t)); + stream.read(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + payload = (char*)malloc(payloadSize); + stream.read(payload, payloadSize); + } + + virtual uint32_t getSize() + { + return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize; + } + + int32_t tv_sec; + int32_t tv_usec; + uint32_t payloadSize; + char* payload; + +protected: + virtual void doserialize(ostream& stream) + { + stream.write(reinterpret_cast(&tv_sec), sizeof(int32_t)); + stream.write(reinterpret_cast(&tv_usec), sizeof(int32_t)); + stream.write(reinterpret_cast(&payloadSize), sizeof(uint32_t)); + stream.write(payload, payloadSize); + } +}; + + + + +/* + + virtual ~BaseMessage(){}; + virtual message_type getType() = 0; + + virtual char* serialize() + { + char* outStream = (char*)malloc(getSize()); + size_t pos = 0; + pos = add(outStream, pos, getType()); + pos = add(outStream, pos, getSize()); + doSerialize(outStream + pos); + return outStream; + } + + virtual void deserialize(istream& stream) + { + stream.read(reinterpret_cast(&drvId), sizeof(int)); + stream.read(reinterpret_cast(&frameSize), sizeof(int)); + msg = (byte*)malloc((sizeof(byte)) * frameSize); + stream.read(reinterpret_cast(&msg[0]), frameSize); + stream.read(reinterpret_cast(&offset), sizeof(unsigned int)); + stream.read(reinterpret_cast(&isLive), sizeof(bool)); + + size_t pos = 0; + memcpy((char*)&type, inStream, sizeof(uint16_t)); + pos += sizeof(uint16_t); + memcpy(&length, inStream + pos, sizeof(uint32_t)); + pos += sizeof(uint32_t); + doDeserialize(inStream + pos); + } + + virtual size_t getSize() + { + return sizeof(int16_t) + sizeof(int32_t) + doGetSize(); + } + +protected: + template + size_t add(char* stream, size_t pos, T* t) + { + memcpy(stream + pos, reinterpret_cast(t), sizeof(T)); + return pos + sizeof(T); + } + + virtual size_t doGetSize() = 0; + virtual void doSerialize(char* outStream) = 0; + virtual void doDeserialize(const char* inStream) = 0; +}; + + +struct WireChunk : public BaseMessage +{ + int32_t tv_sec; + int32_t tv_usec; + uint32_t payloadSize; + char* payload; + + virtual ~WireChunk() + { + free(payload); + } + + virtual message_type getType() + { + return message_type::payload; + } + +protected: + virtual size_t doGetSize() + { + return sizeof(int32_t) + sizeof(int32_t) + sizeof(uint32_t) + payloadSize; + } + + virtual void doSerialize(char* outStream) + { + size_t pos = 0; + pos = add(outStream, pos, tv_sec); + pos = add(outStream, pos, tv_usec); + pos = add(outStream, pos, payloadSize); + memcpy(outStream + pos, payload, payloadSize); + } + + virtual void doDeserialize(const char* inStream) + { + size_t pos = 0; + memcpy(&tv_sec, inStream + pos, sizeof(int32_t)); + pos += sizeof(int32_t); + memcpy(&tv_usec, inStream + pos, sizeof(int32_t)); + pos += sizeof(int32_t); + memcpy(&payloadSize, inStream + pos, sizeof(uint32_t)); + pos += sizeof(uint32_t); + payload = (char*)malloc(payloadSize); + memcpy(payload, inStream + pos, payloadSize); + } +}; +*/ + +class Chunk : public WireChunk +{ +public: + Chunk(const SampleFormat& sampleFormat, size_t ms); + ~Chunk(); + + int read(void* outputBuffer, size_t frameCount); + bool isEndOfChunk() const; + + inline time_point_ms timePoint() const + { + time_point_ms tp; + std::chrono::milliseconds::rep relativeIdxTp = ((double)idx / ((double)format.rate/1000.)); + return + tp + + std::chrono::seconds(tv_sec) + + std::chrono::milliseconds(tv_usec / 1000) + + std::chrono::milliseconds(relativeIdxTp); + } + + template + inline T getAge() const + { + return getAge(timePoint()); + } + + inline long getAge() const + { + return getAge().count(); + } + + inline static long getAge(const time_point_ms& time_point) + { + return getAge(time_point).count(); + } + + template + static inline T getAge(const std::chrono::time_point& time_point) + { + return std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - time_point); + } + + int seek(int frames); + double getDuration() const; + double getDurationUs() const; + double getTimeLeft() const; + double getFrameCount() const; + + SampleFormat format; + +private: +// SampleFormat format_; + uint32_t idx; +}; + + + +#endif + + diff --git a/test/test.cpp b/test/test.cpp new file mode 100644 index 00000000..0fa207b7 --- /dev/null +++ b/test/test.cpp @@ -0,0 +1,415 @@ +// +// blocking_tcp_echo_server.cpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +/* +#include +#include +#include +#include +#include +#include +#include // localtime +#include // stringstream +#include +#include +#include +#include +#include "common/chunk.h" +#include "common/timeUtils.h" +#include "common/queue.h" +#include "common/signalHandler.h" +#include "common/utils.h" +#include "common/sampleFormat.h" +#include "pcmEncoder.h" +#include "oggEncoder.h" +#include +*/ + +#include +#include +#include +#include +#include +#include +#include // localtime +#include // stringstream +#include +#include +#include +#include +#include +#include "message.h" + +using boost::asio::ip::tcp; +namespace po = boost::program_options; + + +typedef boost::shared_ptr socket_ptr; +using namespace std; +using namespace std::chrono; + + +bool g_terminated = false; + + +int main(int argc, char* argv[]) +{ + TestMessage* chunk = new TestMessage(1, (char*)"Hallo"); + stringstream ss; + chunk->serialize(ss); + + BaseMessage header; + ss.seekg(0, ss.beg); + header.read(ss); +// cout << ss.str() << "\n"; +// ss.read(reinterpret_cast(&header), sizeof(MessageHeader)); + cout << "Header: " << header.type << ", " << header.size << "\n"; + delete chunk; + chunk = new TestMessage(); + chunk->read(ss); + cout << "Header: " << chunk->type << ", " << chunk->size << ", " << (int)chunk->logLevel << ", " << chunk->text << "\n"; + + +/* chunk->tv_sec = 21; + chunk->tv_usec = 2; + chunk->payloadSize = 5; + chunk->payload = (char*)malloc(5); +chunk->payload[0] = 99; + char* stream = chunk->serialize(); +cout << "1\n"; +for (size_t n=0; n<24; ++n) + cout << (int)stream[n] << " "; + delete chunk; +cout << "\n3\n"; + chunk = new WireChunk(); +cout << "4\n"; + chunk->deserialize(stream); +cout << "5\n"; + cout << chunk->tv_sec << ", " << chunk->tv_usec << ", " << chunk->payloadSize << ", " << chunk->payload << "\n"; +*/ +return 0; +} + + +/* +std::string return_current_time_and_date() +{ + auto now = system_clock::now(); + auto in_time_t = system_clock::to_time_t(now); + system_clock::duration ms = now.time_since_epoch(); + char buff[20]; + strftime(buff, 20, "%Y-%m-%d %H:%M:%S", localtime(&in_time_t)); + stringstream ss; + ss << buff << "." << std::setw(3) << std::setfill('0') << ((ms / milliseconds(1)) % 1000); + return ss.str(); +} + + +class Session +{ +public: + Session(socket_ptr sock) : active_(false), socket_(sock) + { + } + + void sender() + { + try + { + for (;;) + { + shared_ptr chunk(chunks.pop()); + char* stream = chunk->serialize(); + size_t written(0); + size_t toWrite = sizeof(stream); + do + { + written += boost::asio::write(*socket_, boost::asio::buffer(stream + written, toWrite - written));//, error); + } + while (written < toWrite); + } + } + catch (std::exception& e) + { + std::cerr << "Exception in thread: " << e.what() << "\n"; + active_ = false; + } + } + + void start() + { + active_ = true; + senderThread = new thread(&Session::sender, this); +// readerThread.join(); + } + + void send(shared_ptr chunk) + { + if (!chunk) + return; + + while (chunks.size() * chunk->getDuration() > 10000) + chunks.pop(); + chunks.push(chunk); + } + + bool isActive() const + { + return active_; + } + +private: + bool active_; + socket_ptr socket_; + thread* senderThread; + Queue> chunks; +}; + + +class Server +{ +public: + Server(unsigned short port) : port_(port), headerChunk(NULL) + { + } + + void acceptor() + { + tcp::acceptor a(io_service_, tcp::endpoint(tcp::v4(), port_)); + for (;;) + { + socket_ptr sock(new tcp::socket(io_service_)); + a.accept(*sock); +// cout << "New connection: " << sock->remote_endpoint().address().to_string() << "\n"; + Session* session = new Session(sock); +cout << "Sending header: " << headerChunk->wireChunk->length << "\n"; + session->send(headerChunk); + session->start(); + sessions.insert(shared_ptr(session)); + } + } + + void setHeader(shared_ptr chunk) + { + if (chunk && (chunk->wireChunk != NULL)) + headerChunk = shared_ptr(chunk); + } + + void send(shared_ptr chunk) + { +// fwrite(chunk->wireChunk->payload, 1, chunk->wireChunk->length, stdout); + + for (std::set>::iterator it = sessions.begin(); it != sessions.end(); ) + { + if (!(*it)->isActive()) + { + cout << "Session inactive. Removing\n"; + sessions.erase(it++); + } + else + ++it; + } + + for (auto s : sessions) + s->send(chunk); + } + + void start() + { + acceptThread = new thread(&Server::acceptor, this); + } + + void stop() + { +// acceptThread->join(); + } + +private: + set> sessions; + boost::asio::io_service io_service_; + unsigned short port_; + shared_ptr headerChunk; + thread* acceptThread; +}; + + +class ServerException : public std::exception +{ +public: + ServerException(const std::string& what) : what_(what) + { + } + + virtual ~ServerException() throw() + { + } + + virtual const char* what() const throw() + { + return what_.c_str(); + } + +private: + std::string what_; +}; + + +int main(int argc, char* argv[]) +{ + WireChunk* chunk = new WireChunk(); + chunk->tv_sec = 21; + chunk->tv_usec = 2; + chunk->payloadSize = 5; + chunk->payload = (char*)malloc(5); +chunk->payload[0] = 99; + char* stream = chunk->serialize(); +cout << "1\n"; +for (size_t n=0; n<24; ++n) + cout << (int)stream[n] << " "; + delete chunk; +cout << "\n3\n"; + chunk = new WireChunk(); +cout << "4\n"; + chunk->deserialize(stream); +cout << "5\n"; + cout << chunk->tv_sec << ", " << chunk->tv_usec << ", " << chunk->payloadSize << ", " << chunk->payload << "\n"; +return 0; + + try + { + string sampleFormat; + + size_t port; + string fifoName; + string codec; + bool runAsDaemon; + + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port,p", po::value(&port)->default_value(98765), "port to listen on") + ("sampleformat,s", po::value(&sampleFormat)->default_value("48000:16:2"), "sample format") + ("codec,c", po::value(&codec)->default_value("ogg"), "transport codec [ogg|pcm]") + ("fifo,f", po::value(&fifoName)->default_value("/tmp/snapfifo"), "name of fifo file") + ("daemon,d", po::bool_switch(&runAsDaemon)->default_value(false), "daemonize") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) + { + cout << desc << "\n"; + return 1; + } + + + if (runAsDaemon) + { + daemonize(); + syslog (LOG_NOTICE, "First daemon started."); + } + + openlog ("firstdaemon", LOG_PID, LOG_DAEMON); + + using namespace std; // For atoi. + Server* server = new Server(port); + server->start(); + + timeval tvChunk; + gettimeofday(&tvChunk, NULL); + long nextTick = getTickCount(); + + mkfifo(fifoName.c_str(), 0777); +size_t duration = 50; + + SampleFormat format(sampleFormat); + std::auto_ptr encoder; + if (codec == "ogg") + encoder.reset(new OggEncoder(sampleFormat)); + else if (codec == "pcm") + encoder.reset(new PcmEncoder(sampleFormat)); + else + { + cout << "unknown codec: " << codec << "\n"; + return 1; + } + + shared_ptr header(new Chunk(format, encoder->getHeader())); + server->setHeader(header); + + while (!g_terminated) + { + int fd = open(fifoName.c_str(), O_RDONLY); + try + { + shared_ptr chunk;//(new WireChunk()); + while (true)//cin.good()) + { + chunk.reset(new Chunk(format, duration));//2*WIRE_CHUNK_SIZE)); + WireChunk* wireChunk = chunk->wireChunk; + int toRead = wireChunk->length; +// cout << "tr: " << toRead << ", size: " << WIRE_CHUNK_SIZE << "\t"; +// char* payload = (char*)(&chunk->payload[0]); + int len = 0; + do + { + int count = read(fd, wireChunk->payload + len, toRead - len); + if (count <= 0) + throw ServerException("count = " + boost::lexical_cast(count)); + + len += count; + } + while (len < toRead); + + wireChunk->tv_sec = tvChunk.tv_sec; + wireChunk->tv_usec = tvChunk.tv_usec; + double chunkDuration = encoder->encode(chunk.get()); + if (chunkDuration > 0) + server->send(chunk); +//cout << wireChunk->tv_sec << ", " << wireChunk->tv_usec / 1000 << "\n"; +// addUs(tvChunk, 1000*chunk->getDuration()); + addUs(tvChunk, chunkDuration * 1000); + nextTick += duration; + long currentTick = getTickCount(); + if (nextTick > currentTick) + { + usleep((nextTick - currentTick) * 1000); + } + else + { + cin.sync(); + gettimeofday(&tvChunk, NULL); + nextTick = getTickCount(); + } + } + } + catch(const std::exception& e) + { + std::cerr << "Exception: " << e.what() << std::endl; + } + close(fd); + } + + server->stop(); + } + catch (const std::exception& e) + { + std::cerr << "Exception: " << e.what() << std::endl; + } + + syslog (LOG_NOTICE, "First daemon terminated."); + closelog(); +} + + +*/ + +