GST pipelines refactor.

This commit is contained in:
Miroslav Šedivý 2020-11-01 20:18:19 +01:00
parent c10b2212d1
commit 16d762b6ae
5 changed files with 74 additions and 56 deletions

View file

@ -18,7 +18,8 @@ type CaptureManagerCtx struct {
audio *gst.Pipeline
broadcast *gst.Pipeline
config *config.Capture
shutdown chan bool
audio_stop chan bool
video_stop chan bool
emmiter events.EventEmmiter
streaming bool
broadcasting bool
@ -29,7 +30,8 @@ type CaptureManagerCtx struct {
func New(desktop types.DesktopManager, config *config.Capture) *CaptureManagerCtx {
return &CaptureManagerCtx{
logger: log.With().Str("module", "capture").Logger(),
shutdown: make(chan bool),
audio_stop: make(chan bool),
video_stop: make(chan bool),
emmiter: events.New(),
config: config,
streaming: false,
@ -48,35 +50,15 @@ func (manager *CaptureManagerCtx) Start() {
manager.logger.Warn().Err(err).Msg("unable to change screen size")
}
manager.CreateVideoPipeline()
manager.CreateAudioPipeline()
manager.StartBroadcastPipeline()
go func() {
defer func() {
manager.logger.Info().Msg("shutdown")
}()
for {
select {
case <-manager.shutdown:
return
case sample := <-manager.video.Sample:
manager.emmiter.Emit("video", sample)
case sample := <-manager.audio.Sample:
manager.emmiter.Emit("audio", sample)
}
}
}()
}
func (manager *CaptureManagerCtx) Shutdown() error {
manager.logger.Info().Msgf("capture shutting down")
manager.video.DestroyPipeline()
manager.audio.DestroyPipeline()
manager.audio_stop <- true
manager.video_stop <- true
manager.StopBroadcastPipeline()
manager.shutdown <- true
return nil
}
@ -103,16 +85,16 @@ func (manager *CaptureManagerCtx) OnAudioFrame(listener func(sample types.Sample
func (manager *CaptureManagerCtx) StartStream() {
manager.logger.Info().Msgf("Pipelines starting...")
manager.video.Start()
manager.audio.Start()
manager.createVideoPipeline()
manager.createAudioPipeline()
manager.streaming = true
}
func (manager *CaptureManagerCtx) StopStream() {
manager.logger.Info().Msgf("Pipelines shutting down...")
manager.logger.Info().Msgf("Pipelines stopping...")
manager.video.Stop()
manager.audio.Stop()
manager.audio_stop <- true
manager.video_stop <- true
manager.streaming = false
}
@ -120,7 +102,19 @@ func (manager *CaptureManagerCtx) Streaming() bool {
return manager.streaming
}
func (manager *CaptureManagerCtx) CreateVideoPipeline() {
func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int) error {
manager.video_stop <- true
manager.StopBroadcastPipeline()
defer func() {
manager.createVideoPipeline()
manager.StartBroadcastPipeline()
}()
return manager.desktop.ChangeScreenSize(width, height, rate)
}
func (manager *CaptureManagerCtx) createVideoPipeline() {
var err error
manager.logger.Info().
@ -138,9 +132,34 @@ func (manager *CaptureManagerCtx) CreateVideoPipeline() {
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create video pipeline")
}
manager.logger.Info().
Str("pipeline", manager.video.Src).
Msgf("Starting video pipeline...")
manager.video.Start()
go func() {
manager.logger.Debug().Msg("started emitting video data")
defer func() {
manager.logger.Debug().Msg("stopped emitting video data")
}()
for {
select {
case <-manager.video_stop:
manager.logger.Info().Msgf("Stopping video pipeline...")
manager.video.Stop()
return
case sample := <-manager.video.Sample:
manager.emmiter.Emit("video", sample)
}
}
}()
}
func (manager *CaptureManagerCtx) CreateAudioPipeline() {
func (manager *CaptureManagerCtx) createAudioPipeline() {
var err error
manager.logger.Info().
@ -158,20 +177,29 @@ func (manager *CaptureManagerCtx) CreateAudioPipeline() {
if err != nil {
manager.logger.Panic().Err(err).Msg("unable to create audio pipeline")
}
}
func (manager *CaptureManagerCtx) ChangeResolution(width int, height int, rate int) error {
manager.video.DestroyPipeline()
manager.StopBroadcastPipeline()
manager.logger.Info().
Str("pipeline", manager.audio.Src).
Msgf("Starting audio pipeline...")
defer func() {
manager.CreateVideoPipeline()
manager.video.Start()
manager.logger.Info().Msg("starting video pipeline...")
manager.StartBroadcastPipeline()
manager.audio.Start()
go func() {
manager.logger.Debug().Msg("started emitting audio data")
defer func() {
manager.logger.Debug().Msg("stopped emitting audio data")
}()
for {
select {
case <-manager.audio_stop:
manager.logger.Info().Msgf("Stopping audio pipeline...")
manager.audio.Stop()
return
case sample := <-manager.audio.Sample:
manager.emmiter.Emit("audio", sample)
}
}
}()
return manager.desktop.ChangeScreenSize(width, height, rate)
}