mirror of
https://github.com/m1k1o/neko.git
synced 2025-05-21 21:17:04 +02:00
emit /recevie samples on demand.
This commit is contained in:
parent
26ab7fa12d
commit
c2cf9277dc
2 changed files with 67 additions and 71 deletions
|
@ -25,10 +25,6 @@ type StreamSinkManagerCtx struct {
|
|||
pipelineMu sync.Mutex
|
||||
pipelineStr func() string
|
||||
|
||||
sample chan types.Sample
|
||||
sampleStop chan struct{}
|
||||
sampleUpdate chan struct{}
|
||||
|
||||
listeners map[uintptr]*func(sample types.Sample)
|
||||
listenersMu sync.Mutex
|
||||
}
|
||||
|
@ -40,37 +36,12 @@ func streamSinkNew(codec codec.RTPCodec, pipelineStr func() string, video_id str
|
|||
Str("video_id", video_id).Logger()
|
||||
|
||||
manager := &StreamSinkManagerCtx{
|
||||
logger: logger,
|
||||
codec: codec,
|
||||
pipelineStr: pipelineStr,
|
||||
sampleStop: make(chan struct{}),
|
||||
sampleUpdate: make(chan struct{}),
|
||||
listeners: map[uintptr]*func(sample types.Sample){},
|
||||
logger: logger,
|
||||
codec: codec,
|
||||
pipelineStr: pipelineStr,
|
||||
listeners: map[uintptr]*func(sample types.Sample){},
|
||||
}
|
||||
|
||||
manager.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
manager.logger.Debug().Msg("started emitting samples")
|
||||
defer manager.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-manager.sampleStop:
|
||||
manager.logger.Debug().Msg("stopped emitting samples")
|
||||
return
|
||||
case <-manager.sampleUpdate:
|
||||
manager.logger.Debug().Msg("update emitting samples")
|
||||
case sample := <-manager.sample:
|
||||
manager.listenersMu.Lock()
|
||||
for _, emit := range manager.listeners {
|
||||
(*emit)(sample)
|
||||
}
|
||||
manager.listenersMu.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
|
@ -84,8 +55,6 @@ func (manager *StreamSinkManagerCtx) shutdown() {
|
|||
manager.listenersMu.Unlock()
|
||||
|
||||
manager.destroyPipeline()
|
||||
|
||||
close(manager.sampleStop)
|
||||
manager.wg.Wait()
|
||||
}
|
||||
|
||||
|
@ -249,8 +218,27 @@ func (manager *StreamSinkManagerCtx) createPipeline() error {
|
|||
manager.pipeline.AttachAppsink("appsink")
|
||||
manager.pipeline.Play()
|
||||
|
||||
manager.sample = manager.pipeline.Sample
|
||||
manager.sampleUpdate <- struct{}{}
|
||||
manager.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
manager.logger.Debug().Msg("started emitting samples")
|
||||
defer manager.wg.Done()
|
||||
|
||||
for {
|
||||
sample, ok := <-manager.pipeline.Sample
|
||||
if !ok {
|
||||
manager.logger.Debug().Msg("stopped emitting samples")
|
||||
return
|
||||
}
|
||||
|
||||
manager.listenersMu.Lock()
|
||||
for _, emit := range manager.listeners {
|
||||
(*emit)(sample)
|
||||
}
|
||||
manager.listenersMu.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue