From db820806a48ed425ce6b7c4c24e733fbe2207f50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Wed, 18 Nov 2020 21:34:39 +0100 Subject: [PATCH] add broadcast endpoint & broadcast pipeline return error. --- internal/api/room/broadcast.go | 69 +++++++++++++++++++++++++ internal/api/room/handler.go | 6 +++ internal/capture/broadcast.go | 13 ++--- internal/capture/manager.go | 22 ++++++-- internal/session/manager.go | 21 ++++++++ internal/types/capture.go | 2 +- internal/types/session.go | 1 + internal/websocket/handler/broadcast.go | 5 +- 8 files changed, 124 insertions(+), 15 deletions(-) diff --git a/internal/api/room/broadcast.go b/internal/api/room/broadcast.go index 01912655..1423953c 100644 --- a/internal/api/room/broadcast.go +++ b/internal/api/room/broadcast.go @@ -1 +1,70 @@ package room + +import ( + "net/http" + + "demodesk/neko/internal/utils" + "demodesk/neko/internal/types/event" + "demodesk/neko/internal/types/message" +) + +type BroadcastStatusPayload struct { + URL string `json:"url,omitempty"` + IsActive bool `json:"is_active"` +} + +func (h *RoomHandler) BroadcastStatus(w http.ResponseWriter, r *http.Request) { + utils.HttpSuccess(w, BroadcastStatusPayload{ + IsActive: h.capture.BroadcastEnabled(), + URL: h.capture.BroadcastUrl(), + }) +} + +func (h *RoomHandler) BoradcastStart(w http.ResponseWriter, r *http.Request) { + data := &BroadcastStatusPayload{} + if !utils.HttpJsonRequest(w, r, data) { + return + } + + if data.URL == "" { + utils.HttpBadRequest(w, "Missing broadcast URL.") + return + } + + if h.capture.BroadcastEnabled() { + utils.HttpBadRequest(w, "Server is already broadcasting.") + return + } + + if err := h.capture.StartBroadcast(data.URL); err != nil { + utils.HttpInternalServer(w, err) + return + } + + h.sessions.AdminBroadcast( + message.BroadcastStatus{ + Event: event.BORADCAST_STATUS, + IsActive: h.capture.BroadcastEnabled(), + URL: h.capture.BroadcastUrl(), + }, nil) + + utils.HttpSuccess(w) +} + +func (h *RoomHandler) BoradcastStop(w http.ResponseWriter, r *http.Request) { + if !h.capture.BroadcastEnabled() { + utils.HttpBadRequest(w, "Server is not broadcasting.") + return + } + + h.capture.StopBroadcast() + + h.sessions.AdminBroadcast( + message.BroadcastStatus{ + Event: event.BORADCAST_STATUS, + IsActive: h.capture.BroadcastEnabled(), + URL: h.capture.BroadcastUrl(), + }, nil) + + utils.HttpSuccess(w) +} diff --git a/internal/api/room/handler.go b/internal/api/room/handler.go index 0b2c8676..4f1ce704 100644 --- a/internal/api/room/handler.go +++ b/internal/api/room/handler.go @@ -54,4 +54,10 @@ func (h *RoomHandler) Route(r chi.Router) { r.With(auth.AdminsOnly).Post("/take", h.ControlTake) r.With(auth.AdminsOnly).Post("/give", h.ControlGive) }) + + r.With(auth.AdminsOnly).Route("/broadcast", func(r chi.Router) { + r.Get("/", h.BroadcastStatus) + r.Post("/start", h.BoradcastStart) + r.Post("/stop", h.BoradcastStop) + }) } diff --git a/internal/capture/broadcast.go b/internal/capture/broadcast.go index 28a3a7d8..d0d419be 100644 --- a/internal/capture/broadcast.go +++ b/internal/capture/broadcast.go @@ -4,10 +4,10 @@ import ( "demodesk/neko/internal/capture/gst" ) -func (manager *CaptureManagerCtx) StartBroadcast(url string) { +func (manager *CaptureManagerCtx) StartBroadcast(url string) error { manager.broadcast_url = url manager.broadcasting = true - manager.createBroadcastPipeline() + return manager.createBroadcastPipeline() } func (manager *CaptureManagerCtx) StopBroadcast() { @@ -23,13 +23,9 @@ func (manager *CaptureManagerCtx) BroadcastUrl() string { return manager.broadcast_url } -func (manager *CaptureManagerCtx) createBroadcastPipeline() { +func (manager *CaptureManagerCtx) createBroadcastPipeline() error { var err error - if manager.broadcast != nil || !manager.BroadcastEnabled() { - return - } - manager.logger.Info(). Str("audio_device", manager.config.Device). Str("video_display", manager.config.Display). @@ -44,11 +40,12 @@ func (manager *CaptureManagerCtx) createBroadcastPipeline() { ) if err != nil { - manager.logger.Panic().Err(err).Msg("unable to create broadcast pipeline") + return err } manager.broadcast.Play() manager.logger.Info().Msgf("starting broadcast pipeline") + return nil } func (manager *CaptureManagerCtx) destroyBroadcastPipeline() { diff --git a/internal/capture/manager.go b/internal/capture/manager.go index d48d166c..3e9c280e 100644 --- a/internal/capture/manager.go +++ b/internal/capture/manager.go @@ -46,14 +46,18 @@ func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCt } func (manager *CaptureManagerCtx) Start() { - manager.createBroadcastPipeline() + if manager.BroadcastEnabled() { + manager.createBroadcastPipeline() + } manager.desktop.OnBeforeScreenSizeChange(func() { if manager.Streaming() { manager.destroyVideoPipeline() } - manager.destroyBroadcastPipeline() + if manager.BroadcastEnabled() { + manager.destroyBroadcastPipeline() + } }) manager.desktop.OnAfterScreenSizeChange(func() { @@ -61,7 +65,9 @@ func (manager *CaptureManagerCtx) Start() { manager.createVideoPipeline() } - manager.createBroadcastPipeline() + if manager.BroadcastEnabled() { + manager.createBroadcastPipeline() + } }) go func() { @@ -85,9 +91,15 @@ func (manager *CaptureManagerCtx) Start() { func (manager *CaptureManagerCtx) Shutdown() error { manager.logger.Info().Msgf("capture shutting down") - manager.StopStream() + + if manager.Streaming() { + manager.StopStream() + } + + if manager.BroadcastEnabled() { + manager.createBroadcastPipeline() + } - manager.destroyBroadcastPipeline() manager.emit_stop <- true return nil } diff --git a/internal/session/manager.go b/internal/session/manager.go index fd2ff379..46460a67 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -163,6 +163,27 @@ func (manager *SessionManagerCtx) Broadcast(v interface{}, exclude interface{}) } } +func (manager *SessionManagerCtx) AdminBroadcast(v interface{}, exclude interface{}) { + manager.membersMu.Lock() + defer manager.membersMu.Unlock() + + for id, session := range manager.members { + if !session.connected || !session.Admin() { + continue + } + + if exclude != nil { + if in, _ := utils.ArrayIn(id, exclude); in { + continue + } + } + + if err := session.Send(v); err != nil { + manager.logger.Warn().Err(err).Msgf("broadcasting admin event has failed") + } + } +} + // --- // events // --- diff --git a/internal/types/capture.go b/internal/types/capture.go index ca4ddc20..a2d47aa9 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -20,7 +20,7 @@ type CaptureManager interface { Streaming() bool // broacast - StartBroadcast(url string) + StartBroadcast(url string) error StopBroadcast() BroadcastEnabled() bool BroadcastUrl() string diff --git a/internal/types/session.go b/internal/types/session.go index f96e958a..3e3a0bc4 100644 --- a/internal/types/session.go +++ b/internal/types/session.go @@ -30,6 +30,7 @@ type SessionManager interface { Admins() []Session Members() []Session Broadcast(v interface{}, exclude interface{}) + AdminBroadcast(v interface{}, exclude interface{}) OnHost(listener func(session Session)) OnHostCleared(listener func(session Session)) diff --git a/internal/websocket/handler/broadcast.go b/internal/websocket/handler/broadcast.go index 4e3aba89..7eda5b5f 100644 --- a/internal/websocket/handler/broadcast.go +++ b/internal/websocket/handler/broadcast.go @@ -12,7 +12,10 @@ func (h *MessageHandlerCtx) boradcastCreate(session types.Session, payload *mess return nil } - h.capture.StartBroadcast(payload.URL) + if err := h.capture.StartBroadcast(payload.URL); err != nil { + return err + } + return h.boradcastStatus(session) }