♻️ Refactor storage internal concurrency model

This commit is contained in:
Andrey Antukh 2023-03-03 14:05:26 +01:00
parent aafbf6bc15
commit dfdc9c9fa5
16 changed files with 261 additions and 290 deletions

View file

@ -6,22 +6,18 @@
(ns app.storage.fs
(:require
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uri :as u]
[app.storage :as-alias sto]
[app.storage.impl :as impl]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.fs :as fs]
[datoteka.io :as io]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px])
[integrant.core :as ig])
(:import
java.io.InputStream
java.io.OutputStream
java.nio.file.Path
java.nio.file.Files))
@ -48,74 +44,66 @@
(s/keys :req [::directory
::uri]
:opt [::sto/type
::sto/id
::wrk/executor]))
::sto/id]))
;; --- API IMPL
(defmethod impl/put-object :fs
[{:keys [::wrk/executor] :as backend} {:keys [id] :as object} content]
[backend {:keys [id] :as object} content]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [base (fs/path (::directory backend))
path (fs/path (impl/id->path id))
full (fs/normalize (fs/join base path))]
(when-not (fs/exists? (fs/parent full))
(fs/create-dir (fs/parent full)))
(with-open [^InputStream src (io/input-stream content)
^OutputStream dst (io/output-stream full)]
(io/copy! src dst))
(let [base (fs/path (::directory backend))
path (fs/path (impl/id->path id))
full (fs/normalize (fs/join base path))]
object)))
(when-not (fs/exists? (fs/parent full))
(fs/create-dir (fs/parent full)))
(dm/with-open [src (io/input-stream content)
dst (io/output-stream full)]
(io/copy! src dst))
object))
(defmethod impl/get-object-data :fs
[{:keys [::wrk/executor] :as backend} {:keys [id] :as object}]
[backend {:keys [id] :as object}]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [^Path base (fs/path (::directory backend))
^Path path (fs/path (impl/id->path id))
^Path full (fs/normalize (fs/join base path))]
(when-not (fs/exists? full)
(ex/raise :type :internal
:code :filesystem-object-does-not-exists
:path (str full)))
(io/input-stream full))))
(let [^Path base (fs/path (::directory backend))
^Path path (fs/path (impl/id->path id))
^Path full (fs/normalize (fs/join base path))]
(when-not (fs/exists? full)
(ex/raise :type :internal
:code :filesystem-object-does-not-exists
:path (str full)))
(io/input-stream full)))
(defmethod impl/get-object-bytes :fs
[backend object]
(->> (impl/get-object-data backend object)
(p/fmap (fn [input]
(try
(io/read-as-bytes input)
(finally
(io/close! input)))))))
(dm/with-open [input (impl/get-object-data backend object)]
(io/read-as-bytes input)))
(defmethod impl/get-object-url :fs
[{:keys [::uri] :as backend} {:keys [id] :as object} _]
(us/assert! ::backend backend)
(p/resolved
(update uri :path
(fn [existing]
(if (str/ends-with? existing "/")
(str existing (impl/id->path id))
(str existing "/" (impl/id->path id)))))))
(update uri :path
(fn [existing]
(if (str/ends-with? existing "/")
(str existing (impl/id->path id))
(str existing "/" (impl/id->path id))))))
(defmethod impl/del-object :fs
[{:keys [::wrk/executor] :as backend} {:keys [id] :as object}]
[backend {:keys [id] :as object}]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [base (fs/path (::directory backend))
path (fs/path (impl/id->path id))
path (fs/join base path)]
(Files/deleteIfExists ^Path path))))
(let [base (fs/path (::directory backend))
path (fs/path (impl/id->path id))
path (fs/join base path)]
(Files/deleteIfExists ^Path path)))
(defmethod impl/del-objects-in-bulk :fs
[{:keys [::wrk/executor] :as backend} ids]
[backend ids]
(us/assert! ::backend backend)
(px/with-dispatch executor
(let [base (fs/path (::directory backend))]
(doseq [id ids]
(let [path (fs/path (impl/id->path id))
path (fs/join base path)]
(Files/deleteIfExists ^Path path))))))
(let [base (fs/path (::directory backend))]
(doseq [id ids]
(let [path (fs/path (impl/id->path id))
path (fs/join base path)]
(Files/deleteIfExists ^Path path)))))

View file

@ -153,8 +153,8 @@
(content (.toPath ^java.io.File data) size)
(instance? String data)
(let [data (.getBytes data "UTF-8")]
(bytes->content data (alength data)))
(let [data (.getBytes ^String data "UTF-8")]
(bytes->content data (alength ^bytes data)))
(bytes? data)
(bytes->content data (or size (alength ^bytes data)))
@ -195,7 +195,7 @@
(defn calculate-hash
[resource]
(let [result (with-open [input (io/input-stream resource)]
(let [result (dm/with-open [input (io/input-stream resource)]
(-> (bh/blake2b-256 input)
(bc/bytes->hex)))]
(str "blake2b:" result)))

View file

@ -45,6 +45,7 @@
software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup
software.amazon.awssdk.regions.Region
software.amazon.awssdk.services.s3.S3AsyncClient
software.amazon.awssdk.services.s3.S3AsyncClientBuilder
software.amazon.awssdk.services.s3.S3Configuration
software.amazon.awssdk.services.s3.model.Delete
software.amazon.awssdk.services.s3.model.DeleteObjectRequest
@ -121,7 +122,7 @@
(defmethod impl/put-object :s3
[backend object content]
(us/assert! ::backend backend)
(put-object backend object content))
(p/await! (put-object backend object content)))
(defmethod impl/get-object-data :s3
[backend object]
@ -135,12 +136,13 @@
:cause cause))]
(-> (get-object-data backend object)
(p/catch no-such-key? handle-not-found))))
(p/catch no-such-key? handle-not-found)
(p/await!))))
(defmethod impl/get-object-bytes :s3
[backend object]
(us/assert! ::backend backend)
(get-object-bytes backend object))
(p/await! (get-object-bytes backend object)))
(defmethod impl/get-object-url :s3
[backend object options]
@ -150,12 +152,12 @@
(defmethod impl/del-object :s3
[backend object]
(us/assert! ::backend backend)
(del-object backend object))
(p/await! (del-object backend object)))
(defmethod impl/del-objects-in-bulk :s3
[backend ids]
(us/assert! ::backend backend)
(del-object-in-bulk backend ids))
(p/await! (del-object-in-bulk backend ids)))
;; --- HELPERS
@ -187,13 +189,17 @@
(.writeTimeout default-timeout)
(.build))
client (-> (S3AsyncClient/builder)
(.serviceConfiguration ^S3Configuration sconfig)
(.asyncConfiguration ^ClientAsyncConfiguration aconfig)
(.httpClient ^NettyNioAsyncHttpClient hclient)
(.region (lookup-region region))
(cond-> (some? endpoint) (.endpointOverride (URI. endpoint)))
(.build))]
client (let [builder (S3AsyncClient/builder)
builder (.serviceConfiguration ^S3AsyncClientBuilder builder ^S3Configuration sconfig)
builder (.asyncConfiguration ^S3AsyncClientBuilder builder ^ClientAsyncConfiguration aconfig)
builder (.httpClient ^S3AsyncClientBuilder builder ^NettyNioAsyncHttpClient hclient)
builder (.region ^S3AsyncClientBuilder builder (lookup-region region))
builder (cond-> ^S3AsyncClientBuilder builder
(some? endpoint)
(.endpointOverride (URI. endpoint)))]
(.build ^S3AsyncClientBuilder builder))
]
(reify
clojure.lang.IDeref
@ -288,6 +294,7 @@
^AsyncRequestBody rbody)
(p/fmap (constantly object)))))
;; FIXME: research how to avoid reflection on close method
(defn- path->stream
[path]
(proxy [FilterInputStream] [(io/input-stream path)]
@ -347,8 +354,7 @@
(getObjectRequest ^GetObjectRequest gor)
(build))
pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)]
(p/resolved
(u/uri (str (.url ^PresignedGetObjectRequest pgor))))))
(u/uri (str (.url ^PresignedGetObjectRequest pgor)))))
(defn- del-object
[{:keys [::bucket ::client ::prefix]} {:keys [id] :as obj}]