Merge remote-tracking branch 'origin/staging' into develop

This commit is contained in:
Andrey Antukh 2022-08-12 09:45:58 +02:00
commit 4f0cc3d0d8
22 changed files with 349 additions and 297 deletions

View file

@ -11,14 +11,7 @@
<Logger name="com.zaxxer.hikari" level="error" />
<Logger name="org.postgresql" level="error" />
<Logger name="app.rpc.commands.binfile" level="info" />
<Logger name="app.storage.tmp" level="info" />
<Logger name="app.worker" level="info" />
<Logger name="app.msgbus" level="info" />
<Logger name="app.http.websocket" level="info" />
<Logger name="app.util.websocket" level="info" />
<Logger name="app" level="debug" additivity="false">
<Logger name="app" level="info" additivity="false">
<AppenderRef ref="console" />
</Logger>

View file

@ -308,6 +308,7 @@
(def default-flags
[:enable-backend-api-doc
:enable-backend-worker
:enable-secure-session-cookies])
(defn- parse-flags

View file

@ -79,8 +79,7 @@
:app.storage/gc-deleted-task
{:pool (ig/ref :app.db/pool)
:storage (ig/ref :app.storage/storage)
:executor (ig/ref [::worker :app.worker/executor])
:min-age (dt/duration {:hours 2})}
:executor (ig/ref [::worker :app.worker/executor])}
:app.storage/gc-touched-task
{:pool (ig/ref :app.db/pool)}
@ -236,54 +235,6 @@
:app.rpc/routes
{:methods (ig/ref :app.rpc/methods)}
:app.worker/worker
{:executor (ig/ref [::worker :app.worker/executor])
:tasks (ig/ref :app.worker/registry)
:metrics (ig/ref :app.metrics/metrics)
:pool (ig/ref :app.db/pool)}
:app.worker/cron
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)
:tasks (ig/ref :app.worker/registry)
:pool (ig/ref :app.db/pool)
:entries
[{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :file-gc}
{:cron #app/cron "0 0 * * * ?" ;; hourly
:task :file-xlog-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :storage-deleted-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :storage-touched-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :session-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :objects-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :tasks-gc}
{:cron #app/cron "0 30 */3,23 * * ?"
:task :telemetry}
(when (cf/get :fdata-storage-backed)
{:cron #app/cron "0 0 * * * ?" ;; hourly
:task :file-offload})
(when (contains? cf/flags :audit-log-archive)
{:cron #app/cron "0 */5 * * * ?" ;; every 5m
:task :audit-log-archive})
(when (contains? cf/flags :audit-log-gc)
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :audit-log-gc})]}
:app.worker/registry
{:metrics (ig/ref :app.metrics/metrics)
:tasks
@ -291,12 +242,11 @@
:objects-gc (ig/ref :app.tasks.objects-gc/handler)
:file-gc (ig/ref :app.tasks.file-gc/handler)
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:storage-deleted-gc (ig/ref :app.storage/gc-deleted-task)
:storage-touched-gc (ig/ref :app.storage/gc-touched-task)
:storage-gc-deleted (ig/ref :app.storage/gc-deleted-task)
:storage-gc-touched (ig/ref :app.storage/gc-touched-task)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task)
:file-offload (ig/ref :app.tasks.file-offload/handler)
:audit-log-archive (ig/ref :app.loggers.audit/archive-task)
:audit-log-gc (ig/ref :app.loggers.audit/gc-task)}}
@ -317,22 +267,13 @@
:app.tasks.objects-gc/handler
{:pool (ig/ref :app.db/pool)
:storage (ig/ref :app.storage/storage)
:max-age cf/deletion-delay}
:storage (ig/ref :app.storage/storage)}
:app.tasks.file-gc/handler
{:pool (ig/ref :app.db/pool)
:max-age cf/deletion-delay}
{:pool (ig/ref :app.db/pool)}
:app.tasks.file-xlog-gc/handler
{:pool (ig/ref :app.db/pool)
:max-age (dt/duration {:hours 72})}
:app.tasks.file-offload/handler
{:pool (ig/ref :app.db/pool)
:max-age (dt/duration {:seconds 5})
:storage (ig/ref :app.storage/storage)
:backend (cf/get :fdata-storage-backed :fdata-s3)}
{:pool (ig/ref :app.db/pool)}
:app.tasks.telemetry/handler
{:pool (ig/ref :app.db/pool)
@ -413,14 +354,62 @@
{:directory (cf/get :storage-assets-fs-directory)}
})
(def worker-config
{ :app.worker/cron
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)
:tasks (ig/ref :app.worker/registry)
:pool (ig/ref :app.db/pool)
:entries
[{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :file-gc}
{:cron #app/cron "0 0 * * * ?" ;; hourly
:task :file-xlog-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :storage-gc-deleted}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :storage-gc-touched}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :session-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :objects-gc}
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :tasks-gc}
{:cron #app/cron "0 30 */3,23 * * ?"
:task :telemetry}
(when (contains? cf/flags :audit-log-archive)
{:cron #app/cron "0 */5 * * * ?" ;; every 5m
:task :audit-log-archive})
(when (contains? cf/flags :audit-log-gc)
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :audit-log-gc})]}
:app.worker/worker
{:executor (ig/ref [::worker :app.worker/executor])
:tasks (ig/ref :app.worker/registry)
:metrics (ig/ref :app.metrics/metrics)
:pool (ig/ref :app.db/pool)}})
(def system nil)
(defn start
[]
(ig/load-namespaces system-config)
(ig/load-namespaces (merge system-config worker-config))
(alter-var-root #'system (fn [sys]
(when sys (ig/halt! sys))
(-> system-config
(cond-> (contains? cf/flags :backend-worker)
(merge worker-config))
(ig/prep)
(ig/init))))
(l/info :msg "welcome to penpot"

View file

@ -241,6 +241,9 @@
{:name "0077-mod-comment-thread-table"
:fn (mg/resource "app/migrations/sql/0077-mod-comment-thread-table.sql")}
{:name "0078-mod-file-media-object-table-drop-cascade"
:fn (mg/resource "app/migrations/sql/0078-mod-file-media-object-table-drop-cascade.sql")}
])

View file

@ -0,0 +1,9 @@
ALTER TABLE file_media_object
DROP CONSTRAINT file_media_object_media_id_fkey,
ADD CONSTRAINT file_media_object_media_id_fkey
FOREIGN KEY (media_id) REFERENCES storage_object(id) ON DELETE NO ACTION DEFERRABLE;
ALTER TABLE file_media_object
DROP CONSTRAINT file_media_object_thumbnail_id_fkey,
ADD CONSTRAINT file_media_object_thumbnail_id_fkey
FOREIGN KEY (thumbnail_id) REFERENCES storage_object(id) ON DELETE NO ACTION DEFERRABLE;

View file

@ -186,7 +186,7 @@
spec (or (::sv/spec mdata) (s/spec any?))
auth? (:auth mdata true)]
(l/trace :action "register" :name (::sv/name mdata))
(l/debug :hint "register method" :name (::sv/name mdata))
(with-meta
(fn [{:keys [::request] :as params}]
;; Raise authentication error when rpc method requires auth but

View file

@ -88,7 +88,7 @@
The `on-file` parameter should be a function that receives the file
and the previous state and returns the new state."
[system {:keys [chunk-size on-file] :or {chunk-size 10}}]
[system & {:keys [chunk-size on-file] :or {chunk-size 10}}]
(letfn [(get-chunk [conn cursor]
(let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])]
[(some->> rows peek :created-at) (seq rows)]))
@ -109,6 +109,21 @@
(recur state (rest files)))
state)))))
(defn analyze-file-data
[system & {:keys [id on-form on-data]}]
(let [file (get-file system id)]
(cond
(fn? on-data)
(on-data (:data file))
(fn? on-form)
(walk/postwalk (fn [form]
(on-form form)
form)
(:data file)))
nil))
(defn update-pages
"Apply a function to all pages of one file. The function receives a page and returns an updated page."
[data f]

View file

@ -9,7 +9,24 @@
#_:clj-kondo/ignore
(:require
[app.common.logging :as l]
[app.common.pprint :as p]
[app.srepl.fixes :as f]
[app.srepl.helpers :as h]
[app.srepl.fixes :as f]))
[clojure.pprint :refer [pprint]]))
;; Empty namespace as main entry point for Server REPL
(defn print-available-tasks
[system]
(let [tasks (:app.worker/registry system)]
(p/pprint (keys tasks) :level 200)))
(defn run-task!
([system name]
(run-task! system name {}))
([system name params]
(let [tasks (:app.worker/registry system)]
(if-let [task-fn (get tasks name)]
(task-fn params)
(l/warn :hint "no task found" :name name)))))

View file

@ -270,39 +270,48 @@
(defmethod ig/pre-init-spec ::gc-deleted-task [_]
(s/keys :req-un [::storage ::db/pool ::min-age ::wrk/executor]))
(defmethod ig/prep-key ::gc-deleted-task
[_ cfg]
(merge {:min-age (dt/duration {:hours 2})}
(d/without-nils cfg)))
(defmethod ig/init-key ::gc-deleted-task
[_ {:keys [pool storage min-age] :as cfg}]
(letfn [(retrieve-deleted-objects-chunk [conn cursor]
[_ {:keys [pool storage] :as cfg}]
(letfn [(retrieve-deleted-objects-chunk [conn min-age cursor]
(let [min-age (db/interval min-age)
rows (db/exec! conn [sql:retrieve-deleted-objects-chunk min-age cursor])]
[(some-> rows peek :created-at)
(some->> (seq rows) (d/group-by #(-> % :backend keyword) :id #{}) seq)]))
(retrieve-deleted-objects [conn]
(->> (d/iteration (fn [cursor]
(retrieve-deleted-objects-chunk conn cursor))
(retrieve-deleted-objects [conn min-age]
(->> (d/iteration (partial retrieve-deleted-objects-chunk conn min-age)
:initk (dt/now)
:vf second
:kf first)
(sequence cat)))
(delete-in-bulk [conn backend ids]
(let [backend (impl/resolve-backend storage backend)
(delete-in-bulk [conn backend-name ids]
(let [backend (impl/resolve-backend storage backend-name)
backend (assoc backend :conn conn)]
(doseq [id ids]
(l/debug :hint "permanently delete storage object" :task "gc-deleted" :backend backend-name :id id))
@(impl/del-objects-in-bulk backend ids)))]
(fn [_]
(fn [params]
(let [min-age (or (:min-age params) (:min-age cfg))]
(db/with-atomic [conn pool]
(loop [total 0
groups (retrieve-deleted-objects conn)]
groups (retrieve-deleted-objects conn min-age)]
(if-let [[backend ids] (first groups)]
(do
(delete-in-bulk conn backend ids)
(recur (+ total (count ids))
(rest groups)))
(do
(l/info :task "gc-deleted" :count total)
{:deleted total})))))))
(l/info :hint "task finished" :min-age (dt/format-duration min-age) :task "gc-deleted" :total total)
{:deleted total}))))))))
(def sql:retrieve-deleted-objects-chunk
"with items_part as (
@ -345,14 +354,14 @@
(defmethod ig/init-key ::gc-touched-task
[_ {:keys [pool] :as cfg}]
(letfn [(has-team-font-variant-nrefs? [conn id]
(-> (db/exec-one! conn [sql:retrieve-team-font-variant-nrefs id id id id]) :nrefs pos?))
(letfn [(get-team-font-variant-nrefs [conn id]
(-> (db/exec-one! conn [sql:retrieve-team-font-variant-nrefs id id id id]) :nrefs))
(has-file-media-object-nrefs? [conn id]
(-> (db/exec-one! conn [sql:retrieve-file-media-object-nrefs id id]) :nrefs pos?))
(get-file-media-object-nrefs [conn id]
(-> (db/exec-one! conn [sql:retrieve-file-media-object-nrefs id id]) :nrefs))
(has-profile-nrefs? [conn id]
(-> (db/exec-one! conn [sql:retrieve-profile-nrefs id id]) :nrefs pos?))
(get-profile-nrefs [conn id]
(-> (db/exec-one! conn [sql:retrieve-profile-nrefs id id]) :nrefs))
(mark-freeze-in-bulk [conn ids]
(db/exec-one! conn ["update storage_object set touched_at=null where id = ANY(?)"
@ -395,15 +404,23 @@
:kf first)
(sequence cat)))
(process-objects! [conn pred-fn ids]
(process-objects! [conn get-fn ids bucket]
(loop [to-freeze #{}
to-delete #{}
ids (seq ids)]
(if-let [id (first ids)]
(if (pred-fn conn id)
(recur (conj to-freeze id) to-delete (rest ids))
(recur to-freeze (conj to-delete id) (rest ids)))
(let [nrefs (get-fn conn id)]
(if (pos? nrefs)
(do
(l/debug :hint "processing storage object"
:task "gc-touched" :id id :status "freeze"
:bucket bucket :refs nrefs)
(recur (conj to-freeze id) to-delete (rest ids)))
(do
(l/debug :hint "processing storage object"
:task "gc-touched" :id id :status "delete"
:bucket bucket :refs nrefs)
(recur to-freeze (conj to-delete id) (rest ids)))))
(do
(some->> (seq to-freeze) (mark-freeze-in-bulk conn))
(some->> (seq to-delete) (mark-delete-in-bulk conn))
@ -417,9 +434,9 @@
groups (retrieve-touched conn)]
(if-let [[bucket ids] (first groups)]
(let [[f d] (case bucket
"file-media-object" (process-objects! conn has-file-media-object-nrefs? ids)
"team-font-variant" (process-objects! conn has-team-font-variant-nrefs? ids)
"profile" (process-objects! conn has-profile-nrefs? ids)
"file-media-object" (process-objects! conn get-file-media-object-nrefs ids bucket)
"team-font-variant" (process-objects! conn get-team-font-variant-nrefs ids bucket)
"profile" (process-objects! conn get-profile-nrefs ids bucket)
(ex/raise :type :internal
:code :unexpected-unknown-reference
:hint (dm/fmt "unknown reference %" bucket)))]
@ -427,15 +444,16 @@
(+ to-delete d)
(rest groups)))
(do
(l/info :task "gc-touched" :to-freeze to-freeze :to-delete to-delete)
(l/info :hint "task finished" :task "gc-touched" :to-freeze to-freeze :to-delete to-delete)
{:freeze to-freeze :delete to-delete})))))))
(def sql:retrieve-touched-objects-chunk
"select so.* from storage_object as so
where so.touched_at is not null
and so.created_at < ?
order by so.created_at desc
limit 500;")
"SELECT so.*
FROM storage_object AS so
WHERE so.touched_at IS NOT NULL
AND so.created_at < ?
ORDER by so.created_at DESC
LIMIT 500;")
(def sql:retrieve-file-media-object-nrefs
"select ((select count(*) from file_media_object where media_id = ?) +

View file

@ -14,6 +14,7 @@
[app.common.logging :as l]
[app.common.pages.migrations :as pmg]
[app.common.types.shape-tree :as ctt]
[app.config :as cf]
[app.db :as db]
[app.util.blob :as blob]
[app.util.time :as dt]
@ -29,16 +30,22 @@
;; HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::max-age ::dt/duration)
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::max-age]))
(s/keys :req-un [::db/pool ::min-age]))
(defmethod ig/prep-key ::handler
[_ cfg]
(merge {:min-age cf/deletion-delay}
(d/without-nils cfg)))
(defmethod ig/init-key ::handler
[_ {:keys [pool] :as cfg}]
(fn [_]
(fn [params]
(db/with-atomic [conn pool]
(let [cfg (assoc cfg :conn conn)]
(let [min-age (or (:min-age params) (:min-age cfg))
cfg (assoc cfg :min-age min-age :conn conn)]
(loop [total 0
files (retrieve-candidates cfg)]
(if-let [file (first files)]
@ -47,7 +54,12 @@
(recur (inc total)
(rest files)))
(do
(l/debug :msg "finished processing files" :processed total)
(l/info :hint "task finished" :min-age (dt/format-duration min-age) :total total)
;; Allow optional rollback passed by params
(when (:rollback? params)
(db/rollback! conn))
{:processed total})))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -69,18 +81,20 @@
for update skip locked")
(defn- retrieve-candidates
[{:keys [conn max-age] :as cfg}]
(let [interval (db/interval max-age)
get-chunk
(fn [cursor]
[{:keys [conn min-age id] :as cfg}]
(if id
(do
(l/warn :hint "explicit file id passed on params" :id id)
(db/query conn :file {:id id}))
(let [interval (db/interval min-age)
get-chunk (fn [cursor]
(let [rows (db/exec! conn [sql:retrieve-candidates-chunk interval cursor])]
[(some->> rows peek :modified-at) (seq rows)]))]
(sequence cat (d/iteration get-chunk
:vf second
:kf first
:initk (dt/now)))))
:initk (dt/now))))))
(defn collect-used-media
[data]
@ -142,14 +156,14 @@
"delete from file_object_thumbnail "
" where file_id=? and object_id=ANY(?)")
res (db/exec-one! conn [sql file-id (db/create-array conn "text" unused)])]
(l/debug :hint "delete object thumbnails" :total (:next.jdbc/update-count res))))))
(l/debug :hint "delete file object thumbnails" :file-id file-id :total (:next.jdbc/update-count res))))))
(defn- clean-file-thumbnails!
[conn file-id revn]
(let [sql (str "delete from file_thumbnail "
" where file_id=? and revn < ?")
res (db/exec-one! conn [sql file-id revn])]
(l/debug :hint "delete file thumbnails" :total (:next.jdbc/update-count res))))
(l/debug :hint "delete file thumbnails" :file-id file-id :total (:next.jdbc/update-count res))))
(defn- process-file
[{:keys [conn] :as cfg} {:keys [id data revn modified-at] :as file}]

View file

@ -1,63 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.tasks.file-offload
"A maintenance task that offloads file data to an external storage (S3)."
(:require
[app.common.logging :as l]
[app.common.spec :as us]
[app.db :as db]
[app.storage :as sto]
[app.storage.impl :as simpl]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(def sql:offload-candidates-chunk
"select f.id, f.data from file as f
where f.data is not null
and f.modified_at < now() - ?::interval
order by f.modified_at
limit 10")
(defn- retrieve-candidates
[{:keys [conn max-age]}]
(db/exec! conn [sql:offload-candidates-chunk max-age]))
(defn- offload-candidate
[{:keys [storage conn backend] :as cfg} {:keys [id data] :as file}]
(l/debug :hint "offload file data" :id id)
(let [backend (simpl/resolve-backend storage backend)]
(->> (simpl/content data)
(simpl/put-object backend file))
(db/update! conn :file
{:data nil
:data-backend (name (:id backend))}
{:id id})))
;; ---- STATE INIT
(s/def ::max-age ::dt/duration)
(s/def ::backend ::us/keyword)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::max-age ::sto/storage ::backend]))
(defmethod ig/init-key ::handler
[_ {:keys [pool max-age] :as cfg}]
(fn [_]
(db/with-atomic [conn pool]
(let [max-age (db/interval max-age)
cfg (-> cfg
(assoc :conn conn)
(assoc :max-age max-age))]
(loop [n 0]
(let [candidates (retrieve-candidates cfg)]
(if (seq candidates)
(do
(run! (partial offload-candidate cfg) candidates)
(recur (+ n (count candidates))))
(l/debug :hint "offload summary" :count n))))))))

View file

@ -8,6 +8,7 @@
"A maintenance task that performs a garbage collection of the file
change (transaction) log."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.db :as db]
[app.util.time :as dt]
@ -16,21 +17,31 @@
(declare sql:delete-files-xlog)
(s/def ::max-age ::dt/duration)
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::max-age]))
(s/keys :req-un [::db/pool]
:opt-un [::min-age]))
(defmethod ig/prep-key ::handler
[_ cfg]
(merge {:min-age (dt/duration {:hours 72})}
(d/without-nils cfg)))
(defmethod ig/init-key ::handler
[_ {:keys [pool max-age] :as cfg}]
(fn [_]
[_ {:keys [pool] :as cfg}]
(fn [params]
(let [min-age (or (:min-age params) (:min-age cfg))]
(db/with-atomic [conn pool]
(let [interval (db/interval max-age)
(let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-files-xlog interval])
result (:next.jdbc/update-count result)]
(l/info :hint "remove old file changes"
:removed result)
result))))
(l/info :hint "task finished" :min-age (dt/format-duration min-age) :total result)
(when (:rollback? params)
(db/rollback! conn))
result)))))
(def ^:private
sql:delete-files-xlog

View file

@ -8,7 +8,9 @@
"A maintenance task that performs a general purpose garbage collection
of deleted objects."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.config :as cf]
[app.db :as db]
[app.media :as media]
[app.storage :as sto]
@ -41,38 +43,38 @@
;; --- IMPL: generic object deletion
(defmethod delete-objects :default
[{:keys [conn max-age table] :as cfg}]
[{:keys [conn min-age table] :as cfg}]
(let [sql (str/fmt sql:delete-objects
{:table table :limit 50})
result (db/exec! conn [sql max-age])]
result (db/exec! conn [sql min-age])]
(doseq [{:keys [id] :as item} result]
(l/trace :hint "delete object" :table table :id id))
(l/debug :hint "permanently delete object" :table table :id id))
(count result)))
;; --- IMPL: file deletion
(defmethod delete-objects "file"
[{:keys [conn max-age table] :as cfg}]
[{:keys [conn min-age table] :as cfg}]
(let [sql (str/fmt sql:delete-objects {:table table :limit 50})
result (db/exec! conn [sql max-age])]
result (db/exec! conn [sql min-age])]
(doseq [{:keys [id] :as item} result]
(l/trace :hint "delete object" :table table :id id))
(l/debug :hint "permanently delete object" :table table :id id))
(count result)))
;; --- IMPL: team-font-variant deletion
(defmethod delete-objects "team_font_variant"
[{:keys [conn max-age storage table] :as cfg}]
[{:keys [conn min-age storage table] :as cfg}]
(let [sql (str/fmt sql:delete-objects
{:table table :limit 50})
fonts (db/exec! conn [sql max-age])
fonts (db/exec! conn [sql min-age])
storage (media/configure-assets-storage storage conn)]
(doseq [{:keys [id] :as font} fonts]
(l/trace :hint "delete object" :table table :id id)
(l/debug :hint "permanently delete object" :table table :id id)
(some->> (:woff1-file-id font) (sto/touch-object! storage) deref)
(some->> (:woff2-file-id font) (sto/touch-object! storage) deref)
(some->> (:otf-file-id font) (sto/touch-object! storage) deref)
@ -82,14 +84,14 @@
;; --- IMPL: team deletion
(defmethod delete-objects "team"
[{:keys [conn max-age storage table] :as cfg}]
[{:keys [conn min-age storage table] :as cfg}]
(let [sql (str/fmt sql:delete-objects
{:table table :limit 50})
teams (db/exec! conn [sql max-age])
teams (db/exec! conn [sql min-age])
storage (assoc storage :conn conn)]
(doseq [{:keys [id] :as team} teams]
(l/trace :hint "delete object" :table table :id id)
(l/debug :hint "permanently delete object" :table table :id id)
(some->> (:photo-id team) (sto/touch-object! storage) deref))
(count teams)))
@ -115,17 +117,17 @@
where id in (select id from owned)")
(defmethod delete-objects "profile"
[{:keys [conn max-age storage table] :as cfg}]
[{:keys [conn min-age storage table] :as cfg}]
(let [sql (str/fmt sql:retrieve-deleted-profiles {:limit 50})
profiles (db/exec! conn [sql max-age])
profiles (db/exec! conn [sql min-age])
storage (assoc storage :conn conn)]
(doseq [{:keys [id] :as profile} profiles]
(l/trace :hint "delete object" :table table :id id)
(l/debug :hint "permanently delete object" :table table :id id)
;; Mark the owned teams as deleted; this enables them to be processed
;; in the same transaction in the "team" table step.
(db/exec-one! conn [sql:mark-owned-teams-deleted id max-age])
(db/exec-one! conn [sql:mark-owned-teams-deleted id min-age])
;; Mark as deleted the storage object related with the photo-id
;; field.
@ -144,22 +146,40 @@
(let [res (delete-objects cfg)]
(if (pos? res)
(recur (+ n res))
(l/debug :hint "table gc summary" :table table :deleted n)))))
(do
(l/debug :hint "delete summary" :table table :total n)
n)))))
(s/def ::max-age ::dt/duration)
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::sto/storage ::max-age]))
(s/keys :req-un [::db/pool ::sto/storage]
:opt-un [::min-age]))
(defmethod ig/prep-key ::handler
[_ cfg]
(merge {:min-age cf/deletion-delay}
(d/without-nils cfg)))
(defmethod ig/init-key ::handler
[_ {:keys [pool max-age] :as cfg}]
(fn [task]
[_ {:keys [pool] :as cfg}]
(fn [params]
;; Checking first on task argument allows properly testing it.
(let [max-age (get task :max-age max-age)]
(let [min-age (or (:min-age params) (:min-age cfg))]
(db/with-atomic [conn pool]
(let [max-age (db/interval max-age)
cfg (-> cfg
(assoc :max-age max-age)
(let [cfg (-> cfg
(assoc :min-age (db/interval min-age))
(assoc :conn conn))]
(doseq [table target-tables]
(process-table (assoc cfg :table table))))))))
(loop [tables (seq target-tables)
total 0]
(if-let [table (first tables)]
(recur (rest tables)
(+ total (process-table (assoc cfg :table table))))
(do
(l/info :hint "task finished" :min-age (dt/format-duration min-age) :total total)
(when (:rollback? params)
(db/rollback! conn))
{:processed total}))))))))

View file

@ -8,7 +8,9 @@
"A maintenance task that performs a cleanup of already executed tasks
from the database table."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.config :as cf]
[app.db :as db]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
@ -16,20 +18,31 @@
(declare sql:delete-completed-tasks)
(s/def ::max-age ::dt/duration)
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::max-age]))
(s/keys :req-un [::db/pool]
:opt-un [::min-age]))
(defmethod ig/prep-key ::handler
[_ cfg]
(merge {:min-age cf/deletion-delay}
(d/without-nils cfg)))
(defmethod ig/init-key ::handler
[_ {:keys [pool max-age] :as cfg}]
(fn [_]
[_ {:keys [pool] :as cfg}]
(fn [params]
(let [min-age (or (:min-age params) (:min-age cfg))]
(db/with-atomic [conn pool]
(let [interval (db/interval max-age)
(let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval])
result (:next.jdbc/update-count result)]
(l/debug :hint "trim completed tasks table" :removed result)
result))))
(l/debug :hint "task finished" :total result)
(when (:rollback? params)
(db/rollback! conn))
result)))))
(def ^:private
sql:delete-completed-tasks

View file

@ -116,6 +116,9 @@
Duration
(-edn [o] (pr-str o)))
(defn format-duration
[o]
(str/lower (subs (str o) 2)))
;; --- INSTANT

View file

@ -224,7 +224,7 @@
:name (d/name name)
:queue (d/name queue))
(do
(l/info :hint "worker started"
(l/info :hint "worker initialized"
:name (d/name name)
:queue (d/name queue))
(event-loop cfg)))
@ -565,7 +565,7 @@
(l/info :hint "registry initialized" :tasks (count tasks))
(reduce-kv (fn [res k v]
(let [tname (name k)]
(l/trace :hint "register task" :name tname)
(l/debug :hint "register task" :name tname)
(assoc res k (wrap-task-handler metrics tname v))))
{}
tasks))

View file

@ -192,23 +192,18 @@
;; freeze because of the deduplication (we have uploaded 2 times
;; 2 two same files).
(let [task (:app.storage/gc-touched-task th/*system*)
res (task {})]
res (task {:min-age (dt/duration 0)})]
(t/is (= 2 (:freeze res)))
(t/is (= 0 (:delete res))))
;; run the task immediately
;; run the file-gc task immediately without forced min-age
(let [task (:app.tasks.file-gc/handler th/*system*)
res (task {})]
(t/is (= 0 (:processed res))))
;; make the file eligible for GC waiting 300ms (configured
;; timeout for testing)
(th/sleep 300)
;; run the task again
(let [task (:app.tasks.file-gc/handler th/*system*)
res (task {})]
res (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed res))))
;; retrieve file and check trimmed attribute
@ -225,22 +220,36 @@
(t/is (some? @(sto/get-object storage (:media-id fmo1))))
(t/is (some? @(sto/get-object storage (:thumbnail-id fmo1))))
;; now, we have deleted the unused file-media-object, if we
;; execute the touched-gc task, we should see that two of them
;; are marked to be deleted.
;; proceed to remove usage of the file
(update-file {:file-id (:id file)
:profile-id (:id profile)
:revn 0
:changes [{:type :del-obj
:page-id (first (get-in file [:data :pages]))
:id shid}]})
;; Now, we have deleted the usag of pointers to the
;; file-media-objects, if we pase file-gc, they should be marked
;; as deleted.
(let [task (:app.tasks.file-gc/handler th/*system*)
res (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed res))))
;; Now that file-gc have deleted the file-media-object usage,
;; lets execute the touched-gc task, we should see that two of
;; them are marked to be deleted.
(let [task (:app.storage/gc-touched-task th/*system*)
res (task {})]
(t/is (= 2 (:freeze res)))
(t/is (= 0 (:delete res))))
res (task {:min-age (dt/duration 0)})]
(t/is (= 0 (:freeze res)))
(t/is (= 2 (:delete res))))
;; Finally, check that some of the objects that are marked as
;; deleted we are unable to retrieve them using standard storage
;; public api.
(t/is (some? @(sto/get-object storage (:media-id fmo2))))
(t/is (some? @(sto/get-object storage (:thumbnail-id fmo2))))
(t/is (some? @(sto/get-object storage (:media-id fmo1))))
(t/is (some? @(sto/get-object storage (:thumbnail-id fmo1))))
(t/is (nil? @(sto/get-object storage (:media-id fmo2))))
(t/is (nil? @(sto/get-object storage (:thumbnail-id fmo2))))
(t/is (nil? @(sto/get-object storage (:media-id fmo1))))
(t/is (nil? @(sto/get-object storage (:thumbnail-id fmo1))))
)))
(t/deftest permissions-checks-creating-file
@ -359,8 +368,8 @@
:profile-id (:id profile1)})]
;; file is not deleted because it does not meet all
;; conditions to be deleted.
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 0 (:processed result))))
;; query the list of files
(let [data {::th/type :project-files
@ -390,8 +399,8 @@
(t/is (= 0 (count result)))))
;; run permanent deletion (should be noop)
(let [result (task {:max-age (dt/duration {:minutes 1})})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration {:minutes 1})})]
(t/is (= 0 (:processed result))))
;; query the list of file libraries of a after hard deletion
(let [data {::th/type :file-libraries
@ -404,8 +413,8 @@
(t/is (= 0 (count result)))))
;; run permanent deletion
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed result))))
;; query the list of file libraries of a after hard deletion
(let [data {::th/type :file-libraries
@ -603,7 +612,7 @@
;; run the task again
(let [task (:app.tasks.file-gc/handler th/*system*)
res (task {})]
res (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed res))))
;; check that object thumbnails are still here
@ -630,7 +639,7 @@
;; run the task again
(let [task (:app.tasks.file-gc/handler th/*system*)
res (task {})]
res (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed res))))
;; check that the unknown frame thumbnail is deleted
@ -714,7 +723,7 @@
;; run the task again
(let [task (:app.tasks.file-gc/handler th/*system*)
res (task {})]
res (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed res))))
;; Then query the specific revn

View file

@ -126,8 +126,8 @@
;; profile is not deleted because it does not meet all
;; conditions to be deleted.
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 0 (:processed result))))
;; Request profile to be deleted
(let [params {::th/type :delete-profile
@ -145,8 +145,8 @@
(t/is (= 1 (count (:result out)))))
;; execute permanent deletion task
(let [result (task {:max-age (dt/duration "-1m")})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration "-1m")})]
(t/is (= 1 (:processed result))))
;; query profile after delete
(let [params {::th/type :profile

View file

@ -179,8 +179,8 @@
;; project is not deleted because it does not meet all
;; conditions to be deleted.
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 0 (:processed result))))
;; query the list of projects
(let [data {::th/type :projects
@ -210,8 +210,8 @@
(t/is (= 1 (count result)))))
;; run permanent deletion (should be noop)
(let [result (task {:max-age (dt/duration {:minutes 1})})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration {:minutes 1})})]
(t/is (= 0 (:processed result))))
;; query the list of files of a after soft deletion
(let [data {::th/type :project-files
@ -224,8 +224,8 @@
(t/is (= 0 (count result)))))
;; run permanent deletion
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed result))))
;; query the list of files of a after hard deletion
(let [data {::th/type :project-files

View file

@ -99,8 +99,8 @@
;; team is not deleted because it does not meet all
;; conditions to be deleted.
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 0 (:processed result))))
;; query the list of teams
(let [data {::th/type :teams
@ -132,8 +132,8 @@
(t/is (= (:default-team-id profile1) (get-in result [0 :id])))))
;; run permanent deletion (should be noop)
(let [result (task {:max-age (dt/duration {:minutes 1})})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration {:minutes 1})})]
(t/is (= 0 (:processed result))))
;; query the list of projects after hard deletion
(let [data {::th/type :projects
@ -147,8 +147,8 @@
(t/is (= (:type error-data) :not-found))))
;; run permanent deletion
(let [result (task {:max-age (dt/duration 0)})]
(t/is (nil? result)))
(let [result (task {:min-age (dt/duration 0)})]
(t/is (= 1 (:processed result))))
;; query the list of projects of a after hard deletion
(let [data {::th/type :projects

View file

@ -57,8 +57,8 @@
:file-uri "test"
:thumbnail-uri "test"
:path (-> "app/test_files/template.penpot" io/resource fs/path)}]
config (-> main/system-config
(merge main/worker-config)
(assoc-in [:app.msgbus/msgbus :redis-uri] (:redis-uri config))
(assoc-in [:app.db/pool :uri] (:database-uri config))
(assoc-in [:app.db/pool :username] (:database-username config))
@ -85,9 +85,7 @@
:app.loggers.database/reporter
:app.loggers.zmq/receiver
:app.worker/cron
:app.worker/worker)
(d/deep-merge
{:app.tasks.file-gc/handler {:max-age (dt/duration 300)}}))
:app.worker/worker))
_ (ig/load-namespaces config)
system (-> (ig/prep config)
(ig/init))]

View file

@ -7,6 +7,7 @@
(ns app.main.ui.dashboard.import
(:require
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.logging :as log]
[app.main.data.dashboard :as dd]
[app.main.data.events :as ev]
@ -379,6 +380,7 @@
(let [editing? (and (some? (:file-id file))
(= (:file-id file) (:editing @state)))]
[:& import-entry {:state state
:key (dm/str (:id file))
:file file
:editing? editing?
:can-be-deleted? (> (count files) 1)}]))