mirror of
https://github.com/penpot/penpot.git
synced 2025-05-05 14:45:53 +02:00
✨ Allow connect to read-only databases.
This commit is contained in:
parent
f72e140327
commit
bc2a0432b9
8 changed files with 216 additions and 135 deletions
|
@ -112,6 +112,7 @@
|
||||||
(s/def ::database-password (s/nilable ::us/string))
|
(s/def ::database-password (s/nilable ::us/string))
|
||||||
(s/def ::database-uri ::us/string)
|
(s/def ::database-uri ::us/string)
|
||||||
(s/def ::database-username (s/nilable ::us/string))
|
(s/def ::database-username (s/nilable ::us/string))
|
||||||
|
(s/def ::database-readonly ::us/boolean)
|
||||||
(s/def ::default-blob-version ::us/integer)
|
(s/def ::default-blob-version ::us/integer)
|
||||||
(s/def ::error-report-webhook ::us/string)
|
(s/def ::error-report-webhook ::us/string)
|
||||||
(s/def ::user-feedback-destination ::us/string)
|
(s/def ::user-feedback-destination ::us/string)
|
||||||
|
@ -201,6 +202,7 @@
|
||||||
::database-password
|
::database-password
|
||||||
::database-uri
|
::database-uri
|
||||||
::database-username
|
::database-username
|
||||||
|
::database-readonly
|
||||||
::default-blob-version
|
::default-blob-version
|
||||||
::error-report-webhook
|
::error-report-webhook
|
||||||
::file-change-snapshot-every
|
::file-change-snapshot-every
|
||||||
|
|
|
@ -62,12 +62,13 @@
|
||||||
:opt-un [::migrations ::mtx/metrics ::read-only]))
|
:opt-un [::migrations ::mtx/metrics ::read-only]))
|
||||||
|
|
||||||
(defmethod ig/init-key ::pool
|
(defmethod ig/init-key ::pool
|
||||||
[_ {:keys [migrations metrics name] :as cfg}]
|
[_ {:keys [migrations metrics name read-only] :as cfg}]
|
||||||
(l/info :action "initialize connection pool" :name (d/name name) :uri (:uri cfg))
|
(l/info :action "initialize connection pool" :name (d/name name) :uri (:uri cfg))
|
||||||
(some-> metrics :registry instrument-jdbc!)
|
(some-> metrics :registry instrument-jdbc!)
|
||||||
|
|
||||||
(let [pool (create-pool cfg)]
|
(let [pool (create-pool cfg)]
|
||||||
(some->> (seq migrations) (apply-migrations! pool))
|
(when-not read-only
|
||||||
|
(some->> (seq migrations) (apply-migrations! pool)))
|
||||||
pool))
|
pool))
|
||||||
|
|
||||||
(defmethod ig/halt-key! ::pool
|
(defmethod ig/halt-key! ::pool
|
||||||
|
@ -136,10 +137,14 @@
|
||||||
|
|
||||||
(s/def ::pool pool?)
|
(s/def ::pool pool?)
|
||||||
|
|
||||||
(defn pool-closed?
|
(defn closed?
|
||||||
[pool]
|
[pool]
|
||||||
(.isClosed ^HikariDataSource pool))
|
(.isClosed ^HikariDataSource pool))
|
||||||
|
|
||||||
|
(defn read-only?
|
||||||
|
[pool]
|
||||||
|
(.isReadOnly ^HikariDataSource pool))
|
||||||
|
|
||||||
(defn create-pool
|
(defn create-pool
|
||||||
[cfg]
|
[cfg]
|
||||||
(let [dsc (create-datasource-config cfg)]
|
(let [dsc (create-datasource-config cfg)]
|
||||||
|
|
|
@ -11,74 +11,114 @@
|
||||||
[app.common.logging :as l]
|
[app.common.logging :as l]
|
||||||
[app.config :as cfg]
|
[app.config :as cfg]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
|
[app.db.sql :as sql]
|
||||||
[app.metrics :as mtx]
|
[app.metrics :as mtx]
|
||||||
[app.util.async :as aa]
|
[app.util.async :as aa]
|
||||||
[app.util.time :as dt]
|
[app.util.time :as dt]
|
||||||
[app.worker :as wrk]
|
[app.worker :as wrk]
|
||||||
[clojure.core.async :as a]
|
[clojure.core.async :as a]
|
||||||
[clojure.spec.alpha :as s]
|
[clojure.spec.alpha :as s]
|
||||||
[integrant.core :as ig]))
|
[integrant.core :as ig]
|
||||||
|
[ring.middleware.session.store :as rss]))
|
||||||
|
|
||||||
;; A default cookie name for storing the session. We don't allow
|
;; A default cookie name for storing the session. We don't allow
|
||||||
;; configure it.
|
;; configure it.
|
||||||
(def cookie-name "auth-token")
|
(def cookie-name "auth-token")
|
||||||
|
|
||||||
|
(deftype DatabaseStore [pool tokens]
|
||||||
|
rss/SessionStore
|
||||||
|
(read-session [_ token]
|
||||||
|
(db/exec-one! pool (sql/select :http-session {:id token})))
|
||||||
|
|
||||||
|
(write-session [_ _ data]
|
||||||
|
(let [profile-id (:profile-id data)
|
||||||
|
user-agent (:user-agent data)
|
||||||
|
token (tokens :generate {:iss "authentication"
|
||||||
|
:iat (dt/now)
|
||||||
|
:uid profile-id})
|
||||||
|
params {:user-agent user-agent
|
||||||
|
:profile-id profile-id
|
||||||
|
:id token}]
|
||||||
|
(db/insert! pool :http-session params)
|
||||||
|
token))
|
||||||
|
|
||||||
|
(delete-session [_ token]
|
||||||
|
(db/delete! pool :http-session {:id token})
|
||||||
|
nil))
|
||||||
|
|
||||||
|
(deftype MemoryStore [cache tokens]
|
||||||
|
rss/SessionStore
|
||||||
|
(read-session [_ token]
|
||||||
|
(get @cache token))
|
||||||
|
|
||||||
|
(write-session [_ _ data]
|
||||||
|
(let [profile-id (:profile-id data)
|
||||||
|
user-agent (:user-agent data)
|
||||||
|
token (tokens :generate {:iss "authentication"
|
||||||
|
:iat (dt/now)
|
||||||
|
:uid profile-id})
|
||||||
|
params {:user-agent user-agent
|
||||||
|
:profile-id profile-id
|
||||||
|
:id token}]
|
||||||
|
|
||||||
|
(swap! cache assoc token params)
|
||||||
|
token))
|
||||||
|
|
||||||
|
(delete-session [_ token]
|
||||||
|
(swap! cache dissoc token)
|
||||||
|
nil))
|
||||||
|
|
||||||
;; --- IMPL
|
;; --- IMPL
|
||||||
|
|
||||||
(defn- create-session
|
(defn- create-session
|
||||||
[{:keys [conn tokens] :as cfg} {:keys [profile-id headers] :as request}]
|
[store request profile-id]
|
||||||
(let [token (tokens :generate {:iss "authentication"
|
(let [params {:user-agent (get-in request [:headers "user-agent"])
|
||||||
:iat (dt/now)
|
:profile-id profile-id}]
|
||||||
:uid profile-id})
|
(rss/write-session store nil params)))
|
||||||
params {:user-agent (get headers "user-agent")
|
|
||||||
:profile-id profile-id
|
|
||||||
:id token}]
|
|
||||||
(db/insert! conn :http-session params)))
|
|
||||||
|
|
||||||
(defn- delete-session
|
(defn- delete-session
|
||||||
[{:keys [conn] :as cfg} {:keys [cookies] :as request}]
|
[store {:keys [cookies] :as request}]
|
||||||
(when-let [token (get-in cookies [cookie-name :value])]
|
(when-let [token (get-in cookies [cookie-name :value])]
|
||||||
(db/delete! conn :http-session {:id token}))
|
(rss/delete-session store token)))
|
||||||
nil)
|
|
||||||
|
|
||||||
(defn- retrieve-session
|
(defn- retrieve-session
|
||||||
[{:keys [conn] :as cfg} id]
|
[store token]
|
||||||
(when id
|
(when token
|
||||||
(db/exec-one! conn ["select id, profile_id from http_session where id = ?" id])))
|
(rss/read-session store token)))
|
||||||
|
|
||||||
(defn- retrieve-from-request
|
(defn- retrieve-from-request
|
||||||
[cfg {:keys [cookies] :as request}]
|
[store {:keys [cookies] :as request}]
|
||||||
(->> (get-in cookies [cookie-name :value])
|
(->> (get-in cookies [cookie-name :value])
|
||||||
(retrieve-session cfg)))
|
(retrieve-session store)))
|
||||||
|
|
||||||
(defn- add-cookies
|
(defn- add-cookies
|
||||||
[response {:keys [id] :as session}]
|
[response token]
|
||||||
(let [cors? (contains? cfg/flags :cors)
|
(let [cors? (contains? cfg/flags :cors)
|
||||||
secure? (contains? cfg/flags :secure-session-cookies)]
|
secure? (contains? cfg/flags :secure-session-cookies)]
|
||||||
(assoc response :cookies {cookie-name {:path "/"
|
(assoc response :cookies {cookie-name {:path "/"
|
||||||
:http-only true
|
:http-only true
|
||||||
:value id
|
:value token
|
||||||
:same-site (if cors? :none :lax)
|
:same-site (if cors? :none :lax)
|
||||||
:secure secure?}})))
|
:secure secure?}})))
|
||||||
|
|
||||||
(defn- clear-cookies
|
(defn- clear-cookies
|
||||||
[response]
|
[response]
|
||||||
(assoc response :cookies {cookie-name {:value "" :max-age -1}}))
|
(assoc response :cookies {cookie-name {:value "" :max-age -1}}))
|
||||||
|
|
||||||
(defn- middleware
|
(defn- middleware
|
||||||
[cfg handler]
|
[events-ch store handler]
|
||||||
(fn [request]
|
(fn [request]
|
||||||
(if-let [{:keys [id profile-id] :as session} (retrieve-from-request cfg request)]
|
(if-let [{:keys [id profile-id] :as session} (retrieve-from-request store request)]
|
||||||
(do
|
(do
|
||||||
(a/>!! (::events-ch cfg) id)
|
(a/>!! events-ch id)
|
||||||
(l/set-context! {:profile-id profile-id})
|
(l/set-context! {:profile-id profile-id})
|
||||||
(handler (assoc request :profile-id profile-id :session-id id)))
|
(handler (assoc request :profile-id profile-id :session-id id)))
|
||||||
(handler request))))
|
(handler request))))
|
||||||
|
|
||||||
;; --- STATE INIT: SESSION
|
;; --- STATE INIT: SESSION
|
||||||
|
|
||||||
|
(s/def ::tokens fn?)
|
||||||
(defmethod ig/pre-init-spec ::session [_]
|
(defmethod ig/pre-init-spec ::session [_]
|
||||||
(s/keys :req-un [::db/pool]))
|
(s/keys :req-un [::db/pool ::tokens]))
|
||||||
|
|
||||||
(defmethod ig/prep-key ::session
|
(defmethod ig/prep-key ::session
|
||||||
[_ cfg]
|
[_ cfg]
|
||||||
|
@ -86,20 +126,24 @@
|
||||||
(d/without-nils cfg)))
|
(d/without-nils cfg)))
|
||||||
|
|
||||||
(defmethod ig/init-key ::session
|
(defmethod ig/init-key ::session
|
||||||
[_ {:keys [pool] :as cfg}]
|
[_ {:keys [pool tokens] :as cfg}]
|
||||||
(let [events (a/chan (a/dropping-buffer (:buffer-size cfg)))
|
(let [events-ch (a/chan (a/dropping-buffer (:buffer-size cfg)))
|
||||||
cfg (-> cfg
|
store (if (db/read-only? pool)
|
||||||
(assoc :conn pool)
|
(->MemoryStore (atom {}) tokens)
|
||||||
(assoc ::events-ch events))]
|
(->DatabaseStore pool tokens))]
|
||||||
|
|
||||||
|
(when (db/read-only? pool)
|
||||||
|
(l/warn :hint "sessions module initialized with in-memory store"))
|
||||||
|
|
||||||
(-> cfg
|
(-> cfg
|
||||||
(assoc :middleware #(middleware cfg %))
|
(assoc ::events-ch events-ch)
|
||||||
|
(assoc :middleware #(middleware events-ch store %))
|
||||||
(assoc :create (fn [profile-id]
|
(assoc :create (fn [profile-id]
|
||||||
(fn [request response]
|
(fn [request response]
|
||||||
(let [request (assoc request :profile-id profile-id)
|
(let [token (create-session store request profile-id)]
|
||||||
session (create-session cfg request)]
|
(add-cookies response token)))))
|
||||||
(add-cookies response session)))))
|
|
||||||
(assoc :delete (fn [request response]
|
(assoc :delete (fn [request response]
|
||||||
(delete-session cfg request)
|
(delete-session store request)
|
||||||
(-> response
|
(-> response
|
||||||
(assoc :status 204)
|
(assoc :status 204)
|
||||||
(assoc :body "")
|
(assoc :body "")
|
||||||
|
|
|
@ -89,19 +89,24 @@
|
||||||
(s/def ::events (s/every ::event))
|
(s/def ::events (s/every ::event))
|
||||||
|
|
||||||
(defmethod ig/init-key ::http-handler
|
(defmethod ig/init-key ::http-handler
|
||||||
[_ {:keys [executor] :as cfg}]
|
[_ {:keys [executor pool] :as cfg}]
|
||||||
(fn [{:keys [params profile-id] :as request}]
|
(if (db/read-only? pool)
|
||||||
(when (contains? cf/flags :audit-log)
|
(do
|
||||||
(let [events (->> (:events params)
|
(l/warn :hint "audit log http handler disabled, db is read-only")
|
||||||
(remove #(not= profile-id (:profile-id %)))
|
(constantly {:status 204 :body ""}))
|
||||||
(us/conform ::events))
|
(fn [{:keys [params profile-id] :as request}]
|
||||||
ip-addr (parse-client-ip request)
|
(when (contains? cf/flags :audit-log)
|
||||||
cfg (-> cfg
|
(let [events (->> (:events params)
|
||||||
(assoc :source "frontend")
|
(remove #(not= profile-id (:profile-id %)))
|
||||||
(assoc :events events)
|
(us/conform ::events))
|
||||||
(assoc :ip-addr ip-addr))]
|
ip-addr (parse-client-ip request)
|
||||||
(px/run! executor #(persist-http-events cfg))))
|
cfg (-> cfg
|
||||||
{:status 204 :body ""}))
|
(assoc :source "frontend")
|
||||||
|
(assoc :events events)
|
||||||
|
(assoc :ip-addr ip-addr))]
|
||||||
|
|
||||||
|
(px/run! executor #(persist-http-events cfg))))
|
||||||
|
{:status 204 :body ""})))
|
||||||
|
|
||||||
(defn- persist-http-events
|
(defn- persist-http-events
|
||||||
[{:keys [pool events ip-addr source] :as cfg}]
|
[{:keys [pool events ip-addr source] :as cfg}]
|
||||||
|
@ -148,13 +153,25 @@
|
||||||
(map clean-props)))
|
(map clean-props)))
|
||||||
|
|
||||||
(defmethod ig/init-key ::collector
|
(defmethod ig/init-key ::collector
|
||||||
[_ cfg]
|
[_ {:keys [pool] :as cfg}]
|
||||||
(when (contains? cf/flags :audit-log)
|
(cond
|
||||||
(l/info :msg "initializing audit log collector")
|
(not (contains? cf/flags :audit-log))
|
||||||
|
(do
|
||||||
|
(l/info :hint "audit log collection disabled")
|
||||||
|
(constantly nil))
|
||||||
|
|
||||||
|
(db/read-only? pool)
|
||||||
|
(do
|
||||||
|
(l/warn :hint "audit log collection disabled, db is read-only")
|
||||||
|
(constantly nil))
|
||||||
|
|
||||||
|
:else
|
||||||
(let [input (a/chan 512 event-xform)
|
(let [input (a/chan 512 event-xform)
|
||||||
buffer (aa/batch input {:max-batch-size 100
|
buffer (aa/batch input {:max-batch-size 100
|
||||||
:max-batch-age (* 10 1000) ; 10s
|
:max-batch-age (* 10 1000) ; 10s
|
||||||
:init []})]
|
:init []})]
|
||||||
|
|
||||||
|
(l/info :hint "audit log collector initialized")
|
||||||
(a/go-loop []
|
(a/go-loop []
|
||||||
(when-let [[_type events] (a/<! buffer)]
|
(when-let [[_type events] (a/<! buffer)]
|
||||||
(let [res (a/<! (persist-events cfg events))]
|
(let [res (a/<! (persist-events cfg events))]
|
||||||
|
@ -216,6 +233,7 @@
|
||||||
(:enabled props false))
|
(:enabled props false))
|
||||||
uri (or uri (:uri props))
|
uri (or uri (:uri props))
|
||||||
cfg (assoc cfg :uri uri)]
|
cfg (assoc cfg :uri uri)]
|
||||||
|
|
||||||
(when (and enabled (not uri))
|
(when (and enabled (not uri))
|
||||||
(ex/raise :type :internal
|
(ex/raise :type :internal
|
||||||
:code :task-not-configured
|
:code :task-not-configured
|
||||||
|
|
|
@ -28,9 +28,10 @@
|
||||||
|
|
||||||
(defn- persist-on-database!
|
(defn- persist-on-database!
|
||||||
[{:keys [pool] :as cfg} {:keys [id] :as event}]
|
[{:keys [pool] :as cfg} {:keys [id] :as event}]
|
||||||
(db/with-atomic [conn pool]
|
(when-not (db/read-only? pool)
|
||||||
(db/insert! conn :server-error-report
|
(db/with-atomic [conn pool]
|
||||||
{:id id :content (db/tjson event)})))
|
(db/insert! conn :server-error-report
|
||||||
|
{:id id :content (db/tjson event)}))))
|
||||||
|
|
||||||
(defn- parse-event-data
|
(defn- parse-event-data
|
||||||
[event]
|
[event]
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
{:uri (cf/get :database-uri)
|
{:uri (cf/get :database-uri)
|
||||||
:username (cf/get :database-username)
|
:username (cf/get :database-username)
|
||||||
:password (cf/get :database-password)
|
:password (cf/get :database-password)
|
||||||
|
:read-only (cf/get :database-readonly false)
|
||||||
:metrics (ig/ref :app.metrics/metrics)
|
:metrics (ig/ref :app.metrics/metrics)
|
||||||
:migrations (ig/ref :app.migrations/all)
|
:migrations (ig/ref :app.migrations/all)
|
||||||
:name :main
|
:name :main
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
(ns app.setup
|
(ns app.setup
|
||||||
"Initial data setup of instance."
|
"Initial data setup of instance."
|
||||||
(:require
|
(:require
|
||||||
|
[app.common.logging :as l]
|
||||||
[app.common.uuid :as uuid]
|
[app.common.uuid :as uuid]
|
||||||
[app.db :as db]
|
[app.db :as db]
|
||||||
[buddy.core.codecs :as bc]
|
[buddy.core.codecs :as bc]
|
||||||
|
@ -14,55 +15,49 @@
|
||||||
[clojure.spec.alpha :as s]
|
[clojure.spec.alpha :as s]
|
||||||
[integrant.core :as ig]))
|
[integrant.core :as ig]))
|
||||||
|
|
||||||
(declare initialize-instance-id!)
|
(defn- generate-random-key
|
||||||
(declare initialize-secret-key!)
|
[]
|
||||||
(declare retrieve-all)
|
(-> (bn/random-bytes 64)
|
||||||
|
(bc/bytes->b64u)
|
||||||
|
(bc/bytes->str)))
|
||||||
|
|
||||||
|
(defn- retrieve-all
|
||||||
|
[conn]
|
||||||
|
(->> (db/query conn :server-prop {:preload true})
|
||||||
|
(filter #(not= "secret-key" (:id %)))
|
||||||
|
(map (fn [row]
|
||||||
|
[(keyword (:id row))
|
||||||
|
(db/decode-transit-pgobject (:content row))]))
|
||||||
|
(into {})))
|
||||||
|
|
||||||
|
(defn- handle-instance-id
|
||||||
|
[instance-id conn read-only?]
|
||||||
|
(or instance-id
|
||||||
|
(let [instance-id (uuid/random)]
|
||||||
|
(when-not read-only?
|
||||||
|
(try
|
||||||
|
(db/insert! conn :server-prop
|
||||||
|
{:id "instance-id"
|
||||||
|
:preload true
|
||||||
|
:content (db/tjson instance-id)})
|
||||||
|
(catch Throwable cause
|
||||||
|
(l/warn :hint "unable to persist instance-id"
|
||||||
|
:instance-id instance-id
|
||||||
|
:cause cause))))
|
||||||
|
instance-id)))
|
||||||
|
|
||||||
(defmethod ig/pre-init-spec ::props [_]
|
(defmethod ig/pre-init-spec ::props [_]
|
||||||
(s/keys :req-un [::db/pool]))
|
(s/keys :req-un [::db/pool]))
|
||||||
|
|
||||||
(defmethod ig/init-key ::props
|
(defmethod ig/init-key ::props
|
||||||
[_ {:keys [pool] :as cfg}]
|
[_ {:keys [pool key] :as cfg}]
|
||||||
(db/with-atomic [conn pool]
|
(db/with-atomic [conn pool]
|
||||||
(let [cfg (assoc cfg :conn conn)]
|
(db/xact-lock! conn 0)
|
||||||
(initialize-secret-key! cfg)
|
(when-not key
|
||||||
(initialize-instance-id! cfg)
|
(l/warn :hint (str "using autogenerated secret-key, it will change on each restart and will invalidate "
|
||||||
(retrieve-all cfg))))
|
"all sessions on each restart, it is hightly recommeded setting up the "
|
||||||
|
"PENPOT_SECRET_KEY environment variable")))
|
||||||
|
|
||||||
(def sql:upsert-secret-key
|
(let [stored (-> (retrieve-all conn)
|
||||||
"insert into server_prop (id, preload, content)
|
(assoc :secret-key (or key (generate-random-key))))]
|
||||||
values ('secret-key', true, ?::jsonb)
|
(update stored :instance-id handle-instance-id conn (db/read-only? pool)))))
|
||||||
on conflict (id) do update set content = ?::jsonb")
|
|
||||||
|
|
||||||
(def sql:insert-secret-key
|
|
||||||
"insert into server_prop (id, preload, content)
|
|
||||||
values ('secret-key', true, ?::jsonb)
|
|
||||||
on conflict (id) do nothing")
|
|
||||||
|
|
||||||
(defn- initialize-secret-key!
|
|
||||||
[{:keys [conn key] :as cfg}]
|
|
||||||
(if key
|
|
||||||
(let [key (db/tjson key)]
|
|
||||||
(db/exec-one! conn [sql:upsert-secret-key key key]))
|
|
||||||
(let [key (-> (bn/random-bytes 64)
|
|
||||||
(bc/bytes->b64u)
|
|
||||||
(bc/bytes->str))
|
|
||||||
key (db/tjson key)]
|
|
||||||
(db/exec-one! conn [sql:insert-secret-key key]))))
|
|
||||||
|
|
||||||
(defn- initialize-instance-id!
|
|
||||||
[{:keys [conn] :as cfg}]
|
|
||||||
(let [iid (uuid/random)]
|
|
||||||
|
|
||||||
(db/insert! conn :server-prop
|
|
||||||
{:id "instance-id"
|
|
||||||
:preload true
|
|
||||||
:content (db/tjson iid)}
|
|
||||||
{:on-conflict-do-nothing true})))
|
|
||||||
|
|
||||||
(defn- retrieve-all
|
|
||||||
[{:keys [conn] :as cfg}]
|
|
||||||
(reduce (fn [acc row]
|
|
||||||
(assoc acc (keyword (:id row)) (db/decode-transit-pgobject (:content row))))
|
|
||||||
{}
|
|
||||||
(db/query conn :server-prop {:preload true})))
|
|
||||||
|
|
|
@ -59,6 +59,7 @@
|
||||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
(declare event-loop-fn)
|
(declare event-loop-fn)
|
||||||
|
(declare event-loop)
|
||||||
(declare instrument-tasks)
|
(declare instrument-tasks)
|
||||||
|
|
||||||
(s/def ::queue keyword?)
|
(s/def ::queue keyword?)
|
||||||
|
@ -85,13 +86,10 @@
|
||||||
:queue :default}
|
:queue :default}
|
||||||
(d/without-nils cfg)))
|
(d/without-nils cfg)))
|
||||||
|
|
||||||
(defmethod ig/init-key ::worker
|
(defn- event-loop
|
||||||
[_ {:keys [pool poll-interval name queue] :as cfg}]
|
"Main, worker eventloop"
|
||||||
(l/info :action "start worker"
|
[{:keys [pool poll-interval close-ch] :as cfg}]
|
||||||
:name (d/name name)
|
(let [poll-ms (inst-ms poll-interval)]
|
||||||
:queue (d/name queue))
|
|
||||||
(let [close-ch (a/chan 1)
|
|
||||||
poll-ms (inst-ms poll-interval)]
|
|
||||||
(a/go-loop []
|
(a/go-loop []
|
||||||
(let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)]
|
(let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)]
|
||||||
(cond
|
(cond
|
||||||
|
@ -100,7 +98,7 @@
|
||||||
(or (= port close-ch) (nil? val))
|
(or (= port close-ch) (nil? val))
|
||||||
(l/debug :hint "stop condition found")
|
(l/debug :hint "stop condition found")
|
||||||
|
|
||||||
(db/pool-closed? pool)
|
(db/closed? pool)
|
||||||
(do
|
(do
|
||||||
(l/debug :hint "eventloop aborted because pool is closed")
|
(l/debug :hint "eventloop aborted because pool is closed")
|
||||||
(a/close! close-ch))
|
(a/close! close-ch))
|
||||||
|
@ -132,14 +130,27 @@
|
||||||
(= ::empty val)
|
(= ::empty val)
|
||||||
(do
|
(do
|
||||||
(a/<! (a/timeout poll-ms))
|
(a/<! (a/timeout poll-ms))
|
||||||
(recur)))))
|
(recur)))))))
|
||||||
|
|
||||||
|
(defmethod ig/init-key ::worker
|
||||||
|
[_ {:keys [pool name queue] :as cfg}]
|
||||||
|
(let [close-ch (a/chan 1)
|
||||||
|
cfg (assoc cfg :close-ch close-ch)]
|
||||||
|
(if (db/read-only? pool)
|
||||||
|
(l/warn :hint "worker not started, db is read-only"
|
||||||
|
:name (d/name name)
|
||||||
|
:queue (d/name queue))
|
||||||
|
(do
|
||||||
|
(l/info :hint "worker started"
|
||||||
|
:name (d/name name)
|
||||||
|
:queue (d/name queue))
|
||||||
|
(event-loop cfg)))
|
||||||
|
|
||||||
(reify
|
(reify
|
||||||
java.lang.AutoCloseable
|
java.lang.AutoCloseable
|
||||||
(close [_]
|
(close [_]
|
||||||
(a/close! close-ch)))))
|
(a/close! close-ch)))))
|
||||||
|
|
||||||
|
|
||||||
(defmethod ig/halt-key! ::worker
|
(defmethod ig/halt-key! ::worker
|
||||||
[_ instance]
|
[_ instance]
|
||||||
(.close ^java.lang.AutoCloseable instance))
|
(.close ^java.lang.AutoCloseable instance))
|
||||||
|
@ -343,31 +354,35 @@
|
||||||
(s/keys :req-un [::executor ::db/pool ::schedule ::tasks]))
|
(s/keys :req-un [::executor ::db/pool ::schedule ::tasks]))
|
||||||
|
|
||||||
(defmethod ig/init-key ::scheduler
|
(defmethod ig/init-key ::scheduler
|
||||||
[_ {:keys [schedule tasks] :as cfg}]
|
[_ {:keys [schedule tasks pool] :as cfg}]
|
||||||
(let [scheduler (Executors/newScheduledThreadPool (int 1))
|
(let [scheduler (Executors/newScheduledThreadPool (int 1))]
|
||||||
schedule (->> schedule
|
(if (db/read-only? pool)
|
||||||
(filter some?)
|
(l/warn :hint "scheduler not started, db is read-only")
|
||||||
;; If id is not defined, use the task as id.
|
(let [schedule (->> schedule
|
||||||
(map (fn [{:keys [id task] :as item}]
|
(filter some?)
|
||||||
(if (some? id)
|
;; If id is not defined, use the task as id.
|
||||||
(assoc item :id (d/name id))
|
(map (fn [{:keys [id task] :as item}]
|
||||||
(assoc item :id (d/name task)))))
|
(if (some? id)
|
||||||
(map (fn [{:keys [task] :as item}]
|
(assoc item :id (d/name id))
|
||||||
(let [f (get tasks task)]
|
(assoc item :id (d/name task)))))
|
||||||
(when-not f
|
(map (fn [{:keys [task] :as item}]
|
||||||
(ex/raise :type :internal
|
(let [f (get tasks task)]
|
||||||
:code :task-not-found
|
(when-not f
|
||||||
:hint (str/fmt "task %s not configured" task)))
|
(ex/raise :type :internal
|
||||||
(-> item
|
:code :task-not-found
|
||||||
(dissoc :task)
|
:hint (str/fmt "task %s not configured" task)))
|
||||||
(assoc :fn f))))))
|
(-> item
|
||||||
cfg (assoc cfg
|
(dissoc :task)
|
||||||
:scheduler scheduler
|
(assoc :fn f))))))
|
||||||
:schedule schedule)]
|
cfg (assoc cfg
|
||||||
|
:scheduler scheduler
|
||||||
|
:schedule schedule)]
|
||||||
|
(l/info :hint "scheduler started"
|
||||||
|
:registred-tasks (count schedule))
|
||||||
|
|
||||||
(synchronize-schedule cfg)
|
(synchronize-schedule cfg)
|
||||||
(run! (partial schedule-task cfg)
|
(run! (partial schedule-task cfg)
|
||||||
(filter some? schedule))
|
(filter some? schedule))))
|
||||||
|
|
||||||
(reify
|
(reify
|
||||||
java.lang.AutoCloseable
|
java.lang.AutoCloseable
|
||||||
|
|
Loading…
Add table
Reference in a new issue