♻️ Refactor exportation process, make it considerably faster

This commit is contained in:
Andrey Antukh 2022-03-29 12:34:11 +02:00 committed by Andrés Moya
parent d6abd2202c
commit 9140fc71b9
33 changed files with 1096 additions and 1090 deletions

View file

@ -6,34 +6,35 @@
(ns app.handlers.export-shapes
(:require
[app.common.exceptions :as exc :include-macros true]
["path" :as path]
[app.common.data :as d]
[app.common.exceptions :as exc]
[app.common.logging :as l]
[app.common.spec :as us]
[app.redis :as redis]
[app.handlers.resources :as rsc]
[app.renderer.bitmap :as rb]
[app.renderer.pdf :as rp]
[app.renderer.svg :as rs]
[app.redis :as redis]
[app.renderer :as rd]
[app.util.mime :as mime]
[app.util.shell :as sh]
[cljs.spec.alpha :as s]
[cuerdas.core :as str]
[promesa.core :as p]))
(declare ^:private handle-exports)
(declare ^:private handle-single-export)
(declare ^:private handle-multiple-export)
(declare ^:private run-export)
(declare ^:private assign-file-name)
(declare ^:private assoc-file-name)
(declare prepare-exports)
(s/def ::name ::us/string)
(s/def ::page-id ::us/uuid)
(s/def ::file-id ::us/uuid)
(s/def ::filename ::us/string)
(s/def ::name ::us/string)
(s/def ::object-id ::us/uuid)
(s/def ::page-id ::us/uuid)
(s/def ::profile-id ::us/uuid)
(s/def ::scale ::us/number)
(s/def ::suffix ::us/string)
(s/def ::type ::us/keyword)
(s/def ::suffix string?)
(s/def ::scale number?)
(s/def ::uri ::us/uri)
(s/def ::profile-id ::us/uuid)
(s/def ::wait ::us/boolean)
(s/def ::export
@ -47,13 +48,13 @@
:opt-un [::uri ::wait ::name]))
(defn handler
[{:keys [:request/auth-token] :as exchange} {:keys [exports] :as params}]
(let [xform (comp
(map #(assoc % :token auth-token))
(assign-file-name))
exports (into [] xform exports)]
(if (= 1 (count exports))
(handle-single-export exchange (assoc params :export (first exports)))
[{:keys [:request/auth-token] :as exchange} {:keys [exports uri] :as params}]
(let [exports (prepare-exports exports auth-token uri)]
(if (and (= 1 (count exports))
(= 1 (count (-> exports first :objects))))
(handle-single-export exchange (-> params
(assoc :export (first exports))
(dissoc :exports)))
(handle-multiple-export exchange (assoc params :exports exports)))))
(defn- handle-single-export
@ -61,87 +62,102 @@
(let [topic (str profile-id)
resource (rsc/create (:type export) (or name (:name export)))
on-progress (fn [progress]
(let [data {:type :export-update
:resource-id (:id resource)
:status "running"
:progress progress}]
(redis/pub! topic data)))
on-complete (fn [resource]
(let [data {:type :export-update
:resource-id (:id resource)
:size (:size resource)
:name (:name resource)
:status "ended"}]
(redis/pub! topic data)))
on-progress (fn [{:keys [path] :as object}]
(p/do
;; Move the generated path to the resource
;; path destination.
(sh/move! path (:path resource))
(when-not wait
(redis/pub! topic {:type :export-update
:resource-id (:id resource)
:status "running"
:total 1
:done 1})
(redis/pub! topic {:type :export-update
:resource-id (:id resource)
:filename (:filename resource)
:name (:name resource)
:status "ended"}))))
on-error (fn [cause]
(let [data {:type :export-update
:resource-id (:id resource)
:name (:name resource)
:status "error"
:cause (ex-message cause)}]
(redis/pub! topic data)))
(l/error :hint "unexpected error happened on export multiple process"
:cause cause)
(if wait
(p/rejected cause)
(redis/pub! topic {:type :export-update
:resource-id (:id resource)
:status "error"
:cause (ex-message cause)})))
proc (-> (rd/render export on-progress)
(p/then (constantly resource))
(p/catch on-error))]
proc (rsc/create-simple :task #(run-export export)
:resource resource
:on-progress on-progress
:on-error on-error
:on-complete on-complete)]
(if wait
(p/then proc #(assoc exchange :response/body (dissoc % :path)))
(assoc exchange :response/body (dissoc resource :path)))))
(defn- handle-multiple-export
[exchange {:keys [exports wait uri profile-id name] :as params}]
(let [tasks (map #(fn [] (run-export %)) exports)
(let [resource (rsc/create :zip (or name (-> exports first :name)))
total (count exports)
topic (str profile-id)
resource (rsc/create :zip (or name (-> exports first :name)))
on-progress (fn [progress]
(let [data {:type :export-update
:resource-id (:id resource)
:name (:name resource)
:status "running"
:progress progress}]
(redis/pub! topic data)))
to-delete (atom #{})
on-complete (fn [resource]
(let [data {:type :export-update
:resource-id (:id resource)
:name (:name resource)
:size (:size resource)
:status "ended"}]
(redis/pub! topic data)))
on-progress (fn [{:keys [done]}]
(when-not wait
(let [data {:type :export-update
:resource-id (:id resource)
:status "running"
:total total
:done done}]
(redis/pub! topic data))))
on-complete (fn []
(when-not wait
(let [data {:type :export-update
:name (:name resource)
:filename (:filename resource)
:resource-id (:id resource)
:status "ended"}]
(redis/pub! topic data))))
on-error (fn [cause]
(let [data {:type :export-update
:resource-id (:id resource)
:name (:name resource)
:status "error"
:cause (ex-message cause)}]
(redis/pub! topic data)))
(l/error :hint "unexpected error on multiple exportation" :cause cause)
(if wait
(p/rejected cause)
(redis/pub! topic {:type :export-update
:resource-id (:id resource)
:status "error"
:cause (ex-message cause)})))
proc (rsc/create-zip :resource resource
:tasks tasks
:on-progress on-progress
zip (rsc/create-zip :resource resource
:on-complete on-complete
:on-error on-error)]
:on-error on-error
:on-progress on-progress)
append (fn [{:keys [filename path] :as object}]
(swap! to-delete conj path)
(rsc/add-to-zip! zip path filename))
proc (-> (p/do
(p/loop [exports (seq exports)]
(when-let [export (first exports)]
(p/let [proc (rd/render export append)]
(p/recur (rest exports)))))
(.finalize zip))
(p/then (fn [_] (p/run! #(sh/rmdir! (path/dirname %)) @to-delete)))
(p/then (constantly resource))
(p/catch on-error))
]
(if wait
(p/then proc #(assoc exchange :response/body (dissoc % :path)))
(assoc exchange :response/body (dissoc resource :path)))))
(defn- run-export
[{:keys [type] :as params}]
(p/let [res (case type
:png (rb/render params)
:jpeg (rb/render params)
:svg (rs/render params)
:pdf (rp/render params))]
(assoc res :type type)))
(defn- assign-file-name
(defn- assoc-file-name
"A transducer that assocs a candidate filename and avoid duplicates."
[]
(letfn [(find-candidate [params used]
@ -149,12 +165,8 @@
(let [candidate (str (:name params)
(:suffix params "")
(when (pos? index)
(str "-" (inc index)))
(case (:type params)
:png ".png"
:jpeg ".jpg"
:svg ".svg"
:pdf ".pdf"))]
(str/concat "-" (inc index)))
(mime/get-extension (:type params)))]
(if (contains? used candidate)
(recur (inc index))
candidate))))]
@ -168,3 +180,37 @@
params (assoc params :filename candidate)]
(vswap! used conj candidate)
(rf result params))))))))
(def ^:const ^:private
default-partition-size 50)
(defn prepare-exports
[exports token uri]
(letfn [(process-group [group]
(sequence (comp (partition-all default-partition-size)
(map process-partition))
group))
(process-partition [[part1 :as part]]
{:file-id (:file-id part1)
:page-id (:page-id part1)
:name (:name part1)
:token token
:uri uri
:type (:type part1)
:scale (:scale part1)
:objects (mapv part-entry->object part)})
(part-entry->object [entry]
{:id (:object-id entry)
:filename (:filename entry)
:name (:name entry)
:suffix (:suffix entry)})]
(let [xform (comp
(map #(assoc % :token token))
(assoc-file-name))]
(->> (sequence xform exports)
(d/group-by (juxt :scale :type))
(map second)
(into [] (mapcat process-group))))))