Add performance enhacements to storage/gc-touched task

This commit is contained in:
Andrey Antukh 2024-08-02 12:25:47 +02:00
parent 253b9e5bd8
commit f6bfe3931c
2 changed files with 74 additions and 79 deletions

View file

@ -121,5 +121,3 @@
:total total) :total total)
{:deleted total})))))) {:deleted total}))))))

View file

@ -28,58 +28,53 @@
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig])) [integrant.core :as ig]))
(def ^:private sql:get-team-font-variant-nrefs (def ^:private sql:has-team-font-variant-refs
"SELECT ((SELECT count(*) FROM team_font_variant WHERE woff1_file_id = ?) + "SELECT ((SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE woff1_file_id = ?)) OR
(SELECT count(*) FROM team_font_variant WHERE woff2_file_id = ?) + (SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE woff2_file_id = ?)) OR
(SELECT count(*) FROM team_font_variant WHERE otf_file_id = ?) + (SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE otf_file_id = ?)) OR
(SELECT count(*) FROM team_font_variant WHERE ttf_file_id = ?)) AS nrefs") (SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE ttf_file_id = ?))) AS has_refs")
(defn- get-team-font-variant-nrefs (defn- has-team-font-variant-refs?
[conn id] [conn id]
(-> (db/exec-one! conn [sql:get-team-font-variant-nrefs id id id id]) (-> (db/exec-one! conn [sql:has-team-font-variant-refs id id id id])
(get :nrefs))) (get :has-refs)))
(def ^:private (def ^:private
sql:get-file-media-object-nrefs sql:has-file-media-object-refs
"SELECT ((SELECT count(*) FROM file_media_object WHERE media_id = ?) + "SELECT ((SELECT EXISTS (SELECT 1 FROM file_media_object WHERE media_id = ?)) OR
(SELECT count(*) FROM file_media_object WHERE thumbnail_id = ?)) AS nrefs") (SELECT EXISTS (SELECT 1 FROM file_media_object WHERE thumbnail_id = ?))) AS has_refs")
(defn- get-file-media-object-nrefs (defn- has-file-media-object-refs?
[conn id] [conn id]
(-> (db/exec-one! conn [sql:get-file-media-object-nrefs id id]) (-> (db/exec-one! conn [sql:has-file-media-object-refs id id])
(get :nrefs))) (get :has-refs)))
(def ^:private sql:has-profile-refs
"SELECT ((SELECT EXISTS (SELECT 1 FROM profile WHERE photo_id = ?)) OR
(SELECT EXISTS (SELECT 1 FROM team WHERE photo_id = ?))) AS has_refs")
(def ^:private sql:get-profile-nrefs (defn- has-profile-refs?
"SELECT ((SELECT count(*) FROM profile WHERE photo_id = ?) +
(SELECT count(*) FROM team WHERE photo_id = ?)) AS nrefs")
(defn- get-profile-nrefs
[conn id] [conn id]
(-> (db/exec-one! conn [sql:get-profile-nrefs id id]) (-> (db/exec-one! conn [sql:has-profile-refs id id])
(get :nrefs))) (get :has-refs)))
(def ^:private (def ^:private
sql:get-file-object-thumbnail-nrefs sql:has-file-object-thumbnail-refs
"SELECT (SELECT count(*) FROM file_tagged_object_thumbnail WHERE media_id = ?) AS nrefs") "SELECT EXISTS (SELECT 1 FROM file_tagged_object_thumbnail WHERE media_id = ?) AS has_refs")
(defn- get-file-object-thumbnails (defn- has-file-object-thumbnails-refs?
[conn id] [conn id]
(-> (db/exec-one! conn [sql:get-file-object-thumbnail-nrefs id]) (-> (db/exec-one! conn [sql:has-file-object-thumbnail-refs id])
(get :nrefs))) (get :has-refs)))
(def ^:private (def ^:private
sql:get-file-thumbnail-nrefs sql:has-file-thumbnail-refs
"SELECT (SELECT count(*) FROM file_thumbnail WHERE media_id = ?) AS nrefs") "SELECT EXISTS (SELECT 1 FROM file_thumbnail WHERE media_id = ?) AS has_refs")
(defn- get-file-thumbnails (defn- has-file-thumbnails-refs?
[conn id] [conn id]
(-> (db/exec-one! conn [sql:get-file-thumbnail-nrefs id]) (-> (db/exec-one! conn [sql:has-file-thumbnail-refs id])
(get :nrefs))) (get :has-refs)))
(def ^:private sql:mark-freeze-in-bulk (def ^:private sql:mark-freeze-in-bulk
"UPDATE storage_object "UPDATE storage_object
@ -91,7 +86,6 @@
(let [ids (db/create-array conn "uuid" ids)] (let [ids (db/create-array conn "uuid" ids)]
(db/exec-one! conn [sql:mark-freeze-in-bulk ids]))) (db/exec-one! conn [sql:mark-freeze-in-bulk ids])))
(def ^:private sql:mark-delete-in-bulk (def ^:private sql:mark-delete-in-bulk
"UPDATE storage_object "UPDATE storage_object
SET deleted_at = now(), SET deleted_at = now(),
@ -123,25 +117,24 @@
"file-media-object")) "file-media-object"))
(defn- process-objects! (defn- process-objects!
[conn get-fn ids bucket] [conn has-refs? ids bucket]
(loop [to-freeze #{} (loop [to-freeze #{}
to-delete #{} to-delete #{}
ids (seq ids)] ids (seq ids)]
(if-let [id (first ids)] (if-let [id (first ids)]
(let [nrefs (get-fn conn id)] (if (has-refs? conn id)
(if (pos? nrefs)
(do (do
(l/debug :hint "processing object" (l/debug :hint "processing object"
:id (str id) :id (str id)
:status "freeze" :status "freeze"
:bucket bucket :refs nrefs) :bucket bucket)
(recur (conj to-freeze id) to-delete (rest ids))) (recur (conj to-freeze id) to-delete (rest ids)))
(do (do
(l/debug :hint "processing object" (l/debug :hint "processing object"
:id (str id) :id (str id)
:status "delete" :status "delete"
:bucket bucket :refs nrefs) :bucket bucket)
(recur to-freeze (conj to-delete id) (rest ids))))) (recur to-freeze (conj to-delete id) (rest ids))))
(do (do
(some->> (seq to-freeze) (mark-freeze-in-bulk! conn)) (some->> (seq to-freeze) (mark-freeze-in-bulk! conn))
(some->> (seq to-delete) (mark-delete-in-bulk! conn)) (some->> (seq to-delete) (mark-delete-in-bulk! conn))
@ -150,15 +143,23 @@
(defn- process-bucket! (defn- process-bucket!
[conn bucket ids] [conn bucket ids]
(case bucket (case bucket
"file-media-object" (process-objects! conn get-file-media-object-nrefs ids bucket) "file-media-object" (process-objects! conn has-file-media-object-refs? ids bucket)
"team-font-variant" (process-objects! conn get-team-font-variant-nrefs ids bucket) "team-font-variant" (process-objects! conn has-team-font-variant-refs? ids bucket)
"file-object-thumbnail" (process-objects! conn get-file-object-thumbnails ids bucket) "file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket)
"file-thumbnail" (process-objects! conn get-file-thumbnails ids bucket) "file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket)
"profile" (process-objects! conn get-profile-nrefs ids bucket) "profile" (process-objects! conn has-profile-refs? ids bucket)
(ex/raise :type :internal (ex/raise :type :internal
:code :unexpected-unknown-reference :code :unexpected-unknown-reference
:hint (dm/fmt "unknown reference %" bucket)))) :hint (dm/fmt "unknown reference '%'" bucket))))
(defn process-chunk!
[{:keys [::db/conn]} chunk]
(reduce-kv (fn [[nfo ndo] bucket ids]
(let [[nfo' ndo'] (process-bucket! conn bucket ids)]
[(+ nfo nfo')
(+ ndo ndo')]))
[0 0]
(d/group-by lookup-bucket :id #{} chunk)))
(def ^:private (def ^:private
sql:get-touched-storage-objects sql:get-touched-storage-objects
@ -167,29 +168,22 @@
WHERE so.touched_at IS NOT NULL WHERE so.touched_at IS NOT NULL
ORDER BY touched_at ASC ORDER BY touched_at ASC
FOR UPDATE FOR UPDATE
SKIP LOCKED") SKIP LOCKED
LIMIT 10")
(defn- group-by-bucket (defn get-chunk
[row]
(d/group-by lookup-bucket :id #{} row))
(defn- get-buckets
[conn] [conn]
(sequence (->> (db/exec! conn [sql:get-touched-storage-objects])
(comp (map impl/decode-row) (map impl/decode-row)
(partition-all 25) (not-empty)))
(mapcat group-by-bucket))
(db/cursor conn sql:get-touched-storage-objects)))
(defn- process-touched! (defn- process-touched!
[{:keys [::db/conn]}] [{:keys [::db/pool] :as cfg}]
(loop [buckets (get-buckets conn) (loop [freezed 0
freezed 0
deleted 0] deleted 0]
(if-let [[bucket ids] (first buckets)] (if-let [chunk (get-chunk pool)]
(let [[nfo ndo] (process-bucket! conn bucket ids)] (let [[nfo ndo] (db/tx-run! cfg process-chunk! chunk)]
(recur (rest buckets) (recur (+ freezed nfo)
(+ freezed nfo)
(+ deleted ndo))) (+ deleted ndo)))
(do (do
(l/inf :hint "task finished" (l/inf :hint "task finished"
@ -198,11 +192,14 @@
{:freeze freezed :delete deleted})))) {:freeze freezed :delete deleted}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool])) (s/keys :req [::db/pool]))
(defmethod ig/init-key ::handler (defmethod ig/init-key ::handler
[_ cfg] [_ cfg]
(fn [_] (fn [_] (process-touched! cfg)))
(db/tx-run! cfg process-touched!)))