mirror of
https://github.com/penpot/penpot.git
synced 2025-05-30 05:16:10 +02:00
♻️ Integrate new storage subsystem.
This commit is contained in:
parent
3d88749976
commit
ab944fb9ae
48 changed files with 950 additions and 632 deletions
|
@ -16,39 +16,39 @@
|
|||
[app.common.uuid :as uuid]
|
||||
[app.config :as cfg]
|
||||
[app.db :as db]
|
||||
[app.storage.db :as sdb]
|
||||
[app.storage.fs :as sfs]
|
||||
[app.storage.impl :as impl]
|
||||
[app.storage.s3 :as ss3]
|
||||
[app.storage.db :as sdb]
|
||||
[app.util.time :as dt]
|
||||
[lambdaisland.uri :as u]
|
||||
[app.worker :as wrk]
|
||||
[clojure.spec.alpha :as s]
|
||||
[cuerdas.core :as str]
|
||||
[integrant.core :as ig]))
|
||||
[integrant.core :as ig]
|
||||
[lambdaisland.uri :as u]
|
||||
[promesa.exec :as px]))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Storage Module State
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(declare handler)
|
||||
|
||||
(s/def ::backend ::us/keyword)
|
||||
(s/def ::backends
|
||||
(s/map-of ::us/keyword
|
||||
(s/or :s3 ::ss3/backend
|
||||
:fs ::sfs/backend
|
||||
:db ::sdb/backend)))
|
||||
(s/map-of ::us/keyword (s/or :s3 (s/nilable ::ss3/backend)
|
||||
:fs (s/nilable ::sfs/backend)
|
||||
:db (s/nilable ::sdb/backend))))
|
||||
|
||||
(defmethod ig/pre-init-spec ::storage [_]
|
||||
(s/keys :req-un [::backend ::db/pool ::backends]))
|
||||
(s/keys :req-un [::backend ::wrk/executor ::db/pool ::backends]))
|
||||
|
||||
(defmethod ig/prep-key ::storage
|
||||
[_ {:keys [backends] :as cfg}]
|
||||
(assoc cfg :backends (d/without-nils backends)))
|
||||
(-> (d/without-nils cfg)
|
||||
(assoc :backends (d/without-nils backends))))
|
||||
|
||||
(defmethod ig/init-key ::storage
|
||||
[_ {:keys [backends] :as cfg}]
|
||||
(assoc cfg :handler (partial handler cfg)))
|
||||
[_ cfg]
|
||||
cfg)
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Database Objects
|
||||
|
@ -63,25 +63,35 @@
|
|||
returning *")
|
||||
|
||||
(defn- create-database-object
|
||||
[conn backend {:keys [content] :as object}]
|
||||
(let [id (uuid/next)
|
||||
mdata (dissoc object :content)
|
||||
result (db/exec-one! conn [sql:insert-storage-object id
|
||||
(count content)
|
||||
(name backend)
|
||||
(db/tjson mdata)])]
|
||||
(StorageObject. (:id result)
|
||||
(:size result)
|
||||
(:created-at result)
|
||||
backend
|
||||
mdata
|
||||
nil)))
|
||||
[{:keys [conn backend]} {:keys [content] :as object}]
|
||||
(if (instance? StorageObject object)
|
||||
(let [id (uuid/random)
|
||||
mdata (meta object)
|
||||
result (db/exec-one! conn [sql:insert-storage-object id
|
||||
(:size object)
|
||||
(name (:backend object))
|
||||
(db/tjson mdata)])]
|
||||
(assoc object
|
||||
:id (:id result)
|
||||
:created-at (:created-at result)))
|
||||
(let [id (uuid/random)
|
||||
mdata (dissoc object :content)
|
||||
result (db/exec-one! conn [sql:insert-storage-object id
|
||||
(count content)
|
||||
(name backend)
|
||||
(db/tjson mdata)])]
|
||||
(StorageObject. (:id result)
|
||||
(:size result)
|
||||
(:created-at result)
|
||||
backend
|
||||
mdata
|
||||
nil))))
|
||||
|
||||
(def ^:private sql:retrieve-storage-object
|
||||
"select * from storage_object where id = ? and deleted_at is null")
|
||||
|
||||
(defn- retrieve-database-object
|
||||
[conn id]
|
||||
[{:keys [conn] :as storage} id]
|
||||
(when-let [res (db/exec-one! conn [sql:retrieve-storage-object id])]
|
||||
(let [mdata (some-> (:metadata res) (db/decode-transit-pgobject))]
|
||||
(StorageObject. (:id res)
|
||||
|
@ -95,107 +105,90 @@
|
|||
"update storage_object set deleted_at=now() where id=? and deleted_at is null")
|
||||
|
||||
(defn- delete-database-object
|
||||
[conn id]
|
||||
[{:keys [conn] :as storage} id]
|
||||
(let [result (db/exec-one! conn [sql:delete-storage-object id])]
|
||||
(pos? (:next.jdbc/update-count result))))
|
||||
|
||||
(defn- register-recheck
|
||||
[{:keys [pool] :as storage} backend id]
|
||||
(db/insert! pool :storage-pending {:id id :backend (name backend)}))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; API
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(declare resolve-backend)
|
||||
|
||||
(defn content-object
|
||||
([data] (impl/content-object data nil))
|
||||
([data size] (impl/content-object data size)))
|
||||
(defn content
|
||||
([data] (impl/content data nil))
|
||||
([data size] (impl/content data size)))
|
||||
|
||||
(defn get-object
|
||||
[{:keys [conn pool]} id]
|
||||
(let [id (impl/coerce-id id)]
|
||||
(retrieve-database-object (or conn pool) id)))
|
||||
[{:keys [conn pool] :as storage} id]
|
||||
(-> (assoc storage :conn (or conn pool))
|
||||
(retrieve-database-object id)))
|
||||
|
||||
(defn put-object
|
||||
[{:keys [pool conn backend] :as storage} {:keys [content] :as object}]
|
||||
(us/assert impl/content-object? content)
|
||||
(let [conn (or conn pool)
|
||||
object (create-database-object conn backend object)]
|
||||
[{:keys [pool conn backend executor] :as storage} {:keys [content] :as object}]
|
||||
(us/assert impl/content? content)
|
||||
(let [storage (assoc storage :conn (or conn pool))
|
||||
object (create-database-object storage object)]
|
||||
|
||||
;; Schedule to execute in background; in an other transaction and
|
||||
;; register the currently created storage object id for a later
|
||||
;; recheck.
|
||||
(px/run! executor #(register-recheck storage backend (:id object)))
|
||||
|
||||
;; Store the data finally on the underlying storage subsystem.
|
||||
(-> (resolve-backend storage backend)
|
||||
(assoc :conn conn)
|
||||
(impl/put-object object content))
|
||||
|
||||
object))
|
||||
|
||||
(defn clone-object
|
||||
[{:keys [pool conn executor] :as storage} object]
|
||||
(let [storage (assoc storage :conn (or conn pool))
|
||||
object* (create-database-object storage object)]
|
||||
|
||||
(with-open [input (-> (resolve-backend storage (:backend object))
|
||||
(impl/get-object-data object))]
|
||||
(-> (resolve-backend storage (:backend storage))
|
||||
(impl/put-object object* (impl/content input (:size object))))
|
||||
|
||||
object*)))
|
||||
|
||||
(defn get-object-data
|
||||
[{:keys [pool conn] :as storage} object]
|
||||
(-> (resolve-backend storage (:backend object))
|
||||
(assoc :conn (or conn pool))
|
||||
(impl/get-object object)))
|
||||
(-> (assoc storage :conn (or conn pool))
|
||||
(resolve-backend (:backend object))
|
||||
(impl/get-object-data object)))
|
||||
|
||||
(defn get-object-url
|
||||
([storage object]
|
||||
(get-object-url storage object nil))
|
||||
([storage object options]
|
||||
([{:keys [conn pool] :as storage} object options]
|
||||
;; As this operation does not need the database connection, the
|
||||
;; assoc of the conn to backend is ommited.
|
||||
(-> (resolve-backend storage (:backend object))
|
||||
(-> (assoc storage :conn (or conn pool))
|
||||
(resolve-backend (:backend object))
|
||||
(impl/get-object-url object options))))
|
||||
|
||||
(defn del-object
|
||||
[{:keys [conn pool]} id]
|
||||
(let [conn (or conn pool)]
|
||||
(delete-database-object conn id)))
|
||||
[{:keys [conn pool] :as storage} id]
|
||||
(-> (assoc storage :conn (or conn pool))
|
||||
(delete-database-object id)))
|
||||
|
||||
;; --- impl
|
||||
|
||||
(defn- resolve-backend
|
||||
[storage backend]
|
||||
(let [backend* (get-in storage [:backends backend])]
|
||||
(when-not backend*
|
||||
(defn resolve-backend
|
||||
[{:keys [conn] :as storage} backend-id]
|
||||
(us/assert some? conn)
|
||||
(let [backend (get-in storage [:backends backend-id])]
|
||||
(when-not backend
|
||||
(ex/raise :type :internal
|
||||
:code :backend-not-configured
|
||||
:hint (str/fmt "backend '%s' not configured" backend)))
|
||||
backend*))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; HTTP Handler
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def cache-max-age
|
||||
(dt/duration {:hours 24}))
|
||||
|
||||
(def signature-max-age
|
||||
(dt/duration {:hours 24 :minutes 15}))
|
||||
|
||||
(defn- handler
|
||||
[storage request]
|
||||
(let [id (get-in request [:path-params :id])
|
||||
obj (get-object storage id)]
|
||||
(if obj
|
||||
(let [mdata (meta obj)
|
||||
backend (resolve-backend storage (:backend obj))]
|
||||
(case (:type backend)
|
||||
:db
|
||||
{:status 200
|
||||
:headers {"content-type" (:content-type mdata)
|
||||
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
|
||||
:body (get-object-data storage obj)}
|
||||
|
||||
:s3
|
||||
(let [url (get-object-url storage obj {:max-age signature-max-age})]
|
||||
{:status 307
|
||||
:headers {"location" (str url)
|
||||
"x-host" (:host url)
|
||||
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
|
||||
:body ""})
|
||||
|
||||
:fs
|
||||
(let [url (get-object-url storage obj)]
|
||||
{:status 200
|
||||
:headers {"x-accel-redirect" (:path url)
|
||||
"content-type" (:content-type mdata)
|
||||
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
|
||||
:body ""})))
|
||||
{:status 404
|
||||
:body ""})))
|
||||
:hint (str/fmt "backend '%s' not configured" backend-id)))
|
||||
(assoc backend :conn conn)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Garbage Collection Task
|
||||
|
@ -241,3 +234,49 @@
|
|||
returning *;")
|
||||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Recheck Stalled Task
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(declare sql:retrieve-pending)
|
||||
(declare sql:exists-storage-object)
|
||||
|
||||
(defmethod ig/pre-init-spec ::recheck-task [_]
|
||||
(s/keys :req-un [::storage ::db/pool]))
|
||||
|
||||
(defmethod ig/init-key ::recheck-task
|
||||
[_ {:keys [pool storage] :as cfg}]
|
||||
(letfn [(retrieve-pending [conn]
|
||||
(->> (db/exec! conn [sql:retrieve-pending])
|
||||
(map (fn [{:keys [backend] :as row}]
|
||||
(assoc row :backend (keyword backend))))
|
||||
(seq)))
|
||||
|
||||
(exists-on-database? [conn id]
|
||||
(:exists (db/exec-one! conn [sql:exists-storage-object id])))
|
||||
|
||||
(recheck-item [conn {:keys [id backend]}]
|
||||
(when-not (exists-on-database? conn id)
|
||||
(let [backend (resolve-backend storage backend)
|
||||
backend (assoc backend :conn conn)]
|
||||
(impl/del-objects-in-bulk backend [id]))))]
|
||||
|
||||
(fn [task]
|
||||
(db/with-atomic [conn pool]
|
||||
(loop [items (retrieve-pending conn)]
|
||||
(when items
|
||||
(run! (partial recheck-item conn) items)
|
||||
(recur (retrieve-pending conn))))))))
|
||||
|
||||
(def sql:retrieve-pending
|
||||
"with items_part as (
|
||||
select s.id from storage_pending as s
|
||||
order by s.created_at
|
||||
limit 100
|
||||
)
|
||||
delete from storage_pending
|
||||
where id in (select id from items_part)
|
||||
returning *;")
|
||||
|
||||
(def sql:exists-storage-object
|
||||
"select exists (select id from storage_object where id = ?) as exists")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue