From 18b6fa0a032bc17ad7b0f7367875ba788ccc80c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 5 Feb 2021 13:58:02 +0100 Subject: [PATCH] remove config from capture managers. --- internal/capture/broadcast.go | 47 +++++++++++-------------- internal/capture/gst/gst.go | 53 +++++++++++++--------------- internal/capture/manager.go | 49 +++++++++++++++++++++----- internal/capture/screencast.go | 64 +++++++++++++--------------------- internal/capture/stream.go | 52 ++++++++++++--------------- 5 files changed, 133 insertions(+), 132 deletions(-) diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index d481b07c..e28ac174 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -3,30 +3,29 @@ package capture import ( "fmt" "sync" + "strings" "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "demodesk/neko/internal/config" "demodesk/neko/internal/capture/gst" ) type BroacastManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - config *config.Capture - pipeline *gst.Pipeline - enabled bool - url string + logger zerolog.Logger + mu sync.Mutex + pipelineStr string + pipeline *gst.Pipeline + enabled bool + url string } -func broadcastNew(config *config.Capture) *BroacastManagerCtx { +func broadcastNew(pipelineStr string) *BroacastManagerCtx { return &BroacastManagerCtx{ - logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), - mu: sync.Mutex{}, - config: config, - enabled: false, - url: "", + logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), + pipelineStr: pipelineStr, + enabled: false, + url: "", } } @@ -68,30 +67,24 @@ func (manager *BroacastManagerCtx) Url() string { func (manager *BroacastManagerCtx) createPipeline() error { if manager.pipeline != nil { - return fmt.Errorf("pipeline already running") + return fmt.Errorf("pipeline already exists") } var err error + // replace {url} with valid URL + pipelineStr := strings.Replace(manager.pipelineStr, "{url}", manager.url, 1) + manager.logger.Info(). - Str("audio_device", manager.config.Device). - Str("video_display", manager.config.Display). - Str("broadcast_pipeline", manager.config.BroadcastPipeline). - Msgf("creating pipeline") - - manager.pipeline, err = gst.CreateRTMPPipeline( - manager.config.Device, - manager.config.Display, - manager.config.BroadcastPipeline, - manager.url, - ) + Str("str", pipelineStr). + Msgf("starting pipeline") + manager.pipeline, err = gst.CreatePipeline(pipelineStr) if err != nil { return err } manager.pipeline.Play() - manager.logger.Info().Msgf("starting pipeline") return nil } @@ -101,6 +94,6 @@ func (manager *BroacastManagerCtx) destroyPipeline() { } manager.pipeline.Stop() - manager.logger.Info().Msgf("stopping pipeline") + manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil } diff --git a/internal/capture/gst/gst.go b/internal/capture/gst/gst.go index 48a17ba7..c439528c 100644 --- a/internal/capture/gst/gst.go +++ b/internal/capture/gst/gst.go @@ -39,35 +39,36 @@ func init() { registry = C.gst_registry_get() } -// CreateRTMPPipeline creates a GStreamer Pipeline -func CreateRTMPPipeline(pipelineDevice string, pipelineDisplay string, pipelineSrc string, pipelineRTMP string) (*Pipeline, error) { - video := fmt.Sprintf(videoSrc, pipelineDisplay) - audio := fmt.Sprintf(audioSrc, pipelineDevice) +func GetRTMPPipeline(audioDevice string, videoDisplay string, pipelineSrc string) string { + video := fmt.Sprintf(videoSrc, videoDisplay) + audio := fmt.Sprintf(audioSrc, audioDevice) var pipelineStr string if pipelineSrc != "" { - pipelineStr = fmt.Sprintf(pipelineSrc, pipelineRTMP, pipelineDevice, pipelineDisplay) + pipelineStr = fmt.Sprintf(pipelineSrc, audioDevice, videoDisplay) } else { - pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", pipelineRTMP, audio, video) + pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='{url} live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", audio, video) } - return CreatePipeline(pipelineStr) + return pipelineStr } -// CreateJPEGPipeline creates a GStreamer Pipeline -func CreateJPEGPipeline(pipelineDisplay string, pipelineSrc string, rate string, quality string) (*Pipeline, error) { +func GetJPEGPipeline(videoDisplay string, pipelineSrc string, rate string, quality string) string { var pipelineStr string if pipelineSrc != "" { - pipelineStr = fmt.Sprintf(pipelineSrc, pipelineDisplay) + pipelineStr = fmt.Sprintf(pipelineSrc, videoDisplay) } else { - pipelineStr = fmt.Sprintf("ximagesrc display-name=%s show-pointer=true use-damage=false ! videoconvert ! videoscale ! videorate ! video/x-raw,framerate=%s ! jpegenc quality=%s" + appSink, pipelineDisplay, rate, quality) + pipelineStr = fmt.Sprintf("ximagesrc display-name=%s show-pointer=true use-damage=false ! videoconvert ! videoscale ! videorate ! video/x-raw,framerate=%s ! jpegenc quality=%s" + appSink, videoDisplay, rate, quality) } - return CreatePipeline(pipelineStr) + return pipelineStr } -// CreateAppPipeline creates a GStreamer Pipeline -func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineSrc string) (*Pipeline, error) { +func GetAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineSrc string) (string, error) { + if pipelineSrc != "" { + return fmt.Sprintf(pipelineSrc + appSink, pipelineDevice), nil + } + var pipelineStr string switch codecRTP.Name { @@ -75,7 +76,7 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS // https://gstreamer.freedesktop.org/documentation/vpx/vp8enc.html // gstreamer1.0-plugins-good if err := CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil { - return nil, err + return "", err } pipelineStr = fmt.Sprintf(videoSrc + "vp8enc cpu-used=16 threads=4 deadline=1 error-resilient=partitions keyframe-max-dist=15 static-threshold=20" + appSink, pipelineDevice) @@ -83,14 +84,14 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS // https://gstreamer.freedesktop.org/documentation/vpx/vp9enc.html // gstreamer1.0-plugins-good if err := CheckPlugins([]string{"ximagesrc", "vpx"}); err != nil { - return nil, err + return "", err } pipelineStr = fmt.Sprintf(videoSrc + "vp9enc cpu-used=16 threads=4 deadline=1 keyframe-max-dist=15 static-threshold=20" + appSink, pipelineDevice) case "h264": var err error if err = CheckPlugins([]string{"ximagesrc"}); err != nil { - return nil, err + return "", err } // https://gstreamer.freedesktop.org/documentation/x264/index.html @@ -107,12 +108,12 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS break } - return nil, err + return "", err case "opus": // https://gstreamer.freedesktop.org/documentation/opus/opusenc.html // gstreamer1.0-plugins-base if err := CheckPlugins([]string{"pulseaudio", "opus"}); err != nil { - return nil, err + return "", err } pipelineStr = fmt.Sprintf(audioSrc + "opusenc bitrate=128000" + appSink, pipelineDevice) @@ -120,7 +121,7 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS // https://gstreamer.freedesktop.org/documentation/libav/avenc_g722.html // gstreamer1.0-libav if err := CheckPlugins([]string{"pulseaudio", "libav"}); err != nil { - return nil, err + return "", err } pipelineStr = fmt.Sprintf(audioSrc + "avenc_g722" + appSink, pipelineDevice) @@ -128,7 +129,7 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS // https://gstreamer.freedesktop.org/documentation/mulaw/mulawenc.html // gstreamer1.0-plugins-good if err := CheckPlugins([]string{"pulseaudio", "mulaw"}); err != nil { - return nil, err + return "", err } pipelineStr = fmt.Sprintf(audioSrc + "audio/x-raw, rate=8000 ! mulawenc" + appSink, pipelineDevice) @@ -136,19 +137,15 @@ func CreateAppPipeline(codecRTP codec.RTPCodec, pipelineDevice string, pipelineS // https://gstreamer.freedesktop.org/documentation/alaw/alawenc.html // gstreamer1.0-plugins-good if err := CheckPlugins([]string{"pulseaudio", "alaw"}); err != nil { - return nil, err + return "", err } pipelineStr = fmt.Sprintf(audioSrc + "audio/x-raw, rate=8000 ! alawenc" + appSink, pipelineDevice) default: - return nil, fmt.Errorf("unknown codec %s", codecRTP.Name) + return "", fmt.Errorf("unknown codec %s", codecRTP.Name) } - if pipelineSrc != "" { - pipelineStr = fmt.Sprintf(pipelineSrc + appSink, pipelineDevice) - } - - return CreatePipeline(pipelineStr) + return pipelineStr, nil } // CreatePipeline creates a GStreamer Pipeline diff --git a/internal/capture/manager.go b/internal/capture/manager.go index e729aab2..a75434af 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -8,12 +8,12 @@ import ( "demodesk/neko/internal/types" "demodesk/neko/internal/config" + "demodesk/neko/internal/capture/gst" ) type CaptureManagerCtx struct { logger zerolog.Logger mu sync.Mutex - config *config.Capture desktop types.DesktopManager streaming bool broadcast *BroacastManagerCtx @@ -23,16 +23,49 @@ type CaptureManagerCtx struct { } func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { + logger := log.With().Str("module", "capture").Logger() + + broadcastPipeline := gst.GetRTMPPipeline( + config.Device, + config.Display, + config.BroadcastPipeline, + ) + + screencastPipeline := gst.GetJPEGPipeline( + config.Display, + config.ScreencastPipeline, + config.ScreencastRate, + config.ScreencastQuality, + ) + + audioPipeline, err := gst.GetAppPipeline( + config.AudioCodec, + config.Device, + config.AudioParams, + ) + + if err != nil { + logger.Panic().Err(err).Msg("unable to get pipeline") + } + + videoPipeline, err := gst.GetAppPipeline( + config.VideoCodec, + config.Display, + config.VideoParams, + ) + + if err != nil { + logger.Panic().Err(err).Msg("unable to get pipeline") + } + return &CaptureManagerCtx{ - logger: log.With().Str("module", "capture").Logger(), - mu: sync.Mutex{}, - config: config, + logger: logger, desktop: desktop, streaming: false, - broadcast: broadcastNew(config), - screencast: screencastNew(config), - audio: streamNew(config.AudioCodec, config.Device, config.AudioParams), - video: streamNew(config.VideoCodec, config.Display, config.VideoParams), + broadcast: broadcastNew(broadcastPipeline), + screencast: screencastNew(config.Screencast, screencastPipeline), + audio: streamNew(config.AudioCodec, audioPipeline), + video: streamNew(config.VideoCodec, videoPipeline), } } diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index bbac4766..cfce69f9 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -9,41 +9,38 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "demodesk/neko/internal/config" "demodesk/neko/internal/types" "demodesk/neko/internal/capture/gst" ) type ScreencastManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - config *config.Capture - pipeline *gst.Pipeline - enabled bool - started bool - emitStop chan bool - emitUpdate chan bool - expired int32 - sample chan types.Sample - image types.Sample + logger zerolog.Logger + mu sync.Mutex + pipelineStr string + pipeline *gst.Pipeline + enabled bool + started bool + emitStop chan bool + emitUpdate chan bool + expired int32 + sample chan types.Sample + image types.Sample } -func screencastNew(config *config.Capture) *ScreencastManagerCtx { - manager := &ScreencastManagerCtx{ - logger: log.With().Str("module", "capture").Str("submodule", "screencast").Logger(), - config: config, - enabled: config.Screencast, - started: false, - emitStop: make(chan bool), - emitUpdate: make(chan bool), - } +const screencastTimeout = 5 * time.Second - if !manager.enabled { - return manager +func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { + manager := &ScreencastManagerCtx{ + logger: log.With().Str("module", "capture").Str("submodule", "screencast").Logger(), + pipelineStr: pipelineStr, + enabled: enabled, + started: false, + emitStop: make(chan bool), + emitUpdate: make(chan bool), } go func() { - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(screencastTimeout) manager.logger.Debug().Msg("started emitting samples") for { @@ -133,31 +130,20 @@ func (manager *ScreencastManagerCtx) stop() { func (manager *ScreencastManagerCtx) createPipeline() error { if manager.pipeline != nil { - return fmt.Errorf("pipeline already running") + return fmt.Errorf("pipeline already exists") } var err error manager.logger.Info(). - Str("video_display", manager.config.Display). - Str("screencast_pipeline", manager.config.ScreencastPipeline). + Str("str", manager.pipelineStr). Msgf("creating pipeline") - manager.pipeline, err = gst.CreateJPEGPipeline( - manager.config.Display, - manager.config.ScreencastPipeline, - manager.config.ScreencastRate, - manager.config.ScreencastQuality, - ) - + manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr) if err != nil { return err } - manager.logger.Info(). - Str("src", manager.pipeline.Src). - Msgf("starting pipeline") - manager.pipeline.Start() manager.sample = manager.pipeline.Sample manager.emitUpdate <-true @@ -170,6 +156,6 @@ func (manager *ScreencastManagerCtx) destroyPipeline() { } manager.pipeline.Stop() - manager.logger.Info().Msgf("stopping pipeline") + manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil } diff --git a/internal/capture/stream.go b/internal/capture/stream.go index b374b032..5c7df80e 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -14,30 +14,27 @@ import ( ) type StreamManagerCtx struct { - logger zerolog.Logger - mu sync.Mutex - codec codec.RTPCodec - pipelineDevice string - pipelineSrc string - pipeline *gst.Pipeline - sample chan types.Sample - emmiter events.EventEmmiter - emitUpdate chan bool - emitStop chan bool - enabled bool + logger zerolog.Logger + mu sync.Mutex + codec codec.RTPCodec + pipelineStr string + pipeline *gst.Pipeline + sample chan types.Sample + emmiter events.EventEmmiter + emitUpdate chan bool + emitStop chan bool + enabled bool } -func streamNew(codec codec.RTPCodec, pipelineDevice string, pipelineSrc string) *StreamManagerCtx { +func streamNew(codec codec.RTPCodec, pipelineStr string) *StreamManagerCtx { manager := &StreamManagerCtx{ - logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(), - mu: sync.Mutex{}, - codec: codec, - pipelineDevice: pipelineDevice, - pipelineSrc: pipelineSrc, - emmiter: events.New(), - emitUpdate: make(chan bool), - emitStop: make(chan bool), - enabled: false, + logger: log.With().Str("module", "capture").Str("submodule", "stream").Logger(), + codec: codec, + pipelineStr: pipelineStr, + emmiter: events.New(), + emitUpdate: make(chan bool), + emitStop: make(chan bool), + enabled: false, } go func() { @@ -103,7 +100,7 @@ func (manager *StreamManagerCtx) Enabled() bool { func (manager *StreamManagerCtx) createPipeline() error { if manager.pipeline != nil { - return fmt.Errorf("pipeline already running") + return fmt.Errorf("pipeline already exists") } var err error @@ -111,19 +108,14 @@ func (manager *StreamManagerCtx) createPipeline() error { codec := manager.Codec() manager.logger.Info(). Str("codec", codec.Name). - Str("device", manager.pipelineDevice). - Str("src", manager.pipelineSrc). + Str("src", manager.pipelineStr). Msgf("creating pipeline") - manager.pipeline, err = gst.CreateAppPipeline(codec, manager.pipelineDevice, manager.pipelineSrc) + manager.pipeline, err = gst.CreatePipeline(manager.pipelineStr) if err != nil { return err } - manager.logger.Info(). - Str("src", manager.pipeline.Src). - Msgf("starting pipeline") - manager.pipeline.Start() manager.sample = manager.pipeline.Sample @@ -137,6 +129,6 @@ func (manager *StreamManagerCtx) destroyPipeline() { } manager.pipeline.Stop() - manager.logger.Info().Msgf("stopping pipeline") + manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil }