From 555fd803bcc431d7abc82f28a07738796a5976b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Sun, 19 Jun 2022 01:03:16 +0200 Subject: [PATCH] add active pipelines. --- internal/capture/broadcast.go | 16 +++++++++++++++ internal/capture/screencast.go | 36 ++++++++++++++++++++++++---------- internal/capture/streamsink.go | 17 ++++++++++++++++ internal/capture/streamsrc.go | 20 ++++++++++++++++++- 4 files changed, 78 insertions(+), 11 deletions(-) diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index 4e860f03..70f955de 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -26,6 +26,7 @@ type BroacastManagerCtx struct { // metrics pipelinesCounter prometheus.Counter + pipelinesActive prometheus.Gauge } func broadcastNew(pipelineStr string) *BroacastManagerCtx { @@ -53,6 +54,18 @@ func broadcastNew(pipelineStr string) *BroacastManagerCtx { "codec_type": "-", }, }), + pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pipelines_active", + Namespace: "neko", + Subsystem: "capture", + Help: "Total number of active pipelines.", + ConstLabels: map[string]string{ + "submodule": "broadcast", + "video_id": "main", + "codec_name": "-", + "codec_type": "-", + }, + }), } } @@ -122,6 +135,7 @@ func (manager *BroacastManagerCtx) createPipeline() error { manager.pipeline.Play() manager.pipelinesCounter.Inc() + manager.pipelinesActive.Set(1) return nil } @@ -137,4 +151,6 @@ func (manager *BroacastManagerCtx) destroyPipeline() { manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil + + manager.pipelinesActive.Set(0) } diff --git a/internal/capture/screencast.go b/internal/capture/screencast.go index a8314d31..7bb73e60 100644 --- a/internal/capture/screencast.go +++ b/internal/capture/screencast.go @@ -38,6 +38,7 @@ type ScreencastManagerCtx struct { // metrics imagesCounter prometheus.Counter pipelinesCounter prometheus.Counter + pipelinesActive prometheus.Gauge } func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { @@ -72,6 +73,18 @@ func screencastNew(enabled bool, pipelineStr string) *ScreencastManagerCtx { "codec_type": "-", }, }), + pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pipelines_active", + Namespace: "neko", + Subsystem: "capture", + Help: "Total number of active pipelines.", + ConstLabels: map[string]string{ + "submodule": "screencast", + "video_id": "main", + "codec_name": "-", + "codec_type": "-", + }, + }), } manager.wg.Add(1) @@ -185,6 +198,7 @@ func (manager *ScreencastManagerCtx) createPipeline() error { manager.pipeline.AttachAppsink("appsink") manager.pipeline.Play() manager.pipelinesCounter.Inc() + manager.pipelinesActive.Set(1) // get first image select { @@ -192,11 +206,7 @@ func (manager *ScreencastManagerCtx) createPipeline() error { if !ok { return errors.New("unable to get first image") } else { - manager.imageMu.Lock() - manager.image = image - manager.imageMu.Unlock() - - manager.imagesCounter.Inc() + manager.setImage(image) } case <-time.After(1 * time.Second): return errors.New("timeouted while waiting for first image") @@ -216,17 +226,21 @@ func (manager *ScreencastManagerCtx) createPipeline() error { return } - manager.imageMu.Lock() - manager.image = image - manager.imageMu.Unlock() - - manager.imagesCounter.Inc() + manager.setImage(image) } }() return nil } +func (manager *ScreencastManagerCtx) setImage(image types.Sample) { + manager.imageMu.Lock() + manager.image = image + manager.imageMu.Unlock() + + manager.imagesCounter.Inc() +} + func (manager *ScreencastManagerCtx) destroyPipeline() { manager.pipelineMu.Lock() defer manager.pipelineMu.Unlock() @@ -238,4 +252,6 @@ func (manager *ScreencastManagerCtx) destroyPipeline() { manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil + + manager.pipelinesActive.Set(0) } diff --git a/internal/capture/streamsink.go b/internal/capture/streamsink.go index 8de12b9b..354e829b 100644 --- a/internal/capture/streamsink.go +++ b/internal/capture/streamsink.go @@ -34,6 +34,7 @@ type StreamSinkManagerCtx struct { // metrics currentListeners prometheus.Gauge pipelinesCounter prometheus.Counter + pipelinesActive prometheus.Gauge } func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id string) *StreamSinkManagerCtx { @@ -72,6 +73,18 @@ func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id str "codec_type": codec.Type.String(), }, }), + pipelinesActive: promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pipelines_active", + Namespace: "neko", + Subsystem: "capture", + Help: "Total number of active pipelines.", + ConstLabels: map[string]string{ + "submodule": "streamsink", + "video_id": video_id, + "codec_name": codec.Name, + "codec_type": codec.Type.String(), + }, + }), } return manager @@ -275,6 +288,8 @@ func (manager *StreamSinkManagerCtx) createPipeline() error { }() manager.pipelinesCounter.Inc() + manager.pipelinesActive.Set(1) + return nil } @@ -289,4 +304,6 @@ func (manager *StreamSinkManagerCtx) destroyPipeline() { manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil + + manager.pipelinesActive.Set(0) } diff --git a/internal/capture/streamsrc.go b/internal/capture/streamsrc.go index fbf4303a..e31f2834 100644 --- a/internal/capture/streamsrc.go +++ b/internal/capture/streamsrc.go @@ -27,6 +27,7 @@ type StreamSrcManagerCtx struct { // metrics pushedData map[string]prometheus.Summary pipelinesCounter map[string]prometheus.Counter + pipelinesActive map[string]prometheus.Gauge } func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string) *StreamSrcManagerCtx { @@ -37,6 +38,7 @@ func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string pushedData := map[string]prometheus.Summary{} pipelinesCounter := map[string]prometheus.Counter{} + pipelinesActive := map[string]prometheus.Gauge{} for codecName, pipeline := range codecPipeline { codec, ok := codec.ParseStr(codecName) if !ok { @@ -69,6 +71,18 @@ func streamSrcNew(enabled bool, codecPipeline map[string]string, video_id string "codec_type": codec.Type.String(), }, }) + pipelinesActive[codecName] = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pipelines_active", + Namespace: "neko", + Subsystem: "capture", + Help: "Total number of active pipelines.", + ConstLabels: map[string]string{ + "submodule": "streamsrc", + "video_id": video_id, + "codec_name": codec.Name, + "codec_type": codec.Type.String(), + }, + }) } return &StreamSrcManagerCtx{ @@ -133,7 +147,9 @@ func (manager *StreamSrcManagerCtx) Start(codec codec.RTPCodec) error { manager.pipeline.AttachAppsrc("appsrc") manager.pipeline.Play() - manager.pipelinesCounter[codec.Name].Inc() + manager.pipelinesCounter[manager.codec.Name].Inc() + manager.pipelinesActive[manager.codec.Name].Set(1) + return nil } @@ -148,6 +164,8 @@ func (manager *StreamSrcManagerCtx) Stop() { manager.pipeline.Destroy() manager.logger.Info().Msgf("destroying pipeline") manager.pipeline = nil + + manager.pipelinesActive[manager.codec.Name].Set(0) } func (manager *StreamSrcManagerCtx) Push(bytes []byte) {