;; This Source Code Form is subject to the terms of the Mozilla Public ;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; ;; This Source Code Form is "Incompatible With Secondary Licenses", as ;; defined by the Mozilla Public License, v. 2.0. ;; ;; Copyright (c) 2020-2021 UXBOX Labs SL (ns app.notifications "A websocket based notifications mechanism." (:require [app.common.spec :as us] [app.db :as db] [app.metrics :as mtx] [app.redis :as rd] [app.util.async :as aa] [app.util.transit :as t] [clojure.core.async :as a] [clojure.spec.alpha :as s] [clojure.tools.logging :as log] [integrant.core :as ig] [ring.adapter.jetty9 :as jetty] [ring.middleware.cookies :refer [wrap-cookies]] [ring.middleware.keyword-params :refer [wrap-keyword-params]] [ring.middleware.params :refer [wrap-params]])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Http Handler ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (declare retrieve-file) (declare websocket) (declare handler) (s/def ::session map?) (defmethod ig/pre-init-spec ::handler [_] (s/keys :req-un [::rd/redis ::db/pool ::session ::mtx/metrics])) (defmethod ig/init-key ::handler [_ {:keys [session metrics] :as cfg}] (let [wrap-session (:middleware session) mtx-active-connections (mtx/create {:name "websocket_notifications_active_connections" :registry (:registry metrics) :type :gauge :help "Active websocket connections on notifications service."}) mtx-message-recv (mtx/create {:name "websocket_notifications_message_recv_timing" :registry (:registry metrics) :type :summary :help "Message receive summary timing (ms)."}) mtx-message-send (mtx/create {:name "websocket_notifications_message_send_timing" :registry (:registry metrics) :type :summary :help "Message receive summary timing (ms)."}) cfg (assoc cfg :mtx-active-connections mtx-active-connections :mtx-message-recv mtx-message-recv :mtx-message-send mtx-message-send)] (-> #(handler cfg %) (wrap-session) (wrap-keyword-params) (wrap-cookies) (wrap-params)))) (s/def ::file-id ::us/uuid) (s/def ::session-id ::us/uuid) (s/def ::websocket-handler-params (s/keys :req-un [::file-id ::session-id])) (defn- handler [{:keys [pool] :as cfg} {:keys [profile-id params] :as req}] (let [params (us/conform ::websocket-handler-params params) file (retrieve-file pool (:file-id params)) cfg (merge cfg params {:profile-id profile-id :team-id (:team-id file)})] (cond (not profile-id) {:error {:code 403 :message "Authentication required"}} (not file) {:error {:code 404 :message "File does not exists"}} :else (websocket cfg)))) (def ^:private sql:retrieve-file "select f.id as id, p.team_id as team_id from file as f join project as p on (p.id = f.project_id) where f.id = ?") (defn- retrieve-file [conn id] (db/exec-one! conn [sql:retrieve-file id])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; WebSocket Http Handler ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (declare handle-connect) (defrecord WebSocket [conn in out sub]) (defn- ws-send [conn data] (try (when (jetty/connected? conn) (jetty/send! conn data) true) (catch java.lang.NullPointerException _e false))) (defn websocket [{:keys [file-id team-id redis] :as cfg}] (let [in (a/chan 32) out (a/chan 32) mtx-active-connections (:mtx-active-connections cfg) mtx-message-send (:mtx-message-send cfg) mtx-message-recv (:mtx-message-recv cfg) ws-send (mtx/wrap-summary ws-send mtx-message-send)] (letfn [(on-connect [conn] (mtx-active-connections :inc) (let [sub (rd/subscribe redis {:xform (map t/decode-str) :topics [file-id team-id]}) ws (WebSocket. conn in out sub nil cfg)] ;; message forwarding loop (a/go-loop [] (let [val (a/!! in message)))] {:on-connect on-connect :on-error on-error :on-close on-close :on-text (mtx/wrap-summary on-message mtx-message-recv) :on-bytes (constantly nil)}))) (declare handle-message) (declare start-loop!) (defn- handle-connect [{:keys [conn] :as ws}] (a/go (try (aa/! out val)) (recur)) ;; Timeout channel signaling (= port timeout) (do (a/>! out {:type :ping}) (recur)) :else nil))))) ;; Incoming Messages Handling (defn- publish [redis channel message] (aa/go-try (let [message (t/encode-str message)] (aa/