Improve storage recheck task and add more specs.

This commit is contained in:
Andrey Antukh 2021-01-25 16:14:54 +01:00 committed by Hirunatan
parent 340d1d43be
commit a69a35a0b6
3 changed files with 36 additions and 5 deletions

View file

@ -193,11 +193,11 @@
:fn (ig/ref :app.tasks.file-xlog-gc/handler)} :fn (ig/ref :app.tasks.file-xlog-gc/handler)}
{:id "storage-gc" {:id "storage-gc"
:cron #app/cron "0 0 0 */1 * ?" ;; daily :cron #app/cron "0 0 */6 * * ?" ;; every 6 hours
:fn (ig/ref :app.storage/gc-task)} :fn (ig/ref :app.storage/gc-task)}
{:id "storage-recheck" {:id "storage-recheck"
:cron #app/cron "0 0 0 */1 * ?" ;; daily :cron #app/cron "0 0 */6 * * ?" ;; every 6 hours
:fn (ig/ref :app.storage/recheck-task)} :fn (ig/ref :app.storage/recheck-task)}
{:id "tasks-gc" {:id "tasks-gc"

View file

@ -24,6 +24,7 @@
[app.worker :as wrk] [app.worker :as wrk]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[lambdaisland.uri :as u] [lambdaisland.uri :as u]
[promesa.exec :as px]) [promesa.exec :as px])
@ -56,6 +57,9 @@
[_ cfg] [_ cfg]
cfg) cfg)
(s/def ::storage
(s/keys :req-un [::backends ::wrk/executor ::db/pool ::backend]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Database Objects ;; Database Objects
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -153,11 +157,13 @@
(defn get-object (defn get-object
[{:keys [conn pool] :as storage} id] [{:keys [conn pool] :as storage} id]
(us/assert ::storage storage)
(-> (assoc storage :conn (or conn pool)) (-> (assoc storage :conn (or conn pool))
(retrieve-database-object id))) (retrieve-database-object id)))
(defn put-object (defn put-object
[{:keys [pool conn backend executor] :as storage} {:keys [content] :as params}] [{:keys [pool conn backend executor] :as storage} {:keys [content] :as params}]
(us/assert ::storage storage)
(us/assert impl/content? content) (us/assert impl/content? content)
(let [storage (assoc storage :conn (or conn pool)) (let [storage (assoc storage :conn (or conn pool))
object (create-database-object storage params)] object (create-database-object storage params)]
@ -175,6 +181,7 @@
(defn clone-object (defn clone-object
[{:keys [pool conn executor] :as storage} object] [{:keys [pool conn executor] :as storage} object]
(us/assert ::storage storage)
(let [storage (assoc storage :conn (or conn pool)) (let [storage (assoc storage :conn (or conn pool))
object* (create-database-object storage object)] object* (create-database-object storage object)]
(if (= (:backend object) (:backend storage)) (if (= (:backend object) (:backend storage))
@ -196,6 +203,7 @@
(defn get-object-data (defn get-object-data
[{:keys [pool conn] :as storage} object] [{:keys [pool conn] :as storage} object]
(us/assert ::storage storage)
(-> (assoc storage :conn (or conn pool)) (-> (assoc storage :conn (or conn pool))
(resolve-backend (:backend object)) (resolve-backend (:backend object))
(impl/get-object-data object))) (impl/get-object-data object)))
@ -204,16 +212,22 @@
([storage object] ([storage object]
(get-object-url storage object nil)) (get-object-url storage object nil))
([{:keys [conn pool] :as storage} object options] ([{:keys [conn pool] :as storage} object options]
(us/assert ::storage storage)
(-> (assoc storage :conn (or conn pool)) (-> (assoc storage :conn (or conn pool))
(resolve-backend (:backend object)) (resolve-backend (:backend object))
(impl/get-object-url object options)))) (impl/get-object-url object options))))
(defn object->path (defn object->relative-path
[{:keys [id] :as obj}] [{:keys [id] :as obj}]
(impl/id->path id)) (impl/id->path id))
(defn file-url->path
[url]
(fs/path (java.net.URI. (str url))))
(defn del-object (defn del-object
[{:keys [conn pool] :as storage} id-or-obj] [{:keys [conn pool] :as storage} id-or-obj]
(us/assert ::storage storage)
(-> (assoc storage :conn (or conn pool)) (-> (assoc storage :conn (or conn pool))
(delete-database-object (if (uuid? id-or-obj) id-or-obj (:id id-or-obj))))) (delete-database-object (if (uuid? id-or-obj) id-or-obj (:id id-or-obj)))))
@ -223,7 +237,7 @@
[storage params] [storage params]
(let [storage (assoc storage :backend :fs) (let [storage (assoc storage :backend :fs)
params (assoc params params (assoc params
:expired-at (dt/in-future {:hours 2}) :expired-at (dt/in-future {:minutes 30})
:temporal true)] :temporal true)]
(put-object storage params))) (put-object storage params)))
@ -292,6 +306,18 @@
;; Recheck Stalled Task ;; Recheck Stalled Task
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Because the physical storage (filesystem, s3, ... except db) is not
;; transactional, in some situations we can found physical object
;; leakage. That situations happens when the transaction that writes
;; the file aborts, leaving the file written to the underlying storage
;; but the reference on the database is lost with the rollback.
;;
;; For this situations we need to write a "log" of inserted files that
;; are checked in some time in future. If physical file exists but the
;; database refence does not exists means that leaked file is found
;; and is inmediatelly deleted. The responsability of this task is
;; check that write log for possible leaked files.
(declare sql:retrieve-pending) (declare sql:retrieve-pending)
(declare sql:exists-storage-object) (declare sql:exists-storage-object)
@ -324,7 +350,9 @@
(def sql:retrieve-pending (def sql:retrieve-pending
"with items_part as ( "with items_part as (
select s.id from storage_pending as s select s.id
from storage_pending as s
where s.created_at < now() - '1 hour'::interval
order by s.created_at order by s.created_at
limit 100 limit 100
) )

View file

@ -165,6 +165,9 @@
(instance? String data) (instance? String data)
(string->content data) (string->content data)
(bytes? data)
(input-stream->content (ByteArrayInputStream. ^bytes data) (alength data))
(instance? InputStream data) (instance? InputStream data)
(do (do
(when-not size (when-not size