♻️ Refactor storage and assets related modules

- improve internal error handling
- add more specs and more asserts
This commit is contained in:
Andrey Antukh 2023-02-06 12:27:53 +01:00
parent 4b4f78b4cc
commit ab3b9cba45
20 changed files with 547 additions and 511 deletions

View file

@ -9,7 +9,9 @@
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uri :as u]
[app.storage :as-alias sto]
[app.storage.impl :as impl]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.fs :as fs]
@ -28,42 +30,49 @@
(s/def ::directory ::us/string)
(defmethod ig/pre-init-spec ::backend [_]
(s/keys :opt-un [::directory]))
(s/keys :opt [::directory]))
(defmethod ig/init-key ::backend
[_ cfg]
;; Return a valid backend data structure only if all optional
;; parameters are provided.
(when (string? (:directory cfg))
(let [dir (fs/normalize (:directory cfg))]
(when (string? (::directory cfg))
(let [dir (fs/normalize (::directory cfg))]
(assoc cfg
:type :fs
:directory (str dir)
:uri (u/uri (str "file://" dir))))))
::sto/type :fs
::directory (str dir)
::uri (u/uri (str "file://" dir))))))
(s/def ::type ::us/keyword)
(s/def ::uri u/uri?)
(s/def ::backend
(s/keys :req-un [::type ::directory ::uri]))
(s/keys :req [::directory
::uri]
:opt [::sto/type
::sto/id
::wrk/executor]))
;; --- API IMPL
(defmethod impl/put-object :fs
[{:keys [executor] :as backend} {:keys [id] :as object} content]
[{:keys [::wrk/executor] :as backend} {:keys [id] :as object} content]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [base (fs/path (:directory backend))
(let [base (fs/path (::directory backend))
path (fs/path (impl/id->path id))
full (fs/normalize (fs/join base path))]
(when-not (fs/exists? (fs/parent full))
(fs/create-dir (fs/parent full)))
(with-open [^InputStream src (io/input-stream content)
^OutputStream dst (io/output-stream full)]
(io/copy! src dst)))))
(io/copy! src dst))
object)))
(defmethod impl/get-object-data :fs
[{:keys [executor] :as backend} {:keys [id] :as object}]
[{:keys [::wrk/executor] :as backend} {:keys [id] :as object}]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [^Path base (fs/path (:directory backend))
(let [^Path base (fs/path (::directory backend))
^Path path (fs/path (impl/id->path id))
^Path full (fs/normalize (fs/join base path))]
(when-not (fs/exists? full)
@ -74,33 +83,37 @@
(defmethod impl/get-object-bytes :fs
[backend object]
(p/let [input (impl/get-object-data backend object)]
(try
(io/read-as-bytes input)
(finally
(io/close! input)))))
(->> (impl/get-object-data backend object)
(p/fmap (fn [input]
(try
(io/read-as-bytes input)
(finally
(io/close! input)))))))
(defmethod impl/get-object-url :fs
[{:keys [uri executor] :as backend} {:keys [id] :as object} _]
(px/with-dispatch executor
(update uri :path
(fn [existing]
(if (str/ends-with? existing "/")
(str existing (impl/id->path id))
(str existing "/" (impl/id->path id)))))))
[{:keys [::uri] :as backend} {:keys [id] :as object} _]
(us/assert! ::backend backend)
(p/resolved
(update uri :path
(fn [existing]
(if (str/ends-with? existing "/")
(str existing (impl/id->path id))
(str existing "/" (impl/id->path id)))))))
(defmethod impl/del-object :fs
[{:keys [executor] :as backend} {:keys [id] :as object}]
[{:keys [::wrk/executor] :as backend} {:keys [id] :as object}]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [base (fs/path (:directory backend))
(let [base (fs/path (::directory backend))
path (fs/path (impl/id->path id))
path (fs/join base path)]
(Files/deleteIfExists ^Path path))))
(defmethod impl/del-objects-in-bulk :fs
[{:keys [executor] :as backend} ids]
[{:keys [::wrk/executor] :as backend} ids]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [base (fs/path (:directory backend))]
(let [base (fs/path (::directory backend))]
(doseq [id ids]
(let [path (fs/path (impl/id->path id))
path (fs/join base path)]

View file

@ -9,9 +9,13 @@
(:require
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.db :as-alias db]
[app.storage :as-alias sto]
[app.worker :as-alias wrk]
[buddy.core.codecs :as bc]
[buddy.core.hash :as bh]
[clojure.java.io :as jio]
[clojure.spec.alpha :as s]
[datoteka.io :as io])
(:import
java.nio.ByteBuffer
@ -21,7 +25,7 @@
;; --- API Definition
(defmulti put-object (fn [cfg _ _] (:type cfg)))
(defmulti put-object (fn [cfg _ _] (::sto/type cfg)))
(defmethod put-object :default
[cfg _ _]
@ -29,7 +33,7 @@
:code :invalid-storage-backend
:context cfg))
(defmulti get-object-data (fn [cfg _] (:type cfg)))
(defmulti get-object-data (fn [cfg _] (::sto/type cfg)))
(defmethod get-object-data :default
[cfg _]
@ -37,7 +41,7 @@
:code :invalid-storage-backend
:context cfg))
(defmulti get-object-bytes (fn [cfg _] (:type cfg)))
(defmulti get-object-bytes (fn [cfg _] (::sto/type cfg)))
(defmethod get-object-bytes :default
[cfg _]
@ -45,7 +49,7 @@
:code :invalid-storage-backend
:context cfg))
(defmulti get-object-url (fn [cfg _ _] (:type cfg)))
(defmulti get-object-url (fn [cfg _ _] (::sto/type cfg)))
(defmethod get-object-url :default
[cfg _ _]
@ -54,7 +58,7 @@
:context cfg))
(defmulti del-object (fn [cfg _] (:type cfg)))
(defmulti del-object (fn [cfg _] (::sto/type cfg)))
(defmethod del-object :default
[cfg _]
@ -62,7 +66,7 @@
:code :invalid-storage-backend
:context cfg))
(defmulti del-objects-in-bulk (fn [cfg _] (:type cfg)))
(defmulti del-objects-in-bulk (fn [cfg _] (::sto/type cfg)))
(defmethod del-objects-in-bulk :default
[cfg _]
@ -189,10 +193,6 @@
(make-output-stream [_ opts]
(jio/make-output-stream content opts))))
(defn content?
[v]
(satisfies? IContentObject v))
(defn calculate-hash
[resource]
(let [result (with-open [input (io/input-stream resource)]
@ -201,13 +201,37 @@
(str "blake2b:" result)))
(defn resolve-backend
[{:keys [conn pool executor] :as storage} backend-id]
(let [backend (get-in storage [:backends backend-id])]
[{:keys [::db/pool ::wrk/executor] :as storage} backend-id]
(let [backend (get-in storage [::sto/backends backend-id])]
(when-not backend
(ex/raise :type :internal
:code :backend-not-configured
:hint (dm/fmt "backend '%' not configured" backend-id)))
(assoc backend
:executor executor
:conn (or conn pool)
:id backend-id)))
(-> backend
(assoc ::sto/id backend-id)
(assoc ::wrk/executor executor)
(assoc ::db/pool pool))))
(defrecord StorageObject [id size created-at expired-at touched-at backend])
(ns-unmap *ns* '->StorageObject)
(ns-unmap *ns* 'map->StorageObject)
(defn storage-object
([id size created-at expired-at touched-at backend]
(StorageObject. id size created-at expired-at touched-at backend))
([id size created-at expired-at touched-at backend mdata]
(StorageObject. id size created-at expired-at touched-at backend mdata nil)))
(defn object?
[v]
(instance? StorageObject v))
(defn content?
[v]
(satisfies? IContentObject v))
(s/def ::object object?)
(s/def ::content content?)

View file

@ -8,9 +8,12 @@
"S3 Storage backend implementation."
(:require
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.uri :as u]
[app.storage :as-alias sto]
[app.storage.impl :as impl]
[app.storage.tmp :as tmp]
[app.util.time :as dt]
@ -64,6 +67,9 @@
(declare build-s3-client)
(declare build-s3-presigner)
;; (set! *warn-on-reflection* true)
;; (set! *unchecked-math* :warn-on-boxed)
;; --- BACKEND INIT
(s/def ::region ::us/keyword)
@ -72,26 +78,26 @@
(s/def ::endpoint ::us/string)
(defmethod ig/pre-init-spec ::backend [_]
(s/keys :opt-un [::region ::bucket ::prefix ::endpoint ::wrk/executor]))
(s/keys :opt [::region ::bucket ::prefix ::endpoint ::wrk/executor]))
(defmethod ig/prep-key ::backend
[_ {:keys [prefix region] :as cfg}]
[_ {:keys [::prefix ::region] :as cfg}]
(cond-> (d/without-nils cfg)
(some? prefix) (assoc :prefix prefix)
(nil? region) (assoc :region :eu-central-1)))
(some? prefix) (assoc ::prefix prefix)
(nil? region) (assoc ::region :eu-central-1)))
(defmethod ig/init-key ::backend
[_ cfg]
;; Return a valid backend data structure only if all optional
;; parameters are provided.
(when (and (contains? cfg :region)
(string? (:bucket cfg)))
(when (and (contains? cfg ::region)
(string? (::bucket cfg)))
(let [client (build-s3-client cfg)
presigner (build-s3-presigner cfg)]
(assoc cfg
:client @client
:presigner presigner
:type :s3
::sto/type :s3
::client @client
::presigner presigner
::close-fn #(.close ^java.lang.AutoCloseable client)))))
(defmethod ig/halt-key! ::backend
@ -99,21 +105,27 @@
(when (fn? close-fn)
(px/run! close-fn)))
(s/def ::type ::us/keyword)
(s/def ::client #(instance? S3AsyncClient %))
(s/def ::presigner #(instance? S3Presigner %))
(s/def ::backend
(s/keys :req-un [::region ::bucket ::client ::type ::presigner]
:opt-un [::prefix]))
(s/keys :req [::region
::bucket
::client
::presigner]
:opt [::prefix
::sto/id
::wrk/executor]))
;; --- API IMPL
(defmethod impl/put-object :s3
[backend object content]
(us/assert! ::backend backend)
(put-object backend object content))
(defmethod impl/get-object-data :s3
[backend object]
(us/assert! ::backend backend)
(letfn [(no-such-key? [cause]
(instance? software.amazon.awssdk.services.s3.model.NoSuchKeyException cause))
(handle-not-found [cause]
@ -127,18 +139,22 @@
(defmethod impl/get-object-bytes :s3
[backend object]
(us/assert! ::backend backend)
(get-object-bytes backend object))
(defmethod impl/get-object-url :s3
[backend object options]
(us/assert! ::backend backend)
(get-object-url backend object options))
(defmethod impl/del-object :s3
[backend object]
(us/assert! ::backend backend)
(del-object backend object))
(defmethod impl/del-objects-in-bulk :s3
[backend ids]
(us/assert! ::backend backend)
(del-object-in-bulk backend ids))
;; --- HELPERS
@ -152,8 +168,8 @@
[region]
(Region/of (name region)))
(defn build-s3-client
[{:keys [region endpoint executor]}]
(defn- build-s3-client
[{:keys [::region ::endpoint ::wrk/executor]}]
(let [aconfig (-> (ClientAsyncConfiguration/builder)
(.advancedOption SdkAdvancedAsyncClientOption/FUTURE_COMPLETION_EXECUTOR executor)
(.build))
@ -188,8 +204,8 @@
(.close ^NettyNioAsyncHttpClient hclient)
(.close ^S3AsyncClient client)))))
(defn build-s3-presigner
[{:keys [region endpoint]}]
(defn- build-s3-presigner
[{:keys [::region ::endpoint]}]
(let [config (-> (S3Configuration/builder)
(cond-> (some? endpoint) (.pathStyleAccessEnabled true))
(.build))]
@ -200,65 +216,87 @@
(.serviceConfiguration ^S3Configuration config)
(.build))))
(defn- upload-thread
[id subscriber sem content]
(px/thread
{:name "penpot/s3/uploader"
:daemon true}
(l/trace :hint "start upload thread"
:object-id (str id)
:size (impl/get-size content)
::l/sync? true)
(let [stream (io/input-stream content)
bsize (* 1024 64)
tpoint (dt/tpoint)]
(try
(loop []
(.acquire ^Semaphore sem 1)
(let [buffer (byte-array bsize)
readed (.read ^InputStream stream buffer)]
(when (pos? readed)
(let [data (ByteBuffer/wrap ^bytes buffer 0 readed)]
(.onNext ^Subscriber subscriber ^ByteBuffer data)
(when (= readed bsize)
(recur))))))
(.onComplete ^Subscriber subscriber)
(catch InterruptedException _
(l/trace :hint "interrupted upload thread"
:object-:id (str id)
::l/sync? true)
nil)
(catch Throwable cause
(.onError ^Subscriber subscriber cause))
(finally
(l/trace :hint "end upload thread"
:object-id (str id)
:elapsed (dt/format-duration (tpoint))
::l/sync? true)
(.close ^InputStream stream))))))
(defn- make-request-body
[content]
(let [is (io/input-stream content)
buff-size (* 1024 64)
sem (Semaphore. 0)
[id content]
(reify
AsyncRequestBody
(contentLength [_]
(Optional/of (long (impl/get-size content))))
writer-fn (fn [^Subscriber s]
(try
(loop []
(.acquire sem 1)
(let [buffer (byte-array buff-size)
readed (.read is buffer)]
(when (pos? readed)
(.onNext ^Subscriber s (ByteBuffer/wrap buffer 0 readed))
(when (= readed buff-size)
(recur)))))
(.onComplete s)
(catch Throwable cause
(.onError s cause))
(finally
(.close ^InputStream is))))]
(reify
AsyncRequestBody
(contentLength [_]
(Optional/of (long (impl/get-size content))))
(^void subscribe [_ ^Subscriber s]
(let [thread (Thread. #(writer-fn s))]
(.setDaemon thread true)
(.setName thread "penpot/storage:s3")
(.start thread)
(.onSubscribe s (reify Subscription
(cancel [_]
(.interrupt thread)
(.release sem 1))
(request [_ n]
(.release sem (int n))))))))))
(^void subscribe [_ ^Subscriber subscriber]
(let [sem (Semaphore. 0)
thr (upload-thread id subscriber sem content)]
(.onSubscribe subscriber
(reify Subscription
(cancel [_]
(px/interrupt! thr)
(.release sem 1))
(request [_ n]
(.release sem (int n)))))))))
(defn put-object
[{:keys [client bucket prefix]} {:keys [id] :as object} content]
(p/let [path (str prefix (impl/id->path id))
mdata (meta object)
mtype (:content-type mdata "application/octet-stream")
request (.. (PutObjectRequest/builder)
(bucket bucket)
(contentType mtype)
(key path)
(build))]
(defn- put-object
[{:keys [::client ::bucket ::prefix]} {:keys [id] :as object} content]
(let [path (dm/str prefix (impl/id->path id))
mdata (meta object)
mtype (:content-type mdata "application/octet-stream")
rbody (make-request-body id content)
request (.. (PutObjectRequest/builder)
(bucket bucket)
(contentType mtype)
(key path)
(build))]
(->> (.putObject ^S3AsyncClient client
^PutObjectRequest request
^AsyncRequestBody rbody)
(p/fmap (constantly object)))))
(let [content (make-request-body content)]
(.putObject ^S3AsyncClient client
^PutObjectRequest request
^AsyncRequestBody content))))
(defn- path->stream
[path]
(proxy [FilterInputStream] [(io/input-stream path)]
(close []
(fs/delete path)
(proxy-super close))))
(defn get-object-data
[{:keys [client bucket prefix]} {:keys [id size]}]
(defn- get-object-data
[{:keys [::client ::bucket ::prefix]} {:keys [id size]}]
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
@ -267,83 +305,83 @@
;; If the file size is greater than 2MiB then stream the content
;; to the filesystem and then read with buffered inputstream; if
;; not, read the contento into memory using bytearrays.
(if (> size (* 1024 1024 2))
(p/let [path (tmp/tempfile :prefix "penpot.storage.s3.")
rxf (AsyncResponseTransformer/toFile ^Path path)
_ (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)]
(proxy [FilterInputStream] [(io/input-stream path)]
(close []
(fs/delete path)
(proxy-super close))))
(if (> ^long size (* 1024 1024 2))
(let [path (tmp/tempfile :prefix "penpot.storage.s3.")
rxf (AsyncResponseTransformer/toFile ^Path path)]
(->> (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)
(p/fmap (constantly path))
(p/fmap path->stream)))
(p/let [rxf (AsyncResponseTransformer/toBytes)
obj (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)]
(.asInputStream ^ResponseBytes obj)))))
(let [rxf (AsyncResponseTransformer/toBytes)]
(->> (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)
(p/fmap #(.asInputStream ^ResponseBytes %)))))))
(defn get-object-bytes
[{:keys [client bucket prefix]} {:keys [id]}]
(p/let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
rxf (AsyncResponseTransformer/toBytes)
obj (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)]
(.asByteArray ^ResponseBytes obj)))
(defn- get-object-bytes
[{:keys [::client ::bucket ::prefix]} {:keys [id]}]
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
rxf (AsyncResponseTransformer/toBytes)]
(->> (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)
(p/fmap #(.asByteArray ^ResponseBytes %)))))
(def default-max-age
(dt/duration {:minutes 10}))
(defn get-object-url
[{:keys [presigner bucket prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}]
(defn- get-object-url
[{:keys [::presigner ::bucket ::prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}]
(us/assert dt/duration? max-age)
(p/do
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
gopr (.. (GetObjectPresignRequest/builder)
(signatureDuration ^Duration max-age)
(getObjectRequest ^GetObjectRequest gor)
(build))
pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)]
(u/uri (str (.url ^PresignedGetObjectRequest pgor))))))
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (dm/str prefix (impl/id->path id)))
(build))
gopr (.. (GetObjectPresignRequest/builder)
(signatureDuration ^Duration max-age)
(getObjectRequest ^GetObjectRequest gor)
(build))
pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)]
(p/resolved
(u/uri (str (.url ^PresignedGetObjectRequest pgor))))))
(defn del-object
[{:keys [bucket client prefix]} {:keys [id] :as obj}]
(p/let [dor (.. (DeleteObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))]
(.deleteObject ^S3AsyncClient client
^DeleteObjectRequest dor)))
(defn- del-object
[{:keys [::bucket ::client ::prefix]} {:keys [id] :as obj}]
(let [dor (.. (DeleteObjectRequest/builder)
(bucket bucket)
(key (dm/str prefix (impl/id->path id)))
(build))]
(->> (.deleteObject ^S3AsyncClient client ^DeleteObjectRequest dor)
(p/fmap (constantly nil)))))
(defn del-object-in-bulk
[{:keys [bucket client prefix]} ids]
(p/let [oids (map (fn [id]
(.. (ObjectIdentifier/builder)
(key (str prefix (impl/id->path id)))
(build)))
ids)
delc (.. (Delete/builder)
(objects ^Collection oids)
(build))
dor (.. (DeleteObjectsRequest/builder)
(bucket bucket)
(delete ^Delete delc)
(build))
dres (.deleteObjects ^S3AsyncClient client
^DeleteObjectsRequest dor)]
(when (.hasErrors ^DeleteObjectsResponse dres)
(let [errors (seq (.errors ^DeleteObjectsResponse dres))]
(ex/raise :type :internal
:code :error-on-s3-bulk-delete
:s3-errors (mapv (fn [^S3Error error]
{:key (.key error)
:msg (.message error)})
errors))))))
(defn- del-object-in-bulk
[{:keys [::bucket ::client ::prefix]} ids]
(let [oids (map (fn [id]
(.. (ObjectIdentifier/builder)
(key (str prefix (impl/id->path id)))
(build)))
ids)
delc (.. (Delete/builder)
(objects ^Collection oids)
(build))
dor (.. (DeleteObjectsRequest/builder)
(bucket bucket)
(delete ^Delete delc)
(build))]
(->> (.deleteObjects ^S3AsyncClient client ^DeleteObjectsRequest dor)
(p/fmap (fn [dres]
(when (.hasErrors ^DeleteObjectsResponse dres)
(let [errors (seq (.errors ^DeleteObjectsResponse dres))]
(ex/raise :type :internal
:code :error-on-s3-bulk-delete
:s3-errors (mapv (fn [^S3Error error]
{:key (.key error)
:msg (.message error)})
errors)))))))))