diff --git a/backend/src/app/binfile/v1.clj b/backend/src/app/binfile/v1.clj index 183c3ac69..1e3a7b917 100644 --- a/backend/src/app/binfile/v1.clj +++ b/backend/src/app/binfile/v1.clj @@ -20,7 +20,6 @@ [app.common.uuid :as uuid] [app.config :as cf] [app.db :as db] - [app.http.sse :as sse] [app.loggers.audit :as-alias audit] [app.loggers.webhooks :as-alias webhooks] [app.media :as media] @@ -30,6 +29,7 @@ [app.storage :as sto] [app.storage.tmp :as tmp] [app.tasks.file-gc] + [app.util.events :as events] [app.util.time :as dt] [app.worker :as-alias wrk] [clojure.java.io :as jio] @@ -475,9 +475,6 @@ features (cfeat/get-team-enabled-features cf/flags team)] - (sse/tap {:type :import-progress - :section :read-import}) - ;; Process all sections (run! (fn [section] (l/dbg :hint "reading section" :section section ::l/sync? true) @@ -487,8 +484,7 @@ (assoc ::section section) (assoc ::input input))] (binding [bfc/*options* options] - (sse/tap {:type :import-progress - :section section}) + (events/tap :progress {:op :import :section section}) (read-section options)))) [:v1/metadata :v1/files :v1/rels :v1/sobjects]) diff --git a/backend/src/app/binfile/v2.clj b/backend/src/app/binfile/v2.clj index 33a92a03b..8ec2e1921 100644 --- a/backend/src/app/binfile/v2.clj +++ b/backend/src/app/binfile/v2.clj @@ -18,12 +18,12 @@ [app.config :as cf] [app.db :as db] [app.db.sql :as sql] - [app.http.sse :as sse] [app.loggers.audit :as-alias audit] [app.loggers.webhooks :as-alias webhooks] [app.media :as media] [app.storage :as sto] [app.storage.tmp :as tmp] + [app.util.events :as events] [app.util.time :as dt] [app.worker :as-alias wrk] [clojure.set :as set] @@ -116,13 +116,15 @@ (defn- write-team! [cfg team-id] - (sse/tap {:type :export-progress - :section :write-team - :id team-id}) - (let [team (bfc/get-team cfg team-id) fonts (bfc/get-fonts cfg team-id)] + (events/tap :progress + {:op :export + :section :write-team + :id team-id + :name (:name team)}) + (l/trc :hint "write" :obj "team" :id (str team-id) :fonts (count fonts)) @@ -138,28 +140,29 @@ (defn- write-project! [cfg project-id] - - (sse/tap {:type :export-progress - :section :write-project - :id project-id}) - (let [project (bfc/get-project cfg project-id)] + (events/tap :progress + {:op :export + :section :write-project + :id project-id + :name (:name project)}) (l/trc :hint "write" :obj "project" :id (str project-id)) (write! cfg :project (str project-id) project) (vswap! bfc/*state* update :projects conj project-id))) (defn- write-file! [cfg file-id] - - (sse/tap {:type :export-progress - :section :write-file - :id file-id}) - (let [file (bfc/get-file cfg file-id) thumbs (bfc/get-file-object-thumbnails cfg file-id) media (bfc/get-file-media cfg file) rels (bfc/get-files-rels cfg #{file-id})] + (events/tap :progress + {:op :export + :section :write-file + :id file-id + :name (:name file)}) + (vswap! bfc/*state* (fn [state] (-> state (update :files conj file-id) @@ -218,10 +221,6 @@ [{:keys [::db/conn ::bfc/timestamp] :as cfg} team-id] (l/trc :hint "read" :obj "team" :id (str team-id)) - (sse/tap {:type :import-progress - :section :read-team - :id team-id}) - (let [team (read-obj cfg :team team-id) team (-> team (update :id bfc/lookup-index) @@ -229,6 +228,12 @@ (assoc :created-at timestamp) (assoc :modified-at timestamp))] + (events/tap :progress + {:op :import + :section :read-team + :id team-id + :name (:name team)}) + (db/insert! conn :team (update team :features db/encode-pgarray conn "text") ::db/return-keys false) @@ -253,10 +258,6 @@ [{:keys [::db/conn ::bfc/timestamp] :as cfg} project-id] (l/trc :hint "read" :obj "project" :id (str project-id)) - (sse/tap {:type :import-progress - :section :read-project - :id project-id}) - (let [project (read-obj cfg :project project-id) project (-> project (update :id bfc/lookup-index) @@ -264,6 +265,12 @@ (assoc :created-at timestamp) (assoc :modified-at timestamp))] + (events/tap :progress + {:op :import + :section :read-project + :id project-id + :name (:name project)}) + (db/insert! conn :project project ::db/return-keys false))) @@ -271,15 +278,17 @@ [{:keys [::db/conn ::bfc/timestamp] :as cfg} file-id] (l/trc :hint "read" :obj "file" :id (str file-id)) - (sse/tap {:type :import-progress - :section :read-file - :id file-id}) - (let [file (-> (read-obj cfg :file file-id) (update :id bfc/lookup-index) (update :project-id bfc/lookup-index) (bfc/process-file))] + (events/tap :progress + {:op :import + :section :read-file + :id file-id + :name (:name file)}) + ;; All features that are enabled and requires explicit migration are ;; added to the state for a posterior migration step. (doseq [feature (-> (::bfc/features cfg) diff --git a/backend/src/app/features/components_v2.clj b/backend/src/app/features/components_v2.clj index 03240a904..548255a5a 100644 --- a/backend/src/app/features/components_v2.clj +++ b/backend/src/app/features/components_v2.clj @@ -41,7 +41,6 @@ [app.db :as db] [app.db.sql :as sql] [app.features.fdata :as fdata] - [app.http.sse :as sse] [app.media :as media] [app.rpc.commands.files :as files] [app.rpc.commands.files-snapshot :as fsnap] @@ -51,6 +50,7 @@ [app.svgo :as svgo] [app.util.blob :as blob] [app.util.cache :as cache] + [app.util.events :as events] [app.util.pointer-map :as pmap] [app.util.time :as dt] [buddy.core.codecs :as bc] @@ -767,8 +767,6 @@ backup', generate main instances for all components there and remove shapes from library components. Mark the file with the :components-v2 option." [file-data libraries] - (sse/tap {:type :migration-progress - :section :components}) (let [file-data (prepare-file-data file-data libraries) components (ctkl/components-seq file-data)] (if (empty? components) @@ -843,9 +841,9 @@ add-instance-grid (fn [fdata frame-id grid assets] (reduce (fn [result [component position]] - (sse/tap {:type :migration-progress - :section :components - :name (:name component)}) + (events/tap :progress {:op :migrate-component + :id (:id component) + :name (:name component)}) (add-main-instance result component frame-id (gpt/add position (gpt/point grid-gap grid-gap)))) fdata @@ -881,9 +879,9 @@ (gpt/add position (gpt/point 0 (+ height (* 2 grid-gap) frame-gap)))))))))] (let [total (count components)] - (some-> *stats* (swap! update :processed/components (fnil + 0) total)) - (some-> *team-stats* (swap! update :processed/components (fnil + 0) total)) - (some-> *file-stats* (swap! assoc :processed/components total))) + (some-> *stats* (swap! update :processed-components (fnil + 0) total)) + (some-> *team-stats* (swap! update :processed-components (fnil + 0) total)) + (some-> *file-stats* (swap! assoc :processed-components total))) (add-instance-grids file-data))))) @@ -1143,16 +1141,14 @@ (->> (d/zip media-group grid) (reduce (fn [fdata [mobj position]] - (sse/tap {:type :migration-progress - :section :graphics - :name (:name mobj)}) + (events/tap :progress {:op :migrate-graphic + :id (:id mobj) + :name (:name mobj)}) (or (process fdata mobj position) fdata)) (assoc-in fdata [:options :components-v2] true))))) (defn- migrate-graphics [fdata] - (sse/tap {:type :migration-progress - :section :graphics}) (if (empty? (:media fdata)) fdata (let [[fdata page-id start-pos] @@ -1167,9 +1163,9 @@ groups (get-asset-groups media "Graphics")] (let [total (count media)] - (some-> *stats* (swap! update :processed/graphics (fnil + 0) total)) - (some-> *team-stats* (swap! update :processed/graphics (fnil + 0) total)) - (some-> *file-stats* (swap! assoc :processed/graphics total))) + (some-> *stats* (swap! update :processed-graphics (fnil + 0) total)) + (some-> *team-stats* (swap! update :processed-graphics (fnil + 0) total)) + (some-> *file-stats* (swap! assoc :processed-graphics total))) (loop [groups (seq groups) fdata fdata @@ -1236,10 +1232,8 @@ (cfv/validate-file-schema! file)) (defn- process-file - [{:keys [::db/conn] :as system} id & {:keys [validate?]}] - (let [file (get-file system id) - - libs (->> (files/get-file-libraries conn id) + [{:keys [::db/conn] :as system} {:keys [id] :as file} & {:keys [validate?]}] + (let [libs (->> (files/get-file-libraries conn id) (into [file] (comp (map :id) (map (partial get-file system)))) (d/index-by :id)) @@ -1314,7 +1308,13 @@ (when (string? label) (fsnap/take-file-snapshot! system {:file-id file-id :label (str "migration/" label)})) - (process-file system file-id :validate? validate?)) + (let [file (get-file system file-id)] + (events/tap :progress + {:op :migrate-file + :name (:name file) + :id (:id file)}) + + (process-file system file :validate? validate?))) (catch Throwable cause (let [team-id *team-id*] @@ -1325,8 +1325,8 @@ (finally (let [elapsed (tpoint) - components (get @*file-stats* :processed/components 0) - graphics (get @*file-stats* :processed/graphics 0)] + components (get @*file-stats* :processed-components 0) + graphics (get @*file-stats* :processed-graphics 0)] (l/dbg :hint "migrate:file:end" :file-id (str file-id) @@ -1335,8 +1335,8 @@ :validate validate? :elapsed (dt/format-duration elapsed)) - (some-> *stats* (swap! update :processed/files (fnil inc 0))) - (some-> *team-stats* (swap! update :processed/files (fnil inc 0))))))))) + (some-> *stats* (swap! update :processed-files (fnil inc 0))) + (some-> *team-stats* (swap! update :processed-files (fnil inc 0))))))))) (defn migrate-team! [system team-id & {:keys [validate? skip-on-graphic-error? label]}] @@ -1355,7 +1355,7 @@ :skip-on-graphic-error? skip-on-graphic-error?)) migrate-team (fn [{:keys [::db/conn] :as system} team-id] - (let [{:keys [id features]} (get-team system team-id)] + (let [{:keys [id features name]} (get-team system team-id)] (if (contains? features "components/v2") (l/inf :hint "team already migrated") (let [features (-> features @@ -1364,6 +1364,11 @@ (conj "layout/grid") (conj "styles/v2"))] + (events/tap :progress + {:op :migrate-team + :name name + :id id}) + (run! (partial migrate-file system) (get-and-lock-files conn id)) @@ -1380,11 +1385,12 @@ (finally (let [elapsed (tpoint) - components (get @*team-stats* :processed/components 0) - graphics (get @*team-stats* :processed/graphics 0) - files (get @*team-stats* :processed/files 0)] + components (get @*team-stats* :processed-components 0) + graphics (get @*team-stats* :processed-graphics 0) + files (get @*team-stats* :processed-files 0)] - (some-> *stats* (swap! update :processed/teams (fnil inc 0))) + (when-not @err + (some-> *stats* (swap! update :processed-teams (fnil inc 0)))) (if (cache/cache? *cache*) (let [cache-stats (cache/stats *cache*)] diff --git a/backend/src/app/http/sse.clj b/backend/src/app/http/sse.clj index 0ece3b329..ec80df72d 100644 --- a/backend/src/app/http/sse.clj +++ b/backend/src/app/http/sse.clj @@ -9,11 +9,10 @@ (:refer-clojure :exclude [tap]) (:require [app.common.data :as d] - [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.transit :as t] [app.http.errors :as errors] - [promesa.core :as p] + [app.util.events :as events] [promesa.exec :as px] [promesa.exec.csp :as sp] [promesa.util :as pu] @@ -21,26 +20,12 @@ (:import java.io.OutputStream)) -(def ^:dynamic *channel* nil) - (defn- write! - [^OutputStream output ^bytes data] + [^OutputStream output ^bytes data] (l/trc :hint "writting data" :data data :length (alength data)) (.write output data) (.flush output)) -(defn- create-writer-loop - [^OutputStream output] - (try - (loop [] - (when-let [event (sp/take! *channel*)] - (let [result (ex/try! (write! output event))] - (if (ex/exception? result) - (l/wrn :hint "unexpected exception on sse writer" :cause result) - (recur))))) - (finally - (pu/close! output)))) - (defn- encode [[name data]] (try @@ -61,13 +46,6 @@ "Cache-Control" "no-cache, no-store, max-age=0, must-revalidate" "Pragma" "no-cache"}) -(defn tap - ([data] (tap "event" data)) - ([name data] - (when-let [channel *channel*] - (sp/put! channel [name data]) - nil))) - (defn response [handler & {:keys [buf] :or {buf 32} :as opts}] (fn [request] @@ -75,15 +53,18 @@ ::rres/status 200 ::rres/body (reify rres/StreamableResponseBody (-write-body-to-stream [_ _ output] - (binding [*channel* (sp/chan :buf buf :xf (keep encode))] - (let [writer (px/run! :virtual (partial create-writer-loop output))] + (binding [events/*channel* (sp/chan :buf buf :xf (keep encode))] + (let [listener (events/start-listener + (partial write! output) + (partial pu/close! output))] (try - (tap "end" (handler)) + (let [result (handler)] + (events/tap :end result)) (catch Throwable cause (binding [l/*context* (errors/request->context request)] (l/err :hint "unexpected error process streaming response" :cause cause)) - (tap "error" (errors/handle' cause request))) + (events/tap :error (errors/handle' cause request))) (finally - (sp/close! *channel*) - (p/await! writer)))))))})) + (sp/close! events/*channel*) + (px/await! listener)))))))})) diff --git a/backend/src/app/srepl/cli.clj b/backend/src/app/srepl/cli.clj index 9b4943bdc..6bcca5c0c 100644 --- a/backend/src/app/srepl/cli.clj +++ b/backend/src/app/srepl/cli.clj @@ -13,7 +13,8 @@ [app.db :as db] [app.main :as main] [app.rpc.commands.auth :as cmd.auth] - [app.srepl.components-v2] + [app.srepl.components-v2 :refer [migrate-teams!]] + [app.util.events :as events] [app.util.json :as json] [app.util.time :as dt] [cuerdas.core :as str])) @@ -106,25 +107,36 @@ (defmethod exec-command :migrate-v2 [_] - (letfn [(on-start [{:keys [total rollback]}] - (println - (str/ffmt "The components/v2 migration started (rollback:%, teams:%)" - (if rollback "on" "off") - total))) + (letfn [(on-progress-report [{:keys [elapsed completed errors]}] + (println (str/ffmt "-> Progress: completed: %, errors: %, elapsed: %" + completed errors elapsed))) + + (on-progress [{:keys [op name]}] + (case op + :migrate-team + (println (str/ffmt "-> Migrating team: \"%\"" name)) + :migrate-file + (println (str/ffmt "=> Migrating file: \"%\"" name)) + nil)) + + (on-event [[type payload]] + (case type + :progress-report (on-progress-report payload) + :progress (on-progress payload) + :error (on-error payload) + nil)) - (on-progress [{:keys [total elapsed progress completed]}] - (println (str/ffmt "Progress % (total: %, completed: %, elapsed: %)" - progress total completed elapsed))) (on-error [cause] - (println "ERR:" (ex-message cause))) + (println "EE:" (ex-message cause)))] - (on-end [_] - (println "Migration finished"))] - (app.srepl.components-v2/migrate-teams! main/system - :on-start on-start - :on-error on-error - :on-progress on-progress - :on-end on-end))) + (println "The components/v2 migration started...") + + (try + (let [result (-> (partial migrate-teams! main/system {:rollback? true}) + (events/run-with! on-event))] + (println (str/ffmt "Migration process finished (elapsed: %)" (:elapsed result)))) + (catch Throwable cause + (on-error cause))))) (defmethod exec-command :default [{:keys [::cmd]}] diff --git a/backend/src/app/srepl/components_v2.clj b/backend/src/app/srepl/components_v2.clj index 5e6a697bb..3db774600 100644 --- a/backend/src/app/srepl/components_v2.clj +++ b/backend/src/app/srepl/components_v2.clj @@ -8,13 +8,13 @@ (:require [app.common.data :as d] [app.common.logging :as l] - [app.common.pprint :as pp] [app.common.uuid :as uuid] [app.db :as db] [app.features.components-v2 :as feat] [app.main :as main] [app.svgo :as svgo] [app.util.cache :as cache] + [app.util.events :as events] [app.util.time :as dt] [app.worker :as-alias wrk] [cuerdas.core :as str] @@ -29,32 +29,30 @@ ;; PRIVATE HELPERS ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defn- print-stats! - [stats] - (->> stats - (into (sorted-map)) - (pp/pprint))) - (defn- report-progress-files [tpoint] (fn [_ _ oldv newv] - (when (not= (:processed/files oldv) - (:processed/files newv)) + (when (not= (:processed-files oldv) + (:processed-files newv)) (let [elapsed (tpoint)] (l/dbg :hint "progress" - :completed (:processed/files newv) + :completed (:processed-files newv) :elapsed (dt/format-duration elapsed)))))) (defn- report-progress-teams - [tpoint on-progress] + [tpoint] (fn [_ _ oldv newv] - (when (not= (:processed/teams oldv) - (:processed/teams newv)) - (let [completed (:processed/teams newv) + (when (or (not= (:processed-teams oldv) + (:processed-teams newv)) + (not= (:errors oldv) + (:errors newv))) + (let [completed (:processed-teams newv 0) + errors (:errors newv 0) elapsed (dt/format-duration (tpoint))] - (when (fn? on-progress) - (on-progress {:elapsed elapsed - :completed completed})) + (events/tap :progress-report + {:elapsed elapsed + :completed completed + :errors errors}) (l/dbg :hint "progress" :completed completed :elapsed elapsed))))) @@ -235,10 +233,10 @@ (feat/migrate-team! team-id :label label :validate? validate? - :skip-on-graphic-error? skip-on-graphic-error?)) - (print-stats! - (-> (deref feat/*stats*) - (assoc :elapsed (dt/format-duration (tpoint))))) + :skip-on-graphics-error? skip-on-graphic-error?)) + + (-> (deref feat/*stats*) + (assoc :elapsed (dt/format-duration (tpoint)))) (catch Throwable cause (l/dbg :hint "migrate:error" :cause cause)) @@ -261,8 +259,8 @@ a correct `:label`. That label is also used for persist a file snaphot before continue with the migration." [& {:keys [max-jobs max-items max-time rollback? validate? query - pred max-procs cache on-start on-progress on-error on-end - skip-on-graphic-error? label partitions current-partition] + pred max-procs cache skip-on-graphic-error? + label partitions current-partition] :or {validate? false rollback? true max-jobs 1 @@ -310,6 +308,14 @@ (l/wrn :hint "unexpected error on processing team (skiping)" :team-id (str team-id) :cause cause) + + (events/tap :error + (ex-info "unexpected error on processing team (skiping)" + {:team-id team-id} + cause)) + + (swap! stats update :errors (fnil inc 0)) + (when (string? label) (report! main/system team-id label (tpoint) (ex-message cause)))) @@ -336,15 +342,12 @@ :max-jobs max-jobs :max-items max-items) - (add-watch stats :progress-report (report-progress-teams tpoint on-progress)) + (add-watch stats :progress-report (report-progress-teams tpoint)) (binding [feat/*stats* stats feat/*cache* cache svgo/*semaphore* sprocs] (try - (when (fn? on-start) - (on-start {:rollback rollback?})) - (when (string? label) (create-report-table! main/system) (clean-reports! main/system label)) @@ -367,20 +370,12 @@ ;; Close and await tasks (pu/close! executor))) - (if (fn? on-end) - (-> (deref stats) - (assoc :elapsed/total (tpoint)) - (on-end)) - (-> (deref stats) - (assoc :elapsed/total (tpoint)) - (update :elapsed/total dt/format-duration) - (dissoc :total/teams) - (print-stats!))) + (-> (deref stats) + (assoc :elapsed (dt/format-duration (tpoint)))) (catch Throwable cause (l/dbg :hint "migrate:error" :cause cause) - (when (fn? on-error) - (on-error cause))) + (events/tap :error cause)) (finally (let [elapsed (dt/format-duration (tpoint))] diff --git a/backend/src/app/util/events.clj b/backend/src/app/util/events.clj new file mode 100644 index 000000000..a41843c6b --- /dev/null +++ b/backend/src/app/util/events.clj @@ -0,0 +1,64 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.util.events + "A generic asynchronous events notifications subsystem; used mainly + for mark event points in functions and be able to attach listeners + to them. Mainly used in http.sse for progress reporting." + (:refer-clojure :exclude [tap run!]) + (:require + [app.common.data.macros :as dm] + [app.common.exceptions :as ex] + [app.common.logging :as l] + [promesa.exec :as px] + [promesa.exec.csp :as sp])) + +(def ^:dynamic *channel* nil) + +(defn channel + [] + (sp/chan :buf 32)) + +(defn tap + [type data] + (when-let [channel *channel*] + (sp/put! channel [type data]) + nil)) + +(defn start-listener + [on-event on-close] + + (dm/assert! + "expected active events channel" + (sp/chan? *channel*)) + + (px/thread + {:virtual true} + (try + (loop [] + (when-let [event (sp/take! *channel*)] + (let [result (ex/try! (on-event event))] + (if (ex/exception? result) + (do + (l/wrn :hint "unexpected exception" :cause result) + (sp/close! *channel*)) + (recur))))) + (finally + (on-close))))) + +(defn run-with! + "A high-level facility for to run a function in context of event + emiter." + [f on-event] + + (binding [*channel* (sp/chan :buf 32)] + (let [listener (start-listener on-event (constantly nil))] + (try + (f) + (finally + (sp/close! *channel*) + (px/await! listener)))))) + diff --git a/backend/test/backend_tests/rpc_management_test.clj b/backend/test/backend_tests/rpc_management_test.clj index f9b62d1b0..325831480 100644 --- a/backend/test/backend_tests/rpc_management_test.clj +++ b/backend/test/backend_tests/rpc_management_test.clj @@ -612,7 +612,7 @@ (t/is (fn? result)) (let [events (th/consume-sse result)] - (t/is (= 8 (count events))) + (t/is (= 6 (count events))) (t/is (= :end (first (last events)))))))) (t/deftest get-list-of-buitin-templates diff --git a/frontend/src/app/main/data/dashboard.cljs b/frontend/src/app/main/data/dashboard.cljs index c782b49fa..0511e335d 100644 --- a/frontend/src/app/main/data/dashboard.cljs +++ b/frontend/src/app/main/data/dashboard.cljs @@ -1013,7 +1013,7 @@ (rx/tap (fn [event] (let [payload (sse/get-payload event) type (sse/get-type event)] - (if (= type "event") + (if (= type "progress") (log/dbg :hint "clone-template: progress" :section (:section payload) :name (:name payload)) (log/dbg :hint "clone-template: end"))))) diff --git a/frontend/src/app/worker/import.cljs b/frontend/src/app/worker/import.cljs index daea0c241..f43df1cf2 100644 --- a/frontend/src/app/worker/import.cljs +++ b/frontend/src/app/worker/import.cljs @@ -735,7 +735,7 @@ (rx/tap (fn [event] (let [payload (sse/get-payload event) type (sse/get-type event)] - (if (= type "event") + (if (= type "progress") (log/dbg :hint "import-binfile: progress" :section (:section payload) :name (:name payload)) (log/dbg :hint "import-binfile: end"))))) (rx/filter sse/end-of-stream?)