diff --git a/internal/webrtc/manager.go b/internal/webrtc/manager.go index e220316c..e25f1c93 100644 --- a/internal/webrtc/manager.go +++ b/internal/webrtc/manager.go @@ -213,9 +213,13 @@ func (manager *WebRTCManagerCtx) newPeerConnection(bitrate int, codecs []codec.R congestionController, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) { if bitrate == 0 { - bitrate = 1000000 + bitrate = 1_000_000 } - return gcc.NewSendSideBWE(gcc.SendSideBWEInitialBitrate(bitrate)) + + return gcc.NewSendSideBWE( + gcc.SendSideBWEInitialBitrate(bitrate), + gcc.SendSideBWEPacer(gcc.NewNoOpPacer()), + ) }) if err != nil { return nil, nil, err @@ -625,12 +629,14 @@ func (manager *WebRTCManagerCtx) CreatePeer(session types.Session, bitrate int, }) }) - videoTrack.OnRTCP(func(p rtcp.Packet) { - if rtcpPacket, ok := p.(*rtcp.ReceiverReport); ok { - l := len(rtcpPacket.Reports) - if l > 0 { - // use only last report - manager.metrics.SetReceiverReport(session, rtcpPacket.Reports[l-1]) + videoTrack.OnRTCP(func(packets []rtcp.Packet) { + for _, p := range packets { + if rtcpPacket, ok := p.(*rtcp.ReceiverReport); ok { + l := len(rtcpPacket.Reports) + if l > 0 { + // use only last report + manager.metrics.SetReceiverReport(session, rtcpPacket.Reports[l-1]) + } } } }) diff --git a/internal/webrtc/track.go b/internal/webrtc/track.go index d3fb4082..6e53b44c 100644 --- a/internal/webrtc/track.go +++ b/internal/webrtc/track.go @@ -26,7 +26,7 @@ type Track struct { stream types.StreamSinkManager streamMu sync.Mutex - onRtcp func(rtcp.Packet) + onRtcp func([]rtcp.Packet) onRtcpMu sync.RWMutex bitrateChange func(int) (bool, error) @@ -84,9 +84,8 @@ func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.Pe } func (t *Track) rtcpReader(sender *webrtc.RTPSender) { - rtcpBuf := make([]byte, 1500) for { - n, _, err := sender.Read(rtcpBuf) + packets, _, err := sender.ReadRTCP() if err != nil { if err == io.EOF || err == io.ErrClosedPipe { return @@ -96,21 +95,11 @@ func (t *Track) rtcpReader(sender *webrtc.RTPSender) { continue } - packets, err := rtcp.Unmarshal(rtcpBuf[:n]) - if err != nil { - t.logger.Err(err).Msg("RTCP unmarshal error") - continue - } - t.onRtcpMu.RLock() - handler := t.onRtcp - t.onRtcpMu.RUnlock() - - for _, packet := range packets { - if handler != nil { - go handler(packet) - } + if t.onRtcp != nil { + go t.onRtcp(packets) } + t.onRtcpMu.RUnlock() } } @@ -152,7 +141,7 @@ func (t *Track) SetPaused(paused bool) { t.paused = paused } -func (t *Track) OnRTCP(f func(rtcp.Packet)) { +func (t *Track) OnRTCP(f func([]rtcp.Packet)) { t.onRtcpMu.Lock() defer t.onRtcpMu.Unlock()