Add upstream to control desktop

This commit is contained in:
John Nguyen 2023-05-03 22:53:27 +10:00
parent 5959d056f3
commit 900b4e53b1
9 changed files with 310 additions and 14 deletions

38
Dockerfile Normal file
View file

@ -0,0 +1,38 @@
#
# STAGE 1: SERVER
#
FROM golang:1.20-bullseye as server
WORKDIR /src
#
# install dependencies
RUN set -eux; apt-get update; \
apt-get install -y --no-install-recommends git cmake make libx11-dev libxrandr-dev libxtst-dev \
libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly; \
#
# install libclipboard
set -eux; \
cd /tmp; \
git clone --depth=1 https://github.com/jtanx/libclipboard; \
cd libclipboard; \
cmake .; \
make -j4; \
make install; \
rm -rf /tmp/libclipboard; \
#
# clean up
apt-get clean -y; \
rm -rf /var/lib/apt/lists/* /var/cache/apt/*
COPY server/go.mod ./
COPY server/go.sum ./
RUN go mod download
#
# build server
COPY server/ .
RUN ./build
FROM ghcr.io/m1k1o/neko/intel-firefox
COPY --from=server /src/bin/neko /usr/bin/neko

View file

@ -1,15 +1,19 @@
version: "3.4" version: "3.4"
services: services:
neko: neko:
image: "m1k1o/neko:firefox" build: .
restart: "unless-stopped" restart: "unless-stopped"
shm_size: "2gb" shm_size: "2gb"
ports: ports:
- "8080:8080" - "8080:8080"
- "52000-52100:52000-52100/udp" - "52000-52100:52000-52100/udp"
extra_hosts:
- "host.docker.internal:host-gateway"
environment: environment:
NEKO_SCREEN: 1920x1080@30 NEKO_SCREEN: 1920x1080@60
NEKO_PASSWORD: neko NEKO_PASSWORD: neko
NEKO_PASSWORD_ADMIN: admin NEKO_PASSWORD_ADMIN: admin
NEKO_EPR: 52000-52100 NEKO_EPR: 52000-52100
NEKO_ICELITE: 1 NEKO_ICELITE: 1
NEKO_BROADCAST_URL: rtmp://ome.thuan.au:1935/app/stream-neko
NEKO_NAT1TO1: 192.168.0.34

View file

@ -8,6 +8,9 @@
}, },
{ {
"path": "." "path": "."
},
{
"path": "upstream"
} }
], ],
"settings": { "settings": {

View file

@ -35,7 +35,7 @@ const (
) )
func NewBroadcastPipeline(device string, display string, pipelineSrc string, url string) (string, error) { func NewBroadcastPipeline(device string, display string, pipelineSrc string, url string) (string, error) {
video := fmt.Sprintf(videoSrc, display, 25) video := fmt.Sprintf(videoSrc, display, 60)
audio := fmt.Sprintf(audioSrc, device) audio := fmt.Sprintf(audioSrc, device)
var pipelineStr string var pipelineStr string
@ -47,7 +47,7 @@ func NewBroadcastPipeline(device string, display string, pipelineSrc string, url
// replace display // replace display
pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1) pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1)
} else { } else {
pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", url, audio, video) pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! x264enc bitrate=%d bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", url, audio, video, 70000)
} }
return pipelineStr, nil return pipelineStr, nil

View file

@ -0,0 +1,144 @@
package websocket
import (
"bytes"
"encoding/binary"
"m1k1o/neko/internal/webrtc"
"strconv"
"time"
"github.com/gorilla/websocket"
)
func (ws *WebSocketHandler) connectUpstream() {
upstreamURL := "ws://cave.thuan.au:4001/?type=host"
retryTicker := time.NewTicker(5 * time.Second)
for {
select {
case <-ws.shutdown:
return
case <-retryTicker.C:
ws.logger.Debug().Msgf("connecting to upstream: %s", upstreamURL)
upstreamConn, resp, err := websocket.DefaultDialer.Dial(upstreamURL, nil)
if err != nil {
if err == websocket.ErrBadHandshake {
ws.logger.Err(err).Msgf("failed to connect to upstream, status: %d", resp.StatusCode)
} else {
ws.logger.Err(err).Msg("failed to connect to upstream")
}
} else {
defer func () {
upstreamConn.Close()
}()
for {
_, raw, err := upstreamConn.ReadMessage()
if err != nil {
ws.logger.Err(err).Msg("failed to read message from upstream")
break
}
buffer := bytes.NewBuffer(raw)
header, err := ws.readHeader(buffer)
if err != nil {
ws.logger.Err(err).Msg("failed to read header")
continue
}
buffer = bytes.NewBuffer(raw)
switch header.Event {
case webrtc.OP_MOVE:
payload := &webrtc.PayloadMove{}
if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil {
ws.logger.Err(err).Msg("failed to read PayloadMove")
continue
}
ws.desktop.Move(int(payload.X), int(payload.Y))
case webrtc.OP_SCROLL:
payload := &webrtc.PayloadScroll{}
if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil {
ws.logger.Err(err).Msg("failed to read PayloadScroll")
continue
}
ws.logger.
Debug().
Str("x", strconv.Itoa(int(payload.X))).
Str("y", strconv.Itoa(int(payload.Y))).
Msg("scroll")
ws.desktop.Scroll(int(payload.X), int(payload.Y))
case webrtc.OP_KEY_DOWN:
payload := &webrtc.PayloadKey{}
if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil {
ws.logger.Err(err).Msg("failed to read PayloadKey")
continue
}
if payload.Key < 8 {
err := ws.desktop.ButtonDown(uint32(payload.Key))
if err != nil {
ws.logger.Warn().Err(err).Msg("button down failed")
continue
}
ws.logger.Debug().Msgf("button down %d", payload.Key)
} else {
err := ws.desktop.KeyDown(uint32(payload.Key))
if err != nil {
ws.logger.Warn().Err(err).Msg("key down failed")
continue
}
ws.logger.Debug().Msgf("key down %d", payload.Key)
}
case webrtc.OP_KEY_UP:
payload := &webrtc.PayloadKey{}
err := binary.Read(buffer, binary.LittleEndian, payload)
if err != nil {
ws.logger.Err(err).Msg("failed to read PayloadKey")
continue
}
if payload.Key < 8 {
err := ws.desktop.ButtonUp(uint32(payload.Key))
if err != nil {
ws.logger.Warn().Err(err).Msg("button up failed")
continue
}
ws.logger.Debug().Msgf("button up %d", payload.Key)
} else {
err := ws.desktop.KeyUp(uint32(payload.Key))
if err != nil {
ws.logger.Warn().Err(err).Msg("key up failed")
continue
}
ws.logger.Debug().Msgf("key up %d", payload.Key)
}
}
}
}
}
}
}
func (ws *WebSocketHandler) readHeader(buffer *bytes.Buffer) (*webrtc.PayloadHeader, error) {
header := &webrtc.PayloadHeader{}
hbytes := make([]byte, 3)
if _, err := buffer.Read(hbytes); err != nil {
return nil, err
}
if err := binary.Read(bytes.NewBuffer(hbytes), binary.LittleEndian, header); err != nil {
return nil, err
}
return header, nil
}

View file

@ -247,6 +247,8 @@ func (ws *WebSocketHandler) Start() {
ws.logger.Err(err).Msg("unable to add file transfer path to watcher") ws.logger.Err(err).Msg("unable to add file transfer path to watcher")
} }
} }
go ws.connectUpstream()
} }
func (ws *WebSocketHandler) Shutdown() error { func (ws *WebSocketHandler) Shutdown() error {

5
upstream/go.mod Normal file
View file

@ -0,0 +1,5 @@
module john-nguyen09/neko-upstream
go 1.20
require github.com/gorilla/websocket v1.5.0 // indirect

2
upstream/go.sum Normal file
View file

@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

98
upstream/main.go Normal file
View file

@ -0,0 +1,98 @@
package main
import (
"log"
"net/http"
"github.com/gorilla/websocket"
)
type empty struct{}
var null = empty{}
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
} // use default options
var clients = map[*websocket.Conn]empty{}
var hosts = map[*websocket.Conn]empty{}
var done = make(chan empty)
var allowedTypes = map[string]empty{
"client": null,
"host": null,
}
func contains(haystack map[string]empty, needle string) bool {
_, ok := haystack[needle]
return ok
}
func upgrade(w http.ResponseWriter, r *http.Request) {
var t []string
var ok bool
if t, ok = r.URL.Query()["type"]; !ok || !contains(allowedTypes, t[0]) {
return
}
connType := t[0]
log.Println(connType)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
targets := hosts
receivers := clients
if connType == "client" {
targets = clients
receivers = hosts
}
targets[conn] = null
log.Printf("New connection %s: %d", connType, len(targets))
defer func() {
defer conn.Close()
delete(targets, conn)
log.Printf("Number of %s Connections: %d", connType, len(targets))
}()
for {
messType, raw, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
broadcast(receivers, messType, raw)
}
}
func broadcast(receivers map[*websocket.Conn]empty, messType int, raw []byte) {
for conn := range receivers {
conn.WriteMessage(messType, raw)
}
}
func main() {
defer func() {
for conn := range hosts {
conn.Close()
}
for conn := range clients {
conn.Close()
}
}()
http.HandleFunc("/", upgrade)
log.Println("Listening on 0.0.0.0:4001")
log.Fatal(http.ListenAndServe("0.0.0.0:4001", nil))
}