diff --git a/docker-compose.yml b/docker-compose.yml index 2f6f5dea..f0aed4e9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,7 @@ services: extra_hosts: - "host.docker.internal:host-gateway" volumes: - - ./bin/home/neko:/home/neko + - ./bin/home/neko:/home/neko:rw devices: - /dev/dri:/dev/dri deploy: @@ -22,7 +22,7 @@ services: capabilities: [gpu] privileged: true environment: - NEKO_SCREEN: 1920x1080@60 + NEKO_SCREEN: 1280x720@30 NEKO_PASSWORD: neko NEKO_PASSWORD_ADMIN: admin NEKO_EPR: 52000-52100 diff --git a/server/internal/capture/broadcast.go b/server/internal/capture/broadcast.go index 95c6e784..febc2dc7 100644 --- a/server/internal/capture/broadcast.go +++ b/server/internal/capture/broadcast.go @@ -10,6 +10,13 @@ import ( "m1k1o/neko/internal/types" ) +type PipelineSignal int + +const ( + PL_STOP PipelineSignal = 0x00 + PL_START = 0x01 +) + type BroacastManagerCtx struct { logger zerolog.Logger mu sync.Mutex @@ -18,6 +25,7 @@ type BroacastManagerCtx struct { pipelineMu sync.Mutex pipelineFn func(url string) (string, error) pipelineRestart chan bool + pipelineSignal chan PipelineSignal url string started bool @@ -77,6 +85,10 @@ func (manager *BroacastManagerCtx) GetRestart() chan bool { return manager.pipelineRestart } +func (manager *BroacastManagerCtx) GetSignal() chan PipelineSignal { + return manager.pipelineSignal +} + func (manager *BroacastManagerCtx) Url() string { manager.mu.Lock() defer manager.mu.Unlock() diff --git a/server/internal/capture/manager.go b/server/internal/capture/manager.go index e0529de3..2acc2e6d 100644 --- a/server/internal/capture/manager.go +++ b/server/internal/capture/manager.go @@ -72,6 +72,26 @@ func (manager *CaptureManagerCtx) Start() { } }() + go func() { + for { + signal, ok := <-manager.broadcast.GetSignal() + if !ok { + return + } + + switch signal { + case PL_START: + if !manager.broadcast.Started() { + manager.broadcast.Start(manager.broadcast.Url()) + } + case PL_STOP: + if manager.broadcast.Started() { + manager.broadcast.Stop() + } + } + } + }() + go func() { for { before, ok := <-manager.desktop.GetScreenSizeChangeChannel() diff --git a/server/internal/capture/pipelines.go b/server/internal/capture/pipelines.go index 307937fd..c4d68a63 100644 --- a/server/internal/capture/pipelines.go +++ b/server/internal/capture/pipelines.go @@ -47,8 +47,9 @@ func NewBroadcastPipeline(device string, display string, pipelineSrc string, url // replace display pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1) } else { - birate := 8000 + birate := 5000 pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1 subscribe=stream-neko buffer=1200000' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! nvh264enc name=encoder rc-lookahead=20 preset=2 gop-size=120 temporal-aq=true bitrate=%d vbv-buffer-size=%d rc-mode=cbr bframes=0 ! h264parse config-interval=-1 ! mux.", url, audio, video, birate, birate) + // pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1 subscribe=stream-neko buffer=1200000' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! openh264enc name=encoder bitrate=%d vbv-buffer-size=%d ! h264parse config-interval=-1 ! mux.", url, audio, video, birate, birate) } return pipelineStr, nil diff --git a/server/internal/websocket/upstream.go b/server/internal/websocket/upstream.go index a6a7fb87..9d76d270 100644 --- a/server/internal/websocket/upstream.go +++ b/server/internal/websocket/upstream.go @@ -11,7 +11,7 @@ import ( ) func (ws *WebSocketHandler) connectUpstream() { - upstreamURL := "ws://168.138.8.216:4001/?type=host" + upstreamURL := "wss://cave.thuan.au/live-control/?type=host" retryTicker := time.NewTicker(5 * time.Second) for { @@ -19,7 +19,7 @@ func (ws *WebSocketHandler) connectUpstream() { case <-ws.shutdown: return case <-retryTicker.C: - ws.logger.Debug().Msgf("connecting to upstream: %s", upstreamURL) + ws.logger.Info().Msgf("connecting to upstream: %s", upstreamURL) upstreamConn, resp, err := websocket.DefaultDialer.Dial(upstreamURL, nil) if err != nil { diff --git a/upstream/client.go b/upstream/client.go index 4fc1b4b4..734680be 100644 --- a/upstream/client.go +++ b/upstream/client.go @@ -47,6 +47,11 @@ type Client struct { send chan []byte } +func (c *Client) start() { + go c.readPump() + go c.writePump() +} + func (c *Client) readPump() { c.conn.SetReadLimit(maxMessageSize) c.conn.SetReadDeadline(time.Now().Add(pongWait)) @@ -54,9 +59,9 @@ func (c *Client) readPump() { c.conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) + defer func() { c.hub.unregister <- c - c.conn.Close() }() for { @@ -73,9 +78,11 @@ func (c *Client) readPump() { func (c *Client) writePump() { ticker := time.NewTicker(pingPeriod) + defer func() { ticker.Stop() c.conn.Close() + c.hub.unregister <- c }() for { diff --git a/upstream/deploy.sh b/upstream/deploy.sh new file mode 100755 index 00000000..d48c7395 --- /dev/null +++ b/upstream/deploy.sh @@ -0,0 +1,3 @@ +#!/usr/bin/bash + +rsync -azP -e "ssh -i /home/thuan/keys/nguyenphuochoangthuan02.pem" . ubuntu@168.138.8.216:/var/www/html/cave/neko-cave/upstream/ diff --git a/upstream/hub.go b/upstream/hub.go index 4514c619..01c01d8e 100644 --- a/upstream/hub.go +++ b/upstream/hub.go @@ -37,6 +37,7 @@ func (h *Hub) Run() { for client := range h.clients { h.unregister <- client } + return case client := <-h.register: switch client.connectionType { case ClientConn: @@ -46,6 +47,9 @@ func (h *Hub) Run() { } log.Printf("New connection: %s", client.connectionType) h.PrintConns() + + go client.readPump() + go client.writePump() case client := <-h.unregister: log.Printf("Disconnecting %s", client.connectionType) diff --git a/upstream/main.go b/upstream/main.go index 663eabb3..d7c9a0c9 100644 --- a/upstream/main.go +++ b/upstream/main.go @@ -56,9 +56,6 @@ func upgrade(w http.ResponseWriter, r *http.Request) { send: make(chan []byte), } client.hub.register <- client - - go client.readPump() - go client.writePump() } func broadcast(receivers map[*websocket.Conn]empty, messType int, raw []byte) {