mirror of
https://github.com/m1k1o/neko.git
synced 2025-08-02 08:19:14 +02:00
move server to server directory.
This commit is contained in:
parent
da45f62ca8
commit
cfb423b13d
211 changed files with 18 additions and 10 deletions
203
server/internal/webrtc/track.go
Normal file
203
server/internal/webrtc/track.go
Normal file
|
@ -0,0 +1,203 @@
|
|||
package webrtc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/webrtc/v3"
|
||||
"github.com/pion/webrtc/v3/pkg/media"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/demodesk/neko/pkg/types"
|
||||
"github.com/demodesk/neko/pkg/types/codec"
|
||||
)
|
||||
|
||||
type Track struct {
|
||||
logger zerolog.Logger
|
||||
track *webrtc.TrackLocalStaticSample
|
||||
|
||||
rtcpCh chan []rtcp.Packet
|
||||
sample chan types.Sample
|
||||
|
||||
paused bool
|
||||
stream types.StreamSinkManager
|
||||
streamMu sync.Mutex
|
||||
}
|
||||
|
||||
type trackOption func(*Track)
|
||||
|
||||
func WithRtcpChan(rtcp chan []rtcp.Packet) trackOption {
|
||||
return func(t *Track) {
|
||||
t.rtcpCh = rtcp
|
||||
}
|
||||
}
|
||||
|
||||
func NewTrack(logger zerolog.Logger, codec codec.RTPCodec, connection *webrtc.PeerConnection, opts ...trackOption) (*Track, error) {
|
||||
id := codec.Type.String()
|
||||
track, err := webrtc.NewTrackLocalStaticSample(codec.Capability, id, "stream")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
t := &Track{
|
||||
logger: logger.With().Str("id", id).Logger(),
|
||||
track: track,
|
||||
rtcpCh: nil,
|
||||
sample: make(chan types.Sample),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(t)
|
||||
}
|
||||
|
||||
sender, err := connection.AddTrack(t.track)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go t.rtcpReader(sender)
|
||||
go t.sampleReader()
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *Track) Shutdown() {
|
||||
t.RemoveStream()
|
||||
close(t.sample)
|
||||
}
|
||||
|
||||
func (t *Track) rtcpReader(sender *webrtc.RTPSender) {
|
||||
for {
|
||||
packets, _, err := sender.ReadRTCP()
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
|
||||
t.logger.Debug().Msg("track rtcp reader closed")
|
||||
return
|
||||
}
|
||||
|
||||
t.logger.Warn().Err(err).Msg("failed to read track rtcp")
|
||||
continue
|
||||
}
|
||||
|
||||
if t.rtcpCh != nil {
|
||||
t.rtcpCh <- packets
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- sample ---
|
||||
|
||||
func (t *Track) sampleReader() {
|
||||
for {
|
||||
sample, ok := <-t.sample
|
||||
if !ok {
|
||||
t.logger.Debug().Msg("track sample reader closed")
|
||||
return
|
||||
}
|
||||
|
||||
err := t.track.WriteSample(media.Sample{
|
||||
Data: sample.Data,
|
||||
Duration: sample.Duration,
|
||||
Timestamp: sample.Timestamp,
|
||||
})
|
||||
|
||||
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
||||
t.logger.Warn().Err(err).Msg("failed to write sample to track")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Track) WriteSample(sample types.Sample) {
|
||||
t.sample <- sample
|
||||
}
|
||||
|
||||
// --- stream ---
|
||||
|
||||
func (t *Track) SetStream(stream types.StreamSinkManager) (bool, error) {
|
||||
t.streamMu.Lock()
|
||||
defer t.streamMu.Unlock()
|
||||
|
||||
// if we already listen to the stream, do nothing
|
||||
if t.stream == stream {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// if paused, we switch the stream but don't add the listener
|
||||
if t.paused {
|
||||
t.stream = stream
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var err error
|
||||
if t.stream != nil {
|
||||
err = t.stream.MoveListenerTo(t, stream)
|
||||
} else {
|
||||
err = stream.AddListener(t)
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
t.stream = stream
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t *Track) RemoveStream() {
|
||||
t.streamMu.Lock()
|
||||
defer t.streamMu.Unlock()
|
||||
|
||||
// if there is no stream, or paused we don't need to remove the listener
|
||||
if t.stream == nil || t.paused {
|
||||
t.stream = nil
|
||||
return
|
||||
}
|
||||
|
||||
err := t.stream.RemoveListener(t)
|
||||
if err != nil {
|
||||
t.logger.Warn().Err(err).Msg("failed to remove listener from stream")
|
||||
}
|
||||
|
||||
t.stream = nil
|
||||
}
|
||||
|
||||
func (t *Track) Stream() (types.StreamSinkManager, bool) {
|
||||
t.streamMu.Lock()
|
||||
defer t.streamMu.Unlock()
|
||||
|
||||
return t.stream, t.stream != nil
|
||||
}
|
||||
|
||||
// --- paused ---
|
||||
|
||||
func (t *Track) SetPaused(paused bool) {
|
||||
t.streamMu.Lock()
|
||||
defer t.streamMu.Unlock()
|
||||
|
||||
// if there is no state change or no stream, do nothing
|
||||
if t.paused == paused || t.stream == nil {
|
||||
t.paused = paused
|
||||
return
|
||||
}
|
||||
|
||||
var err error
|
||||
if paused {
|
||||
err = t.stream.RemoveListener(t)
|
||||
} else {
|
||||
err = t.stream.AddListener(t)
|
||||
}
|
||||
if err != nil {
|
||||
t.logger.Warn().Err(err).Msg("failed to change listener state")
|
||||
return
|
||||
}
|
||||
|
||||
t.paused = paused
|
||||
}
|
||||
|
||||
func (t *Track) Paused() bool {
|
||||
t.streamMu.Lock()
|
||||
defer t.streamMu.Unlock()
|
||||
|
||||
return t.paused
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue