diff --git a/internal/session/manager.go b/internal/session/manager.go index ba0c5305..f5d39166 100644 --- a/internal/session/manager.go +++ b/internal/session/manager.go @@ -15,14 +15,12 @@ import ( func New(config *config.Session) *SessionManagerCtx { manager := &SessionManagerCtx{ - logger: log.With().Str("module", "session").Logger(), - config: config, - host: nil, - hostMu: sync.Mutex{}, - tokens: make(map[string]string), - sessions: make(map[string]*SessionCtx), - sessionsMu: sync.Mutex{}, - emmiter: events.New(), + logger: log.With().Str("module", "session").Logger(), + config: config, + tokens: make(map[string]string), + sessions: make(map[string]*SessionCtx), + cursors: make(map[types.Session]types.Cursor), + emmiter: events.New(), } // create API session @@ -48,13 +46,19 @@ func New(config *config.Session) *SessionManagerCtx { } type SessionManagerCtx struct { - logger zerolog.Logger - config *config.Session - host types.Session - hostMu sync.Mutex + logger zerolog.Logger + config *config.Session + tokens map[string]string sessions map[string]*SessionCtx sessionsMu sync.Mutex + + host types.Session + hostMu sync.Mutex + + cursors map[types.Session]types.Cursor + cursorsMu sync.Mutex + emmiter events.EventEmmiter apiSession *SessionCtx } @@ -193,6 +197,32 @@ func (manager *SessionManagerCtx) ClearHost() { manager.SetHost(nil) } +// --- +// cursors +// --- + +func (manager *SessionManagerCtx) SetCursor(x, y int, session types.Session) { + manager.cursorsMu.Lock() + defer manager.cursorsMu.Unlock() + + pos, ok := manager.cursors[session] + if ok { + pos.X, pos.Y = x, y + } else { + manager.cursors[session] = types.Cursor{X: x, Y: y} + } +} + +func (manager *SessionManagerCtx) PopCursors() map[types.Session]types.Cursor { + manager.cursorsMu.Lock() + defer manager.cursorsMu.Unlock() + + cursors := manager.cursors + manager.cursors = make(map[types.Session]types.Cursor) + + return cursors +} + // --- // broadcasts // --- diff --git a/internal/session/session.go b/internal/session/session.go index 5c84d726..85e16a30 100644 --- a/internal/session/session.go +++ b/internal/session/session.go @@ -16,10 +16,6 @@ type SessionCtx struct { profile types.MemberProfile state types.SessionState - positionX int - positionY int - positionMu sync.Mutex - websocketPeer types.WebSocketPeer websocketMu sync.Mutex @@ -57,16 +53,8 @@ func (session *SessionCtx) IsHost() bool { return session.manager.GetHost() == session } -// --- -// cursor position -// --- - -func (session *SessionCtx) SetPosition(x, y int) { - session.positionMu.Lock() - defer session.positionMu.Unlock() - - session.positionX = x - session.positionY = y +func (session *SessionCtx) SetCursor(x, y int) { + session.manager.SetCursor(x, y, session) } // --- diff --git a/internal/types/event/events.go b/internal/types/event/events.go index e16ed9ab..7583e258 100644 --- a/internal/types/event/events.go +++ b/internal/types/event/events.go @@ -21,6 +21,7 @@ const ( SESSION_DELETED = "session/deleted" SESSION_PROFILE = "session/profile" SESSION_STATE = "session/state" + SESSION_CURSORS = "session/cursors" ) const ( diff --git a/internal/types/message/messages.go b/internal/types/message/messages.go index bd64db7b..bff58a34 100644 --- a/internal/types/message/messages.go +++ b/internal/types/message/messages.go @@ -87,6 +87,12 @@ type SessionData struct { State types.SessionState `json:"state"` } +type SessionCursor struct { + ID string `json:"id"` + X uint16 `json:"x"` + Y uint16 `json:"y"` +} + ///////////////////////////// // Control ///////////////////////////// diff --git a/internal/types/session.go b/internal/types/session.go index c814fc5d..95063766 100644 --- a/internal/types/session.go +++ b/internal/types/session.go @@ -12,6 +12,11 @@ var ( ErrSessionLoginDisabled = errors.New("session login disabled") ) +type Cursor struct { + X int + Y int +} + type SessionState struct { IsConnected bool `json:"is_connected"` IsWatching bool `json:"is_watching"` @@ -23,8 +28,8 @@ type Session interface { State() SessionState IsHost() bool - // cursor position - SetPosition(x, y int) + // cursor + SetCursor(x, y int) // websocket SetWebSocketPeer(websocketPeer WebSocketPeer) @@ -50,6 +55,9 @@ type SessionManager interface { GetHost() Session ClearHost() + SetCursor(x, y int, session Session) + PopCursors() map[Session]Cursor + Broadcast(event string, payload interface{}, exclude interface{}) AdminBroadcast(event string, payload interface{}, exclude interface{}) diff --git a/internal/webrtc/handler.go b/internal/webrtc/handler.go index 3c06ae4f..0732206b 100644 --- a/internal/webrtc/handler.go +++ b/internal/webrtc/handler.go @@ -52,7 +52,7 @@ func (manager *WebRTCManagerCtx) handle(data []byte, session types.Session) erro // handle inactive cursor movement if session.Profile().CanHost { - session.SetPosition(x, y) + session.SetCursor(x, y) } return nil diff --git a/internal/websocket/manager.go b/internal/websocket/manager.go index 573aa20d..9295e253 100644 --- a/internal/websocket/manager.go +++ b/internal/websocket/manager.go @@ -132,6 +132,41 @@ func (manager *WebSocketManagerCtx) Start() { manager.fileChooserDialogEvents() + manager.wg.Add(1) + go func() { + defer manager.wg.Done() + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-manager.shutdown: + return + case <-ticker.C: + cursorsMap := manager.sessions.PopCursors() + if len(cursorsMap) == 0 { + continue + } + + cursors := []message.SessionCursor{} + for session, cursor := range cursorsMap { + cursors = append( + cursors, + message.SessionCursor{ + ID: session.ID(), + X: uint16(cursor.X), + Y: uint16(cursor.Y), + }, + ) + } + + // TODO: Send to subscribers only. + manager.sessions.AdminBroadcast(event.SESSION_CURSORS, cursors, nil) + } + } + }() + manager.logger.Info().Msg("websocket starting") }