diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 6d5fc3d5f5..97171825c1 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -349,6 +349,8 @@ :audit-log-archive (ig/ref :app.loggers.audit.archive-task/handler) :audit-log-gc (ig/ref :app.loggers.audit.gc-task/handler) + :object-update + (ig/ref :app.tasks.object-update/handler) :process-webhook-event (ig/ref ::webhooks/process-event-handler) :run-webhook @@ -376,7 +378,10 @@ ::sto/storage (ig/ref ::sto/storage)} :app.tasks.orphan-teams-gc/handler - {::db/pool (ig/ref ::db/pool)} + {::db/pool (ig/ref ::db/pool)} + + :app.tasks.object-update/handler + {::db/pool (ig/ref ::db/pool)} :app.tasks.file-gc/handler {::db/pool (ig/ref ::db/pool) diff --git a/backend/src/app/rpc/commands/files_thumbnails.clj b/backend/src/app/rpc/commands/files_thumbnails.clj index d766acd3c5..bd982ce171 100644 --- a/backend/src/app/rpc/commands/files_thumbnails.clj +++ b/backend/src/app/rpc/commands/files_thumbnails.clj @@ -271,7 +271,7 @@ (when (and (some? th1) (not= (:media-id th1) (:media-id th2))) - (sto/touch-object! storage (:media-id th1))) + (sto/touch-object! storage (:media-id th1) :async true)) th2)) diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index f6924aedb4..070c53f3fb 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -16,6 +16,7 @@ [app.storage.impl :as impl] [app.storage.s3 :as ss3] [app.util.time :as dt] + [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] @@ -170,15 +171,28 @@ (impl/put-object object content)) object))) +(def ^:private default-touch-delay + "A default delay for the asynchronous touch operation" + (dt/duration "5m")) + (defn touch-object! "Mark object as touched." - [{:keys [::db/pool-or-conn] :as storage} object-or-id] + [{:keys [::db/pool-or-conn] :as storage} object-or-id & {:keys [async]}] (us/assert! ::storage storage) - (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) - rs (db/update! pool-or-conn :storage-object - {:touched-at (dt/now)} - {:id id})] - (pos? (db/get-update-count rs)))) + (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)] + (if async + (wrk/submit! ::wrk/conn pool-or-conn + ::wrk/task :object-update + ::wrk/delay default-touch-delay + :object :storage-object + :id id + :key :touched-at + :val (dt/now)) + (-> (db/update! pool-or-conn :storage-object + {:touched-at (dt/now)} + {:id id}) + (db/get-update-count) + (pos?))))) (defn get-object-data "Return an input stream instance of the object content." diff --git a/backend/src/app/tasks/object_update.clj b/backend/src/app/tasks/object_update.clj new file mode 100644 index 0000000000..cfe5fda445 --- /dev/null +++ b/backend/src/app/tasks/object_update.clj @@ -0,0 +1,32 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.object-update + "A task used for perform simple object properties update + in an asynchronous flow." + (:require + [app.common.data :as d] + [app.common.logging :as l] + [app.db :as db] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(defn- update-object + [{:keys [::db/conn] :as cfg} {:keys [id object key val] :as props}] + (l/trc :hint "update object prop" + :id (str id) + :object (d/name object) + :key (d/name key) + :val val) + (db/update! conn object {key val} {:id id} {::db/return-keys false})) + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req [::db/pool])) + +(defmethod ig/init-key ::handler + [_ cfg] + (fn [{:keys [props] :as params}] + (db/tx-run! cfg update-object props))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index a648080f3b..1da2e8de08 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -119,11 +119,13 @@ :next.jdbc/update-count))] (l/trc :hint "submit task" :name task + :task-id (str id) :queue queue :label label :dedupe (boolean dedupe) - :deleted (or deleted 0) - :in (dt/format-duration duration)) + :delay (dt/format-duration duration) + :replace (or deleted 0)) + (db/exec-one! conn [sql:insert-new-task id task props queue label priority max-retries interval]) diff --git a/backend/src/app/worker/cron.clj b/backend/src/app/worker/cron.clj index 72a66a22b3..cb5a69d882 100644 --- a/backend/src/app/worker/cron.clj +++ b/backend/src/app/worker/cron.clj @@ -92,7 +92,7 @@ (let [ts (ms-until-valid cron) ft (px/schedule! ts (partial execute-cron-task cfg task))] - (l/dbg :hint "schedule task" :id id + (l/dbg :hint "schedule" :id id :ts (dt/format-duration ts) :at (dt/format-instant (dt/in-future ts))) diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index 11ca3d972a..e68fa484db 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -139,7 +139,7 @@ :else (try - (l/trc :hint "start task" + (l/trc :hint "start" :name (:name task) :task-id (str task-id) :queue queue @@ -149,7 +149,7 @@ result (handle-task task) elapsed (dt/format-duration (tpoint))] - (l/trc :hint "end task" + (l/trc :hint "end" :name (:name task) :task-id (str task-id) :queue queue diff --git a/backend/test/backend_tests/rpc_file_thumbnails_test.clj b/backend/test/backend_tests/rpc_file_thumbnails_test.clj index f0cfc96375..11ed4f352e 100644 --- a/backend/test/backend_tests/rpc_file_thumbnails_test.clj +++ b/backend/test/backend_tests/rpc_file_thumbnails_test.clj @@ -277,8 +277,6 @@ (t/is (thrown? org.postgresql.util.PSQLException (th/db-delete! :storage-object {:id (:media-id row1)})))))) - - (t/deftest get-file-object-thumbnail (let [storage (::sto/storage th/*system*) profile (th/create-profile* 1) @@ -317,3 +315,44 @@ (let [result (:result out)] (t/is (contains? result "test-key-2")))))) + +(t/deftest create-file-object-thumbnail + (th/db-delete! :task {:name "object-update"}) + (let [storage (::sto/storage th/*system*) + profile (th/create-profile* 1) + file (th/create-file* 1 {:profile-id (:id profile) + :project-id (:default-project-id profile) + :is-shared false}) + data {::th/type :create-file-object-thumbnail + ::rpc/profile-id (:id profile) + :file-id (:id file) + :object-id "test-key-2" + :media {:filename "sample.jpg" + :mtype "image/jpeg"}}] + + (let [data (update data :media + (fn [media] + (-> media + (assoc :path (th/tempfile "backend_tests/test_files/sample2.jpg")) + (assoc :size 7923)))) + out (th/command! data)] + (t/is (nil? (:error out))) + (t/is (map? (:result out)))) + + (let [data (update data :media + (fn [media] + (-> media + (assoc :path (th/tempfile "backend_tests/test_files/sample.jpg")) + (assoc :size 312043)))) + out (th/command! data)] + (t/is (nil? (:error out))) + (t/is (map? (:result out)))) + + (let [[row1 :as rows] + (->> (th/db-query :task {:name "object-update"}) + (map #(update % :props db/decode-transit-pgobject)))] + + ;; (app.common.pprint/pprint rows) + (t/is (= 1 (count rows))) + (t/is (> (inst-ms (dt/diff (:created-at row1) (:scheduled-at row1))) + (inst-ms (dt/duration "4m")))))))