diff --git a/backend/deps.edn b/backend/deps.edn index 1ceba4b524..77e325ba4f 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -28,7 +28,8 @@ metosin/reitit-core {:mvn/version "0.5.18"} org.postgresql/postgresql {:mvn/version "42.4.0"} com.zaxxer/HikariCP {:mvn/version "5.0.1"} - funcool/datoteka {:mvn/version "2.0.0"} + + funcool/datoteka {:mvn/version "3.0.64"} buddy/buddy-hashers {:mvn/version "1.8.158"} buddy/buddy-sign {:mvn/version "3.4.333"} diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 9fe00155ed..7d547976aa 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -71,6 +71,10 @@ :app.tokens/tokens {:keys (ig/ref :app.setup/keys)} + :app.storage.tmp/cleaner + {:executor (ig/ref [::worker :app.worker/executor]) + :scheduler (ig/ref :app.worker/scheduler)} + :app.storage/gc-deleted-task {:pool (ig/ref :app.db/pool) :storage (ig/ref :app.storage/storage) @@ -336,23 +340,12 @@ :backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) - :assets-db (ig/ref [::assets :app.storage.db/backend]) :assets-fs (ig/ref [::assets :app.storage.fs/backend]) - :tmp (ig/ref [::tmp :app.storage.fs/backend]) - :fdata-s3 (ig/ref [::fdata :app.storage.s3/backend]) - ;; keep this for backward compatibility :s3 (ig/ref [::assets :app.storage.s3/backend]) :fs (ig/ref [::assets :app.storage.fs/backend])}} - [::fdata :app.storage.s3/backend] - {:region (cf/get :storage-fdata-s3-region) - :bucket (cf/get :storage-fdata-s3-bucket) - :endpoint (cf/get :storage-fdata-s3-endpoint) - :prefix (cf/get :storage-fdata-s3-prefix) - :executor (ig/ref [::default :app.worker/executor])} - [::assets :app.storage.s3/backend] {:region (cf/get :storage-assets-s3-region) :endpoint (cf/get :storage-assets-s3-endpoint) @@ -361,12 +354,7 @@ [::assets :app.storage.fs/backend] {:directory (cf/get :storage-assets-fs-directory)} - - [::tmp :app.storage.fs/backend] - {:directory "/tmp/penpot"} - - [::assets :app.storage.db/backend] - {:pool (ig/ref :app.db/pool)}}) + }) (def system nil) diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index e96cd7f7ff..1a54a08b76 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -14,7 +14,6 @@ [app.common.spec :as us] [app.common.uuid :as uuid] [app.db :as db] - [app.storage.db :as sdb] [app.storage.fs :as sfs] [app.storage.impl :as impl] [app.storage.s3 :as ss3] @@ -32,14 +31,12 @@ (s/def ::s3 ::ss3/backend) (s/def ::fs ::sfs/backend) -(s/def ::db ::sdb/backend) (s/def ::backends (s/map-of ::us/keyword (s/nilable (s/or :s3 ::ss3/backend - :fs ::sfs/backend - :db ::sdb/backend)))) + :fs ::sfs/backend)))) (defmethod ig/pre-init-spec ::storage [_] (s/keys :req-un [::db/pool ::wrk/executor ::backends])) @@ -109,7 +106,7 @@ result (or result (-> (db/insert! conn :storage-object {:id id - :size (count content) + :size (impl/get-size content) :backend (name backend) :metadata (db/tjson mdata) :deleted-at expired-at @@ -263,7 +260,8 @@ ;; A task responsible to permanently delete already marked as deleted ;; storage files. The storage objects are practically never marked to ;; be deleted directly by the api call. The touched-gc is responsible -;; of collecting the usage of the object and mark it as deleted. +;; of collecting the usage of the object and mark it as deleted. Only +;; the TMP files are are created with expiration date in future. (declare sql:retrieve-deleted-objects-chunk) diff --git a/backend/src/app/storage/db.clj b/backend/src/app/storage/db.clj deleted file mode 100644 index 4ccbf74800..0000000000 --- a/backend/src/app/storage/db.clj +++ /dev/null @@ -1,67 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) UXBOX Labs SL - -(ns app.storage.db - (:require - [app.common.spec :as us] - [app.db :as db] - [app.storage.impl :as impl] - [clojure.spec.alpha :as s] - [integrant.core :as ig] - [promesa.exec :as px]) - (:import - java.io.ByteArrayInputStream)) - -;; --- BACKEND INIT - -(defmethod ig/pre-init-spec ::backend [_] - (s/keys :opt-un [::db/pool])) - -(defmethod ig/init-key ::backend - [_ cfg] - (assoc cfg :type :db)) - -(s/def ::type ::us/keyword) -(s/def ::backend - (s/keys :req-un [::type ::db/pool])) - -;; --- API IMPL - -(defmethod impl/put-object :db - [{:keys [conn executor] :as storage} {:keys [id] :as object} content] - (px/with-dispatch executor - (let [data (impl/slurp-bytes content)] - (db/insert! conn :storage-data {:id id :data data}) - object))) - -(defmethod impl/get-object-data :db - [{:keys [conn executor] :as backend} {:keys [id] :as object}] - (px/with-dispatch executor - (let [result (db/exec-one! conn ["select data from storage_data where id=?" id])] - (ByteArrayInputStream. (:data result))))) - -(defmethod impl/get-object-bytes :db - [{:keys [conn executor] :as backend} {:keys [id] :as object}] - (px/with-dispatch executor - (let [result (db/exec-one! conn ["select data from storage_data where id=?" id])] - (:data result)))) - -(defmethod impl/get-object-url :db - [_ _] - (throw (UnsupportedOperationException. "not supported"))) - -(defmethod impl/del-object :db - [_ _] - ;; NOOP: because deleting the row already deletes the file data from - ;; the database. - nil) - -(defmethod impl/del-objects-in-bulk :db - [_ _] - ;; NOOP: because deleting the row already deletes the file data from - ;; the database. - nil) - diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj index 2b56549a79..4feaaf6242 100644 --- a/backend/src/app/storage/fs.clj +++ b/backend/src/app/storage/fs.clj @@ -10,11 +10,13 @@ [app.common.spec :as us] [app.common.uri :as u] [app.storage.impl :as impl] + [app.util.bytes :as bs] [clojure.java.io :as io] [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.core :as fs] [integrant.core :as ig] + [promesa.core :as p] [promesa.exec :as px]) (:import java.io.InputStream @@ -72,9 +74,10 @@ (io/input-stream full)))) (defmethod impl/get-object-bytes :fs - [{:keys [executor] :as backend} object] - (px/with-dispatch executor - (fs/slurp-bytes (impl/get-object-data backend object)))) + [backend object] + (p/let [input (impl/get-object-data backend object)] + (ex/with-always (bs/close! input) + (bs/read-as-bytes input)))) (defmethod impl/get-object-url :fs [{:keys [uri executor] :as backend} {:keys [id] :as object} _] diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index c5623dd5a8..bca9b5e200 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -9,18 +9,15 @@ (:require [app.common.data.macros :as dm] [app.common.exceptions :as ex] - [app.common.uuid :as uuid] + [app.util.bytes :as bs] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] [clojure.java.io :as io]) (:import java.nio.ByteBuffer - java.util.UUID - java.io.ByteArrayInputStream - java.io.InputStream java.nio.file.Files - org.apache.commons.io.input.BoundedInputStream - )) + java.nio.file.Path + java.util.UUID)) ;; --- API Definition @@ -95,23 +92,23 @@ (defn coerce-id [id] (cond - (string? id) (uuid/uuid id) - (uuid? id) id - :else (ex/raise :type :internal - :code :invalid-id-type - :hint "id should be string or uuid"))) + (string? id) (parse-uuid id) + (uuid? id) id + :else (ex/raise :type :internal + :code :invalid-id-type + :hint "id should be string or uuid"))) (defprotocol IContentObject - (size [_] "get object size")) + (get-size [_] "get object size")) (defprotocol IContentHash (get-hash [_] "get precalculated hash")) -(defn- make-content - [^InputStream is ^long size] +(defn- path->content + [^Path path ^long size] (reify IContentObject - (size [_] size) + (get-size [_] size) io/IOFactory (make-reader [this opts] @@ -119,47 +116,53 @@ (make-writer [_ _] (throw (UnsupportedOperationException. "not implemented"))) (make-input-stream [_ _] - (doto (BoundedInputStream. is size) - (.setPropagateClose false))) + (-> (io/input-stream path) + (bs/bounded-input-stream size))) (make-output-stream [_ _] + (throw (UnsupportedOperationException. "not implemented"))))) + +(defn- bytes->content + [^bytes data ^long size] + (reify + IContentObject + (get-size [_] size) + + io/IOFactory + (make-reader [this opts] + (io/make-reader this opts)) + (make-writer [_ _] (throw (UnsupportedOperationException. "not implemented"))) - - clojure.lang.Counted - (count [_] size) - - java.lang.AutoCloseable - (close [_] - (.close is)))) + (make-input-stream [_ _] + (-> (bs/bytes-input-stream data) + (bs/bounded-input-stream size))) + (make-output-stream [_ _] + (throw (UnsupportedOperationException. "not implemented"))))) (defn content ([data] (content data nil)) ([data size] (cond (instance? java.nio.file.Path data) - (make-content (io/input-stream data) - (Files/size data)) + (path->content data (or size (Files/size data))) (instance? java.io.File data) - (content (.toPath ^java.io.File data) nil) + (content (.toPath ^java.io.File data) size) (instance? String data) - (let [data (.getBytes data "UTF-8") - bais (ByteArrayInputStream. ^bytes data)] - (make-content bais (alength data))) + (let [data (.getBytes data "UTF-8")] + (bytes->content data (alength data))) (bytes? data) - (let [size (alength ^bytes data) - bais (ByteArrayInputStream. ^bytes data)] - (make-content bais size)) + (bytes->content data (or size (alength ^bytes data))) - (instance? InputStream data) - (do - (when-not size - (throw (UnsupportedOperationException. "size should be provided on InputStream"))) - (make-content data size)) + ;; (instance? InputStream data) + ;; (do + ;; (when-not size + ;; (throw (UnsupportedOperationException. "size should be provided on InputStream"))) + ;; (make-content data size)) :else - (throw (UnsupportedOperationException. "type not supported"))))) + (throw (IllegalArgumentException. "invalid argument type"))))) (defn wrap-with-hash [content ^String hash] @@ -171,7 +174,7 @@ (reify IContentObject - (size [_] (size content)) + (get-size [_] (get-size content)) IContentHash (get-hash [_] hash) @@ -184,43 +187,17 @@ (make-input-stream [_ opts] (io/make-input-stream content opts)) (make-output-stream [_ opts] - (io/make-output-stream content opts)) - - clojure.lang.Counted - (count [_] (count content)) - - java.lang.AutoCloseable - (close [_] - (.close ^java.lang.AutoCloseable content)))) + (io/make-output-stream content opts)))) (defn content? [v] (satisfies? IContentObject v)) -(defn slurp-bytes - [content] - (with-open [input (io/input-stream content) - output (java.io.ByteArrayOutputStream. (count content))] - (io/copy input output) - (.toByteArray output))) - (defn calculate-hash - [path-or-stream] - (let [result (cond - (instance? InputStream path-or-stream) - (let [result (-> (bh/blake2b-256 path-or-stream) - (bc/bytes->hex))] - (.reset path-or-stream) - result) - - (string? path-or-stream) - (-> (bh/blake2b-256 path-or-stream) - (bc/bytes->hex)) - - :else - (with-open [is (io/input-stream path-or-stream)] - (-> (bh/blake2b-256 is) - (bc/bytes->hex))))] + [resource] + (let [result (with-open [input (io/input-stream resource)] + (-> (bh/blake2b-256 input) + (bc/bytes->hex)))] (str "blake2b:" result))) (defn resolve-backend diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index c5c4a68196..72480dd539 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -12,14 +12,17 @@ [app.common.spec :as us] [app.common.uri :as u] [app.storage.impl :as impl] + [app.storage.tmp :as tmp] [app.util.time :as dt] [app.worker :as wrk] [clojure.java.io :as io] [clojure.spec.alpha :as s] + [datoteka.core :as fs] [integrant.core :as ig] [promesa.core :as p] [promesa.exec :as px]) (:import + java.io.FilterInputStream java.io.InputStream java.nio.ByteBuffer java.time.Duration @@ -30,6 +33,7 @@ org.reactivestreams.Subscription software.amazon.awssdk.core.ResponseBytes software.amazon.awssdk.core.async.AsyncRequestBody + software.amazon.awssdk.core.async.AsyncResponseTransformer software.amazon.awssdk.core.client.config.ClientAsyncConfiguration software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient @@ -107,7 +111,16 @@ (defmethod impl/get-object-data :s3 [backend object] - (get-object-data backend object)) + (letfn [(no-such-key? [cause] + (instance? software.amazon.awssdk.services.s3.model.NoSuchKeyException cause)) + (handle-not-found [cause] + (ex/raise :type :not-found + :code :object-not-found + :hint "s3 object not found" + :cause cause))] + + (-> (get-object-data backend object) + (p/catch no-such-key? handle-not-found)))) (defmethod impl/get-object-bytes :s3 [backend object] @@ -204,7 +217,7 @@ (reify AsyncRequestBody (contentLength [_] - (Optional/of (long (count content)))) + (Optional/of (long (impl/get-size content)))) (^void subscribe [_ ^Subscriber s] (let [thread (Thread. #(writer-fn s))] @@ -216,7 +229,6 @@ (cancel [_] (.interrupt thread) (.release sem 1)) - (request [_ n] (.release sem (int n)))))))))) @@ -238,16 +250,31 @@ ^AsyncRequestBody content)))) (defn get-object-data - [{:keys [client bucket prefix]} {:keys [id]}] - (p/let [gor (.. (GetObjectRequest/builder) - (bucket bucket) - (key (str prefix (impl/id->path id))) - (build)) - obj (.getObject ^S3AsyncClient client ^GetObjectRequest gor) - ;; rsp (.response ^ResponseInputStream obj) - ;; len (.contentLength ^GetObjectResponse rsp) - ] - (io/input-stream obj))) + [{:keys [client bucket prefix]} {:keys [id size]}] + (let [gor (.. (GetObjectRequest/builder) + (bucket bucket) + (key (str prefix (impl/id->path id))) + (build))] + + ;; If the file size is greater than 2MiB then stream the content + ;; to the filesystem and then read with buffered inputstream; if + ;; not, read the contento into memory using bytearrays. + (if (> size (* 1024 1024 2)) + (p/let [path (tmp/tempfile :prefix "penpot.storage.s3.") + rxf (AsyncResponseTransformer/toFile path) + _ (.getObject ^S3AsyncClient client + ^GetObjectRequest gor + ^AsyncResponseTransformer rxf)] + (proxy [FilterInputStream] [(io/input-stream path)] + (close [] + (fs/delete path) + (proxy-super close)))) + + (p/let [rxf (AsyncResponseTransformer/toBytes) + obj (.getObject ^S3AsyncClient client + ^GetObjectRequest gor + ^AsyncResponseTransformer rxf)] + (.asInputStream ^ResponseBytes obj))))) (defn get-object-bytes [{:keys [client bucket prefix]} {:keys [id]}] @@ -255,7 +282,10 @@ (bucket bucket) (key (str prefix (impl/id->path id))) (build)) - obj (.getObjectAsBytes ^S3AsyncClient client ^GetObjectRequest gor)] + rxf (AsyncResponseTransformer/toBytes) + obj (.getObjectAsBytes ^S3AsyncClient client + ^GetObjectRequest gor + ^AsyncResponseTransformer rxf)] (.asByteArray ^ResponseBytes obj))) (def default-max-age diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj new file mode 100644 index 0000000000..cdb1b0cc71 --- /dev/null +++ b/backend/src/app/storage/tmp.clj @@ -0,0 +1,83 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.storage.tmp + "Temporal files service all created files will be tried to clean after + 1 hour afrer creation. This is a best effort, if this process fails, + the operating system cleaning task should be responsible of + permanently delete these files (look at systemd-tempfiles)." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.util.time :as dt] + [app.worker :as wrk] + [clojure.core.async :as a] + [clojure.spec.alpha :as s] + [datoteka.core :as fs] + [integrant.core :as ig] + [promesa.exec :as px])) + +(declare remove-temp-file) +(defonce queue (a/chan 128)) + +(s/def ::min-age ::dt/duration) + +(defmethod ig/pre-init-spec ::cleaner [_] + (s/keys :req-un [::min-age ::wrk/scheduler ::wrk/executor])) + +(defmethod ig/prep-key ::cleaner + [_ cfg] + (merge {:min-age (dt/duration {:minutes 30})} + (d/without-nils cfg))) + +(defmethod ig/init-key ::cleaner + [_ {:keys [scheduler executor min-age] :as cfg}] + (l/info :hint "starting tempfile cleaner service") + (let [cch (a/chan)] + (a/go-loop [] + (let [[path port] (a/alts! [queue cch])] + (when (not= port cch) + (l/trace :hint "schedule tempfile deletion" :path path + :expires-at (dt/plus (dt/now) min-age)) + (px/schedule! scheduler + (inst-ms min-age) + (partial remove-temp-file executor path)) + (recur)))) + cch)) + +(defmethod ig/halt-key! ::cleaner + [_ close-ch] + (l/info :hint "stoping tempfile cleaner service") + (some-> close-ch a/close!)) + +(defn- remove-temp-file + "Permanently delete tempfile" + [executor path] + (px/with-dispatch executor + (l/trace :hint "permanently delete tempfile" :path path) + (when (fs/exists? path) + (fs/delete path)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defn tempfile + "Returns a tmpfile candidate (without creating it)" + [& {:keys [suffix prefix] + :or {prefix "penpot." + suffix ".tmp"}}] + (let [candidate (fs/tempfile :suffix suffix :prefix prefix)] + (a/offer! queue candidate) + candidate)) + +(defn create-tempfile + [& {:keys [suffix prefix] + :or {prefix "penpot." + suffix ".tmp"}}] + (let [path (fs/create-tempfile :suffix suffix :prefix prefix)] + (a/offer! queue path) + path)) diff --git a/backend/src/app/util/bytes.clj b/backend/src/app/util/bytes.clj new file mode 100644 index 0000000000..5be58f4054 --- /dev/null +++ b/backend/src/app/util/bytes.clj @@ -0,0 +1,110 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.util.bytes + "Bytes & Byte Streams helpers" + (:require + [clojure.java.io :as io] + [datoteka.core :as fs] + [yetti.adapter :as yt]) + (:import + com.github.luben.zstd.ZstdInputStream + com.github.luben.zstd.ZstdOutputStream + java.io.ByteArrayInputStream + java.io.ByteArrayOutputStream + java.io.DataInputStream + java.io.DataOutputStream + java.io.OutputStream + java.io.InputStream + java.lang.AutoCloseable + org.apache.commons.io.IOUtils + org.apache.commons.io.input.BoundedInputStream)) + +(set! *warn-on-reflection* true) + +(def ^:const default-buffer-size + (:xnio/buffer-size yt/defaults)) + +(defn copy! + [src dst & {:keys [offset size buffer-size] + :or {offset 0 buffer-size default-buffer-size}}] + (let [^bytes buff (byte-array buffer-size)] + (if size + (IOUtils/copyLarge ^InputStream src ^OutputStream dst (long offset) (long size) buff) + (IOUtils/copyLarge ^InputStream src ^OutputStream dst buff)))) + +(defn write-to-file! + [src dst & {:keys [size]}] + (with-open [^OutputStream output (io/output-stream dst)] + (cond + (bytes? src) + (if size + (with-open [^InputStream input (ByteArrayInputStream. ^bytes src)] + (with-open [^InputStream input (BoundedInputStream. input (or size (alength ^bytes src)))] + (copy! input output :size size))) + + (do + (IOUtils/writeChunked ^bytes src output) + (.flush ^OutputStream output) + (alength ^bytes src))) + + (instance? InputStream src) + (copy! src output :size size) + + :else + (throw (IllegalArgumentException. "invalid arguments"))))) + +(defn read-as-bytes + "Read input stream as byte array." + [input & {:keys [size]}] + (cond + (instance? InputStream input) + (with-open [output (ByteArrayOutputStream. (or size (.available ^InputStream input)))] + (copy! input output :size size) + (.toByteArray output)) + + (fs/path? input) + (with-open [input (io/input-stream input) + output (ByteArrayOutputStream. (or size (.available input)))] + (copy! input output :size size) + (.toByteArray output)) + + :else + (throw (IllegalArgumentException. "invalid arguments")))) + +(defn bytes-input-stream + "Creates an instance of ByteArrayInputStream." + [^bytes data] + (ByteArrayInputStream. data)) + +(defn bounded-input-stream + [input size & {:keys [close?] :or {close? true}}] + (doto (BoundedInputStream. ^InputStream input ^long size) + (.setPropagateClose close?))) + +(defn zstd-input-stream + ^InputStream + [input] + (ZstdInputStream. ^InputStream input)) + +(defn zstd-output-stream + ^OutputStream + [output & {:keys [level] :or {level 0}}] + (ZstdOutputStream. ^OutputStream output (int level))) + +(defn data-input-stream + ^DataInputStream + [input] + (DataInputStream. ^InputStream input)) + +(defn data-output-stream + ^DataOutputStream + [output] + (DataOutputStream. ^OutputStream output)) + +(defn close! + [^AutoCloseable stream] + (.close stream))