mirror of
https://github.com/penpot/penpot.git
synced 2025-06-05 06:01:38 +02:00
🎉 Add media-object lifecycle handling.
This commit is contained in:
parent
93aaa52e73
commit
c0cd0d4a23
39 changed files with 975 additions and 323 deletions
|
@ -1,3 +1,3 @@
|
|||
{uxbox/instant uxbox.util.time/from-string
|
||||
uxbox/cron uxbox.util.time/cron
|
||||
uxbox/duration uxbox.util.time/parse-duration}
|
||||
uxbox/duration uxbox.util.time/duration}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
[mount.core :refer [defstate]]
|
||||
[uxbox.common.exceptions :as ex]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.util.time :as tm]))
|
||||
[uxbox.util.time :as dt]))
|
||||
|
||||
(def defaults
|
||||
{:http-server-port 6060
|
||||
|
@ -45,6 +45,12 @@
|
|||
:registration-domain-whitelist ""
|
||||
:debug-humanize-transit true
|
||||
|
||||
;; This is the time should transcurr after the last page
|
||||
;; modification in order to make the file ellegible for
|
||||
;; trimming. The value only supports s(econds) m(inutes) and
|
||||
;; h(ours) as time unit.
|
||||
:file-trimming-max-age "72h"
|
||||
|
||||
;; LDAP auth disabled by default. Set ldap-auth-host to enable
|
||||
;:ldap-auth-host "ldap.mysupercompany.com"
|
||||
;:ldap-auth-port 389
|
||||
|
@ -53,6 +59,7 @@
|
|||
;:ldap-auth-ssl false
|
||||
;:ldap-auth-starttls false
|
||||
;:ldap-auth-base-dn "ou=People,dc=ldap,dc=mysupercompany,dc=com"
|
||||
|
||||
:ldap-auth-user-query "(|(uid=$username)(mail=$username))"
|
||||
:ldap-auth-username-attribute "uid"
|
||||
:ldap-auth-email-attribute "mail"
|
||||
|
@ -103,6 +110,7 @@
|
|||
(s/def ::ldap-auth-email-attribute ::us/string)
|
||||
(s/def ::ldap-auth-fullname-attribute ::us/string)
|
||||
(s/def ::ldap-auth-avatar-attribute ::us/string)
|
||||
(s/def ::file-trimming-threshold ::dt/duration)
|
||||
|
||||
(s/def ::config
|
||||
(s/keys :opt-un [::http-server-cors
|
||||
|
@ -128,6 +136,7 @@
|
|||
::smtp-password
|
||||
::smtp-tls
|
||||
::smtp-ssl
|
||||
::file-trimming-max-age
|
||||
::debug-humanize-transit
|
||||
::allow-demo-users
|
||||
::registration-enabled
|
||||
|
@ -148,12 +157,13 @@
|
|||
|
||||
(defn env->config
|
||||
[env]
|
||||
(reduce-kv (fn [acc k v]
|
||||
(cond-> acc
|
||||
(str/starts-with? (name k) "uxbox-")
|
||||
(assoc (keyword (subs (name k) 6)) v)))
|
||||
{}
|
||||
env))
|
||||
(reduce-kv
|
||||
(fn [acc k v]
|
||||
(cond-> acc
|
||||
(str/starts-with? (name k) "uxbox-")
|
||||
(assoc (keyword (subs (name k) 6)) v)))
|
||||
{}
|
||||
env))
|
||||
|
||||
(defn read-config
|
||||
[env]
|
||||
|
@ -174,4 +184,4 @@
|
|||
:start (read-config env))
|
||||
|
||||
(def default-deletion-delay
|
||||
(tm/duration {:hours 48}))
|
||||
(dt/duration {:hours 48}))
|
||||
|
|
|
@ -14,12 +14,12 @@
|
|||
[reitit.ring :as rring]
|
||||
[ring.adapter.jetty9 :as jetty]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.http.debug :as debug]
|
||||
[uxbox.http.errors :as errors]
|
||||
[uxbox.http.handlers :as handlers]
|
||||
[uxbox.http.auth :as auth]
|
||||
[uxbox.http.auth.google :as google]
|
||||
[uxbox.http.auth.ldap :as ldap]
|
||||
[uxbox.http.debug :as debug]
|
||||
[uxbox.http.errors :as errors]
|
||||
[uxbox.http.handlers :as handlers]
|
||||
[uxbox.http.middleware :as middleware]
|
||||
[uxbox.http.session :as session]
|
||||
[uxbox.http.ws :as ws]
|
||||
|
@ -52,28 +52,26 @@
|
|||
["/login-ldap" {:handler ldap/auth
|
||||
:method :post}]
|
||||
|
||||
["/w" {:middleware [session/auth]}
|
||||
["/w" {:middleware [session/middleware]}
|
||||
["/query/:type" {:get handlers/query-handler}]
|
||||
["/mutation/:type" {:post handlers/mutation-handler}]]]]))
|
||||
|
||||
(defstate app
|
||||
:start (rring/ring-handler
|
||||
(create-router)
|
||||
(constantly {:status 404, :body ""})
|
||||
{:middleware [[middleware/development-resources]
|
||||
[middleware/development-cors]
|
||||
[middleware/metrics]]}))
|
||||
|
||||
(defn start-server
|
||||
[cfg app]
|
||||
[]
|
||||
(let [wsockets {"/ws/notifications" ws/handler}
|
||||
options {:port (:http-server-port cfg)
|
||||
options {:port (:http-server-port cfg/config)
|
||||
:h2c? true
|
||||
:join? false
|
||||
:allow-null-path-info true
|
||||
:websockets wsockets}]
|
||||
(jetty/run-jetty app options)))
|
||||
:websockets wsockets}
|
||||
handler (rring/ring-handler
|
||||
(create-router)
|
||||
(constantly {:status 404, :body ""})
|
||||
{:middleware [[middleware/development-resources]
|
||||
[middleware/development-cors]
|
||||
[middleware/metrics]]})]
|
||||
(jetty/run-jetty handler options)))
|
||||
|
||||
(defstate server
|
||||
:start (start-server cfg/config app)
|
||||
:start (start-server)
|
||||
:stop (.stop server))
|
||||
|
|
|
@ -10,17 +10,23 @@
|
|||
(ns uxbox.http.session
|
||||
(:require
|
||||
[uxbox.db :as db]
|
||||
[uxbox.services.tokens :as tokens]
|
||||
[uxbox.common.uuid :as uuid]))
|
||||
[uxbox.services.tokens :as tokens]))
|
||||
|
||||
(defn extract-auth-token
|
||||
[request]
|
||||
(get-in request [:cookies "auth-token" :value]))
|
||||
|
||||
(defn retrieve
|
||||
"Retrieves a user id associated with the provided auth token."
|
||||
[token]
|
||||
[conn token]
|
||||
(when token
|
||||
(-> (db/query db/pool :http-session {:id token})
|
||||
(first)
|
||||
(-> (db/exec-one! conn ["select profile_id from http_session where id = ?" token])
|
||||
(:profile-id))))
|
||||
|
||||
(defn retrieve-from-request
|
||||
[conn request]
|
||||
(->> (extract-auth-token request)
|
||||
(retrieve conn)))
|
||||
|
||||
(defn create
|
||||
[profile-id user-agent]
|
||||
(let [id (tokens/next-token)]
|
||||
|
@ -39,21 +45,13 @@
|
|||
([id opts]
|
||||
{"auth-token" (merge opts {:value id :path "/" :http-only true})}))
|
||||
|
||||
(defn extract-auth-token
|
||||
[req]
|
||||
(get-in req [:cookies "auth-token" :value]))
|
||||
|
||||
(defn wrap-auth
|
||||
(defn wrap-session
|
||||
[handler]
|
||||
(fn [request]
|
||||
(let [token (get-in request [:cookies "auth-token" :value])
|
||||
profile-id (retrieve token)]
|
||||
(if profile-id
|
||||
(handler (assoc request :profile-id profile-id))
|
||||
(handler request)))))
|
||||
(if-let [profile-id (retrieve-from-request db/pool request)]
|
||||
(handler (assoc request :profile-id profile-id))
|
||||
(handler request))))
|
||||
|
||||
;; TODO: maybe rename to wrap-session?
|
||||
|
||||
(def auth
|
||||
{:nane ::auth
|
||||
:compile (constantly wrap-auth)})
|
||||
(def middleware
|
||||
{:nane ::middleware
|
||||
:compile (constantly wrap-session)})
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
[ring.middleware.params :refer [wrap-params]]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.http.session :refer [wrap-auth]]
|
||||
[uxbox.http.session :refer [wrap-session]]
|
||||
[uxbox.services.notifications :as nf]))
|
||||
|
||||
(s/def ::file-id ::us/uuid)
|
||||
|
@ -47,7 +47,7 @@
|
|||
|
||||
(def handler
|
||||
(-> websocket
|
||||
(wrap-auth)
|
||||
(wrap-session)
|
||||
(wrap-keyword-params)
|
||||
(wrap-cookies)
|
||||
(wrap-params)))
|
||||
|
|
|
@ -83,7 +83,24 @@
|
|||
|
||||
{:desc "Link files to libraries"
|
||||
:name "0017-link-files-to-libraries"
|
||||
:fn (mg/resource "migrations/0017-link-files-to-libraries.sql")}]})
|
||||
:fn (mg/resource "migrations/0017-link-files-to-libraries.sql")}
|
||||
|
||||
{:desc "Add file triming triggers"
|
||||
:name "0018-add-file-trimming-triggers"
|
||||
:fn (mg/resource "migrations/0018-add-file-trimming-triggers.sql")}
|
||||
|
||||
{:desc "Improve scheduled task tables"
|
||||
:name "0019-add-improved-scheduled-tasks"
|
||||
:fn (mg/resource "migrations/0019-add-improved-scheduled-tasks.sql")}
|
||||
|
||||
{:desc "Minor fixes to media object"
|
||||
:name "0020-minor-fixes-to-media-object"
|
||||
:fn (mg/resource "migrations/0020-minor-fixes-to-media-object.sql")}
|
||||
|
||||
{:desc "Improve http session tables"
|
||||
:name "0021-http-session-improvements"
|
||||
:fn (mg/resource "migrations/0021-http-session-improvements.sql")}
|
||||
]})
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Entry point
|
||||
|
|
|
@ -93,37 +93,35 @@
|
|||
:mtype (:content-type content)}})
|
||||
path (persist-media-object-on-fs content)
|
||||
opts (assoc thumbnail-options
|
||||
:input {:mtype (:mtype info)
|
||||
:path path})
|
||||
:input {:mtype (:mtype info)
|
||||
:path path})
|
||||
thumb (if-not (= (:mtype info) "image/svg+xml")
|
||||
(persist-media-thumbnail-on-fs opts)
|
||||
(assoc info
|
||||
:path path
|
||||
:quality 0))
|
||||
|
||||
media-object-id (or id (uuid/next))
|
||||
id (or id (uuid/next))
|
||||
|
||||
media-object (-> (db/insert! conn :media-object
|
||||
{:id media-object-id
|
||||
:file-id file-id
|
||||
:is-local is-local
|
||||
:name name
|
||||
:path (str path)
|
||||
:width (:width info)
|
||||
:height (:height info)
|
||||
:mtype (:mtype info)})
|
||||
(media/resolve-urls :path :uri))
|
||||
media-object (db/insert! conn :media-object
|
||||
{:id id
|
||||
:file-id file-id
|
||||
:is-local is-local
|
||||
:name name
|
||||
:path (str path)
|
||||
:width (:width info)
|
||||
:height (:height info)
|
||||
:mtype (:mtype info)})
|
||||
|
||||
media-thumbnail (-> (db/insert! conn :media-thumbnail
|
||||
{:id (uuid/next)
|
||||
:media-object-id media-object-id
|
||||
:path (str (:path thumb))
|
||||
:width (:width thumb)
|
||||
:height (:height thumb)
|
||||
:quality (:quality thumb)
|
||||
:mtype (:mtype thumb)})
|
||||
(media/resolve-urls :path :uri))]
|
||||
(assoc media-object :thumb-uri (:uri media-thumbnail))))
|
||||
media-thumbnail (db/insert! conn :media-thumbnail
|
||||
{:id (uuid/next)
|
||||
:media-object-id id
|
||||
:path (str (:path thumb))
|
||||
:width (:width thumb)
|
||||
:height (:height thumb)
|
||||
:quality (:quality thumb)
|
||||
:mtype (:mtype thumb)})]
|
||||
(assoc media-object :thumb-path (:path media-thumbnail))))
|
||||
|
||||
(def ^:private sql:select-file-for-update
|
||||
"select file.*,
|
||||
|
|
|
@ -19,9 +19,9 @@
|
|||
[uxbox.db :as db]))
|
||||
|
||||
(defn next-token
|
||||
([] (next-token 64))
|
||||
([] (next-token 96))
|
||||
([n]
|
||||
(-> (sodi.prng/random-bytes n)
|
||||
(-> (sodi.prng/random-nonce n)
|
||||
(sodi.util/bytes->b64s))))
|
||||
|
||||
(def default-duration
|
||||
|
|
|
@ -5,54 +5,47 @@
|
|||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.tasks
|
||||
"Async tasks abstraction (impl)."
|
||||
(:require
|
||||
[cuerdas.core :as str]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[mount.core :as mount :refer [defstate]]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.common.uuid :as uuid]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.metrics :as mtx]
|
||||
[uxbox.tasks.sendmail]
|
||||
[uxbox.tasks.gc]
|
||||
[uxbox.tasks.remove-media]
|
||||
[uxbox.tasks.delete-profile]
|
||||
[uxbox.tasks.delete-object]
|
||||
[uxbox.tasks.impl :as impl]
|
||||
[uxbox.util.time :as dt]))
|
||||
[uxbox.util.time :as dt]
|
||||
[uxbox.metrics :as mtx]))
|
||||
|
||||
;; --- State initialization
|
||||
(s/def ::name ::us/string)
|
||||
(s/def ::delay
|
||||
(s/or :int ::us/integer
|
||||
:duration dt/duration?))
|
||||
(s/def ::queue ::us/string)
|
||||
|
||||
(def ^:private tasks
|
||||
{"delete-profile" #'uxbox.tasks.delete-profile/handler
|
||||
"delete-object" #'uxbox.tasks.delete-object/handler
|
||||
"remove-media" #'uxbox.tasks.remove-media/handler
|
||||
"sendmail" #'uxbox.tasks.sendmail/handler})
|
||||
(s/def ::task-options
|
||||
(s/keys :req-un [::name]
|
||||
:opt-un [::delay ::props ::queue]))
|
||||
|
||||
(def ^:private schedule
|
||||
[{:id "remove-deleted-media"
|
||||
:cron (dt/cron "1 1 */1 * * ? *")
|
||||
:fn #'uxbox.tasks.gc/remove-media}])
|
||||
|
||||
(defstate worker
|
||||
:start (impl/start-worker! {:tasks tasks :name "worker1"})
|
||||
:stop (impl/stop! worker))
|
||||
|
||||
(defstate scheduler-worker
|
||||
:start (impl/start-scheduler-worker! {:schedule schedule})
|
||||
:stop (impl/stop! scheduler-worker))
|
||||
|
||||
;; --- Public API
|
||||
(def ^:private sql:insert-new-task
|
||||
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
|
||||
values (?, ?, ?, ?, ?, ?, clock_timestamp() + ?)
|
||||
returning id")
|
||||
|
||||
(defn submit!
|
||||
([opts] (submit! db/pool opts))
|
||||
([conn opts]
|
||||
(s/assert ::impl/task-options opts)
|
||||
(impl/submit! conn opts)))
|
||||
([conn {:keys [name delay props queue priority max-retries]
|
||||
:or {delay 0 props {} queue "default" priority 100 max-retries 3}
|
||||
:as options}]
|
||||
(us/verify ::task-options options)
|
||||
(let [duration (dt/duration delay)
|
||||
interval (db/interval duration)
|
||||
props (db/tjson props)
|
||||
id (uuid/next)]
|
||||
(log/info (str/format "Submit task '%s' to be executed in '%s'." name (str duration)))
|
||||
(db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval])
|
||||
id)))
|
||||
|
||||
(mtx/instrument-with-counter!
|
||||
{:var #'submit!
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.tasks.delete-object
|
||||
"Generic task for permanent deletion of objects."
|
||||
|
@ -41,44 +41,27 @@
|
|||
:id "tasks__delete_object"
|
||||
:help "Timing of remove-object task."})
|
||||
|
||||
(defmethod handle-deletion :image
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from image where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :image-collection
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from image_collection
|
||||
where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :icon
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from icon where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :icon-collection
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from icon_collection
|
||||
where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :file
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from file where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :file-image
|
||||
(defmethod handle-deletion :project
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from file_image where id=? and deleted_at is not null"]
|
||||
(let [sql "delete from project where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :media-object
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from media_object where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :color
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from color where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :page
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from page where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
||||
(defmethod handle-deletion :page-version
|
||||
[conn {:keys [id] :as props}]
|
||||
(let [sql "delete from page_version where id=? and deleted_at is not null"]
|
||||
(db/exec-one! conn [sql id])))
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.tasks.delete-profile
|
||||
"Task for permanent deletion of profiles."
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2016-2020 Andrey Antukh <niwi@niwi.nz>
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.tasks.gc
|
||||
(:require
|
||||
|
@ -18,11 +18,30 @@
|
|||
[uxbox.common.spec :as us]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.tasks :as tasks]
|
||||
[uxbox.media-storage :as mst]
|
||||
[uxbox.util.blob :as blob]
|
||||
[uxbox.util.storage :as ust]))
|
||||
|
||||
(def ^:private sql:delete-items
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Remove deleted media
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; The main purpose of this task is analize the `pending_to_delete`
|
||||
;; table. This table stores the references to the physical files on
|
||||
;; the file system thanks to `handle_delete()` trigger.
|
||||
|
||||
;; Example:
|
||||
;; (1) You delete an media-object. (2) This media object is marked as
|
||||
;; deleted. (3) A task (`delete-object`) is scheduled for permanent
|
||||
;; delete the object. - If that object stores media, the database
|
||||
;; will execute the `handle_delete()` trigger which will place
|
||||
;; filesystem paths into the `pendint_to_delete` table. (4) This
|
||||
;; task (`remove-deleted-media`) permanently delete the file from the
|
||||
;; filesystem when is executed (by scheduler).
|
||||
|
||||
(def ^:private
|
||||
sql:retrieve-peding-to-delete
|
||||
"with items_part as (
|
||||
select i.id
|
||||
from pending_to_delete as i
|
||||
|
@ -34,31 +53,24 @@
|
|||
where id in (select id from items_part)
|
||||
returning *")
|
||||
|
||||
(defn- impl-remove-media
|
||||
[result]
|
||||
(run! (fn [item]
|
||||
(let [path1 (get item "path")
|
||||
path2 (get item "thumb_path")]
|
||||
(ust/delete! mst/media-storage path1)
|
||||
(ust/delete! mst/media-storage path2)))
|
||||
result))
|
||||
|
||||
(defn- decode-row
|
||||
[{:keys [data] :as row}]
|
||||
(cond-> row
|
||||
(db/pgobject? data) (assoc :data (db/decode-pgobject data))))
|
||||
|
||||
(defn- get-items
|
||||
[conn]
|
||||
(->> (db/exec! conn [sql:delete-items 10])
|
||||
(map decode-row)
|
||||
(map :data)))
|
||||
|
||||
(defn remove-media
|
||||
(defn remove-deleted-media
|
||||
[{:keys [props] :as task}]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(loop [result (get-items conn)]
|
||||
(when-not (empty? result)
|
||||
(impl-remove-media result)
|
||||
(recur (get-items conn))))))
|
||||
(letfn [(decode-row [{:keys [data] :as row}]
|
||||
(cond-> row
|
||||
(db/pgobject? data) (assoc :data (db/decode-pgobject data))))
|
||||
(retrieve-items [conn]
|
||||
(->> (db/exec! conn [sql:retrieve-peding-to-delete 10])
|
||||
(map decode-row)
|
||||
(map :data)))
|
||||
(remove-media [rows]
|
||||
(run! (fn [item]
|
||||
(let [path (get item "path")]
|
||||
(ust/delete! mst/media-storage path)))
|
||||
rows))]
|
||||
(loop []
|
||||
(let [rows (retrieve-items db/pool)]
|
||||
(when-not (empty? rows)
|
||||
(remove-media rows)
|
||||
(recur))))))
|
||||
|
||||
|
||||
|
|
|
@ -302,31 +302,3 @@
|
|||
(.close ^java.lang.AutoCloseable worker))
|
||||
|
||||
;; --- Submit API
|
||||
|
||||
(s/def ::name ::us/string)
|
||||
(s/def ::delay
|
||||
(s/or :int ::us/integer
|
||||
:duration dt/duration?))
|
||||
(s/def ::queue ::us/string)
|
||||
|
||||
(s/def ::task-options
|
||||
(s/keys :req-un [::name]
|
||||
:opt-un [::delay ::props ::queue]))
|
||||
|
||||
(def ^:private sql:insert-new-task
|
||||
"insert into task (id, name, props, queue, priority, max_retries, scheduled_at)
|
||||
values (?, ?, ?, ?, ?, ?, clock_timestamp() + ?)
|
||||
returning id")
|
||||
|
||||
(defn submit!
|
||||
[conn {:keys [name delay props queue priority max-retries key]
|
||||
:or {delay 0 props {} queue "default" priority 100 max-retries 3}
|
||||
:as options}]
|
||||
(us/verify ::task-options options)
|
||||
(let [duration (dt/duration delay)
|
||||
interval (db/interval duration)
|
||||
props (db/tjson props)
|
||||
id (uuid/next)]
|
||||
(log/info (str/format "Submit task '%s' to be executed in '%s'." name (str duration)))
|
||||
(db/exec-one! conn [sql:insert-new-task id name props queue priority max-retries interval])
|
||||
id))
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.tasks.remove-media
|
||||
"Demo accounts garbage collector."
|
||||
|
|
95
backend/src/uxbox/tasks/trim_file.clj
Normal file
95
backend/src/uxbox/tasks/trim_file.clj
Normal file
|
@ -0,0 +1,95 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.tasks.trim-file
|
||||
(:require
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[uxbox.common.exceptions :as ex]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.tasks :as tasks]
|
||||
[uxbox.util.blob :as blob]
|
||||
[uxbox.util.time :as dt]))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Task: Trim File
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; This is the task responsible of removing unnecesary media-objects
|
||||
;; associated with file but not used by any page.
|
||||
|
||||
(defn decode-row
|
||||
[{:keys [data metadata changes] :as row}]
|
||||
(cond-> row
|
||||
(bytes? data) (assoc :data (blob/decode data))))
|
||||
|
||||
(def sql:retrieve-files-to-trim
|
||||
"select id from file as f
|
||||
where f.has_media_trimmed is false
|
||||
and f.modified_at < now() - ?::interval
|
||||
order by f.modified_at asc
|
||||
limit 10")
|
||||
|
||||
(defn retrieve-candidates
|
||||
[conn]
|
||||
(let [interval (:file-trimming-max-age cfg/config)]
|
||||
(->> (db/exec! conn [sql:retrieve-files-to-trim interval])
|
||||
(map :id))))
|
||||
|
||||
(defn collect-used-media
|
||||
[pages]
|
||||
(let [xf (comp (filter #(= :image (:type %)))
|
||||
(map :metadata)
|
||||
(map :id))]
|
||||
(reduce conj #{} (->> pages
|
||||
(map :data)
|
||||
(map :objects)
|
||||
(mapcat vals)
|
||||
(filter #(= :image (:type %)))
|
||||
(map :metadata)
|
||||
(map :id)))))
|
||||
|
||||
(defn process-file
|
||||
[file-id]
|
||||
(log/debugf "Processing file: '%s'." file-id)
|
||||
(db/with-atomic [conn db/pool]
|
||||
(let [mobjs (db/query conn :media-object {:file-id file-id})
|
||||
pages (->> (db/query conn :page {:file-id file-id})
|
||||
(map decode-row))
|
||||
used (collect-used-media pages)
|
||||
unused (into #{} (comp (map :id)
|
||||
(remove #(contains? used %))) mobjs)]
|
||||
(log/debugf "Collected media ids: '%s'." (pr-str used))
|
||||
(log/debugf "Unused media ids: '%s'." (pr-str unused))
|
||||
|
||||
(db/update! conn :file
|
||||
{:has-media-trimmed true}
|
||||
{:id file-id})
|
||||
|
||||
(doseq [id unused]
|
||||
(tasks/submit! conn {:name "delete-object"
|
||||
;; :delay cfg/default-deletion-delay
|
||||
:delay 10000
|
||||
:props {:id id :type :media-object}})
|
||||
|
||||
(db/update! conn :media-object
|
||||
{:deleted-at (dt/now)}
|
||||
{:id id}))
|
||||
nil)))
|
||||
|
||||
(defn handler
|
||||
[{:keys [props] :as task}]
|
||||
(log/debug "Running 'trim-file' task.")
|
||||
(loop []
|
||||
(let [files (retrieve-candidates db/pool)]
|
||||
(when (seq files)
|
||||
(run! process-file files)
|
||||
(recur)))))
|
|
@ -5,19 +5,24 @@
|
|||
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
|
||||
|
||||
(ns uxbox.util.async
|
||||
(:require [clojure.core.async :as a]))
|
||||
(:require
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[clojure.core.async :as a])
|
||||
(:import
|
||||
java.util.concurrent.Executor))
|
||||
|
||||
(defmacro go-try
|
||||
[& body]
|
||||
`(a/go
|
||||
(try
|
||||
~@body
|
||||
(catch Throwable e# e#))))
|
||||
(catch Exception e# e#))))
|
||||
|
||||
(defmacro <?
|
||||
[ch]
|
||||
`(let [r# (a/<! ~ch)]
|
||||
(if (instance? Throwable r#)
|
||||
(if (instance? Exception r#)
|
||||
(throw r#)
|
||||
r#)))
|
||||
|
||||
|
@ -26,6 +31,26 @@
|
|||
`(a/thread
|
||||
(try
|
||||
~@body
|
||||
(catch Throwable e#
|
||||
(catch Exception e#
|
||||
e#))))
|
||||
|
||||
|
||||
(s/def ::executor #(instance? Executor %))
|
||||
|
||||
(defn thread-call
|
||||
[^Executor executor f]
|
||||
(let [c (a/chan 1)]
|
||||
(try
|
||||
(.execute executor
|
||||
(fn []
|
||||
(try
|
||||
(let [ret (try (f) (catch Exception e e))]
|
||||
(when-not (nil? ret)
|
||||
(a/>!! c ret)))
|
||||
(finally
|
||||
(a/close! c)))))
|
||||
c
|
||||
(catch java.util.concurrent.RejectedExecutionException e
|
||||
(a/offer! c e)
|
||||
(a/close! c)
|
||||
c))))
|
||||
|
|
|
@ -71,8 +71,7 @@
|
|||
|
||||
(defn parse-duration
|
||||
[s]
|
||||
(assert (string? s))
|
||||
(Duration/parse s))
|
||||
(Duration/parse (str "PT" s)))
|
||||
|
||||
(extend-protocol clojure.core/Inst
|
||||
java.time.Duration
|
||||
|
@ -85,6 +84,22 @@
|
|||
(defmethod print-dup Duration [o w]
|
||||
(print-method o w))
|
||||
|
||||
(letfn [(conformer [v]
|
||||
(cond
|
||||
(duration? v) v
|
||||
(string? v)
|
||||
(try
|
||||
(parse-duration v)
|
||||
(catch java.time.format.DateTimeParseException e
|
||||
::s/invalid))
|
||||
|
||||
:else
|
||||
::s/invalid))
|
||||
(unformer [v]
|
||||
(subs (str v) 2))]
|
||||
(s/def ::duration (s/conformer conformer unformer)))
|
||||
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Cron Expression
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
|
63
backend/src/uxbox/worker.clj
Normal file
63
backend/src/uxbox/worker.clj
Normal file
|
@ -0,0 +1,63 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.worker
|
||||
(:require
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[mount.core :as mount :refer [defstate]]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.metrics :as mtx]
|
||||
[uxbox.tasks.delete-object]
|
||||
[uxbox.tasks.delete-profile]
|
||||
[uxbox.tasks.gc]
|
||||
[uxbox.tasks.remove-media]
|
||||
[uxbox.tasks.sendmail]
|
||||
[uxbox.tasks.trim-file]
|
||||
[uxbox.util.time :as dt]
|
||||
[uxbox.worker-impl :as impl]))
|
||||
|
||||
;; --- State initialization
|
||||
|
||||
(def ^:private tasks
|
||||
{"delete-profile" #'uxbox.tasks.delete-profile/handler
|
||||
"delete-object" #'uxbox.tasks.delete-object/handler
|
||||
"remove-media" #'uxbox.tasks.remove-media/handler
|
||||
"sendmail" #'uxbox.tasks.sendmail/handler})
|
||||
|
||||
(def ^:private schedule
|
||||
[{:id "remove-deleted-media"
|
||||
:cron (dt/cron "0 0 0 */1 * ? *") ;; daily
|
||||
:fn #'uxbox.tasks.gc/remove-deleted-media}
|
||||
{:id "trim-file"
|
||||
:cron (dt/cron "0 0 0 */1 * ? *") ;; daily
|
||||
:fn #'uxbox.tasks.trim-file/handler}
|
||||
])
|
||||
|
||||
|
||||
(defstate executor
|
||||
:start (impl/thread-pool {:idle-timeout 10000
|
||||
:min-threads 0
|
||||
:max-threads 256})
|
||||
:stop (impl/stop! executor))
|
||||
|
||||
(defstate worker
|
||||
:start (impl/start-worker!
|
||||
{:tasks tasks
|
||||
:name "worker1"
|
||||
:batch-size 1
|
||||
:executor executor})
|
||||
:stop (impl/stop! worker))
|
||||
|
||||
(defstate scheduler-worker
|
||||
:start (impl/start-scheduler-worker! {:schedule schedule
|
||||
:executor executor})
|
||||
:stop (impl/stop! scheduler-worker))
|
357
backend/src/uxbox/worker_impl.clj
Normal file
357
backend/src/uxbox/worker_impl.clj
Normal file
|
@ -0,0 +1,357 @@
|
|||
;; This Source Code Form is subject to the terms of the Mozilla Public
|
||||
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
;;
|
||||
;; This Source Code Form is "Incompatible With Secondary Licenses", as
|
||||
;; defined by the Mozilla Public License, v. 2.0.
|
||||
;;
|
||||
;; Copyright (c) 2020 UXBOX Labs SL
|
||||
|
||||
(ns uxbox.worker-impl
|
||||
(:require
|
||||
[cuerdas.core :as str]
|
||||
[clojure.core.async :as a]
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[promesa.exec :as px]
|
||||
[uxbox.common.exceptions :as ex]
|
||||
[uxbox.common.spec :as us]
|
||||
[uxbox.common.uuid :as uuid]
|
||||
[uxbox.config :as cfg]
|
||||
[uxbox.db :as db]
|
||||
[uxbox.util.async :as aa]
|
||||
[uxbox.util.blob :as blob]
|
||||
[uxbox.util.time :as dt])
|
||||
(:import
|
||||
org.eclipse.jetty.util.thread.QueuedThreadPool
|
||||
java.util.concurrent.ExecutorService
|
||||
java.util.concurrent.Executors
|
||||
java.util.concurrent.Executor
|
||||
java.time.Duration
|
||||
java.time.Instant
|
||||
java.util.Date))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private
|
||||
sql:mark-as-retry
|
||||
"update task
|
||||
set scheduled_at = clock_timestamp() + '10 seconds'::interval,
|
||||
modified_at = clock_timestamp(),
|
||||
error = ?,
|
||||
status = 'retry',
|
||||
retry_num = retry_num + ?
|
||||
where id = ?")
|
||||
|
||||
(defn- mark-as-retry
|
||||
[conn {:keys [task error inc-by]
|
||||
:or {inc-by 1}}]
|
||||
(let [explain (ex-message error)
|
||||
sqlv [sql:mark-as-retry explain inc-by (:id task)]]
|
||||
(db/exec-one! conn sqlv)
|
||||
nil))
|
||||
|
||||
(defn- mark-as-failed
|
||||
[conn {:keys [task error]}]
|
||||
(let [explain (ex-message error)]
|
||||
(db/update! conn :task
|
||||
{:error explain
|
||||
:modified-at (dt/now)
|
||||
:status "failed"}
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
(defn- mark-as-completed
|
||||
[conn {:keys [task] :as opts}]
|
||||
(let [now (dt/now)]
|
||||
(db/update! conn :task
|
||||
{:completed-at now
|
||||
:modified-at now
|
||||
:status "completed"}
|
||||
{:id (:id task)})
|
||||
nil))
|
||||
|
||||
(defn- decode-task-row
|
||||
[{:keys [props] :as row}]
|
||||
(when row
|
||||
(cond-> row
|
||||
(db/pgobject? props) (assoc :props (db/decode-transit-pgobject props)))))
|
||||
|
||||
|
||||
(defn- log-task-error
|
||||
[item err]
|
||||
(log/error (str/format "Unhandled exception on task '%s' (retry: %s)\n" (:name item) (:retry-num item))
|
||||
(str/format "Props: %s\n" (pr-str (:props item)))
|
||||
(with-out-str
|
||||
(.printStackTrace ^Throwable err (java.io.PrintWriter. *out*)))))
|
||||
|
||||
(defn- handle-task
|
||||
[tasks {:keys [name] :as item}]
|
||||
(let [task-fn (get tasks name)]
|
||||
(if task-fn
|
||||
(task-fn item)
|
||||
(do
|
||||
(log/warn "no task handler found for" (pr-str name))
|
||||
nil))))
|
||||
|
||||
(defn- run-task
|
||||
[{:keys [tasks conn]} item]
|
||||
(try
|
||||
(log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))
|
||||
(handle-task tasks item)
|
||||
{:status :completed :task item}
|
||||
(catch Exception e
|
||||
(let [data (ex-data e)]
|
||||
(cond
|
||||
(and (= ::retry (:type data))
|
||||
(= ::noop (:strategy data)))
|
||||
{:status :retry :task item :error e :inc-by 0}
|
||||
|
||||
(and (< (:retry-num item)
|
||||
(:max-retries item))
|
||||
(= ::retry (:type data)))
|
||||
{:status :retry :task item :error e}
|
||||
|
||||
:else
|
||||
(do
|
||||
(log/errorf e "Unhandled exception on task '%s' (retry: %s)\nProps: %s"
|
||||
(:name item) (:retry-num item) (pr-str (:props item)))
|
||||
(if (>= (:retry-num item) (:max-retries item))
|
||||
{:status :failed :task item :error e}
|
||||
{:status :retry :task item :error e})))))
|
||||
(finally
|
||||
(log/debugf "Finished task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)))))
|
||||
|
||||
(def ^:private
|
||||
sql:select-next-tasks
|
||||
"select * from task as t
|
||||
where t.scheduled_at <= now()
|
||||
and t.queue = ?
|
||||
and (t.status = 'new' or t.status = 'retry')
|
||||
order by t.priority desc, t.scheduled_at
|
||||
limit ?
|
||||
for update skip locked")
|
||||
|
||||
(defn- event-loop-fn*
|
||||
[{:keys [tasks executor batch-size] :as opts}]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(let [queue (:queue opts "default")
|
||||
items (->> (db/exec! conn [sql:select-next-tasks queue batch-size])
|
||||
(map decode-task-row)
|
||||
(seq))
|
||||
opts (assoc opts :conn conn)]
|
||||
|
||||
(if (nil? items)
|
||||
::empty
|
||||
(let [results (->> items
|
||||
(map #(partial run-task opts %))
|
||||
(map #(px/submit! executor %)))]
|
||||
(doseq [res results]
|
||||
(let [res (deref res)]
|
||||
(case (:status res)
|
||||
:retry (mark-as-retry conn res)
|
||||
:failed (mark-as-failed conn res)
|
||||
:completed (mark-as-completed conn res))))
|
||||
::handled)))))
|
||||
|
||||
(defn- event-loop-fn
|
||||
[{:keys [executor] :as opts}]
|
||||
(aa/thread-call executor #(event-loop-fn* opts)))
|
||||
|
||||
(s/def ::batch-size ::us/integer)
|
||||
(s/def ::poll-interval ::us/integer)
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::tasks (s/map-of string? ::fn))
|
||||
|
||||
(s/def ::start-worker-params
|
||||
(s/keys :req-un [::tasks ::aa/executor ::batch-size]
|
||||
:opt-un [::poll-interval]))
|
||||
|
||||
(defn start-worker!
|
||||
[{:keys [poll-interval executor]
|
||||
:or {poll-interval 5000}
|
||||
:as opts}]
|
||||
(us/assert ::start-worker-params opts)
|
||||
(log/infof "Starting worker '%s' on queue '%s'."
|
||||
(:name opts "anonymous")
|
||||
(:queue opts "default"))
|
||||
(let [cch (a/chan 1)]
|
||||
(a/go-loop []
|
||||
(let [[val port] (a/alts! [cch (event-loop-fn opts)] :priority true)]
|
||||
(cond
|
||||
;; Terminate the loop if close channel is closed or
|
||||
;; event-loop-fn returns nil.
|
||||
(or (= port cch) (nil? val))
|
||||
(log/infof "Stop condition found. Shutdown worker: '%s'"
|
||||
(:name opts "anonymous"))
|
||||
|
||||
(db/pool-closed? db/pool)
|
||||
(do
|
||||
(log/info "Worker eventloop is aborted because pool is closed.")
|
||||
(a/close! cch))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(contains? #{"08003" "08006" "08001" "08004"} (.getSQLState val)))
|
||||
(do
|
||||
(log/error "Connection error, trying resume in some instants.")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(and (instance? java.sql.SQLException val)
|
||||
(= "40001" (.getSQLState ^java.sql.SQLException val)))
|
||||
(do
|
||||
(log/debug "Serialization failure (retrying in some instants).")
|
||||
(a/<! (a/timeout 1000))
|
||||
(recur))
|
||||
|
||||
(instance? Exception val)
|
||||
(do
|
||||
(log/errorf val "Unexpected error ocurried on polling the database (will resume operations in some instants). ")
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur))
|
||||
|
||||
(= ::handled val)
|
||||
(recur)
|
||||
|
||||
(= ::empty val)
|
||||
(do
|
||||
(a/<! (a/timeout poll-interval))
|
||||
(recur)))))
|
||||
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(a/close! cch)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Scheduled Tasks
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(def ^:private
|
||||
sql:upsert-scheduled-task
|
||||
"insert into scheduled_task (id, cron_expr)
|
||||
values (?, ?)
|
||||
on conflict (id)
|
||||
do update set cron_expr=?")
|
||||
|
||||
(defn- synchronize-schedule-item
|
||||
[conn {:keys [id cron] :as item}]
|
||||
(let [cron (str cron)]
|
||||
(log/debug (str/format "Initialize scheduled task '%s' (cron: '%s')." id cron))
|
||||
(db/exec-one! conn [sql:upsert-scheduled-task id cron cron])))
|
||||
|
||||
(defn- synchronize-schedule!
|
||||
[schedule]
|
||||
(db/with-atomic [conn db/pool]
|
||||
(run! (partial synchronize-schedule-item conn) schedule)))
|
||||
|
||||
(def ^:private sql:lock-scheduled-task
|
||||
"select id from scheduled_task where id=? for update skip locked")
|
||||
|
||||
(declare schedule-task!)
|
||||
|
||||
(defn exception->string
|
||||
[error]
|
||||
(with-out-str
|
||||
(.printStackTrace ^Throwable error (java.io.PrintWriter. *out*))))
|
||||
|
||||
(defn- execute-scheduled-task
|
||||
[{:keys [scheduler executor] :as opts} {:keys [id cron] :as task}]
|
||||
(letfn [(run-task [conn]
|
||||
(try
|
||||
(when (db/exec-one! conn [sql:lock-scheduled-task id])
|
||||
(log/info "Executing scheduled task" id)
|
||||
((:fn task) task))
|
||||
(catch Exception e
|
||||
e)))
|
||||
|
||||
(handle-task* [conn]
|
||||
(let [result (run-task conn)]
|
||||
(if (instance? Throwable result)
|
||||
(do
|
||||
(log/warnf result "Unhandled exception on scheduled task '%s'." id)
|
||||
(db/insert! conn :scheduled-task-history
|
||||
{:id (uuid/next)
|
||||
:task-id id
|
||||
:is-error true
|
||||
:reason (exception->string result)}))
|
||||
(db/insert! conn :scheduled-task-history
|
||||
{:id (uuid/next)
|
||||
:task-id id}))))
|
||||
(handle-task []
|
||||
(db/with-atomic [conn db/pool]
|
||||
(handle-task* conn)))]
|
||||
|
||||
(try
|
||||
(px/run! executor handle-task)
|
||||
(finally
|
||||
(schedule-task! opts task)))))
|
||||
|
||||
(defn ms-until-valid
|
||||
[cron]
|
||||
(s/assert dt/cron? cron)
|
||||
(let [^Instant now (dt/now)
|
||||
^Instant next (dt/next-valid-instant-from cron now)]
|
||||
(inst-ms (dt/duration-between now next))))
|
||||
|
||||
(defn- schedule-task!
|
||||
[{:keys [scheduler] :as opts} {:keys [cron] :as task}]
|
||||
(let [ms (ms-until-valid cron)]
|
||||
(px/schedule! scheduler ms (partial execute-scheduled-task opts task))))
|
||||
|
||||
(s/def ::fn (s/or :var var? :fn fn?))
|
||||
(s/def ::id string?)
|
||||
(s/def ::cron dt/cron?)
|
||||
(s/def ::props (s/nilable map?))
|
||||
(s/def ::scheduled-task
|
||||
(s/keys :req-un [::id ::cron ::fn]
|
||||
:opt-un [::props]))
|
||||
|
||||
(s/def ::schedule (s/coll-of ::scheduled-task))
|
||||
(s/def ::start-scheduler-worker-params
|
||||
(s/keys :req-un [::schedule]))
|
||||
|
||||
(defn start-scheduler-worker!
|
||||
[{:keys [schedule] :as opts}]
|
||||
(us/assert ::start-scheduler-worker-params opts)
|
||||
(let [scheduler (Executors/newScheduledThreadPool (int 1))
|
||||
opts (assoc opts :scheduler scheduler)]
|
||||
(synchronize-schedule! schedule)
|
||||
(run! (partial schedule-task! opts) schedule)
|
||||
(reify
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(.shutdownNow ^ExecutorService scheduler)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Thread Pool
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn thread-pool
|
||||
([] (thread-pool {}))
|
||||
([{:keys [min-threads max-threads idle-timeout name]
|
||||
:or {min-threads 0 max-threads 128 idle-timeout 60000}}]
|
||||
(let [executor (QueuedThreadPool. max-threads min-threads)]
|
||||
(.setName executor (or name "default-tp"))
|
||||
(.start executor)
|
||||
executor)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Helpers
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn stop!
|
||||
[o]
|
||||
(cond
|
||||
(instance? java.lang.AutoCloseable o)
|
||||
(.close ^java.lang.AutoCloseable o)
|
||||
|
||||
(instance? org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
||||
(.stop ^org.eclipse.jetty.util.component.ContainerLifeCycle o)
|
||||
|
||||
:else
|
||||
(ex/raise :type :not-implemented)))
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue