diff --git a/internal/api/room/broadcast.go b/internal/api/room/broadcast.go index de2c72f6..817c07b7 100644 --- a/internal/api/room/broadcast.go +++ b/internal/api/room/broadcast.go @@ -14,9 +14,10 @@ type BroadcastStatusPayload struct { } func (h *RoomHandler) broadcastStatus(w http.ResponseWriter, r *http.Request) { + broadcast := h.capture.Broadcast() utils.HttpSuccess(w, BroadcastStatusPayload{ - IsActive: h.capture.BroadcastEnabled(), - URL: h.capture.BroadcastUrl(), + IsActive: broadcast.Enabled(), + URL: broadcast.Url(), }) } @@ -31,12 +32,13 @@ func (h *RoomHandler) boradcastStart(w http.ResponseWriter, r *http.Request) { return } - if h.capture.BroadcastEnabled() { + broadcast := h.capture.Broadcast() + if broadcast.Enabled() { utils.HttpUnprocessableEntity(w, "Server is already broadcasting.") return } - if err := h.capture.StartBroadcast(data.URL); err != nil { + if err := broadcast.Start(data.URL); err != nil { utils.HttpInternalServerError(w, err) return } @@ -44,26 +46,27 @@ func (h *RoomHandler) boradcastStart(w http.ResponseWriter, r *http.Request) { h.sessions.AdminBroadcast( message.BroadcastStatus{ Event: event.BORADCAST_STATUS, - IsActive: h.capture.BroadcastEnabled(), - URL: h.capture.BroadcastUrl(), + IsActive: broadcast.Enabled(), + URL: broadcast.Url(), }, nil) utils.HttpSuccess(w) } func (h *RoomHandler) boradcastStop(w http.ResponseWriter, r *http.Request) { - if !h.capture.BroadcastEnabled() { + broadcast := h.capture.Broadcast() + if !broadcast.Enabled() { utils.HttpUnprocessableEntity(w, "Server is not broadcasting.") return } - h.capture.StopBroadcast() + broadcast.Stop() h.sessions.AdminBroadcast( message.BroadcastStatus{ Event: event.BORADCAST_STATUS, - IsActive: h.capture.BroadcastEnabled(), - URL: h.capture.BroadcastUrl(), + IsActive: broadcast.Enabled(), + URL: broadcast.Url(), }, nil) utils.HttpSuccess(w) diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index d0d419be..4fd4c20e 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -1,59 +1,80 @@ package capture import ( + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "demodesk/neko/internal/config" "demodesk/neko/internal/capture/gst" ) -func (manager *CaptureManagerCtx) StartBroadcast(url string) error { - manager.broadcast_url = url - manager.broadcasting = true - return manager.createBroadcastPipeline() +type BroacastManagerCtx struct { + logger zerolog.Logger + config *config.Capture + pipeline *gst.Pipeline + enabled bool + url string } -func (manager *CaptureManagerCtx) StopBroadcast() { - manager.broadcasting = false - manager.destroyBroadcastPipeline() +func broadcastNew(config *config.Capture) *BroacastManagerCtx { + return &BroacastManagerCtx{ + logger: log.With().Str("module", "capture").Str("submodule", "broadcast").Logger(), + config: config, + enabled: false, + url: "", + } } -func (manager *CaptureManagerCtx) BroadcastEnabled() bool { - return manager.broadcasting +func (manager *BroacastManagerCtx) Start(url string) error { + manager.url = url + manager.enabled = true + return manager.createPipeline() } -func (manager *CaptureManagerCtx) BroadcastUrl() string { - return manager.broadcast_url +func (manager *BroacastManagerCtx) Stop() { + manager.enabled = false + manager.destroyPipeline() } -func (manager *CaptureManagerCtx) createBroadcastPipeline() error { +func (manager *BroacastManagerCtx) Enabled() bool { + return manager.enabled +} + +func (manager *BroacastManagerCtx) Url() string { + return manager.url +} + +func (manager *BroacastManagerCtx) createPipeline() error { var err error manager.logger.Info(). Str("audio_device", manager.config.Device). Str("video_display", manager.config.Display). Str("broadcast_pipeline", manager.config.BroadcastPipeline). - Msgf("creating broadcast pipeline") + Msgf("creating pipeline") - manager.broadcast, err = gst.CreateRTMPPipeline( + manager.pipeline, err = gst.CreateRTMPPipeline( manager.config.Device, manager.config.Display, manager.config.BroadcastPipeline, - manager.broadcast_url, + manager.url, ) if err != nil { return err } - manager.broadcast.Play() - manager.logger.Info().Msgf("starting broadcast pipeline") + manager.pipeline.Play() + manager.logger.Info().Msgf("starting pipeline") return nil } -func (manager *CaptureManagerCtx) destroyBroadcastPipeline() { - if manager.broadcast == nil { +func (manager *BroacastManagerCtx) destroyPipeline() { + if manager.pipeline == nil { return } - manager.broadcast.Stop() - manager.logger.Info().Msgf("stopping broadcast pipeline") - manager.broadcast = nil + manager.pipeline.Stop() + manager.logger.Info().Msgf("stopping pipeline") + manager.pipeline = nil } diff --git a/internal/capture/manager.go b/internal/capture/manager.go index ed1d8d61..4b15fc69 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -17,7 +17,6 @@ type CaptureManagerCtx struct { mu sync.Mutex video *gst.Pipeline audio *gst.Pipeline - broadcast *gst.Pipeline config *config.Capture emit_update chan bool emit_stop chan bool @@ -25,9 +24,8 @@ type CaptureManagerCtx struct { audio_sample chan types.Sample emmiter events.EventEmmiter streaming bool - broadcasting bool - broadcast_url string desktop types.DesktopManager + broadcast *BroacastManagerCtx } func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx { @@ -39,15 +37,14 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt emmiter: events.New(), config: config, streaming: false, - broadcasting: false, - broadcast_url: "", desktop: desktop, + broadcast: broadcastNew(config), } } func (manager *CaptureManagerCtx) Start() { - if manager.BroadcastEnabled() { - if err := manager.createBroadcastPipeline(); err != nil { + if manager.broadcast.Enabled() { + if err := manager.broadcast.createPipeline(); err != nil { manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline") } } @@ -57,8 +54,8 @@ func (manager *CaptureManagerCtx) Start() { manager.destroyVideoPipeline() } - if manager.BroadcastEnabled() { - manager.destroyBroadcastPipeline() + if manager.broadcast.Enabled() { + manager.broadcast.destroyPipeline() } }) @@ -67,8 +64,8 @@ func (manager *CaptureManagerCtx) Start() { manager.createVideoPipeline() } - if manager.BroadcastEnabled() { - if err := manager.createBroadcastPipeline(); err != nil { + if manager.broadcast.Enabled() { + if err := manager.broadcast.createPipeline(); err != nil { manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline") } } @@ -100,14 +97,18 @@ func (manager *CaptureManagerCtx) Shutdown() error { manager.StopStream() } - if manager.BroadcastEnabled() { - manager.destroyBroadcastPipeline() + if manager.broadcast.Enabled() { + manager.broadcast.destroyPipeline() } manager.emit_stop <- true return nil } +func (manager *CaptureManagerCtx) Broadcast() types.BroadcastManager { + return manager.broadcast +} + func (manager *CaptureManagerCtx) VideoCodec() string { return manager.config.VideoCodec } diff --git a/internal/types/capture.go b/internal/types/capture.go index a2d47aa9..3fd5bbe2 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -5,10 +5,19 @@ type Sample struct { Samples uint32 } +type BroadcastManager interface { + Start(url string) error + Stop() + Enabled() bool + Url() string +} + type CaptureManager interface { Start() Shutdown() error + Broadcast() BroadcastManager + VideoCodec() string AudioCodec() string @@ -18,10 +27,4 @@ type CaptureManager interface { StartStream() StopStream() Streaming() bool - - // broacast - StartBroadcast(url string) error - StopBroadcast() - BroadcastEnabled() bool - BroadcastUrl() string } diff --git a/internal/websocket/handler/system.go b/internal/websocket/handler/system.go index f9bdf978..ae82707c 100644 --- a/internal/websocket/handler/system.go +++ b/internal/websocket/handler/system.go @@ -75,13 +75,14 @@ func (h *MessageHandlerCtx) systemAdmin(session types.Session) error { } } + broadcast := h.capture.Broadcast() return session.Send( message.SystemAdmin{ Event: event.SYSTEM_ADMIN, ScreenSizesList: screenSizesList, BroadcastStatus: message.BroadcastStatus{ - IsActive: h.capture.BroadcastEnabled(), - URL: h.capture.BroadcastUrl(), + IsActive: broadcast.Enabled(), + URL: broadcast.Url(), }, }) }