mirror of
https://github.com/penpot/penpot.git
synced 2025-05-10 23:57:36 +02:00
🎉 Add maintenance tasks.
This commit is contained in:
parent
8dc3165e54
commit
a1b709a9fd
8 changed files with 161 additions and 479 deletions
|
@ -9,14 +9,14 @@
|
|||
|
||||
(ns app.tasks
|
||||
(:require
|
||||
[cuerdas.core :as str]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[app.common.spec :as us]
|
||||
[app.common.uuid :as uuid]
|
||||
[app.config :as cfg]
|
||||
[app.db :as db]
|
||||
[app.metrics :as mtx]
|
||||
[app.util.time :as dt]
|
||||
[app.metrics :as mtx]))
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]))
|
||||
|
||||
(s/def ::name ::us/string)
|
||||
(s/def ::delay
|
||||
|
@ -43,7 +43,7 @@
|
|||
interval (db/interval duration)
|
||||
props (db/tjson props)
|
||||
id (uuid/next)]
|
||||
(log/info (str/format "Submit task '%s' to be executed in '%s'." name (str duration)))
|
||||
(log/infof "Submit task '%s' to be executed in '%s'." name (str duration))
|
||||
(db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval])
|
||||
id)))
|
||||
|
||||
|
|
|
@ -1,76 +0,0 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns app.tasks.gc
|
||||
(:require
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[cuerdas.core :as str]
|
||||
[postal.core :as postal]
|
||||
[promesa.core :as p]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.spec :as us]
|
||||
[app.config :as cfg]
|
||||
[app.db :as db]
|
||||
[app.tasks :as tasks]
|
||||
[app.media-storage :as mst]
|
||||
[app.util.blob :as blob]
|
||||
[app.util.storage :as ust]))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Remove deleted media
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; The main purpose of this task is analize the `pending_to_delete`
|
||||
;; table. This table stores the references to the physical files on
|
||||
;; the file system thanks to `handle_delete()` trigger.
|
||||
|
||||
;; Example:
|
||||
;; (1) You delete an media-object. (2) This media object is marked as
|
||||
;; deleted. (3) A task (`delete-object`) is scheduled for permanent
|
||||
;; delete the object. - If that object stores media, the database
|
||||
;; will execute the `handle_delete()` trigger which will place
|
||||
;; filesystem paths into the `pendint_to_delete` table. (4) This
|
||||
;; task (`remove-deleted-media`) permanently delete the file from the
|
||||
;; filesystem when is executed (by scheduler).
|
||||
|
||||
(def ^:private
|
||||
sql:retrieve-peding-to-delete
|
||||
"with items_part as (
|
||||
select i.id
|
||||
from pending_to_delete as i
|
||||
order by i.created_at
|
||||
limit ?
|
||||
for update skip locked
|
||||
)
|
||||
delete from pending_to_delete
|
||||
where id in (select id from items_part)
|
||||
returning *")
|
||||
|
||||
(defn remove-deleted-media
|
||||
[{:keys [props] :as task}]
|
||||
(letfn [(decode-row [{:keys [data] :as row}]
|
||||
(cond-> row
|
||||
(db/pgobject? data) (assoc :data (db/decode-pgobject data))))
|
||||
(retrieve-items [conn]
|
||||
(->> (db/exec! conn [sql:retrieve-peding-to-delete 10])
|
||||
(map decode-row)
|
||||
(map :data)))
|
||||
(remove-media [rows]
|
||||
(run! (fn [item]
|
||||
(let [path (get item "path")]
|
||||
(ust/delete! mst/media-storage path)))
|
||||
rows))]
|
||||
(loop []
|
||||
(let [rows (retrieve-items db/pool)]
|
||||
(when-not (empty? rows)
|
||||
(remove-media rows)
|
||||
(recur))))))
|
||||
|
||||
|
|
@ -1,304 +0,0 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
|
||||
(ns app.tasks.impl
|
||||
"Async tasks implementation."
|
||||
(:require
|
||||
[cuerdas.core :as str]
|
||||
[clojure.core.async :as a]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[promesa.exec :as px]
|
||||
[app.common.spec :as us]
|
||||
[app.common.uuid :as uuid]
|
||||
[app.config :as cfg]
|
||||
[app.db :as db]
|
||||
[app.util.async :as aa]
|
||||
[app.util.blob :as blob]
|
||||
[app.util.time :as dt])
|
||||
(:import
|
||||
java.util.concurrent.ScheduledExecutorService
|
||||
java.util.concurrent.Executors
|
||||
java.time.Duration
|
||||
java.time.Instant
|
||||
java.util.Date))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn- string-strack-trace
|
||||
[^Throwable err]
|
||||
(with-out-str
|
||||
(.printStackTrace err (java.io.PrintWriter. *out*))))
|
||||
|
||||
(def ^:private
|
||||
sql:mark-as-retry
|
||||
"update task
|
||||
set scheduled_at = clock_timestamp() + '5 seconds'::interval,
|
||||
modified_at = clock_timestamp(),
|
||||
error = ?,
|
||||
status = 'retry',
|
||||
retry_num = retry_num + 1
|
||||
where id = ?")
|
||||
|
||||
(defn- mark-as-retry
|
||||
[conn task error]
|
||||
(let [explain (ex-message error)
|
||||
sqlv [sql:mark-as-retry explain (:id task)]]
|
||||
(db/exec-one! conn sqlv)
|
||||
nil))
|
||||
|
||||
(defn- mark-as-failed
|
||||
[conn task error]
|
||||
(let [explain (ex-message error)]
|
||||
(db/update! conn :task
|
||||
{:error explain
|
||||
:modified-at (dt/now)
|
||||
:status "failed"}
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
(defn- mark-as-completed
|
||||
[conn task]
|
||||
(let [now (dt/now)]
|
||||
(db/update! conn :task
|
||||
{:completed-at now
|
||||
:modified-at now
|
||||
:status "completed"}
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
|
||||
(def ^:private
|
||||
sql:select-next-task
|
||||
"select * from task as t
|
||||
where t.scheduled_at <= now()
|
||||
and t.queue = ?
|
||||
and (t.status = 'new' or t.status = 'retry')
|
||||
order by t.priority desc, t.scheduled_at
|
||||
limit 1
|
||||
for update skip locked")
|
||||
|
||||
(defn- decode-task-row
|
||||
[{:keys [props] :as row}]
|
||||
(when row
|
||||
(cond-> row
|
||||
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))))
|
||||
|
||||
|
||||
(defn- log-task-error
|
||||
[item err]
|
||||
(log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item))
|
||||
(str/format "Props: %s\n" (pr-str (:props item)))
|
||||
(with-out-str
|
||||
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
||||
|
||||
(defn- handle-task
|
||||
[tasks {:keys [name] :as item}]
|
||||
(let [task-fn (get tasks name)]
|
||||
(if task-fn
|
||||
(task-fn item)
|
||||
(do
|
||||
(log/warn "no task handler found for" (pr-str name))
|
||||
nil))))
|
||||
|
||||
(defn- run-task
|
||||
[{:keys [tasks conn]} item]
|
||||
(try
|
||||
(log/debug (str/format "Started task '%s/%s'." (:name item) (:id item)))
|
||||
(handle-task tasks item)
|
||||
(log/debug (str/format "Finished task '%s/%s'." (:name item) (:id item)))
|
||||
(mark-as-completed conn item)
|
||||
(catch Exception e
|
||||
(log-task-error item e)
|
||||
(if (>= (:retry-num item) (:max-retries item))
|
||||
(mark-as-failed conn item e)
|
||||
(mark-as-retry conn item e)))))
|
||||
|
||||
(defn- event-loop-fn
|
||||
[{:keys [tasks] :as opts}]
|
||||
(aa/thread-try
|
||||
(db/with-atomic [conn db/pool]
|
||||
(let [queue (:queue opts "default")
|
||||
item (-> (db/exec-one! conn [sql:select-next-task queue])
|
||||
(decode-task-row))
|
||||
opts (assoc opts :conn conn)]
|
||||
|
||||
(cond
|
||||
(nil? item)
|
||||
::empty
|
||||
|
||||
(or (= "new" (:status item))
|
||||
(= "retry" (:status item)))
|
||||
(do
|
||||
(run-task opts item)
|
||||
::handled)
|
||||
|
||||
:else
|
||||
(do
|
||||
(log/warn "Unexpected condition on worker event loop:" (pr-str item))
|
||||
::handled))))))
|
||||
|
||||
(s/def ::poll-interval ::us/integer)
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::tasks (s/map-of string? ::fn))
|
||||
(s/def ::start-worker-params
|
||||
(s/keys :req-un [::tasks]
|
||||
:opt-un [::poll-interval]))
|
||||
|
||||
(defn start-worker!
|
||||
[{:keys [poll-interval]
|
||||
:or {poll-interval 5000}
|
||||
:as opts}]
|
||||
(us/assert ::start-worker-params opts)
|
||||
(log/info (str/format "Starting worker '%s' on queue '%s'."
|
||||
(:name opts "anonymous")
|
||||
(:queue opts "default")))
|
||||
(let [cch (a/chan 1)]
|
||||
(a/go-loop []
|
||||
(let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)]
|
||||
(cond
|
||||
;; Terminate the loop if close channel is closed or
|
||||
;; event-loop-fn returns nil.
|
||||
(or (= port cch) (nil? val))
|
||||
(log/info (str/format "Stop condition found. Shutdown worker: '%s'"
|
||||
(:name opts "anonymous")))
|
||||
|
||||
(db/pool-closed? db/pool)
|
||||
(do
|
||||
(log/info "Worker eventloop is aborted because pool is closed.")
|
||||
(a/close! cch))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val)))
|
||||
(do
|
||||
(log/error "Connection error, trying resume in some instants.")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(= "40001" (.getSQLState ^java.sql.SQLException val)))
|
||||
(do
|
||||
(log/debug "Serialization failure (retrying in some instants).")
|
||||
(a/<! (a/timeout 1000))
|
||||
(recur))
|
||||
|
||||
(instance? Exception val)
|
||||
(do
|
||||
(log/error "Unexpected error ocurried on polling the database." val)
|
||||
(log/info "Trying resume operations in some instants.")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(= ::handled val)
|
||||
(recur)
|
||||
|
||||
(= ::empty val)
|
||||
(do
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur)))))
|
||||
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(a/close! cch)))))
|
||||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Scheduled Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private
|
||||
sql:upsert-scheduled-task
|
||||
"insert into scheduled_task (id, cron_expr)
|
||||
values (?, ?)
|
||||
on conflict (id)
|
||||
do update set cron_expr=?")
|
||||
|
||||
(defn- synchronize-schedule-item
|
||||
[conn {:keys [id cron] :as item}]
|
||||
(let [cron (str cron)]
|
||||
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
|
||||
|
||||
(defn- synchronize-schedule!
|
||||
[schedule]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(run! (partial synchronize-schedule-item conn) schedule)))
|
||||
|
||||
(def ^:private sql:lock-scheduled-task
|
||||
"select id from scheduled_task where id=? for update skip locked")
|
||||
|
||||
(declare schedule-task!)
|
||||
|
||||
(defn- log-scheduled-task-error
|
||||
[item err]
|
||||
(log/error "Unhandled exception on scheduled task '" (:id item) "' \n"
|
||||
(with-out-str
|
||||
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
||||
|
||||
(defn- execute-scheduled-task
|
||||
[{:keys [id cron ::xtor] :as task}]
|
||||
(try
|
||||
(db/with-atomic [conn db/pool]
|
||||
;; First we try to lock the task in the database, if locking is
|
||||
;; successful, then we execute the scheduled task; if locking is
|
||||
;; not possible (because other instance is already locked id) we
|
||||
;; just skip it and schedule to be executed in the next slot.
|
||||
(when (db/exec-one! conn [sql:lock-scheduled-task id])
|
||||
(log/info "Executing scheduled task" id)
|
||||
((:fn task) task)))
|
||||
|
||||
(catch Exception e
|
||||
(log-scheduled-task-error task e))
|
||||
(finally
|
||||
(schedule-task! xtor task))))
|
||||
|
||||
(defn ms-until-valid
|
||||
[cron]
|
||||
(s/assert dt/cron? cron)
|
||||
(let [^Instant now (dt/now)
|
||||
^Instant next (dt/next-valid-instant-from cron now)]
|
||||
(inst-ms (dt/duration-between now next))))
|
||||
|
||||
(defn- schedule-task!
|
||||
[xtor {:keys [cron] :as task}]
|
||||
(let [ms (ms-until-valid cron)
|
||||
task (assoc task ::xtor xtor)]
|
||||
(px/schedule! xtor ms (partial execute-scheduled-task task))))
|
||||
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::id string?)
|
||||
(s/def ::cron dt/cron?)
|
||||
;; (s/def ::xtor #(instance? ScheduledExecutorService %))
|
||||
(s/def ::props (s/nilable map?))
|
||||
(s/def ::scheduled-task
|
||||
(s/keys :req-un [::id ::cron ::fn]
|
||||
:opt-un [::props]))
|
||||
|
||||
(s/def ::schedule (s/coll-of ::scheduled-task))
|
||||
(s/def ::start-scheduler-worker-params
|
||||
(s/keys :req-un [::schedule]))
|
||||
|
||||
(defn start-scheduler-worker!
|
||||
[{:keys [schedule] :as opts}]
|
||||
(us/assert ::start-scheduler-worker-params opts)
|
||||
(let [xtor (Executors/newScheduledThreadPool (int 1))]
|
||||
(synchronize-schedule! schedule)
|
||||
(run! (partial schedule-task! xtor) schedule)
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(.shutdownNow ^ScheduledExecutorService xtor)))))
|
||||
|
||||
(defn stop!
|
||||
[worker]
|
||||
(.close ^java.lang.AutoCloseable worker))
|
||||
|
||||
;; --- Submit API
|
70
backend/src/app/tasks/maintenance.clj
Normal file
70
backend/src/app/tasks/maintenance.clj
Normal file
|
@ -0,0 +1,70 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns app.tasks.maintenance
|
||||
(:require
|
||||
[app.common.spec :as us]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.config :as cfg]
|
||||
[app.db :as db]
|
||||
[app.metrics :as mtx]
|
||||
[app.util.time :as dt]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Delete Executed Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(s/def ::max-age ::dt/duration)
|
||||
(s/def ::delete-completed-tasks
|
||||
(s/keys :req-un [::max-age]))
|
||||
|
||||
(def sql:delete-completed-tasks
|
||||
"delete from task_completed
|
||||
where scheduled_at < now() - ?::interval")
|
||||
|
||||
(defn delete-executed-tasks
|
||||
[{:keys [props] :as task}]
|
||||
(us/verify ::delete-completed-tasks props)
|
||||
(db/with-atomic [conn db/pool]
|
||||
(let [max-age (:max-age props)
|
||||
result (db/exec-one! conn [sql:delete-completed-tasks (db/interval max-age)])]
|
||||
(log/infof "Removed %s rows from tasks_completed table." (:next.jdbc/update-count result))
|
||||
nil)))
|
||||
|
||||
(mtx/instrument-with-summary!
|
||||
{:var #'delete-executed-tasks
|
||||
:id "tasks__maintenance__delete_executed_tasks"
|
||||
:help "Timing of mainentance task function: delete-remove-tasks."})
|
||||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Delete old files xlog
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(s/def ::delete-old-file-xlog
|
||||
(s/keys :req-un [::max-age]))
|
||||
|
||||
(def sql:delete-files-xlog
|
||||
"delete from task_completed
|
||||
where scheduled_at < now() - ?::interval")
|
||||
|
||||
(defn delete-old-files-xlog
|
||||
[{:keys [props] :as task}]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(let [max-age (:max-age props)
|
||||
result (db/exec-one! conn [sql:delete-files-xlog (db/interval max-age)])]
|
||||
(log/infof "Removed %s rows from file_changes table." (:next.jdbc/update-count result))
|
||||
nil)))
|
||||
|
||||
(mtx/instrument-with-summary!
|
||||
{:var #'delete-old-files-xlog
|
||||
:id "tasks__maintenance__delete_old_files_xlog"
|
||||
:help "Timing of mainentance task function: delete-old-files-xlog."})
|
|
@ -10,13 +10,17 @@
|
|||
(ns app.tasks.remove-media
|
||||
"Demo accounts garbage collector."
|
||||
(:require
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[app.common.exceptions :as ex]
|
||||
[app.common.spec :as us]
|
||||
[app.db :as db]
|
||||
[app.media-storage :as mst]
|
||||
[app.metrics :as mtx]
|
||||
[app.util.storage :as ust]))
|
||||
[app.util.storage :as ust]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Remove Media
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(s/def ::path ::us/not-empty-string)
|
||||
(s/def ::props
|
||||
|
@ -33,3 +37,55 @@
|
|||
{:var #'handler
|
||||
:id "tasks__remove_media"
|
||||
:help "Timing of remove-media task."})
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Trim Media Storage
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; The main purpose of this task is analize the `pending_to_delete`
|
||||
;; table. This table stores the references to the physical files on
|
||||
;; the file system thanks to `handle_delete()` trigger.
|
||||
|
||||
;; Example:
|
||||
;; (1) You delete an media-object. (2) This media object is marked as
|
||||
;; deleted. (3) A task (`delete-object`) is scheduled for permanent
|
||||
;; delete the object. - If that object stores media, the database
|
||||
;; will execute the `handle_delete()` trigger which will place
|
||||
;; filesystem paths into the `pendint_to_delete` table. (4) This
|
||||
;; task (`remove-deleted-media`) permanently delete the file from the
|
||||
;; filesystem when is executed (by scheduler).
|
||||
|
||||
(def ^:private
|
||||
sql:retrieve-peding-to-delete
|
||||
"with items_part as (
|
||||
select i.id
|
||||
from pending_to_delete as i
|
||||
order by i.created_at
|
||||
limit ?
|
||||
for update skip locked
|
||||
)
|
||||
delete from pending_to_delete
|
||||
where id in (select id from items_part)
|
||||
returning *")
|
||||
|
||||
(defn trim-media-storage
|
||||
[{:keys [props] :as task}]
|
||||
(letfn [(decode-row [{:keys [data] :as row}]
|
||||
(cond-> row
|
||||
(db/pgobject? data) (assoc :data (db/decode-pgobject data))))
|
||||
(retrieve-items [conn]
|
||||
(->> (db/exec! conn [sql:retrieve-peding-to-delete 10])
|
||||
(map decode-row)
|
||||
(map :data)))
|
||||
(remove-media [rows]
|
||||
(run! (fn [item]
|
||||
(let [path (get item "path")]
|
||||
(ust/delete! mst/media-storage path)))
|
||||
rows))]
|
||||
(loop []
|
||||
(let [rows (retrieve-items db/pool)]
|
||||
(when-not (empty? rows)
|
||||
(remove-media rows)
|
||||
(recur))))))
|
||||
|
||||
|
||||
|
|
|
@ -53,15 +53,15 @@
|
|||
(defn duration
|
||||
[ms-or-obj]
|
||||
(cond
|
||||
(string? ms-or-obj)
|
||||
(Duration/parse (str "PT" ms-or-obj))
|
||||
|
||||
(duration? ms-or-obj)
|
||||
ms-or-obj
|
||||
|
||||
(integer? ms-or-obj)
|
||||
(Duration/ofMillis ms-or-obj)
|
||||
|
||||
(string? ms-or-obj)
|
||||
(Duration/parse ms-or-obj)
|
||||
|
||||
:else
|
||||
(obj->duration ms-or-obj)))
|
||||
|
||||
|
@ -71,7 +71,7 @@
|
|||
|
||||
(defn parse-duration
|
||||
[s]
|
||||
(Duration/parse (str "PT" s)))
|
||||
(Duration/parse s))
|
||||
|
||||
(extend-protocol clojure.core/Inst
|
||||
java.time.Duration
|
||||
|
@ -79,7 +79,7 @@
|
|||
|
||||
(defmethod print-method Duration
|
||||
[mv ^java.io.Writer writer]
|
||||
(.write writer (str "#app/duration \"" (.toString ^Duration mv) "\"")))
|
||||
(.write writer (str "#app/duration \"" (subs (str mv) 2) "\"")))
|
||||
|
||||
(defmethod print-dup Duration [o w]
|
||||
(print-method o w))
|
||||
|
|
|
@ -1,74 +0,0 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; Copyright (c) 2016 Andrey Antukh <niwi@niwi.nz>
|
||||
|
||||
(ns app.util.workers
|
||||
"A distributed asynchronous tasks queue implementation on top
|
||||
of PostgreSQL reliable advirsory locking mechanism."
|
||||
#_(:require
|
||||
[suricatta.core :as sc]
|
||||
[app.db :as db]))
|
||||
|
||||
;; (defn- poll-for-task
|
||||
;; [conn queue]
|
||||
;; (let [sql (sql/acquire-task {:queue queue})]
|
||||
;; (sc/fetch-one conn sql)))
|
||||
|
||||
;; (defn- mark-task-done
|
||||
;; [conn {:keys [id]}]
|
||||
;; (let [sql (sql/mark-task-done {:id id})]
|
||||
;; (sc/execute conn sql)))
|
||||
|
||||
;; (defn- mark-task-failed
|
||||
;; [conn {:keys [id]} error]
|
||||
;; (let [sql (sql/mark-task-done {:id id :error (.getMessage error)})]
|
||||
;; (sc/execute conn sql)))
|
||||
|
||||
;; (defn- watch-unit
|
||||
;; [conn queue callback]
|
||||
;; (let [task (poll-for-task conn queue)]
|
||||
;; (if (nil? task)
|
||||
;; (Thread/sleep 1000)
|
||||
;; (try
|
||||
;; (sc/atomic conn
|
||||
;; (callback conn task)
|
||||
;; (mark-task-done conn task))
|
||||
;; (catch Exception e
|
||||
;; (mark-task-failed conn task e))))))
|
||||
|
||||
;; (defn- watch-loop
|
||||
;; "Watch tasks on the specified queue and executes a
|
||||
;; callback for each task is received.
|
||||
;; NOTE: This function blocks the current thread."
|
||||
;; [queue callback]
|
||||
;; (try
|
||||
;; (loop []
|
||||
;; (with-open [conn (db/connection)]
|
||||
;; (sc/atomic conn (watch-unit conn queue callback)))
|
||||
;; (recur))
|
||||
;; (catch InterruptedException e
|
||||
;; ;; just ignoring
|
||||
;; )))
|
||||
|
||||
;; (defn watch!
|
||||
;; [queue callback]
|
||||
;; (let [runnable #(watch-loop queue callback)
|
||||
;; thread (Thread. ^Runnable runnable)]
|
||||
;; (.setDaemon thread true)
|
||||
;; (.start thread)
|
||||
;; (reify
|
||||
;; java.lang.AutoCloseable
|
||||
;; (close [_]
|
||||
;; (.interrupt thread)
|
||||
;; (.join thread 2000))
|
||||
|
||||
;; clojure.lang.IDeref
|
||||
;; (deref [_]
|
||||
;; (.join thread))
|
||||
|
||||
;; clojure.lang.IBlockingDeref
|
||||
;; (deref [_ ms default]
|
||||
;; (.join thread ms)
|
||||
;; default))))
|
|
@ -15,8 +15,8 @@
|
|||
[app.db :as db]
|
||||
[app.tasks.delete-object]
|
||||
[app.tasks.delete-profile]
|
||||
[app.tasks.gc]
|
||||
[app.tasks.remove-media]
|
||||
[app.tasks.maintenance]
|
||||
[app.tasks.sendmail]
|
||||
[app.tasks.trim-file]
|
||||
[app.util.async :as aa]
|
||||
|
@ -53,13 +53,23 @@
|
|||
|
||||
(def ^:private schedule
|
||||
[{:id "remove-deleted-media"
|
||||
:cron (dt/cron "0 0 0 */1 * ? *") ;; daily
|
||||
:fn #'app.tasks.gc/remove-deleted-media}
|
||||
{:id "trim-file"
|
||||
:cron (dt/cron "0 0 0 */1 * ? *") ;; daily
|
||||
:fn #'app.tasks.trim-file/handler}
|
||||
])
|
||||
:cron #app/cron "0 0 0 */1 * ? *" ;; daily
|
||||
:fn #'app.tasks.remove-media/trim-media-storage}
|
||||
|
||||
{:id "trim-file"
|
||||
:cron #app/cron "0 0 0 */1 * ? *" ;; daily
|
||||
:fn #'app.tasks.trim-file/handler}
|
||||
|
||||
{:id "maintenance/delete-executed-tasks"
|
||||
:cron #app/cron "0 0 */1 * * ?" ;; hourly
|
||||
:fn #'app.tasks.maintenance/delete-executed-tasks
|
||||
:props {:max-age #app/duration "48h"}}
|
||||
|
||||
{:id "maintenance/delete-old-files-xlog"
|
||||
:cron #app/cron "0 0 */1 * * ?" ;; hourly
|
||||
:fn #'app.tasks.maintenance/delete-old-files-xlog
|
||||
:props {:max-age #app/duration "8h"}}
|
||||
])
|
||||
|
||||
(defstate executor
|
||||
:start (thread-pool {:idle-timeout 10000
|
||||
|
@ -82,7 +92,7 @@
|
|||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Worker Impl
|
||||
;; Tasks Worker Impl
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue