Merge pull request #4731 from penpot/niwinz-bugfix-6

🐛 Fix many race conditions on thumbnail generation process
This commit is contained in:
Aitor Moreno 2024-06-14 13:15:10 +02:00 committed by GitHub
commit 22ede6b08e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 148 additions and 160 deletions

View file

@ -87,10 +87,7 @@
::sm/params [:map {:title "get-file-object-thumbnails"} ::sm/params [:map {:title "get-file-object-thumbnails"}
[:file-id ::sm/uuid] [:file-id ::sm/uuid]
[:tag {:optional true} :string]] [:tag {:optional true} :string]]
::sm/result [:map-of :string :string] ::sm/result [:map-of :string :string]}
::cond/get-object #(files/get-minimal-file %1 (:file-id %2))
::cond/reuse-key? true
::cond/key-fn files/get-file-etag}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id tag] :as params}] [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id tag] :as params}]
(dm/with-open [conn (db/open pool)] (dm/with-open [conn (db/open pool)]
(files/check-read-permissions! conn profile-id file-id) (files/check-read-permissions! conn profile-id file-id)

View file

@ -158,6 +158,10 @@
(avatars/generate {:name name}) (avatars/generate {:name name})
(dm/str (u/join public-uri "assets/by-id/" photo-id)))) (dm/str (u/join public-uri "assets/by-id/" photo-id))))
(defn resolve-media
[id]
(dm/str (u/join public-uri "assets/by-id/" (str id))))
(defn resolve-file-media (defn resolve-file-media
([media] ([media]
(resolve-file-media media false)) (resolve-file-media media false))

View file

@ -814,7 +814,7 @@
component (ctkl/get-component data component-id) component (ctkl/get-component data component-id)
page-id (:main-instance-page component) page-id (:main-instance-page component)
root-id (:main-instance-id component)] root-id (:main-instance-id component)]
(dwt/request-thumbnail file-id page-id root-id tag "update-component-thumbnail-sync"))) (dwt/update-thumbnail file-id page-id root-id tag "update-component-thumbnail-sync")))
(defn update-component-sync (defn update-component-sync
([shape-id file-id] (update-component-sync shape-id file-id nil)) ([shape-id file-id] (update-component-sync shape-id file-id nil))

View file

@ -10,6 +10,7 @@
[app.common.files.helpers :as cfh] [app.common.files.helpers :as cfh]
[app.common.logging :as l] [app.common.logging :as l]
[app.common.thumbnails :as thc] [app.common.thumbnails :as thc]
[app.config :as cf]
[app.main.data.changes :as dch] [app.main.data.changes :as dch]
[app.main.data.persistence :as-alias dps] [app.main.data.persistence :as-alias dps]
[app.main.data.workspace.notifications :as-alias wnt] [app.main.data.workspace.notifications :as-alias wnt]
@ -18,7 +19,6 @@
[app.main.refs :as refs] [app.main.refs :as refs]
[app.main.render :as render] [app.main.render :as render]
[app.main.repo :as rp] [app.main.repo :as rp]
[app.main.store :as st]
[app.util.http :as http] [app.util.http :as http]
[app.util.queue :as q] [app.util.queue :as q]
[app.util.time :as tp] [app.util.time :as tp]
@ -30,55 +30,36 @@
(l/set-level! :warn) (l/set-level! :warn)
(declare update-thumbnail) (defn- find-request
[params item]
(and (= (unchecked-get params "file-id")
(unchecked-get item "file-id"))
(= (unchecked-get params "page-id")
(unchecked-get item "page-id"))
(= (unchecked-get params "shape-id")
(unchecked-get item "shape-id"))
(= (unchecked-get params "tag")
(unchecked-get item "tag"))))
(defn resolve-request (defn- create-request
"Resolves the request to generate a thumbnail for the given ids." "Creates a request to generate a thumbnail for the given ids."
[item] [file-id page-id shape-id tag]
(let [file-id (unchecked-get item "file-id") #js {:file-id file-id
page-id (unchecked-get item "page-id") :page-id page-id
shape-id (unchecked-get item "shape-id") :shape-id shape-id
tag (unchecked-get item "tag")] :tag tag})
(st/emit! (update-thumbnail file-id page-id shape-id tag))))
;; Defines the thumbnail queue ;; Defines the thumbnail queue
(defonce queue (defonce queue
(q/create resolve-request (/ 1000 30))) (q/create find-request (/ 1000 30)))
(defn create-request
"Creates a request to generate a thumbnail for the given ids."
[file-id page-id shape-id tag]
#js {:file-id file-id :page-id page-id :shape-id shape-id :tag tag})
(defn find-request
"Returns true if the given item matches the given ids."
[file-id page-id shape-id tag item]
(and (= file-id (unchecked-get item "file-id"))
(= page-id (unchecked-get item "page-id"))
(= shape-id (unchecked-get item "shape-id"))
(= tag (unchecked-get item "tag"))))
(defn request-thumbnail
"Enqueues a request to generate a thumbnail for the given ids."
([file-id page-id shape-id tag]
(request-thumbnail file-id page-id shape-id tag "unknown"))
([file-id page-id shape-id tag requester]
(ptk/reify ::request-thumbnail
ptk/EffectEvent
(effect [_ _ _]
(l/dbg :hint "request thumbnail" :requester requester :file-id file-id :page-id page-id :shape-id shape-id :tag tag)
(q/enqueue-unique queue
(create-request file-id page-id shape-id tag)
(partial find-request file-id page-id shape-id tag))))))
;; This function first renders the HTML calling `render/render-frame` that ;; This function first renders the HTML calling `render/render-frame` that
;; returns HTML as a string, then we send that data to the iframe rasterizer ;; returns HTML as a string, then we send that data to the iframe rasterizer
;; that returns the image as a Blob. Finally we create a URI for that blob. ;; that returns the image as a Blob. Finally we create a URI for that blob.
(defn get-thumbnail (defn- render-thumbnail
"Returns the thumbnail for the given ids" "Returns the thumbnail for the given ids"
[state file-id page-id frame-id tag & {:keys [object-id]}] [state file-id page-id frame-id tag]
(let [object-id (thc/fmt-object-id file-id page-id frame-id tag)
(let [object-id (or object-id (thc/fmt-object-id file-id page-id frame-id tag))
tp (tp/tpoint-ms) tp (tp/tpoint-ms)
objects (wsh/lookup-objects state file-id page-id) objects (wsh/lookup-objects state file-id page-id)
shape (get objects frame-id)] shape (get objects frame-id)]
@ -87,10 +68,15 @@
(rx/take 1) (rx/take 1)
(rx/filter some?) (rx/filter some?)
(rx/mapcat thr/render) (rx/mapcat thr/render)
(rx/map (fn [blob] (wapi/create-uri blob)))
(rx/tap #(l/dbg :hint "thumbnail rendered" (rx/tap #(l/dbg :hint "thumbnail rendered"
:elapsed (dm/str (tp) "ms")))))) :elapsed (dm/str (tp) "ms"))))))
(defn- request-thumbnail
"Enqueues a request to generate a thumbnail for the given ids."
[state file-id page-id shape-id tag]
(let [request (create-request file-id page-id shape-id tag)]
(q/enqueue-unique queue request (partial render-thumbnail state file-id page-id shape-id tag))))
(defn clear-thumbnail (defn clear-thumbnail
([file-id page-id frame-id tag] ([file-id page-id frame-id tag]
(clear-thumbnail file-id (thc/fmt-object-id file-id page-id frame-id tag))) (clear-thumbnail file-id (thc/fmt-object-id file-id page-id frame-id tag)))
@ -154,8 +140,7 @@
(defn update-thumbnail (defn update-thumbnail
"Updates the thumbnail information for the given `id`" "Updates the thumbnail information for the given `id`"
[file-id page-id frame-id tag requester]
[file-id page-id frame-id tag]
(let [object-id (thc/fmt-object-id file-id page-id frame-id tag)] (let [object-id (thc/fmt-object-id file-id page-id frame-id tag)]
(ptk/reify ::update-thumbnail (ptk/reify ::update-thumbnail
cljs.core/IDeref cljs.core/IDeref
@ -163,23 +148,25 @@
ptk/WatchEvent ptk/WatchEvent
(watch [_ state stream] (watch [_ state stream]
(l/dbg :hint "update thumbnail" :object-id object-id :tag tag) (l/dbg :hint "update thumbnail" :requester requester :object-id object-id :tag tag)
;; Send the update to the back-end ;; Send the update to the back-end
(->> (get-thumbnail state file-id page-id frame-id tag) (->> (request-thumbnail state file-id page-id frame-id tag)
(rx/mapcat (fn [uri] (rx/mapcat (fn [blob]
(rx/merge ;; Send the data to backend
(rx/of (assoc-thumbnail object-id uri)) (let [params {:file-id file-id
(->> (http/send! {:uri uri :response-type :blob :method :get}) :object-id object-id
(rx/map :body) :media blob
(rx/mapcat (fn [blob] :tag (or tag "frame")}]
;; Send the data to backend (rp/cmd! :create-file-object-thumbnail params))))
(let [params {:file-id file-id
:object-id object-id (rx/mapcat (fn [{:keys [object-id media-id]}]
:media blob (let [uri (cf/resolve-media media-id)]
:tag (or tag "frame")}] ;; We perform this request just for
(rp/cmd! :create-file-object-thumbnail params)))) ;; populate the browser CACHE and avoid
(rx/catch rx/empty) ;; unnecesary image flickering
(rx/ignore))))) (->> (http/send! {:uri uri :method :get})
(rx/map #(assoc-thumbnail object-id uri))))))
(rx/catch (fn [cause] (rx/catch (fn [cause]
(.error js/console cause) (.error js/console cause)
(rx/empty))) (rx/empty)))
@ -260,31 +247,12 @@
(rx/observe-on :async) (rx/observe-on :async)
(rx/with-latest-from workspace-data-s) (rx/with-latest-from workspace-data-s)
(rx/merge-map (partial extract-root-frame-changes page-id)) (rx/merge-map (partial extract-root-frame-changes page-id))
(rx/tap #(l/trc :hint "inconming change" :origin "local" :frame-id (dm/str %))) (rx/tap #(l/trc :hint "inconming change" :origin "all" :frame-id (dm/str %)))
(rx/share)) (rx/share))
local-commits-s
(->> stream
(rx/filter dch/commit?)
(rx/map deref)
(rx/filter #(= :local (:source %)))
(rx/observe-on :async)
(rx/with-latest-from workspace-data-s)
(rx/merge-map (partial extract-root-frame-changes page-id))
(rx/tap #(l/trc :hint "inconming change" :origin "local" :frame-id (dm/str %)))
(rx/share))
;; BUFFER NOTIFIER: only on local changes, remote changes
;; we expect to receive thumbnail uri once it is
;; generated va notifications subsystem
notifier-s notifier-s
(->> stream (->> stream
(rx/filter (ptk/type? ::dps/commit-persisted)) (rx/filter (ptk/type? ::dps/commit-persisted))
(rx/map deref)
(rx/observe-on :async)
(rx/with-latest-from workspace-data-s)
(rx/merge-map (partial extract-root-frame-changes page-id))
(rx/tap #(l/trc :hint "inconming change" :origin "local" :frame-id (dm/str %)))
(rx/debounce 5000) (rx/debounce 5000)
(rx/tap #(l/trc :hint "buffer initialized")))] (rx/tap #(l/trc :hint "buffer initialized")))]
@ -296,11 +264,11 @@
(rx/map (fn [frame-id] (rx/map (fn [frame-id]
(clear-thumbnail file-id page-id frame-id "frame")))) (clear-thumbnail file-id page-id frame-id "frame"))))
;; Generate thumbnails in batchs, once user becomes ;; Generate thumbnails in batches, once user becomes
;; inactive for some instant only for local changes ;; inactive for some instant.
(->> local-commits-s (->> all-commits-s
(rx/buffer-until notifier-s) (rx/buffer-until notifier-s)
(rx/mapcat #(into #{} %)) (rx/mapcat #(into #{} %))
(rx/map #(request-thumbnail file-id page-id % "frame" "watch-state-changes")))) (rx/map #(update-thumbnail file-id page-id % "frame" "watch-state-changes"))))
(rx/take-until stopper-s)))))) (rx/take-until stopper-s))))))

View file

@ -109,7 +109,7 @@
(fn [{:keys [width height]}] (fn [{:keys [width height]}]
(when (or (not (mth/close? width fixed-width 5)) (when (or (not (mth/close? width fixed-width 5))
(not (mth/close? height fixed-height 5))) (not (mth/close? height fixed-height 5)))
(st/emit! (dwt/request-thumbnail file-id page-id frame-id "frame" "check-thumbnail-size")))))))) (st/emit! (dwt/update-thumbnail file-id page-id frame-id "frame" "check-thumbnail-size"))))))))
(defn root-frame-wrapper-factory (defn root-frame-wrapper-factory
[shape-wrapper] [shape-wrapper]
@ -176,7 +176,8 @@
(mf/with-effect [] (mf/with-effect []
(when-not (some? thumbnail-uri) (when-not (some? thumbnail-uri)
(tm/schedule-on-idle (tm/schedule-on-idle
#(st/emit! (dwt/request-thumbnail file-id page-id frame-id "frame" "root-frame")))) #(st/emit! (dwt/update-thumbnail file-id page-id frame-id "frame" "root-frame"))))
#(when-let [task (mf/ref-val task-ref)] #(when-let [task (mf/ref-val task-ref)]
(d/close! task))) (d/close! task)))

View file

@ -5,33 +5,26 @@
;; Copyright (c) KALEIDOS INC ;; Copyright (c) KALEIDOS INC
(ns app.util.queue (ns app.util.queue
(:require [app.common.logging :as l] "Low-Level queuing mechanism, mainly used for process thumbnails"
[app.common.math :as mth] (:require
[app.util.time :as t])) [app.common.logging :as l]
[app.common.math :as mth]
[app.util.time :as t]
[beicon.v2.core :as rx]))
(l/set-level! :info) (l/set-level! :info)
(declare process) (declare process)
(declare dequeue) (declare request-process)
(defrecord Queue [f items timeout time threshold max-iterations])
(defn create (defn create
[f threshold] [find-fn threshold]
(Queue. f #js {:find-fn find-fn
#js [] :items #js []
nil :timeout nil
0 :time 0
threshold :threshold threshold
##Inf)) :max-iterations ##Inf})
(defn- measure-fn
[f & args]
(let [tp (t/tpoint-ms)
_ (apply f args)
duration (tp)]
(l/dbg :hint "queue::measure-fn" :duration duration)
duration))
(defn- next-process-time (defn- next-process-time
[queue] [queue]
@ -40,76 +33,101 @@
max-time 5000 max-time 5000
min-time 1000 min-time 1000
calc-time (mth/min (mth/max (* (- time threshold) 10) min-time) max-time)] calc-time (mth/min (mth/max (* (- time threshold) 10) min-time) max-time)]
(l/dbg :hint "queue::next-process-time" :time time :threshold threshold :calc-time calc-time :max-time max-time :min-time min-time) (l/dbg :hint "queue::next-process-time"
:time time
:threshold threshold
:calc-time calc-time
:max-time max-time
:min-time min-time)
calc-time)) calc-time))
(defn- has-requested-process? (defn- has-requested-process?
[queue] [queue]
(not (nil? (unchecked-get queue "timeout")))) (some? (unchecked-get queue "timeout")))
(defn- request-process
[queue time]
(l/dbg :hint "queue::request-process" :time time)
(unchecked-set queue "timeout" (js/setTimeout (fn [] (process queue)) time)))
;; NOTE: Right now there are no cases where we need to cancel a process ;; NOTE: Right now there are no cases where we need to cancel a process
;; but if we do, we can use this function ;; but if we do, we can use this function
#_(defn- cancel-process ;; (defn- cancel-process
[queue] ;; [queue]
(l/dbg :hint "queue::cancel-process") ;; (l/dbg :hint "queue::cancel-process")
(let [timeout (unchecked-get queue "timeout")] ;; (let [timeout (unchecked-get queue "timeout")]
(when (some? timeout) ;; (when (some? timeout)
(js/clearTimeout timeout)) ;; (js/clearTimeout timeout))
(unchecked-set queue "timeout" nil))) ;; (unchecked-set queue "timeout" nil)))
(defn- process (defn- process
[queue] [queue iterations]
(unchecked-set queue "timeout" nil)
(unchecked-set queue "time" 0)
(let [threshold (unchecked-get queue "threshold") (let [threshold (unchecked-get queue "threshold")
max-iterations (unchecked-get queue "max-iterations") max-iterations (unchecked-get queue "max-iterations")
f (unchecked-get queue "f")] items (unchecked-get queue "items")
(loop [item (dequeue queue) item (.shift ^js items)]
iterations 0]
(l/dbg :hint "queue::process" :item item)
(when (some? item)
(let [duration (measure-fn f item)
time (unchecked-get queue "time")
time (unchecked-set queue "time" (+ time duration))]
(if (or (> time threshold) (>= iterations max-iterations))
(request-process queue (next-process-time queue))
(recur (dequeue queue) (inc iterations))))))))
(defn- dequeue (when (some? item)
[queue] (let [tp (t/tpoint-ms)
(let [items (unchecked-get queue "items")] f (unchecked-get item "f")
(.shift items))) res (unchecked-get item "result")]
(rx/subscribe (f)
(fn [o]
(rx/push! res o))
(fn [e]
(rx/error! res e))
(fn []
(rx/end! res)
(let [duration (tp)
time (unchecked-get queue "time")
time (+ time duration)]
(unchecked-set queue "time" time)
(if (or (> time threshold) (>= iterations max-iterations))
(request-process queue 0 (next-process-time queue))
(request-process queue (inc iterations) 0)))))))))
(defn enqueue-first (defn- request-process
[queue iterations time]
(l/dbg :hint "queue::request-process" :time time)
(unchecked-set queue "timeout"
(js/setTimeout
(fn []
(unchecked-set queue "timeout" nil)
(process queue iterations))
time)))
(defn- enqueue-first
[queue item] [queue item]
(assert (instance? Queue queue))
(let [items (unchecked-get queue "items")] (let [items (unchecked-get queue "items")]
(.unshift items item) (.unshift ^js items item)
(when-not (has-requested-process? queue) (when-not (has-requested-process? queue)
(request-process queue (next-process-time queue))))) (request-process queue 0 (next-process-time queue)))))
(defn enqueue-last (defn- enqueue-last
[queue item] [queue item]
(assert (instance? Queue queue))
(let [items (unchecked-get queue "items")] (let [items (unchecked-get queue "items")]
(.push items item) (.push ^js items item)
(when-not (has-requested-process? queue) (when-not (has-requested-process? queue)
(request-process queue (next-process-time queue))))) (request-process queue 0 (next-process-time queue)))))
(defn enqueue-unique (defn enqueue-unique
[queue item f] [queue request f]
(assert (instance? Queue queue)) (let [items (unchecked-get queue "items")
(let [items (unchecked-get queue "items")] find-fn (unchecked-get queue "find-fn")
result (rx/subject)]
(unchecked-set request "result" result)
(unchecked-set request "f" f)
;; If tag is "frame", then they are added to the front of the queue ;; If tag is "frame", then they are added to the front of the queue
;; so that they are processed first, anything else is added to the ;; so that they are processed first, anything else is added to the
;; end of the queue. ;; end of the queue.
(if (= (unchecked-get item "tag") "frame") (if (= (unchecked-get request "tag") "frame")
(when-not (.find ^js items f) (let [item (.find ^js items find-fn)]
(enqueue-first queue item)) (if item
(when-not (.findLast ^js items f) (let [other-result (unchecked-get item "result")]
(enqueue-last queue item))))) (rx/subscribe other-result result))
(enqueue-first queue request)))
(let [item (.findLast ^js items find-fn)]
(if item
(let [other-result (unchecked-get item "result")]
(rx/subscribe other-result result))
(enqueue-last queue request))))
(rx/to-observable result)))