diff --git a/backend/src/uxbox/services/mutations/demo.clj b/backend/src/uxbox/services/mutations/demo.clj index f11f64b3d..d91a2984d 100644 --- a/backend/src/uxbox/services/mutations/demo.clj +++ b/backend/src/uxbox/services/mutations/demo.clj @@ -28,8 +28,10 @@ [uxbox.services.mutations :as sm] [uxbox.services.util :as su] [uxbox.services.mutations.profile :as profile] + [uxbox.tasks :as tasks] [uxbox.util.blob :as blob] [uxbox.util.uuid :as uuid] + [uxbox.util.time :as tm] [vertx.core :as vc])) (def sql:insert-user @@ -41,7 +43,7 @@ values ($1, $2, true)") (sm/defmutation ::create-demo-profile - [params] + [_] (let [id (uuid/next) sem (System/currentTimeMillis) email (str "demo-" sem ".demo@nodomain.com") @@ -52,5 +54,10 @@ (db/with-atomic [conn db/pool] (db/query-one conn [sql:insert-user id fullname email password']) (db/query-one conn [sql:insert-email id email]) + + ;; Schedule deletion of the demo profile + (tasks/schedule! conn {:name "remove-demo-profile" + :delay (tm/duration {:hours 48}) + :props {:id id}}) {:email email :password password}))) diff --git a/backend/src/uxbox/services/mutations/profile.clj b/backend/src/uxbox/services/mutations/profile.clj index 4bd22d82c..919223bc4 100644 --- a/backend/src/uxbox/services/mutations/profile.clj +++ b/backend/src/uxbox/services/mutations/profile.clj @@ -22,6 +22,7 @@ [uxbox.db :as db] [uxbox.emails :as emails] [uxbox.images :as images] + [uxbox.tasks :as tasks] [uxbox.media :as media] [uxbox.services.mutations :as sm] [uxbox.services.mutations.images :as imgs] @@ -46,7 +47,7 @@ ;; --- Mutation: Login -(declare retrieve-user) +(declare retrieve-user-by-email) (s/def ::email ::us/email) (s/def ::scope ::us/string) @@ -70,7 +71,7 @@ :code ::wrong-credentials)) {:id (:id user)})] - (-> (retrieve-user db/pool email) + (-> (retrieve-user-by-email db/pool email) (p/then' check-user)))) (def sql:user-by-email @@ -79,7 +80,7 @@ where u.email=$1 and u.deleted_at is null") -(defn- retrieve-user +(defn- retrieve-user-by-email [conn email] (db/query-one conn [sql:user-by-email email])) @@ -163,9 +164,14 @@ (sm/defmutation ::update-profile-photo [{:keys [user file] :as params}] (db/with-atomic [conn db/pool] - ;; TODO: send task for delete old photo - (-> (upload-photo conn params) - (p/then (partial update-profile-photo conn user))))) + (p/let [profile (profile/retrieve-profile conn user) + photo (upload-photo conn params)] + + ;; Schedule deletion of old photo + (tasks/schedule! conn {:name "remove-media" + :props {:path (:photo profile)}}) + ;; Save new photo + (update-profile-photo conn user photo)))) (defn- upload-photo [conn {:keys [file user]}] @@ -281,7 +287,7 @@ :token (:token user) :name (:fullname user)}))] (db/with-atomic [conn db/pool] - (-> (retrieve-user conn email) + (-> (retrieve-user-by-email conn email) (p/then' su/raise-not-found-if-nil) (p/then #(create-recovery-token conn %)) (p/then #(send-email-notification conn %)) diff --git a/backend/src/uxbox/services/queries/profile.clj b/backend/src/uxbox/services/queries/profile.clj index f5bb486ff..d8ae24fbc 100644 --- a/backend/src/uxbox/services/queries/profile.clj +++ b/backend/src/uxbox/services/queries/profile.clj @@ -9,6 +9,7 @@ [clojure.spec.alpha :as s] [promesa.core :as p] [promesa.exec :as px] + [uxbox.common.exceptions :as ex] [uxbox.common.spec :as us] [uxbox.db :as db] [uxbox.images :as images] diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj index b1c346748..3d188c5c1 100644 --- a/backend/src/uxbox/tasks.clj +++ b/backend/src/uxbox/tasks.clj @@ -18,8 +18,9 @@ [uxbox.config :as cfg] [uxbox.core :refer [system]] [uxbox.db :as db] - [uxbox.tasks.demo-gc] [uxbox.tasks.sendmail] + [uxbox.tasks.remove-media] + [uxbox.tasks.remove-demo-profile] [uxbox.tasks.impl :as impl] [uxbox.util.time :as dt] [vertx.core :as vc] @@ -28,9 +29,10 @@ ;; --- Public API (defn schedule! - ([task] (schedule! db/pool task)) - ([conn task] - (impl/schedule! conn task))) + ([opts] (schedule! db/pool opts)) + ([conn opts] + (s/assert ::impl/task-options opts) + (impl/schedule! conn opts))) ;; --- State initialization @@ -40,7 +42,8 @@ ;; need to perform a maintenance and delete some old tasks. (def ^:private tasks - {"demo-gc" #'uxbox.tasks.demo-gc/handler + {"remove-demo-profile" #'uxbox.tasks.remove-demo-profile/handler + "remove-media" #'uxbox.tasks.remove-media/handler "sendmail" #'uxbox.tasks.sendmail/handler}) (defstate tasks-worker @@ -48,13 +51,13 @@ (vc/deploy! system $$ {:instances 1}) (deref $$))) -(def ^:private schedule - [{:id "every 1 hour" - :cron (dt/cron "1 1 */1 * * ? *") - :fn #'uxbox.tasks.demo-gc/handler - :props {:foo 1}}]) +;; (def ^:private schedule +;; [{:id "every 1 hour" +;; :cron (dt/cron "1 1 */1 * * ? *") +;; :fn #'uxbox.tasks.demo-gc/handler +;; :props {:foo 1}}]) -(defstate scheduler - :start (as-> (impl/scheduler-verticle {:schedule schedule}) $$ - (vc/deploy! system $$ {:instances 1 :worker true}) - (deref $$))) +;; (defstate scheduler +;; :start (as-> (impl/scheduler-verticle {:schedule schedule}) $$ +;; (vc/deploy! system $$ {:instances 1 :worker true}) +;; (deref $$))) diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj index 337525bb0..c5b403e78 100644 --- a/backend/src/uxbox/tasks/impl.clj +++ b/backend/src/uxbox/tasks/impl.clj @@ -21,17 +21,16 @@ [uxbox.util.blob :as blob] [uxbox.util.time :as tm] [vertx.core :as vc] + [vertx.util :as vu] [vertx.timers :as vt]) (:import java.time.Duration java.time.Instant java.util.Date)) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Implementation -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; - -;; --- Task Execution +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Tasks +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn- string-strack-trace [^Throwable err] @@ -102,7 +101,7 @@ (cond-> row props (assoc :props (blob/decode props))))) -(defn- log-error +(defn- log-task-error [item err] (log/error "Unhandled exception on task '" (:name item) "' (retry:" (:retry-num item) ") \n" @@ -118,11 +117,12 @@ (p/then decode-task-row) (p/then (fn [item] (when item + (log/debug "Execute task " (:name item)) (-> (p/do! (handle-task tasks item)) (p/handle (fn [v e] (if e (do - (log-error item e) + (log-task-error item e) (if (>= (:retry-num item) max-retries) (mark-as-failed conn item e) (reschedule conn item e))) @@ -156,8 +156,9 @@ ::vt/delay 3000 ::vt/repeat true))) - -;; --- Task Scheduling +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Scheduled Tasks +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (def ^:privatr sql:upsert-scheduled-task "insert into scheduled_tasks (id, cron_expr) @@ -180,24 +181,29 @@ (declare schedule-task) -(defn thr-name - [] - (.getName (Thread/currentThread))) +(defn- log-scheduled-task-error + [item err] + (log/error "Unhandled exception on scheduled task '" (:id item) "' \n" + (with-out-str + (.printStackTrace ^Throwable err (java.io.PrintWriter. *out*))))) (defn- execute-scheduled-task [{:keys [id cron] :as stask}] (db/with-atomic [conn db/pool] + ;; First we try to lock the task in the database, if locking us + ;; successful, then we execute the scheduled task; if locking is + ;; not possible (because other instance is already locked id) we + ;; just skip it and schedule to be executed in the next slot. (-> (db/query-one conn [sql:lock-scheduled-task id]) (p/then (fn [result] (when result (-> (p/do! ((:fn stask) stask)) (p/catch (fn [e] - (log/warn "Excepton happens on executing scheduled task" e) + (log-scheduled-task-error stask e) nil)))))) (p/finally (fn [v e] - (-> (vc/current-context) + (-> (vu/current-context) (schedule-task stask))))))) - (defn ms-until-valid [cron] (s/assert tm/cron? cron) @@ -221,9 +227,9 @@ (p/then' (fn [_] (run! #(schedule-task ctx %) schedule))))) -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Public API -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; --- Worker Verticle @@ -270,14 +276,15 @@ (s/def ::delay ::us/integer) (s/def ::queue ::us/string) (s/def ::task-options - (s/keys :req-un [::name ::delay] - :opt-un [::props ::queue])) + (s/keys :req-un [::name] + :opt-un [::delay ::props ::queue])) (defn schedule! - [conn {:keys [name delay props queue key] :as options}] + [conn {:keys [name delay props queue key] + :or {delay 0 props {} queue "default"} + :as options}] (us/verify ::task-options options) - (let [queue (if (string? queue) queue "default") - duration (-> (tm/duration delay) + (let [duration (-> (tm/duration delay) (duration->pginterval)) props (blob/encode props)] (-> (db/query-one conn [sql:insert-new-task name props queue duration]) diff --git a/backend/src/uxbox/tasks/remove_demo_profile.clj b/backend/src/uxbox/tasks/remove_demo_profile.clj new file mode 100644 index 000000000..a2135162e --- /dev/null +++ b/backend/src/uxbox/tasks/remove_demo_profile.clj @@ -0,0 +1,93 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 Andrey Antukh + +(ns uxbox.tasks.remove-demo-profile + "Demo accounts garbage collector." + (:require + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [promesa.core :as p] + [uxbox.common.exceptions :as ex] + [uxbox.common.spec :as us] + [uxbox.db :as db] + [uxbox.media :as media] + [uxbox.util.storage :as ust] + [vertx.util :as vu])) + +(declare remove-file-images) +(declare remove-images) +(declare remove-profile) + +(s/def ::id ::us/uuid) +(s/def ::props + (s/keys :req-un [::id])) + +(defn handler + [{:keys [props] :as task}] + (us/verify ::props props) + (prn "handler" props (.getName (Thread/currentThread))) + (db/with-atomic [conn db/pool] + (remove-file-images conn (:id props)) + (remove-images conn (:id props)) + (remove-profile conn (:id props)) + (prn "finished" (.getName (Thread/currentThread))))) + +(def ^:private sql:file-images-to-delete + "select pfi.id, pfi.path, pfi.thumb_path + from project_file_images as pfi + inner join project_files as pf on (pf.id = pfi.file_id) + inner join projects as p on (p.id = pf.project_id) + where p.user_id = $1 + limit 2") + +(defn remove-file-images + [conn id] + (p/loop [] + (p/let [files (db/query conn [sql:file-images-to-delete id])] + (prn "remove-file-images" files) + (when-not (empty? files) + (-> (vu/blocking + (doseq [item files] + (ust/delete! media/media-storage (:path item)) + (ust/delete! media/media-storage (:thumb-path item)))) + (p/then' #(p/recur))))))) + +(def ^:private sql:images + "select img.id, img.path, img.thumb_path + from images as img + where img.user_id = $1 + limit 5") + +(defn remove-files + [files] + (prn "remove-files" (.getName (Thread/currentThread))) + (doseq [item files] + (ust/delete! media/media-storage (:path item)) + (ust/delete! media/media-storage (:thumb-path item))) + files) + +(defn remove-images + [conn id] + (prn "remove-images" (.getName (Thread/currentThread))) + (vu/loop [i 0] + (prn "remove-images loop" i (.getName (Thread/currentThread))) + (-> (db/query conn [sql:images id]) + (p/then (vu/wrap-blocking remove-files)) + (p/then (fn [images] + (prn "ending" (.getName (Thread/currentThread))) + (when (and (not (empty? images)) + (< i 1000)) + (p/recur (inc i)))))))) + +(defn remove-profile + [conn id] + (let [sql "delete from users where id=$1"] + (db/query conn [sql id]))) + + diff --git a/backend/src/uxbox/tasks/demo_gc.clj b/backend/src/uxbox/tasks/remove_media.clj similarity index 50% rename from backend/src/uxbox/tasks/demo_gc.clj rename to backend/src/uxbox/tasks/remove_media.clj index 3d3948e73..7e93adf70 100644 --- a/backend/src/uxbox/tasks/demo_gc.clj +++ b/backend/src/uxbox/tasks/remove_media.clj @@ -7,17 +7,26 @@ ;; ;; Copyright (c) 2020 Andrey Antukh -(ns uxbox.tasks.demo-gc +(ns uxbox.tasks.remove-media "Demo accounts garbage collector." (:require + [clojure.spec.alpha :as s] [clojure.tools.logging :as log] - [uxbox.common.exceptions :as ex])) + [uxbox.common.exceptions :as ex] + [uxbox.common.spec :as us] + [uxbox.media :as media] + [uxbox.util.storage :as ust] + [vertx.util :as vu])) + +(s/def ::path ::us/string) +(s/def ::props + (s/keys :req-un [::path])) (defn handler - {:uxbox.tasks/name "demo-gc"} [{:keys [props] :as task}] - (try - (Thread/sleep 100) - (prn (.getName (Thread/currentThread)) "demo-gc" (:id task) (:props task)) - (catch Throwable e - nil))) + (us/verify ::props props) + (vu/blocking + (when (ust/exists? media/media-storage (:path props)) + (ust/delete! media/media-storage (:path props)) + (log/debug "Media " (:path props) " removed.")))) + diff --git a/backend/tests/user.clj b/backend/tests/user.clj index 0f03107c1..fab2a01dd 100644 --- a/backend/tests/user.clj +++ b/backend/tests/user.clj @@ -18,7 +18,7 @@ [clojure.repl :refer :all] [criterium.core :refer [quick-bench bench with-progress-reporting]] [promesa.core :as p] - [promesa.exec :as pe] + [promesa.exec :as px] [uxbox.migrations] [uxbox.util.storage :as st] [mount.core :as mount]))