🐛 Fix s3 client issues with s3 compatible services

This commit is contained in:
Andrey Antukh 2022-12-31 16:37:42 +01:00
parent 27451b9796
commit e372e8ba3e
2 changed files with 49 additions and 40 deletions

View file

@ -55,7 +55,8 @@
;; Pretty Print specs ;; Pretty Print specs
pretty-spec/pretty-spec {:mvn/version "0.1.4"} pretty-spec/pretty-spec {:mvn/version "0.1.4"}
software.amazon.awssdk/s3 {:mvn/version "2.19.5"}} software.amazon.awssdk/s3 {:mvn/version "2.19.8"}
}
:paths ["src" "resources" "target/classes"] :paths ["src" "resources" "target/classes"]
:aliases :aliases

View file

@ -24,7 +24,9 @@
(:import (:import
java.io.FilterInputStream java.io.FilterInputStream
java.io.InputStream java.io.InputStream
java.net.URI
java.nio.ByteBuffer java.nio.ByteBuffer
java.nio.file.Path
java.time.Duration java.time.Duration
java.util.Collection java.util.Collection
java.util.Optional java.util.Optional
@ -40,6 +42,7 @@
software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup
software.amazon.awssdk.regions.Region software.amazon.awssdk.regions.Region
software.amazon.awssdk.services.s3.S3AsyncClient software.amazon.awssdk.services.s3.S3AsyncClient
software.amazon.awssdk.services.s3.S3Configuration
software.amazon.awssdk.services.s3.model.Delete software.amazon.awssdk.services.s3.model.Delete
software.amazon.awssdk.services.s3.model.DeleteObjectRequest software.amazon.awssdk.services.s3.model.DeleteObjectRequest
software.amazon.awssdk.services.s3.model.DeleteObjectsRequest software.amazon.awssdk.services.s3.model.DeleteObjectsRequest
@ -151,46 +154,51 @@
(defn build-s3-client (defn build-s3-client
[{:keys [region endpoint executor]}] [{:keys [region endpoint executor]}]
(let [hclient (.. (NettyNioAsyncHttpClient/builder) (let [aconfig (-> (ClientAsyncConfiguration/builder)
(eventLoopGroupBuilder (.. (SdkEventLoopGroup/builder) (.advancedOption SdkAdvancedAsyncClientOption/FUTURE_COMPLETION_EXECUTOR executor)
(numberOfThreads (int default-eventloop-threads)))) (.build))
(connectionAcquisitionTimeout default-timeout)
(connectionTimeout default-timeout)
(readTimeout default-timeout)
(writeTimeout default-timeout)
(build))
client (.. (S3AsyncClient/builder)
(asyncConfiguration (.. (ClientAsyncConfiguration/builder)
(advancedOption SdkAdvancedAsyncClientOption/FUTURE_COMPLETION_EXECUTOR
executor)
(build)))
(httpClient hclient)
(region (lookup-region region)))]
(when-let [uri (some-> endpoint (java.net.URI.))] sconfig (-> (S3Configuration/builder)
(.endpointOverride client uri)) (cond-> (some? endpoint) (.pathStyleAccessEnabled true))
(.build))
hclient (-> (NettyNioAsyncHttpClient/builder)
(.eventLoopGroupBuilder (-> (SdkEventLoopGroup/builder)
(.numberOfThreads (int default-eventloop-threads))))
(.connectionAcquisitionTimeout default-timeout)
(.connectionTimeout default-timeout)
(.readTimeout default-timeout)
(.writeTimeout default-timeout)
(.build))
client (-> (S3AsyncClient/builder)
(.serviceConfiguration ^S3Configuration sconfig)
(.asyncConfiguration ^ClientAsyncConfiguration aconfig)
(.httpClient ^NettyNioAsyncHttpClient hclient)
(.region (lookup-region region))
(cond-> (some? endpoint) (.endpointOverride (URI. endpoint)))
(.build))]
(let [client (.build client)]
(reify (reify
clojure.lang.IDeref clojure.lang.IDeref
(deref [_] client) (deref [_] client)
java.lang.AutoCloseable java.lang.AutoCloseable
(close [_] (close [_]
(.close hclient) (.close ^NettyNioAsyncHttpClient hclient)
(.close client)))))) (.close ^S3AsyncClient client)))))
(defn build-s3-presigner (defn build-s3-presigner
[{:keys [region endpoint]}] [{:keys [region endpoint]}]
(if (string? endpoint) (let [config (-> (S3Configuration/builder)
(let [uri (java.net.URI. endpoint)] (cond-> (some? endpoint) (.pathStyleAccessEnabled true))
(.. (S3Presigner/builder) (.build))]
(endpointOverride uri)
(region (lookup-region region)) (-> (S3Presigner/builder)
(build))) (cond-> (some? endpoint) (.endpointOverride (URI. endpoint)))
(.. (S3Presigner/builder) (.region (lookup-region region))
(region (lookup-region region)) (.serviceConfiguration ^S3Configuration config)
(build)))) (.build))))
(defn- make-request-body (defn- make-request-body
[content] [content]
@ -198,7 +206,7 @@
buff-size (* 1024 64) buff-size (* 1024 64)
sem (Semaphore. 0) sem (Semaphore. 0)
writer-fn (fn [s] writer-fn (fn [^Subscriber s]
(try (try
(loop [] (loop []
(.acquire sem 1) (.acquire sem 1)
@ -261,7 +269,7 @@
;; not, read the contento into memory using bytearrays. ;; not, read the contento into memory using bytearrays.
(if (> size (* 1024 1024 2)) (if (> size (* 1024 1024 2))
(p/let [path (tmp/tempfile :prefix "penpot.storage.s3.") (p/let [path (tmp/tempfile :prefix "penpot.storage.s3.")
rxf (AsyncResponseTransformer/toFile path) rxf (AsyncResponseTransformer/toFile ^Path path)
_ (.getObject ^S3AsyncClient client _ (.getObject ^S3AsyncClient client
^GetObjectRequest gor ^GetObjectRequest gor
^AsyncResponseTransformer rxf)] ^AsyncResponseTransformer rxf)]
@ -283,7 +291,7 @@
(key (str prefix (impl/id->path id))) (key (str prefix (impl/id->path id)))
(build)) (build))
rxf (AsyncResponseTransformer/toBytes) rxf (AsyncResponseTransformer/toBytes)
obj (.getObjectAsBytes ^S3AsyncClient client obj (.getObject ^S3AsyncClient client
^GetObjectRequest gor ^GetObjectRequest gor
^AsyncResponseTransformer rxf)] ^AsyncResponseTransformer rxf)]
(.asByteArray ^ResponseBytes obj))) (.asByteArray ^ResponseBytes obj)))