Codec in header message

git-svn-id: svn://elaine/murooma/trunk@286 d8a302eb-03bc-478d-80e4-98257eca68ef
This commit is contained in:
(no author) 2014-09-21 10:37:14 +00:00
parent a833a00cb1
commit e0a6ac9467
9 changed files with 36 additions and 31 deletions

View file

@ -83,14 +83,15 @@ void Controller::worker()
while (!(sampleFormat = clientConnection->sendReq<SampleFormat>(&requestMsg, 1000))); while (!(sampleFormat = clientConnection->sendReq<SampleFormat>(&requestMsg, 1000)));
cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n"; cout << "SampleFormat rate: " << sampleFormat->rate << ", bits: " << sampleFormat->bits << ", channels: " << sampleFormat->channels << "\n";
decoder = new OggDecoder();
if (decoder != NULL)
{
requestMsg.request = "headerChunk"; requestMsg.request = "headerChunk";
shared_ptr<HeaderMessage> headerChunk(NULL); shared_ptr<HeaderMessage> headerChunk(NULL);
while (!(headerChunk = clientConnection->sendReq<HeaderMessage>(&requestMsg, 1000))); while (!(headerChunk = clientConnection->sendReq<HeaderMessage>(&requestMsg, 1000)));
cout << "Codec: " << headerChunk->codec << "\n";
if (headerChunk->codec == "ogg")
decoder = new OggDecoder();
else if (headerChunk->codec == "pcm")
decoder = new PcmDecoder();
decoder->setHeader(headerChunk.get()); decoder->setHeader(headerChunk.get());
}
RequestMsg timeReq("time"); RequestMsg timeReq("time");
for (size_t n=0; n<10; ++n) for (size_t n=0; n<10; ++n)
@ -124,7 +125,7 @@ void Controller::worker()
{ {
double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.; double latency = (reply->received.sec - reply->sent.sec) + (reply->received.usec - reply->sent.usec) / 1000000.;
TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2); TimeProvider::getInstance().setDiffToServer((reply->latency - latency) * 1000 / 2);
cout << "Median: " << TimeProvider::getInstance().getDiffToServer() << "\n"; // cout << "Median: " << TimeProvider::getInstance().getDiffToServer() << "\n";
} }
} }
} }

View file

