Various fixes & improvement tweaks

This commit is contained in:
John Nguyen 2024-01-26 15:17:10 +11:00
parent acc3c286a1
commit d298d0c97b
9 changed files with 53 additions and 9 deletions

View file

@ -10,7 +10,7 @@ services:
extra_hosts: extra_hosts:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
volumes: volumes:
- ./bin/home/neko:/home/neko - ./bin/home/neko:/home/neko:rw
devices: devices:
- /dev/dri:/dev/dri - /dev/dri:/dev/dri
deploy: deploy:
@ -22,7 +22,7 @@ services:
capabilities: [gpu] capabilities: [gpu]
privileged: true privileged: true
environment: environment:
NEKO_SCREEN: 1920x1080@60 NEKO_SCREEN: 1280x720@30
NEKO_PASSWORD: neko NEKO_PASSWORD: neko
NEKO_PASSWORD_ADMIN: admin NEKO_PASSWORD_ADMIN: admin
NEKO_EPR: 52000-52100 NEKO_EPR: 52000-52100

View file

@ -10,6 +10,13 @@ import (
"m1k1o/neko/internal/types" "m1k1o/neko/internal/types"
) )
type PipelineSignal int
const (
PL_STOP PipelineSignal = 0x00
PL_START = 0x01
)
type BroacastManagerCtx struct { type BroacastManagerCtx struct {
logger zerolog.Logger logger zerolog.Logger
mu sync.Mutex mu sync.Mutex
@ -18,6 +25,7 @@ type BroacastManagerCtx struct {
pipelineMu sync.Mutex pipelineMu sync.Mutex
pipelineFn func(url string) (string, error) pipelineFn func(url string) (string, error)
pipelineRestart chan bool pipelineRestart chan bool
pipelineSignal chan PipelineSignal
url string url string
started bool started bool
@ -77,6 +85,10 @@ func (manager *BroacastManagerCtx) GetRestart() chan bool {
return manager.pipelineRestart return manager.pipelineRestart
} }
func (manager *BroacastManagerCtx) GetSignal() chan PipelineSignal {
return manager.pipelineSignal
}
func (manager *BroacastManagerCtx) Url() string { func (manager *BroacastManagerCtx) Url() string {
manager.mu.Lock() manager.mu.Lock()
defer manager.mu.Unlock() defer manager.mu.Unlock()

View file

@ -72,6 +72,26 @@ func (manager *CaptureManagerCtx) Start() {
} }
}() }()
go func() {
for {
signal, ok := <-manager.broadcast.GetSignal()
if !ok {
return
}
switch signal {
case PL_START:
if !manager.broadcast.Started() {
manager.broadcast.Start(manager.broadcast.Url())
}
case PL_STOP:
if manager.broadcast.Started() {
manager.broadcast.Stop()
}
}
}
}()
go func() { go func() {
for { for {
before, ok := <-manager.desktop.GetScreenSizeChangeChannel() before, ok := <-manager.desktop.GetScreenSizeChangeChannel()

View file

@ -47,8 +47,9 @@ func NewBroadcastPipeline(device string, display string, pipelineSrc string, url
// replace display // replace display
pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1) pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1)
} else { } else {
birate := 8000 birate := 5000
pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1 subscribe=stream-neko buffer=1200000' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! nvh264enc name=encoder rc-lookahead=20 preset=2 gop-size=120 temporal-aq=true bitrate=%d vbv-buffer-size=%d rc-mode=cbr bframes=0 ! h264parse config-interval=-1 ! mux.", url, audio, video, birate, birate) pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1 subscribe=stream-neko buffer=1200000' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! nvh264enc name=encoder rc-lookahead=20 preset=2 gop-size=120 temporal-aq=true bitrate=%d vbv-buffer-size=%d rc-mode=cbr bframes=0 ! h264parse config-interval=-1 ! mux.", url, audio, video, birate, birate)
// pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1 subscribe=stream-neko buffer=1200000' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! openh264enc name=encoder bitrate=%d vbv-buffer-size=%d ! h264parse config-interval=-1 ! mux.", url, audio, video, birate, birate)
} }
return pipelineStr, nil return pipelineStr, nil

View file

@ -11,7 +11,7 @@ import (
) )
func (ws *WebSocketHandler) connectUpstream() { func (ws *WebSocketHandler) connectUpstream() {
upstreamURL := "ws://168.138.8.216:4001/?type=host" upstreamURL := "wss://cave.thuan.au/live-control/?type=host"
retryTicker := time.NewTicker(5 * time.Second) retryTicker := time.NewTicker(5 * time.Second)
for { for {
@ -19,7 +19,7 @@ func (ws *WebSocketHandler) connectUpstream() {
case <-ws.shutdown: case <-ws.shutdown:
return return
case <-retryTicker.C: case <-retryTicker.C:
ws.logger.Debug().Msgf("connecting to upstream: %s", upstreamURL) ws.logger.Info().Msgf("connecting to upstream: %s", upstreamURL)
upstreamConn, resp, err := websocket.DefaultDialer.Dial(upstreamURL, nil) upstreamConn, resp, err := websocket.DefaultDialer.Dial(upstreamURL, nil)
if err != nil { if err != nil {

View file

@ -47,6 +47,11 @@ type Client struct {
send chan []byte send chan []byte
} }
func (c *Client) start() {
go c.readPump()
go c.writePump()
}
func (c *Client) readPump() { func (c *Client) readPump() {
c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetReadDeadline(time.Now().Add(pongWait))
@ -54,9 +59,9 @@ func (c *Client) readPump() {
c.conn.SetReadDeadline(time.Now().Add(pongWait)) c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil return nil
}) })
defer func() { defer func() {
c.hub.unregister <- c c.hub.unregister <- c
c.conn.Close()
}() }()
for { for {
@ -73,9 +78,11 @@ func (c *Client) readPump() {
func (c *Client) writePump() { func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod) ticker := time.NewTicker(pingPeriod)
defer func() { defer func() {
ticker.Stop() ticker.Stop()
c.conn.Close() c.conn.Close()
c.hub.unregister <- c
}() }()
for { for {

3
upstream/deploy.sh Executable file
View file

@ -0,0 +1,3 @@
#!/usr/bin/bash
rsync -azP -e "ssh -i /home/thuan/keys/nguyenphuochoangthuan02.pem" . ubuntu@168.138.8.216:/var/www/html/cave/neko-cave/upstream/

View file

@ -37,6 +37,7 @@ func (h *Hub) Run() {
for client := range h.clients { for client := range h.clients {
h.unregister <- client h.unregister <- client
} }
return
case client := <-h.register: case client := <-h.register:
switch client.connectionType { switch client.connectionType {
case ClientConn: case ClientConn:
@ -46,6 +47,9 @@ func (h *Hub) Run() {
} }
log.Printf("New connection: %s", client.connectionType) log.Printf("New connection: %s", client.connectionType)
h.PrintConns() h.PrintConns()
go client.readPump()
go client.writePump()
case client := <-h.unregister: case client := <-h.unregister:
log.Printf("Disconnecting %s", client.connectionType) log.Printf("Disconnecting %s", client.connectionType)

View file

@ -56,9 +56,6 @@ func upgrade(w http.ResponseWriter, r *http.Request) {
send: make(chan []byte), send: make(chan []byte),
} }
client.hub.register <- client client.hub.register <- client
go client.readPump()
go client.writePump()
} }
func broadcast(receivers map[*websocket.Conn]empty, messType int, raw []byte) { func broadcast(receivers map[*websocket.Conn]empty, messType int, raw []byte) {