From 50a4380616553c12bb24a017e607739e2ac36b2d Mon Sep 17 00:00:00 2001 From: badaix Date: Tue, 1 Nov 2016 13:00:04 +0100 Subject: [PATCH] process stream --- server/streamreader/fileStream.h | 2 +- server/streamreader/pipeStream.cpp | 1 - server/streamreader/pipeStream.h | 2 +- server/streamreader/process.hpp | 228 ++++++++++++++++++++++++++ server/streamreader/processStream.cpp | 115 ++++++++++++- server/streamreader/processStream.h | 18 +- 6 files changed, 357 insertions(+), 9 deletions(-) create mode 100644 server/streamreader/process.hpp diff --git a/server/streamreader/fileStream.h b/server/streamreader/fileStream.h index 2acbd466..676c5827 100644 --- a/server/streamreader/fileStream.h +++ b/server/streamreader/fileStream.h @@ -37,7 +37,7 @@ public: virtual ~FileStream(); protected: - void worker(); + virtual void worker(); std::ifstream ifs; }; diff --git a/server/streamreader/pipeStream.cpp b/server/streamreader/pipeStream.cpp index 8eb3f080..4dbb3c9f 100644 --- a/server/streamreader/pipeStream.cpp +++ b/server/streamreader/pipeStream.cpp @@ -104,7 +104,6 @@ void PipeStream::worker() if (nextTick >= currentTick) { -// logO << "sleep: " << nextTick - currentTick << "\n"; setState(kPlaying); chronos::sleep(nextTick - currentTick); } diff --git a/server/streamreader/pipeStream.h b/server/streamreader/pipeStream.h index 466174d8..7771270e 100644 --- a/server/streamreader/pipeStream.h +++ b/server/streamreader/pipeStream.h @@ -37,7 +37,7 @@ public: virtual ~PipeStream(); protected: - void worker(); + virtual void worker(); int fd_; }; diff --git a/server/streamreader/process.hpp b/server/streamreader/process.hpp new file mode 100644 index 00000000..68867880 --- /dev/null +++ b/server/streamreader/process.hpp @@ -0,0 +1,228 @@ +#ifndef TINY_PROCESS_LIBRARY_HPP_ +#define TINY_PROCESS_LIBRARY_HPP_ + +#include +#include +#include +#include +#include + + + +///Create a new process given command and run path. +///TODO: on Windows it is harder to specify which pipes to redirect. +///Thus, at the moment, if read_stdout==nullptr, read_stderr==nullptr and open_stdin==false, +///the stdout, stderr and stdin are sent to the parent process instead. +///Compile with -DMSYS_PROCESS_USE_SH to run command using "sh -c [command]" on Windows as well. +class Process { + +public: + typedef int fd_type; + + Process(const std::string &command, const std::string &path = "") : closed(true) + { + open(command, path); + } + + ~Process() + { + close_fds(); + } + + ///Get the process id of the started process. + pid_t getPid() + { + return pid; + } + + ///Write to stdin. Convenience function using write(const char *, size_t). + bool write(const std::string &data) + { + return write(data.c_str(), data.size()); + } + + ///Wait until process is finished, and return exit status. + int get_exit_status() + { + if (pid <= 0) + return -1; + + int exit_status; + waitpid(pid, &exit_status, 0); + { + std::lock_guard lock(close_mutex); + closed=true; + } + close_fds(); + + if (exit_status >= 256) + exit_status = exit_status>>8; + + return exit_status; + } + + ///Write to stdin. + bool write(const char *bytes, size_t n) + { + std::lock_guard lock(stdin_mutex); + if (::write(stdin_fd, bytes, n)>=0) + return true; + else + return false; + } + + ///Close stdin. If the process takes parameters from stdin, use this to notify that all parameters have been sent. + void close_stdin() + { + std::lock_guard lock(stdin_mutex); + if (pid > 0) + close(stdin_fd); + } + + ///Kill the process. + void kill(bool force=false) + { + std::lock_guard lock(close_mutex); + if (pid > 0 && !closed) + { + if(force) + ::kill(-pid, SIGTERM); + else + ::kill(-pid, SIGINT); + } + } + + ///Kill a given process id. Use kill(bool force) instead if possible. + static void kill(pid_t id, bool force=false) + { + if (id <= 0) + return; + if (force) + ::kill(-id, SIGTERM); + else + ::kill(-id, SIGINT); + } + + fd_type getStdout() + { + return stdout_fd; + } + + fd_type getStderr() + { + return stderr_fd; + } + + fd_type getStdin() + { + return stdin_fd; + } + + +private: + pid_t pid; + bool closed; + std::mutex close_mutex; + std::mutex stdin_mutex; + + fd_type stdout_fd, stderr_fd, stdin_fd; + + void closePipe(int pipefd[2]) + { + close(pipefd[0]); + close(pipefd[1]); + } + + pid_t open(const std::string &command, const std::string &path) + { + int stdin_p[2], stdout_p[2], stderr_p[2]; + + if (pipe(stdin_p) != 0) + return -1; + + if (pipe(stdout_p) != 0) + { + closePipe(stdin_p); + return -1; + } + + if (pipe(stderr_p) != 0) + { + closePipe(stdin_p); + closePipe(stdout_p); + return -1; + } + + pid = fork(); + + if (pid < 0) + { + closePipe(stdin_p); + closePipe(stdout_p); + closePipe(stderr_p); + return pid; + } + else if (pid == 0) + { + dup2(stdin_p[0], 0); + dup2(stdout_p[1], 1); + dup2(stderr_p[1], 2); + + closePipe(stdin_p); + closePipe(stdout_p); + closePipe(stderr_p); + + //Based on http://stackoverflow.com/a/899533/3808293 + int fd_max = sysconf(_SC_OPEN_MAX); + for (int fd=3; fd 0) + { + close(stdout_fd); + close(stderr_fd); + } + } + +}; + +#endif // TINY_PROCESS_LIBRARY_HPP_ + diff --git a/server/streamreader/processStream.cpp b/server/streamreader/processStream.cpp index f7f07914..5fb18fa0 100644 --- a/server/streamreader/processStream.cpp +++ b/server/streamreader/processStream.cpp @@ -17,7 +17,11 @@ ***/ +#include +#include #include "processStream.h" +#include "common/snapException.h" +#include "common/utils.h" #include "common/log.h" @@ -26,13 +30,95 @@ using namespace std; -ProcessStream::ProcessStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri) +bool ProcessStream::fileExists(const std::string& name) { + struct stat buffer; + return (stat (name.c_str(), &buffer) == 0); +} + + +std::string ProcessStream::findExe(const std::string& name) +{ + if (fileExists(name)) + return name; + + std::string exe = name; + if (exe.find("/") != string::npos) + exe = exe.substr(exe.find_last_of("/") + 1); + + string whereis = execGetOutput("whereis " + exe); + if (whereis.find(":") != std::string::npos) + { + whereis = trim_copy(whereis.substr(whereis.find(":") + 1)); + if (!whereis.empty()) + return whereis; + } + + char buff[PATH_MAX]; + char szTmp[32]; + sprintf(szTmp, "/proc/%d/exe", getpid()); + ssize_t len = readlink(szTmp, buff, sizeof(buff)-1); + if (len != -1) + { + buff[len] = '\0'; + return string(buff) + "/" + exe; + } + + return ""; +} + + +ProcessStream::ProcessStream(PcmListener* pcmListener, const StreamUri& uri) : PcmStream(pcmListener, uri), path(""), process_(nullptr) +{ + logO << "ProcessStream: " << uri_.getQuery("params") << "\n"; + + exe = findExe(uri.path); + if (exe.find("/") != string::npos) + path = exe.substr(0, exe.find_last_of("/")); + + if (!fileExists(exe)) + throw SnapException("file not found: \"" + uri.path + "\""); +// const auto& queries = uri_.query; } ProcessStream::~ProcessStream() { + process_->kill(); +} + + +void ProcessStream::start() +{ + PcmStream::start(); +} + + +void ProcessStream::stop() +{ + if (process_) + process_->kill(); + PcmStream::stop(); + stderrReaderThread_.join(); +} + + +void ProcessStream::stderrReader() +{ + size_t buffer_size = 8192; + auto buffer = std::unique_ptr(new char[buffer_size]); + ssize_t n; + stringstream message; + while (active_ && (n=read(process_->getStderr(), buffer.get(), buffer_size)) > 0) + { + string logmsg(buffer.get(), n); + if ((logmsg.find("allocated stream") == string::npos) && + (logmsg.find("Got channel") == string::npos) && + (logmsg.size() > 4)) + { + logO << logmsg; + } + } } @@ -45,6 +131,10 @@ void ProcessStream::worker() while (active_) { + process_.reset(new Process(exe + " " + uri_.getQuery("params"), path)); + stderrReaderThread_ = thread(&ProcessStream::stderrReader, this); + stderrReaderThread_.detach(); + gettimeofday(&tvChunk, NULL); tvEncodedChunk_ = tvChunk; long nextTick = chronos::getTickCount(); @@ -54,10 +144,22 @@ void ProcessStream::worker() { chunk->timestamp.sec = tvChunk.tv_sec; chunk->timestamp.usec = tvChunk.tv_usec; - size_t toRead = chunk->payloadSize; - size_t count = 0; - -//// read + int toRead = chunk->payloadSize; + int len = 0; + do + { + int count = read(process_->getStdout(), chunk->payload + len, toRead - len); + if (count < 0) + { + setState(kIdle); + chronos::sleep(100); + } + else if (count == 0) + throw SnapException("end of file"); + else + len += count; + } + while ((len < toRead) && active_); encoder_->encode(chunk.get()); nextTick += pcmReadMs_; @@ -66,6 +168,7 @@ void ProcessStream::worker() if (nextTick >= currentTick) { + setState(kPlaying); chronos::sleep(nextTick - currentTick); } else @@ -80,6 +183,8 @@ void ProcessStream::worker() catch(const std::exception& e) { logE << "Exception: " << e.what() << std::endl; + process_->kill(); + chronos::sleep(100); } } } diff --git a/server/streamreader/processStream.h b/server/streamreader/processStream.h index 56002dcb..33d448c8 100644 --- a/server/streamreader/processStream.h +++ b/server/streamreader/processStream.h @@ -19,7 +19,11 @@ #ifndef PROCESS_STREAM_H #define PROCESS_STREAM_H +#include +#include + #include "pcmStream.h" +#include "process.hpp" /// Starts an external process and reads and PCM data from stdout @@ -35,8 +39,20 @@ public: ProcessStream(PcmListener* pcmListener, const StreamUri& uri); virtual ~ProcessStream(); + virtual void start(); + virtual void stop(); + protected: - void worker(); + std::string exe; + std::string path; + std::unique_ptr process_; + std::thread stderrReaderThread_; + + virtual void worker(); + void stderrReader(); + + bool fileExists(const std::string& name); + std::string findExe(const std::string& name); };