@ -8,7 +8,7 @@
class HeaderMessage : public BaseMessage class HeaderMessage : public BaseMessage
{ {
public: public:
HeaderMessage(size_t size = 0) : BaseMessage(message_type::header), payloadSize(size) HeaderMessage(const std::string& codecName = "", size_t size = 0) : BaseMessage(message_type::header), payloadSize(size), codec(codecName)
{ {
payload = (char*)malloc(size); payload = (char*)malloc(size);
} }
@ -20,6 +20,11 @@ public:
virtual void read(std::istream& stream) virtual void read(std::istream& stream)
{ {
int16_t size;
stream.read(reinterpret_cast<char *>(&size), sizeof(int16_t));
codec.resize(size);
stream.read(&codec[0], size);
stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t)); stream.read(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
payload = (char*)realloc(payload, payloadSize); payload = (char*)realloc(payload, payloadSize);
stream.read(payload, payloadSize); stream.read(payload, payloadSize);
@ -27,15 +32,19 @@ public:
virtual uint32_t getSize() virtual uint32_t getSize()
{ {
return sizeof(uint32_t) + payloadSize; return sizeof(int16_t) + codec.size() + sizeof(uint32_t) + payloadSize;
} }
uint32_t payloadSize; uint32_t payloadSize;
char* payload; char* payload;
std::string codec;
protected: protected:
virtual void doserialize(std::ostream& stream) virtual void doserialize(std::ostream& stream)
{ {
int16_t size(codec.size());
stream.write(reinterpret_cast<char *>(&size), sizeof(int16_t));
stream.write(codec.c_str(), size);
stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t)); stream.write(reinterpret_cast<char *>(&payloadSize), sizeof(uint32_t));
stream.write(payload, payloadSize); stream.write(payload, payloadSize);
} }

View file

@ -40,7 +40,7 @@ void ControlServer::onMessageReceived(ServerSession* connection, const BaseMessa
{ {
RequestMsg requestMsg; RequestMsg requestMsg;
requestMsg.deserialize(baseMessage, buffer); requestMsg.deserialize(baseMessage, buffer);
cout << "request: " << requestMsg.request << "\n"; // cout << "request: " << requestMsg.request << "\n";
if (requestMsg.request == "time") if (requestMsg.request == "time")
{ {
// timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec); // timeMsg.latency = (timeMsg.received.sec - timeMsg.sent.sec) * 1000000 + (timeMsg.received.usec - timeMsg.sent.usec);

View file

@ -8,18 +8,26 @@
class Encoder class Encoder
{ {
public: public:
Encoder(const SampleFormat& format) : sampleFormat(format) Encoder(const SampleFormat& format) : sampleFormat(format), headerChunk(NULL)
{ {
} }
virtual ~Encoder()
{
if (headerChunk != NULL)
delete headerChunk;
}
virtual double encode(PcmChunk* chunk) = 0; virtual double encode(PcmChunk* chunk) = 0;
virtual HeaderMessage* getHeader() virtual HeaderMessage* getHeader()
{ {
return NULL; return headerChunk;
} }
protected: protected:
SampleFormat sampleFormat; SampleFormat sampleFormat;
HeaderMessage* headerChunk;
}; };

View file

@ -5,18 +5,13 @@
using namespace std; using namespace std;
OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format), headerChunk(NULL) OggEncoder::OggEncoder(const SampleFormat& format) : Encoder(format)
{ {
init(); init();
lastGranulepos = -1; lastGranulepos = -1;
} }
HeaderMessage* OggEncoder::getHeader()
{
return headerChunk;
}
double OggEncoder::encode(PcmChunk* chunk) double OggEncoder::encode(PcmChunk* chunk)
{ {
@ -170,7 +165,7 @@ void OggEncoder::init()
*/ */
// while(!eos){ // while(!eos){
size_t pos(0); size_t pos(0);
headerChunk = new HeaderMessage(); headerChunk = new HeaderMessage("ogg");
while (true) while (true)
{ {
int result=ogg_stream_flush(&os,&og); int result=ogg_stream_flush(&os,&og);

View file

@ -9,7 +9,6 @@ class OggEncoder : public Encoder
public: public:
OggEncoder(const SampleFormat& format); OggEncoder(const SampleFormat& format);
virtual double encode(PcmChunk* chunk); virtual double encode(PcmChunk* chunk);
virtual HeaderMessage* getHeader();
private: private:
void init(); void init();
@ -31,7 +30,6 @@ private:
ogg_packet header_code; ogg_packet header_code;
ogg_int64_t lastGranulepos; ogg_int64_t lastGranulepos;
HeaderMessage* headerChunk;
int eos=0,ret; int eos=0,ret;
int i, founddata; int i, founddata;

View file

@ -2,6 +2,7 @@
PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format) PcmEncoder::PcmEncoder(const SampleFormat& format) : Encoder(format)
{ {
headerChunk = new HeaderMessage("pcm");
} }

View file

@ -92,10 +92,8 @@ void ServerSession::add(shared_ptr<BaseMessage> message)
bool ServerSession::send(BaseMessage* message) bool ServerSession::send(BaseMessage* message)
{ {
std::unique_lock<std::mutex> mlock(mutex_); std::unique_lock<std::mutex> mlock(mutex_);
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n"; if (!socket)
if (!connected())
return false; return false;
//cout << "send: " << message->type << ", size: " << message->getSize() << "\n";
boost::asio::streambuf streambuf; boost::asio::streambuf streambuf;
std::ostream stream(&streambuf); std::ostream stream(&streambuf);
tv t; tv t;

View file

@ -34,11 +34,6 @@ public:
void stop(); void stop();
bool send(BaseMessage* message); bool send(BaseMessage* message);
void add(std::shared_ptr<BaseMessage> message); void add(std::shared_ptr<BaseMessage> message);
virtual bool connected()
{
return (socket != 0);
// return (connected_ && socket);
}
virtual bool active() virtual bool active()
{ {