diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..b492b726 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,38 @@ +# +# STAGE 1: SERVER +# +FROM golang:1.20-bullseye as server +WORKDIR /src + +# +# install dependencies +RUN set -eux; apt-get update; \ + apt-get install -y --no-install-recommends git cmake make libx11-dev libxrandr-dev libxtst-dev \ + libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-base gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly; \ + # + # install libclipboard + set -eux; \ + cd /tmp; \ + git clone --depth=1 https://github.com/jtanx/libclipboard; \ + cd libclipboard; \ + cmake .; \ + make -j4; \ + make install; \ + rm -rf /tmp/libclipboard; \ + # + # clean up + apt-get clean -y; \ + rm -rf /var/lib/apt/lists/* /var/cache/apt/* + +COPY server/go.mod ./ +COPY server/go.sum ./ +RUN go mod download + +# +# build server +COPY server/ . +RUN ./build + +FROM ghcr.io/m1k1o/neko/intel-firefox + +COPY --from=server /src/bin/neko /usr/bin/neko diff --git a/docker-compose.yaml b/docker-compose.yml similarity index 57% rename from docker-compose.yaml rename to docker-compose.yml index 8738b2b8..ade11332 100644 --- a/docker-compose.yaml +++ b/docker-compose.yml @@ -1,15 +1,19 @@ version: "3.4" services: neko: - image: "m1k1o/neko:firefox" + build: . restart: "unless-stopped" shm_size: "2gb" ports: - "8080:8080" - "52000-52100:52000-52100/udp" + extra_hosts: + - "host.docker.internal:host-gateway" environment: - NEKO_SCREEN: 1920x1080@30 + NEKO_SCREEN: 1920x1080@60 NEKO_PASSWORD: neko NEKO_PASSWORD_ADMIN: admin NEKO_EPR: 52000-52100 NEKO_ICELITE: 1 + NEKO_BROADCAST_URL: rtmp://ome.thuan.au:1935/app/stream-neko + NEKO_NAT1TO1: 192.168.0.34 diff --git a/neko.code-workspace b/neko.code-workspace index 997329c9..ae4f6b97 100644 --- a/neko.code-workspace +++ b/neko.code-workspace @@ -1,15 +1,18 @@ { "folders": [ - { - "path": "server" - }, - { - "path": "client" - }, - { - "path": "." - } - ], + { + "path": "server" + }, + { + "path": "client" + }, + { + "path": "." + }, + { + "path": "upstream" + } + ], "settings": { "editor.tabSize": 2, "editor.insertSpaces": true, diff --git a/server/internal/capture/pipelines.go b/server/internal/capture/pipelines.go index 654ddd31..c13fe68c 100644 --- a/server/internal/capture/pipelines.go +++ b/server/internal/capture/pipelines.go @@ -35,7 +35,7 @@ const ( ) func NewBroadcastPipeline(device string, display string, pipelineSrc string, url string) (string, error) { - video := fmt.Sprintf(videoSrc, display, 25) + video := fmt.Sprintf(videoSrc, display, 60) audio := fmt.Sprintf(audioSrc, device) var pipelineStr string @@ -47,7 +47,7 @@ func NewBroadcastPipeline(device string, display string, pipelineSrc string, url // replace display pipelineStr = strings.Replace(pipelineStr, "{display}", display, -1) } else { - pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s x264enc bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", url, audio, video) + pipelineStr = fmt.Sprintf("flvmux name=mux ! rtmpsink location='%s live=1' %s audio/x-raw,channels=2 ! audioconvert ! voaacenc ! mux. %s video/x-raw,format=NV12 ! x264enc bitrate=%d bframes=0 key-int-max=60 byte-stream=true tune=zerolatency speed-preset=veryfast ! mux.", url, audio, video, 70000) } return pipelineStr, nil diff --git a/server/internal/websocket/upstream.go b/server/internal/websocket/upstream.go new file mode 100644 index 00000000..db511f41 --- /dev/null +++ b/server/internal/websocket/upstream.go @@ -0,0 +1,144 @@ +package websocket + +import ( + "bytes" + "encoding/binary" + "m1k1o/neko/internal/webrtc" + "strconv" + "time" + + "github.com/gorilla/websocket" +) + +func (ws *WebSocketHandler) connectUpstream() { + upstreamURL := "ws://cave.thuan.au:4001/?type=host" + retryTicker := time.NewTicker(5 * time.Second) + + for { + select { + case <-ws.shutdown: + return + case <-retryTicker.C: + ws.logger.Debug().Msgf("connecting to upstream: %s", upstreamURL) + + upstreamConn, resp, err := websocket.DefaultDialer.Dial(upstreamURL, nil) + if err != nil { + if err == websocket.ErrBadHandshake { + ws.logger.Err(err).Msgf("failed to connect to upstream, status: %d", resp.StatusCode) + } else { + ws.logger.Err(err).Msg("failed to connect to upstream") + } + } else { + defer func () { + upstreamConn.Close() + }() + + for { + _, raw, err := upstreamConn.ReadMessage() + if err != nil { + ws.logger.Err(err).Msg("failed to read message from upstream") + break + } + + buffer := bytes.NewBuffer(raw) + header, err := ws.readHeader(buffer) + if err != nil { + ws.logger.Err(err).Msg("failed to read header") + continue + } + + buffer = bytes.NewBuffer(raw) + + switch header.Event { + case webrtc.OP_MOVE: + payload := &webrtc.PayloadMove{} + if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { + ws.logger.Err(err).Msg("failed to read PayloadMove") + continue + } + + ws.desktop.Move(int(payload.X), int(payload.Y)) + case webrtc.OP_SCROLL: + payload := &webrtc.PayloadScroll{} + if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { + ws.logger.Err(err).Msg("failed to read PayloadScroll") + continue + } + + ws.logger. + Debug(). + Str("x", strconv.Itoa(int(payload.X))). + Str("y", strconv.Itoa(int(payload.Y))). + Msg("scroll") + + ws.desktop.Scroll(int(payload.X), int(payload.Y)) + case webrtc.OP_KEY_DOWN: + payload := &webrtc.PayloadKey{} + if err := binary.Read(buffer, binary.LittleEndian, payload); err != nil { + ws.logger.Err(err).Msg("failed to read PayloadKey") + continue + } + + if payload.Key < 8 { + err := ws.desktop.ButtonDown(uint32(payload.Key)) + if err != nil { + ws.logger.Warn().Err(err).Msg("button down failed") + continue + } + + ws.logger.Debug().Msgf("button down %d", payload.Key) + } else { + err := ws.desktop.KeyDown(uint32(payload.Key)) + if err != nil { + ws.logger.Warn().Err(err).Msg("key down failed") + continue + } + + ws.logger.Debug().Msgf("key down %d", payload.Key) + } + case webrtc.OP_KEY_UP: + payload := &webrtc.PayloadKey{} + err := binary.Read(buffer, binary.LittleEndian, payload) + if err != nil { + ws.logger.Err(err).Msg("failed to read PayloadKey") + continue + } + + if payload.Key < 8 { + err := ws.desktop.ButtonUp(uint32(payload.Key)) + if err != nil { + ws.logger.Warn().Err(err).Msg("button up failed") + continue + } + + ws.logger.Debug().Msgf("button up %d", payload.Key) + } else { + err := ws.desktop.KeyUp(uint32(payload.Key)) + if err != nil { + ws.logger.Warn().Err(err).Msg("key up failed") + continue + } + + ws.logger.Debug().Msgf("key up %d", payload.Key) + } + } + } + } + } + } +} + +func (ws *WebSocketHandler) readHeader(buffer *bytes.Buffer) (*webrtc.PayloadHeader, error) { + header := &webrtc.PayloadHeader{} + hbytes := make([]byte, 3) + + if _, err := buffer.Read(hbytes); err != nil { + return nil, err + } + + if err := binary.Read(bytes.NewBuffer(hbytes), binary.LittleEndian, header); err != nil { + return nil, err + } + + return header, nil +} diff --git a/server/internal/websocket/websocket.go b/server/internal/websocket/websocket.go index 6958c6fe..c7f207e1 100644 --- a/server/internal/websocket/websocket.go +++ b/server/internal/websocket/websocket.go @@ -247,6 +247,8 @@ func (ws *WebSocketHandler) Start() { ws.logger.Err(err).Msg("unable to add file transfer path to watcher") } } + + go ws.connectUpstream() } func (ws *WebSocketHandler) Shutdown() error { diff --git a/upstream/go.mod b/upstream/go.mod new file mode 100644 index 00000000..a26c0bb3 --- /dev/null +++ b/upstream/go.mod @@ -0,0 +1,5 @@ +module john-nguyen09/neko-upstream + +go 1.20 + +require github.com/gorilla/websocket v1.5.0 // indirect diff --git a/upstream/go.sum b/upstream/go.sum new file mode 100644 index 00000000..e5a03d4d --- /dev/null +++ b/upstream/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/upstream/main.go b/upstream/main.go new file mode 100644 index 00000000..9153030e --- /dev/null +++ b/upstream/main.go @@ -0,0 +1,98 @@ +package main + +import ( + "log" + "net/http" + + "github.com/gorilla/websocket" +) + +type empty struct{} + +var null = empty{} + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} // use default options + +var clients = map[*websocket.Conn]empty{} +var hosts = map[*websocket.Conn]empty{} + +var done = make(chan empty) +var allowedTypes = map[string]empty{ + "client": null, + "host": null, +} + +func contains(haystack map[string]empty, needle string) bool { + _, ok := haystack[needle] + + return ok +} + +func upgrade(w http.ResponseWriter, r *http.Request) { + var t []string + var ok bool + if t, ok = r.URL.Query()["type"]; !ok || !contains(allowedTypes, t[0]) { + return + } + + connType := t[0] + log.Println(connType) + conn, err := upgrader.Upgrade(w, r, nil) + + if err != nil { + log.Print("upgrade:", err) + return + } + + targets := hosts + receivers := clients + if connType == "client" { + targets = clients + receivers = hosts + } + + targets[conn] = null + log.Printf("New connection %s: %d", connType, len(targets)) + + defer func() { + defer conn.Close() + delete(targets, conn) + log.Printf("Number of %s Connections: %d", connType, len(targets)) + }() + + for { + messType, raw, err := conn.ReadMessage() + + if err != nil { + log.Println(err) + return + } + + broadcast(receivers, messType, raw) + } +} + +func broadcast(receivers map[*websocket.Conn]empty, messType int, raw []byte) { + for conn := range receivers { + conn.WriteMessage(messType, raw) + } +} + +func main() { + defer func() { + for conn := range hosts { + conn.Close() + } + for conn := range clients { + conn.Close() + } + }() + + http.HandleFunc("/", upgrade) + log.Println("Listening on 0.0.0.0:4001") + log.Fatal(http.ListenAndServe("0.0.0.0:4001", nil)) +}