mirror of
https://github.com/penpot/penpot.git
synced 2025-07-21 14:57:13 +02:00
🐛 Fix many race conditions on thumbnail generation process
This commit is contained in:
parent
9f7a002a78
commit
c1463ebd12
5 changed files with 147 additions and 156 deletions
|
@ -5,33 +5,26 @@
|
|||
;; Copyright (c) KALEIDOS INC
|
||||
|
||||
(ns app.util.queue
|
||||
(:require [app.common.logging :as l]
|
||||
[app.common.math :as mth]
|
||||
[app.util.time :as t]))
|
||||
"Low-Level queuing mechanism, mainly used for process thumbnails"
|
||||
(:require
|
||||
[app.common.logging :as l]
|
||||
[app.common.math :as mth]
|
||||
[app.util.time :as t]
|
||||
[beicon.v2.core :as rx]))
|
||||
|
||||
(l/set-level! :info)
|
||||
|
||||
(declare process)
|
||||
(declare dequeue)
|
||||
|
||||
(defrecord Queue [f items timeout time threshold max-iterations])
|
||||
(declare request-process)
|
||||
|
||||
(defn create
|
||||
[f threshold]
|
||||
(Queue. f
|
||||
#js []
|
||||
nil
|
||||
0
|
||||
threshold
|
||||
##Inf))
|
||||
|
||||
(defn- measure-fn
|
||||
[f & args]
|
||||
(let [tp (t/tpoint-ms)
|
||||
_ (apply f args)
|
||||
duration (tp)]
|
||||
(l/dbg :hint "queue::measure-fn" :duration duration)
|
||||
duration))
|
||||
[find-fn threshold]
|
||||
#js {:find-fn find-fn
|
||||
:items #js []
|
||||
:timeout nil
|
||||
:time 0
|
||||
:threshold threshold
|
||||
:max-iterations ##Inf})
|
||||
|
||||
(defn- next-process-time
|
||||
[queue]
|
||||
|
@ -40,76 +33,101 @@
|
|||
max-time 5000
|
||||
min-time 1000
|
||||
calc-time (mth/min (mth/max (* (- time threshold) 10) min-time) max-time)]
|
||||
(l/dbg :hint "queue::next-process-time" :time time :threshold threshold :calc-time calc-time :max-time max-time :min-time min-time)
|
||||
(l/dbg :hint "queue::next-process-time"
|
||||
:time time
|
||||
:threshold threshold
|
||||
:calc-time calc-time
|
||||
:max-time max-time
|
||||
:min-time min-time)
|
||||
calc-time))
|
||||
|
||||
(defn- has-requested-process?
|
||||
[queue]
|
||||
(not (nil? (unchecked-get queue "timeout"))))
|
||||
|
||||
(defn- request-process
|
||||
[queue time]
|
||||
(l/dbg :hint "queue::request-process" :time time)
|
||||
(unchecked-set queue "timeout" (js/setTimeout (fn [] (process queue)) time)))
|
||||
(some? (unchecked-get queue "timeout")))
|
||||
|
||||
;; NOTE: Right now there are no cases where we need to cancel a process
|
||||
;; but if we do, we can use this function
|
||||
#_(defn- cancel-process
|
||||
[queue]
|
||||
(l/dbg :hint "queue::cancel-process")
|
||||
(let [timeout (unchecked-get queue "timeout")]
|
||||
(when (some? timeout)
|
||||
(js/clearTimeout timeout))
|
||||
(unchecked-set queue "timeout" nil)))
|
||||
;; (defn- cancel-process
|
||||
;; [queue]
|
||||
;; (l/dbg :hint "queue::cancel-process")
|
||||
;; (let [timeout (unchecked-get queue "timeout")]
|
||||
;; (when (some? timeout)
|
||||
;; (js/clearTimeout timeout))
|
||||
;; (unchecked-set queue "timeout" nil)))
|
||||
|
||||
(defn- process
|
||||
[queue]
|
||||
(unchecked-set queue "timeout" nil)
|
||||
(unchecked-set queue "time" 0)
|
||||
[queue iterations]
|
||||
(let [threshold (unchecked-get queue "threshold")
|
||||
max-iterations (unchecked-get queue "max-iterations")
|
||||
f (unchecked-get queue "f")]
|
||||
(loop [item (dequeue queue)
|
||||
iterations 0]
|
||||
(l/dbg :hint "queue::process" :item item)
|
||||
(when (some? item)
|
||||
(let [duration (measure-fn f item)
|
||||
time (unchecked-get queue "time")
|
||||
time (unchecked-set queue "time" (+ time duration))]
|
||||
(if (or (> time threshold) (>= iterations max-iterations))
|
||||
(request-process queue (next-process-time queue))
|
||||
(recur (dequeue queue) (inc iterations))))))))
|
||||
items (unchecked-get queue "items")
|
||||
item (.shift ^js items)]
|
||||
|
||||
(defn- dequeue
|
||||
[queue]
|
||||
(let [items (unchecked-get queue "items")]
|
||||
(.shift items)))
|
||||
(when (some? item)
|
||||
(let [tp (t/tpoint-ms)
|
||||
f (unchecked-get item "f")
|
||||
res (unchecked-get item "result")]
|
||||
(rx/subscribe (f)
|
||||
(fn [o]
|
||||
(rx/push! res o))
|
||||
(fn [e]
|
||||
(rx/error! res e))
|
||||
(fn []
|
||||
(rx/end! res)
|
||||
(let [duration (tp)
|
||||
time (unchecked-get queue "time")
|
||||
time (+ time duration)]
|
||||
(unchecked-set queue "time" time)
|
||||
(if (or (> time threshold) (>= iterations max-iterations))
|
||||
(request-process queue 0 (next-process-time queue))
|
||||
(request-process queue (inc iterations) 0)))))))))
|
||||
|
||||
(defn enqueue-first
|
||||
(defn- request-process
|
||||
[queue iterations time]
|
||||
(l/dbg :hint "queue::request-process" :time time)
|
||||
(unchecked-set queue "timeout"
|
||||
(js/setTimeout
|
||||
(fn []
|
||||
(unchecked-set queue "timeout" nil)
|
||||
(process queue iterations))
|
||||
time)))
|
||||
|
||||
(defn- enqueue-first
|
||||
[queue item]
|
||||
(assert (instance? Queue queue))
|
||||
(let [items (unchecked-get queue "items")]
|
||||
(.unshift items item)
|
||||
(.unshift ^js items item)
|
||||
(when-not (has-requested-process? queue)
|
||||
(request-process queue (next-process-time queue)))))
|
||||
(request-process queue 0 (next-process-time queue)))))
|
||||
|
||||
(defn enqueue-last
|
||||
(defn- enqueue-last
|
||||
[queue item]
|
||||
(assert (instance? Queue queue))
|
||||
(let [items (unchecked-get queue "items")]
|
||||
(.push items item)
|
||||
(.push ^js items item)
|
||||
(when-not (has-requested-process? queue)
|
||||
(request-process queue (next-process-time queue)))))
|
||||
(request-process queue 0 (next-process-time queue)))))
|
||||
|
||||
(defn enqueue-unique
|
||||
[queue item f]
|
||||
(assert (instance? Queue queue))
|
||||
(let [items (unchecked-get queue "items")]
|
||||
[queue request f]
|
||||
(let [items (unchecked-get queue "items")
|
||||
find-fn (unchecked-get queue "find-fn")
|
||||
result (rx/subject)]
|
||||
|
||||
(unchecked-set request "result" result)
|
||||
(unchecked-set request "f" f)
|
||||
|
||||
;; If tag is "frame", then they are added to the front of the queue
|
||||
;; so that they are processed first, anything else is added to the
|
||||
;; end of the queue.
|
||||
(if (= (unchecked-get item "tag") "frame")
|
||||
(when-not (.find ^js items f)
|
||||
(enqueue-first queue item))
|
||||
(when-not (.findLast ^js items f)
|
||||
(enqueue-last queue item)))))
|
||||
(if (= (unchecked-get request "tag") "frame")
|
||||
(let [item (.find ^js items find-fn)]
|
||||
(if item
|
||||
(let [other-result (unchecked-get item "result")]
|
||||
(rx/subscribe other-result result))
|
||||
(enqueue-first queue request)))
|
||||
|
||||
(let [item (.findLast ^js items find-fn)]
|
||||
(if item
|
||||
(let [other-result (unchecked-get item "result")]
|
||||
(rx/subscribe other-result result))
|
||||
(enqueue-last queue request))))
|
||||
|
||||
(rx/to-observable result)))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue