Merge pull request #4389 from penpot/test

 Several improvements
This commit is contained in:
Alejandro 2024-04-11 12:35:04 +02:00 committed by GitHub
commit ac835bb655
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 232 additions and 485 deletions

View file

@ -101,6 +101,8 @@
(s/def ::audit-log-archive-uri ::us/string)
(s/def ::audit-log-http-handler-concurrency ::us/integer)
(s/def ::deletion-delay ::dt/duration)
(s/def ::admins ::us/set-of-valid-emails)
(s/def ::file-change-snapshot-every ::us/integer)
(s/def ::file-change-snapshot-timeout ::dt/duration)
@ -214,6 +216,7 @@
(s/keys :opt-un [::secret-key
::flags
::admins
::deletion-delay
::allow-demo-users
::audit-log-archive-uri
::audit-log-http-handler-concurrency
@ -335,7 +338,8 @@
:enable-backend-openapi-doc
:enable-backend-worker
:enable-secure-session-cookies
:enable-email-verification])
:enable-email-verification
:enable-v2-migration])
(defn- parse-flags
[config]
@ -380,7 +384,8 @@
(defonce ^:dynamic flags (parse-flags config))
(def deletion-delay
(dt/duration {:days 7}))
(or (c/get config :deletion-delay)
(dt/duration {:days 7})))
(defn get
"A configuration getter. Helps code be more testable."

View file

@ -53,7 +53,6 @@
[app.storage.tmp :as tmp]
[app.svgo :as svgo]
[app.util.blob :as blob]
[app.util.events :as events]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[buddy.core.codecs :as bc]
@ -1196,9 +1195,6 @@
add-instance-grid
(fn [fdata frame-id grid assets]
(reduce (fn [result [component position]]
(events/tap :progress {:op :migrate-component
:id (:id component)
:name (:name component)})
(add-main-instance result component frame-id (gpt/add position
(gpt/point grid-gap grid-gap))))
fdata
@ -1518,9 +1514,6 @@
(->> (d/zip media-group grid)
(reduce (fn [fdata [mobj position]]
(events/tap :progress {:op :migrate-graphic
:id (:id mobj)
:name (:name mobj)})
(or (process fdata mobj position) fdata))
(assoc-in fdata [:options :components-v2] true)))))
@ -1759,11 +1752,6 @@
(let [file (get-file system file-id)
file (process-file! system file :validate? validate?)]
(events/tap :progress
{:op :migrate-file
:name (:name file)
:id (:id file)})
(persist-file! system file)))))
(catch Throwable cause
@ -1791,10 +1779,11 @@
(some-> *team-stats* (swap! update :processed-files (fnil inc 0)))))))))
(defn migrate-team!
[system team-id & {:keys [validate? skip-on-graphic-error? label]}]
[system team-id & {:keys [validate? rown skip-on-graphic-error? label]}]
(l/dbg :hint "migrate:team:start"
:team-id (dm/str team-id))
:team-id (dm/str team-id)
:rown rown)
(let [tpoint (dt/tpoint)
err (volatile! false)
@ -1816,11 +1805,6 @@
(conj "layout/grid")
(conj "styles/v2"))]
(events/tap :progress
{:op :migrate-team
:name (:name team)
:id id})
(run! (partial migrate-file system)
(get-and-lock-team-files conn id))
@ -1849,6 +1833,7 @@
(l/dbg :hint "migrate:team:end"
:team-id (dm/str team-id)
:rown rown
:files files
:components components
:graphics graphics

View file

@ -99,7 +99,7 @@
(= code :invalid-image)
(binding [l/*context* (request->context request)]
(let [cause (or parent-cause err)]
(l/error :hint "unexpected error on processing image" :cause cause)
(l/warn :hint "unexpected error on processing image" :cause cause)
{::rres/status 400 ::rres/body data}))
:else

View file

@ -23,17 +23,20 @@
(defn- send-mattermost-notification!
[cfg {:keys [id public-uri] :as report}]
(let [text (str "Exception: " public-uri "/dbg/error/" id " "
(when-let [pid (:profile-id report)]
(str "(pid: #uuid-" pid ")"))
"\n"
"```\n"
"- host: `" (:host report) "`\n"
"- tenant: `" (:tenant report) "`\n"
"- host: #" (:host report) "\n"
"- tenant: #" (:tenant report) "\n"
"- logger: #" (:logger report) "\n"
"- request-path: `" (:request-path report) "`\n"
"- frontend-version: `" (:frontend-version report) "`\n"
"- backend-version: `" (:backend-version report) "`\n"
"\n"
"```\n"
"Trace:\n"
(:trace report)
"```")
@ -60,6 +63,7 @@
:frontend-version (:version/frontend context)
:profile-id (:request/profile-id context)
:request-path (:request/path context)
:logger (::l/logger record)
:trace (ex/format-throwable cause :detail? false :header? false)})
(defn handle-event

View file

@ -15,9 +15,9 @@
[app.config :as cf]
[app.db :as db]
[app.http.client :as http]
[app.util.json :as json]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.data.json :as json]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]))
@ -86,11 +86,9 @@
(declare interpret-exception)
(declare interpret-response)
(def ^:private json-mapper
(json/mapper
{:encode-key-fn str/camel
:decode-key-fn (comp keyword str/kebab)
:pretty true}))
(def json-write-opts
{:key-fn str/camel
:indent true})
(defmethod ig/pre-init-spec ::run-webhook-handler [_]
(s/keys :req [::http/client ::db/pool]))
@ -134,15 +132,15 @@
whook (::config props)
body (case (:mtype whook)
"application/json" (json/encode-str event json-mapper)
"application/json" (json/write-str event json-write-opts)
"application/transit+json" (t/encode-str event)
"application/x-www-form-urlencoded" (uri/map->query-string event))]
(l/debug :hint "run webhook"
:event-name (:name event)
:webhook-id (:id whook)
:webhook-uri (:uri whook)
:webhook-mtype (:mtype whook))
(l/dbg :hint "run webhook"
:event-name (:name event)
:webhook-id (:id whook)
:webhook-uri (:uri whook)
:webhook-mtype (:mtype whook))
(let [req {:uri (:uri whook)
:headers {"content-type" (:mtype whook)
@ -160,8 +158,8 @@
(report-delivery! whook req nil err)
(update-webhook! whook err)
(when (= err "unknown")
(l/error :hint "unknown error on webhook request"
:cause cause))))))))))
(l/err :hint "unknown error on webhook request"
:cause cause))))))))))
(defn interpret-response
[{:keys [status] :as response}]

View file

@ -24,6 +24,7 @@
[app.loggers.webhooks :as-alias webhooks]
[app.metrics :as-alias mtx]
[app.metrics.definition :as-alias mdef]
[app.migrations.v2 :as migrations.v2]
[app.msgbus :as-alias mbus]
[app.redis :as-alias rds]
[app.rpc :as-alias rpc]
@ -527,6 +528,15 @@
:worker? (contains? cf/flags :backend-worker)
:version (:full cf/version)))
(defn start-custom
[config]
(ig/load-namespaces config)
(alter-var-root #'system (fn [sys]
(when sys (ig/halt! sys))
(-> config
(ig/prep)
(ig/init)))))
(defn stop
[]
(alter-var-root #'system (fn [sys]
@ -573,6 +583,11 @@
(nrepl/start-server :bind "0.0.0.0" :port 6064 :handler cider-nrepl-handler))
(start)
(when (contains? cf/flags :v2-migration)
(px/sleep 5000)
(migrations.v2/migrate app.main/system))
(deref p))
(catch Throwable cause
(binding [*out* *err*]

View file

@ -0,0 +1,104 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.migrations.v2
(:require
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.db :as db]
[app.features.components-v2 :as feat]
[app.setup :as setup]
[app.util.time :as dt]))
(def ^:private sql:get-teams
"SELECT id, features,
row_number() OVER (ORDER BY created_at DESC) AS rown
FROM team
WHERE deleted_at IS NULL
AND (features <@ '{components/v2}' OR features IS NULL)
ORDER BY created_at DESC")
(defn- get-teams
[conn]
(->> (db/cursor conn [sql:get-teams] {:chunk-size 1})
(map feat/decode-row)))
(defn- migrate-teams
[{:keys [::db/conn] :as system}]
;; Allow long running transaction for this connection
(db/exec-one! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
;; Do not allow other migration running in the same time
(db/xact-lock! conn 0)
;; Run teams migration
(run! (fn [{:keys [id rown]}]
(try
(-> (assoc system ::db/rollback true)
(feat/migrate-team! id
:rown rown
:label "v2-migration"
:validate? false
:skip-on-graphics-error? true))
(catch Throwable _
(swap! feat/*stats* update :errors (fnil inc 0))
(l/wrn :hint "error on migrating team (skiping)"))))
(get-teams conn))
(setup/set-prop! system :v2-migrated true))
(defn migrate
[system]
(let [tpoint (dt/tpoint)
stats (atom {})
migrated? (setup/get-prop system :v2-migrated false)]
(when-not migrated?
(l/inf :hint "v2 migration started"
:files (:processed-files stats))
(try
(binding [feat/*stats* stats]
(db/tx-run! system migrate-teams))
(let [stats (deref stats)
elapsed (dt/format-duration (tpoint))]
(l/inf :hint "v2 migration finished"
:files (:processed-files stats)
:teams (:processed-teams stats)
:errors (:errors stats)
:elapsed elapsed))
(catch Throwable cause
(l/err :hint "error on aplying v2 migration" :cause cause))))))
(def ^:private required-services
[[:app.main/assets :app.storage.s3/backend]
[:app.main/assets :app.storage.fs/backend]
:app.storage/storage
:app.db/pool
:app.setup/props
:app.svgo/optimizer
:app.metrics/metrics
:app.migrations/migrations
:app.http.client/client])
(defn -main
[& _args]
(try
(let [config-var (requiring-resolve 'app.main/system-config)
start-var (requiring-resolve 'app.main/start-custom)
stop-var (requiring-resolve 'app.main/stop)
system-var (requiring-resolve 'app.main/system)
config (select-keys @config-var required-services)]
(start-var config)
(migrate @system-var)
(stop-var)
(System/exit 0))
(catch Throwable cause
(ex/print-throwable cause)
(flush)
(System/exit -1))))

View file

@ -7,6 +7,7 @@
(ns app.setup
"Initial data setup of instance."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.uuid :as uuid]
@ -25,7 +26,7 @@
(bc/bytes->b64u)
(bc/bytes->str)))
(defn- retrieve-all
(defn- get-all-props
[conn]
(->> (db/query conn :server-prop {:preload true})
(filter #(not= "secret-key" (:id %)))
@ -50,6 +51,28 @@
:cause cause))))
instance-id)))
(def sql:add-prop
"INSERT INTO server_prop (id, content, preload)
VALUES (?, ?, ?)
ON CONFLICT (id)
DO UPDATE SET content=?, preload=?")
(defn get-prop
([system prop] (get-prop system prop nil))
([system prop default]
(let [prop (d/name prop)]
(db/run! system (fn [{:keys [::db/conn]}]
(or (db/get* conn :server-prop {:id prop})
default))))))
(defn set-prop!
[system prop value]
(let [value (db/tjson value)
prop (d/name prop)]
(db/run! system (fn [{:keys [::db/conn]}]
(db/exec-one! conn [sql:add-prop prop value false value false])))))
(s/def ::key ::us/string)
(s/def ::props (s/map-of ::us/keyword some?))
@ -67,7 +90,7 @@
"PENPOT_SECRET_KEY environment variable")))
(let [secret (or key (generate-random-key))]
(-> (retrieve-all conn)
(-> (get-all-props conn)
(assoc :secret-key secret)
(assoc :tokens-key (keys/derive secret :salt "tokens"))
(update :instance-id handle-instance-id conn (db/read-only? pool))))))

View file

@ -11,10 +11,7 @@
[app.common.exceptions :as ex]
[app.common.uuid :as uuid]
[app.db :as db]
[app.main :as main]
[app.rpc.commands.auth :as cmd.auth]
[app.srepl.components-v2 :refer [migrate-teams!]]
[app.util.events :as events]
[app.util.json :as json]
[app.util.time :as dt]
[cuerdas.core :as str]))
@ -105,39 +102,6 @@
[{:keys [password]}]
(auth/derive-password password))
(defmethod exec-command :migrate-v2
[_]
(letfn [(on-progress-report [{:keys [elapsed completed errors]}]
(println (str/ffmt "-> Progress: completed: %, errors: %, elapsed: %"
completed errors elapsed)))
(on-progress [{:keys [op name]}]
(case op
:migrate-team
(println (str/ffmt "-> Migrating team: \"%\"" name))
:migrate-file
(println (str/ffmt "=> Migrating file: \"%\"" name))
nil))
(on-event [[type payload]]
(case type
:progress-report (on-progress-report payload)
:progress (on-progress payload)
:error (on-error payload)
nil))
(on-error [cause]
(println "EE:" (ex-message cause)))]
(println "The components/v2 migration started...")
(try
(let [result (-> (partial migrate-teams! main/system {:rollback? true})
(events/run-with! on-event))]
(println (str/ffmt "Migration process finished (elapsed: %)" (:elapsed result))))
(catch Throwable cause
(on-error cause)))))
(defmethod exec-command :default
[{:keys [::cmd]}]
(ex/raise :type :internal

View file

@ -6,18 +6,15 @@
(ns app.srepl.components-v2
(:require
[app.common.data :as d]
[app.common.fressian :as fres]
[app.common.logging :as l]
[app.db :as db]
[app.features.components-v2 :as feat]
[app.main :as main]
[app.srepl.helpers :as h]
[app.svgo :as svgo]
[app.util.events :as events]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io]
[promesa.exec :as px]
@ -31,86 +28,6 @@
;; PRIVATE HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- report-progress-files
[tpoint]
(fn [_ _ oldv newv]
(when (or (not= (:processed-files oldv)
(:processed-files newv))
(not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-files newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))]
(events/tap :progress-report
{:elapsed elapsed
:completed completed
:errors errors})
(l/dbg :hint "progress"
:completed completed
:elapsed elapsed)))))
(defn- report-progress-teams
[tpoint]
(fn [_ _ oldv newv]
(when (or (not= (:processed-teams oldv)
(:processed-teams newv))
(not= (:errors oldv)
(:errors newv)))
(let [completed (:processed-teams newv 0)
errors (:errors newv 0)
elapsed (dt/format-duration (tpoint))]
(events/tap :progress-report
{:elapsed elapsed
:completed completed
:errors errors})
(l/dbg :hint "progress"
:completed completed
:elapsed elapsed)))))
(def ^:private sql:get-teams-by-created-at
"WITH teams AS (
SELECT id, features,
row_number() OVER (ORDER BY created_at) AS rown
FROM team
WHERE deleted_at IS NULL
ORDER BY created_at DESC
) SELECT * FROM TEAMS %(pred)s")
(def ^:private sql:get-teams-by-graphics
"WITH teams AS (
SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT count(*)
FROM file_media_object AS fmo
JOIN file AS f ON (f.id = fmo.file_id)
JOIN project AS p ON (p.id = f.project_id)
WHERE p.team_id = t.id
AND fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false) AS graphics
FROM team AS t
WHERE t.deleted_at IS NULL
ORDER BY 3 ASC
)
SELECT * FROM teams %(pred)s")
(def ^:private sql:get-teams-by-activity
"WITH teams AS (
SELECT t.id, t.features,
row_number() OVER (ORDER BY t.created_at) AS rown,
(SELECT coalesce(max(date_trunc('month', f.modified_at)), date_trunc('month', t.modified_at))
FROM file AS f
JOIN project AS p ON (f.project_id = p.id)
WHERE p.team_id = t.id) AS updated_at,
(SELECT coalesce(count(*), 0)
FROM file AS f
JOIN project AS p ON (f.project_id = p.id)
WHERE p.team_id = t.id) AS total_files
FROM team AS t
WHERE t.deleted_at IS NULL
ORDER BY 3 DESC, 4 DESC
)
SELECT * FROM teams %(pred)s")
(def ^:private sql:get-files-by-created-at
"SELECT id, features,
row_number() OVER (ORDER BY created_at DESC) AS rown
@ -118,87 +35,12 @@
WHERE deleted_at IS NULL
ORDER BY created_at DESC")
(def ^:private sql:get-files-by-modified-at
"SELECT id, features
row_number() OVER (ORDER BY modified_at DESC) AS rown
FROM file
WHERE deleted_at IS NULL
ORDER BY modified_at DESC")
(def ^:private sql:get-files-by-graphics
"WITH files AS (
SELECT f.id, f.features,
row_number() OVER (ORDER BY modified_at) AS rown,
(SELECT count(*) FROM file_media_object AS fmo
WHERE fmo.mtype = 'image/svg+xml'
AND fmo.is_local = false
AND fmo.file_id = f.id) AS graphics
FROM file AS f
WHERE f.deleted_at IS NULL
ORDER BY 3 ASC
) SELECT * FROM files %(pred)s")
(defn- read-pred
[entries]
(let [entries (if (and (vector? entries)
(keyword? (first entries)))
[entries]
entries)]
(loop [params []
queries []
entries (seq entries)]
(if-let [[op val field] (first entries)]
(let [field (name field)
cond (case op
:lt (str/ffmt "% < ?" field)
:lte (str/ffmt "% <= ?" field)
:gt (str/ffmt "% > ?" field)
:gte (str/ffmt "% >= ?" field)
:eq (str/ffmt "% = ?" field))]
(recur (conj params val)
(conj queries cond)
(rest entries)))
(let [sql (apply str "WHERE " (str/join " AND " queries))]
(apply vector sql params))))))
(defn- get-teams
[conn query pred]
(let [query (d/nilv query :created-at)
sql (case query
:created-at sql:get-teams-by-created-at
:activity sql:get-teams-by-activity
:graphics sql:get-teams-by-graphics)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
pred-params))
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2"))))))
(defn- get-files
[conn query pred]
(let [query (d/nilv query :created-at)
sql (case query
:created-at sql:get-files-by-created-at
:modified-at sql:get-files-by-modified-at
:graphics sql:get-files-by-graphics)
sql (if pred
(let [[pred-sql & pred-params] (read-pred pred)]
(apply vector
(str/format sql {:pred pred-sql})
pred-params))
[(str/format sql {:pred ""})])]
(->> (db/cursor conn sql {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2"))))))
[conn]
(->> (db/cursor conn [sql:get-files-by-created-at] {:chunk-size 500})
(map feat/decode-row)
(remove (fn [{:keys [features]}]
(contains? features "components/v2")))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
@ -244,8 +86,6 @@
stats (atom {})
tpoint (dt/tpoint)]
(add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats
feat/*cache* cache]
(try
@ -265,127 +105,6 @@
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end" :rollback rollback? :elapsed elapsed)))))))
(defn migrate-teams!
"A REPL helper for migrate all teams.
This function starts multiple concurrent team migration processes
until the maximum number of jobs is reached which by default has the
value of `1`. This is controled with the `:max-jobs` option.
If you want to run this on multiple machines you will need to specify
the total number of partitions and the current partition.
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache skip-on-graphic-error?
label partitions current-partition]
:or {validate? false
rollback? true
max-jobs 1
current-partition 1
skip-on-graphic-error? true
max-items Long/MAX_VALUE}}]
(when (int? partitions)
(when-not (int? current-partition)
(throw (IllegalArgumentException. "missing `current-partition` parameter")))
(when-not (<= 0 current-partition partitions)
(throw (IllegalArgumentException. "invalid value on `current-partition` parameter"))))
(let [stats (atom {})
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
migrate-team
(fn [team-id]
(try
(db/tx-run! (assoc main/system ::db/rollback rollback?)
(fn [system]
(db/exec-one! system ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(feat/migrate-team! system team-id
:label label
:validate? validate?
:skip-on-graphic-error? skip-on-graphic-error?)))
(catch Throwable cause
(l/wrn :hint "unexpected error on processing team (skiping)"
:team-id (str team-id))
(events/tap :error
(ex-info "unexpected error on processing team (skiping)"
{:team-id team-id}
cause))
(swap! stats update :errors (fnil inc 0)))
(finally
(ps/release! sjobs))))
process-team
(fn [team-id]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(do
(l/inf :hint "max time constraint reached"
:team-id (str team-id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-team team-id)))))]
(l/dbg :hint "migrate:start"
:label label
:rollback rollback?
:max-jobs max-jobs
:max-items max-items)
(add-watch stats :progress-report (report-progress-teams tpoint))
(binding [feat/*stats* stats
feat/*cache* cache
svgo/*semaphore* sprocs]
(try
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
(db/exec! conn ["SET LOCAL statement_timeout = 0"])
(db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-team
(->> (get-teams conn query pred)
(filter (fn [{:keys [rown]}]
(if (int? partitions)
(= current-partition (inc (mod rown partitions)))
true)))
(map :id)
(take max-items)))
;; Close and await tasks
(pu/close! executor)))
(-> (deref stats)
(assoc :elapsed (dt/format-duration (tpoint))))
(catch Throwable cause
(l/dbg :hint "migrate:error" :cause cause)
(events/tap :error cause))
(finally
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "migrate:end"
:rollback rollback?
:elapsed elapsed)))))))
(defn migrate-files!
"A REPL helper for migrate all files.
@ -399,8 +118,8 @@
In order to get the report table populated, you will need to provide
a correct `:label`. That label is also used for persist a file
snaphot before continue with the migration."
[& {:keys [max-jobs max-items max-time rollback? validate? query
pred max-procs cache skip-on-graphic-error?
[& {:keys [max-jobs max-items rollback? validate?
cache skip-on-graphic-error?
label partitions current-partition]
:or {validate? false
rollback? true
@ -417,14 +136,10 @@
(let [stats (atom {})
tpoint (dt/tpoint)
mtime (some-> max-time dt/duration)
factory (px/thread-factory :virtual false :prefix "penpot/migration/")
executor (px/cached-executor :factory factory)
max-procs (or max-procs max-jobs)
sjobs (ps/create :permits max-jobs)
sprocs (ps/create :permits max-procs)
migrate-file
(fn [file-id rown]
@ -455,16 +170,7 @@
process-file
(fn [{:keys [id rown]}]
(ps/acquire! sjobs)
(let [ts (tpoint)]
(if (and mtime (neg? (compare mtime ts)))
(do
(l/inf :hint "max time constraint reached"
:file-id (str id)
:elapsed (dt/format-duration ts))
(ps/release! sjobs)
(reduced nil))
(px/run! executor (partial migrate-file id rown)))))]
(px/run! executor (partial migrate-file id rown)))]
(l/dbg :hint "migrate:start"
:label label
@ -472,11 +178,8 @@
:max-jobs max-jobs
:max-items max-items)
(add-watch stats :progress-report (report-progress-files tpoint))
(binding [feat/*stats* stats
feat/*cache* cache
svgo/*semaphore* sprocs]
feat/*cache* cache]
(try
(db/tx-run! main/system
(fn [{:keys [::db/conn] :as system}]
@ -484,7 +187,7 @@
(db/exec! conn ["SET LOCAL idle_in_transaction_session_timeout = 0"])
(run! process-file
(->> (get-files conn query pred)
(->> (get-files conn)
(filter (fn [{:keys [rown] :as row}]
(if (int? partitions)
(= current-partition (inc (mod rown partitions)))
@ -601,17 +304,3 @@
(let [elapsed (dt/format-duration (tpoint))]
(l/dbg :hint "populate:end"
:elapsed elapsed))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FILE PROCESS HELPERS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn delete-broken-files
[{:keys [id data] :as file}]
(if (-> data :options :components-v2 true?)
(do
(l/wrn :hint "found old components-v2 format"
:file-id (str id)
:file-name (:name file))
(assoc file :deleted-at (dt/now)))
file))

View file

@ -79,6 +79,7 @@
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC
FOR UPDATE
SKIP LOCKED")

View file

@ -210,6 +210,9 @@
:project-id (str project-id)
:deleted-at (dt/format-instant deleted-at))
;; NOTE: fragments not handled here because they have
;; cascade.
;; And finally, permanently delete the file.
(db/delete! conn :file {:id id})
@ -230,7 +233,6 @@
(inc total))
0)))
(def ^:private sql:get-file-thumbnails
"SELECT file_id, revn, media_id, deleted_at
FROM file_thumbnail