From 325af8fc5acddc116c86efaf87ec346e560f33e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Wed, 29 Sep 2021 01:03:39 +0200 Subject: [PATCH] RemoveListener add dispatcher. --- internal/capture/stream.go | 15 +++++++++++++-- internal/types/capture.go | 2 +- internal/webrtc/peertrack.go | 15 +++++++++++---- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/internal/capture/stream.go b/internal/capture/stream.go index 1c0ee341..085078b9 100644 --- a/internal/capture/stream.go +++ b/internal/capture/stream.go @@ -135,9 +135,9 @@ func (manager *StreamManagerCtx) NewListener(listener *func(sample types.Sample) return dispatcher, nil } -func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) { +func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Sample)) (dispatcher chan interface{}) { if listener == nil { - return + return dispatcher } ptr := reflect.ValueOf(listener).Pointer() @@ -152,7 +152,16 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp manager.listenersCount-- manager.mu.Unlock() + dispatcher = make(chan interface{}, 1) go func() { + select { + case <-time.After(newListenerTimeout): + manager.logger.Warn().Msgf("remote listener channel was not called, timeouted") + break + case <-dispatcher: + break + } + manager.mu.Lock() defer manager.mu.Unlock() @@ -166,6 +175,8 @@ func (manager *StreamManagerCtx) RemoveListener(listener *func(sample types.Samp manager.logger.Error().Int("listeners-count", manager.listenersCount).Msgf("listener counter is < 0, something is wrong") } }() + + return dispatcher } func (manager *StreamManagerCtx) ListenersCount() int { diff --git a/internal/types/capture.go b/internal/types/capture.go index e308daa0..1644f81b 100644 --- a/internal/types/capture.go +++ b/internal/types/capture.go @@ -39,7 +39,7 @@ type StreamManager interface { // and returns dispatcher channel NewListener(listener *func(sample Sample)) (dispatcher chan interface{}, err error) // stops pipeline if it was last listener - RemoveListener(listener *func(sample Sample)) + RemoveListener(listener *func(sample Sample)) (dispatcher chan interface{}) ListenersCount() int Started() bool diff --git a/internal/webrtc/peertrack.go b/internal/webrtc/peertrack.go index a2d1efc8..597181fa 100644 --- a/internal/webrtc/peertrack.go +++ b/internal/webrtc/peertrack.go @@ -52,18 +52,24 @@ func (peer *PeerTrack) SetStream(stream types.StreamManager) error { defer peer.streamMu.Unlock() // prepare new listener - dispatcher, err := stream.NewListener(&peer.listener) + addDispatcher, err := stream.NewListener(&peer.listener) if err != nil { return err } // remove previous listener (in case it existed) + var stopDispatcher chan interface{} if peer.stream != nil { - peer.stream.RemoveListener(&peer.listener) + stopDispatcher = peer.stream.RemoveListener(&peer.listener) } // add new listener - close(dispatcher) + close(addDispatcher) + + // stop old pipeline (in case it existed) + if stopDispatcher != nil { + close(stopDispatcher) + } peer.stream = stream return nil @@ -74,7 +80,8 @@ func (peer *PeerTrack) RemoveStream() { defer peer.streamMu.Unlock() if peer.stream != nil { - peer.stream.RemoveListener(&peer.listener) + dispatcher := peer.stream.RemoveListener(&peer.listener) + close(dispatcher) } }