Fix all linter issues on backend code.

This commit is contained in:
Andrey Antukh 2021-01-31 19:25:26 +01:00 committed by Alonso Torres
parent 6dafc087e9
commit b80295a21c
48 changed files with 134 additions and 256 deletions

View file

@ -21,9 +21,6 @@
} }
:unresolved-symbol :unresolved-symbol
{:exclude ['(app.services.mutations/defmutation) {:exclude ['(app.util.services/defmethod)
'(app.services.queries/defquery)
'(app.util.dispatcher/defservice)
'(mount.core/defstate)
]}}} ]}}}

View file

@ -9,17 +9,14 @@
(ns app.cli.migrate-media (ns app.cli.migrate-media
(:require (:require
[app.common.pages :as cp]
[app.common.uuid :as uuid]
[app.common.media :as cm] [app.common.media :as cm]
[app.config :as cfg] [app.config :as cfg]
[app.db :as db] [app.db :as db]
[datoteka.core :as fs]
[app.main :as main] [app.main :as main]
[app.util.blob :as blob]
[app.storage :as sto] [app.storage :as sto]
[cuerdas.core :as str]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare migrate-profiles) (declare migrate-profiles)

View file

@ -12,7 +12,6 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.geom.point :as gpt] [app.common.geom.point :as gpt]
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg]
[app.db.sql :as sql] [app.db.sql :as sql]
[app.util.json :as json] [app.util.json :as json]
[app.util.migrations :as mg] [app.util.migrations :as mg]
@ -20,7 +19,6 @@
[app.util.transit :as t] [app.util.transit :as t]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.string :as str]
[integrant.core :as ig] [integrant.core :as ig]
[next.jdbc :as jdbc] [next.jdbc :as jdbc]
[next.jdbc.date-time :as jdbc-dt]) [next.jdbc.date-time :as jdbc-dt])
@ -186,17 +184,17 @@
(sql/insert table params opts) (sql/insert table params opts)
(assoc opts :return-keys true)))) (assoc opts :return-keys true))))
(defn- select-values [map ks] ;; (defn- select-values [map ks]
(reduce #(conj %1 (map %2)) [] ks)) ;; (reduce #(conj %1 (map %2)) [] ks))
(defn insert-multi! (defn insert-multi!
[ds table param-list] [ds table param-list]
(doseq [params param-list] (doseq [params param-list]
(insert! ds table params)) (insert! ds table params))
;; FIXME: Won't work ;; FIXME: Won't work
#_(let [keys (->> param-list first keys (into [])) #_(let [keys (->> param-list first keys (into []))
params (->> param-list (mapv #(->> keys (select-values %) (into []))) )] params (->> param-list (mapv #(->> keys (select-values %) (into []))) )]
(jdbc-sql/insert-multi! ds table keys params default-options))) (jdbc-sql/insert-multi! ds table keys params default-options)))
(defn update! (defn update!
([ds table params where] (update! ds table params where nil)) ([ds table params where] (update! ds table params where nil))

View file

@ -15,15 +15,14 @@
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cfg] [app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.tasks :as tasks]
[app.worker :as wrk]
[app.util.json :as json]
[app.util.http :as http] [app.util.http :as http]
[app.util.json :as json]
[app.util.template :as tmpl] [app.util.template :as tmpl]
[app.worker :as wrk]
[clojure.core.async :as a] [clojure.core.async :as a]
[clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[clojure.java.io :as io]
[cuerdas.core :as str] [cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px]) [promesa.exec :as px])
@ -47,7 +46,7 @@
:opt-un [::uri])) :opt-un [::uri]))
(defmethod ig/init-key ::reporter (defmethod ig/init-key ::reporter
[_ {:keys [executor uri] :as cfg}] [_ {:keys [executor] :as cfg}]
(log/info "Intializing error reporter.") (log/info "Intializing error reporter.")
(let [close-ch (a/chan 1)] (let [close-ch (a/chan 1)]
(a/go-loop [] (a/go-loop []
@ -91,10 +90,10 @@
[cfg {:keys [message host version id] :as cdata}] [cfg {:keys [message host version id] :as cdata}]
(try (try
(let [uri (:uri cfg) (let [uri (:uri cfg)
prefix (str/<< "Unhandled exception (@channel):\n" prefix (str "Unhandled exception (@channel):\n"
"- detail: ~(:public-uri cfg/config)/dbg/error-by-id/~{id}\n" "- detail: " (:public-uri cfg/config) "/dbg/error-by-id/" id "\n"
"- host: `~{host}`\n" "- host: `" host "`\n"
"- version: `~{version}`\n") "- version: `" version "`\n")
text (str prefix "```\n" message "\n```") text (str prefix "```\n" message "\n```")
rsp (http/send! {:uri uri rsp (http/send! {:uri uri
:method :post :method :post

View file

@ -10,11 +10,8 @@
(ns app.http (ns app.http
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cfg] [app.config :as cfg]
[app.http.assets :as assets]
[app.http.auth :as auth] [app.http.auth :as auth]
[app.http.errors :as errors] [app.http.errors :as errors]
[app.http.middleware :as middleware] [app.http.middleware :as middleware]
@ -26,7 +23,6 @@
[ring.adapter.jetty9 :as jetty]) [ring.adapter.jetty9 :as jetty])
(:import (:import
org.eclipse.jetty.server.Server org.eclipse.jetty.server.Server
org.eclipse.jetty.server.Handler
org.eclipse.jetty.server.handler.ErrorHandler org.eclipse.jetty.server.handler.ErrorHandler
org.eclipse.jetty.server.handler.StatisticsHandler)) org.eclipse.jetty.server.handler.StatisticsHandler))
@ -117,7 +113,7 @@
:body "internal server error"}))))))) :body "internal server error"})))))))
(defn- create-router (defn- create-router
[{:keys [session rpc google-auth gitlab-auth github-auth metrics ldap-auth storage svgparse assets] :as cfg}] [{:keys [session rpc google-auth gitlab-auth github-auth metrics ldap-auth svgparse assets] :as cfg}]
(rr/router (rr/router
[["/metrics" {:get (:handler metrics)}] [["/metrics" {:get (:handler metrics)}]

View file

@ -13,13 +13,12 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx]
[app.storage :as sto] [app.storage :as sto]
[app.util.time :as dt] [app.util.time :as dt]
[app.metrics :as mtx]
[cuerdas.core :as str]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[lambdaisland.uri :as u] [integrant.core :as ig]
[integrant.core :as ig])) [lambdaisland.uri :as u]))
(def ^:private cache-max-age (def ^:private cache-max-age
(dt/duration {:hours 24})) (dt/duration {:hours 24}))
@ -73,14 +72,14 @@
:body ""})))) :body ""}))))
(defn- generic-handler (defn- generic-handler
[{:keys [storage] :as cfg} request id] [{:keys [storage] :as cfg} _request id]
(let [obj (sto/get-object storage id)] (let [obj (sto/get-object storage id)]
(if obj (if obj
(serve-object cfg obj) (serve-object cfg obj)
{:status 404 :body ""}))) {:status 404 :body ""})))
(defn objects-handler (defn objects-handler
[{:keys [storage] :as cfg} request] [cfg request]
(let [id (get-in request [:path-params :id])] (let [id (get-in request [:path-params :id])]
(generic-handler cfg request (coerce-id id)))) (generic-handler cfg request (coerce-id id))))

View file

@ -12,7 +12,6 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg] [app.config :as cfg]
[app.db :as db]
[app.http.session :as session] [app.http.session :as session]
[app.util.http :as http] [app.util.http :as http]
[app.util.time :as dt] [app.util.time :as dt]
@ -46,7 +45,7 @@
(str (assoc public :path "/api/oauth/github/callback")))) (str (assoc public :path "/api/oauth/github/callback"))))
(defn- get-access-token (defn- get-access-token
[cfg code state] [cfg state code]
(let [params {:client_id (:client-id cfg) (let [params {:client_id (:client-id cfg)
:client_secret (:client-secret cfg) :client_secret (:client-secret cfg)
:code code :code code
@ -93,13 +92,13 @@
nil)))) nil))))
(defn auth (defn auth
[{:keys [tokens] :as cfg} request] [{:keys [tokens] :as cfg} _request]
(let [state (tokens :generate (let [state (tokens :generate
{:iss :github-oauth {:iss :github-oauth
:exp (dt/in-future "15m")}) :exp (dt/in-future "15m")})
params {:client_id (:client-id cfg/config) params {:client_id (:client-id cfg/config)
:redirect_uri (build-redirect-url) :redirect_uri (build-redirect-url cfg)
:state state :state state
:scope scope} :scope scope}
query (u/map->query-string params) query (u/map->query-string params)
@ -112,9 +111,9 @@
[{:keys [tokens rpc session] :as cfg} request] [{:keys [tokens rpc session] :as cfg} request]
(let [state (get-in request [:params :state]) (let [state (get-in request [:params :state])
_ (tokens :verify {:token state :iss :github-oauth}) _ (tokens :verify {:token state :iss :github-oauth})
info (some-> (get-in request [:params :code]) info (some->> (get-in request [:params :code])
(get-access-token state) (get-access-token cfg state)
(get-user-info))] (get-user-info))]
(when-not info (when-not info
(ex/raise :type :authentication (ex/raise :type :authentication
@ -158,7 +157,7 @@
::client-secret])) ::client-secret]))
(defn- default-handler (defn- default-handler
[req] [_]
(ex/raise :type :not-found)) (ex/raise :type :not-found))
(defmethod ig/init-key :app.http.auth/github (defmethod ig/init-key :app.http.auth/github

View file

@ -9,10 +9,9 @@
(ns app.http.auth.gitlab (ns app.http.auth.gitlab
(:require (:require
[app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.data :as d]
[app.config :as cfg]
[app.http.session :as session] [app.http.session :as session]
[app.util.http :as http] [app.util.http :as http]
[app.util.time :as dt] [app.util.time :as dt]
@ -75,8 +74,8 @@
(defn- get-user-info (defn- get-user-info
[token] [cfg token]
(let [req {:uri (build-user-info-url) (let [req {:uri (build-user-info-url cfg)
:headers {"Authorization" (str "Bearer " token)} :headers {"Authorization" (str "Bearer " token)}
:method :get} :method :get}
res (http/send! req)] res (http/send! req)]
@ -102,12 +101,12 @@
:exp (dt/in-future "15m")}) :exp (dt/in-future "15m")})
params {:client_id (:client-id cfg) params {:client_id (:client-id cfg)
:redirect_uri (build-redirect-url) :redirect_uri (build-redirect-url cfg)
:response_type "code" :response_type "code"
:state token :state token
:scope scope} :scope scope}
query (uri/map->query-string params) query (uri/map->query-string params)
uri (-> (build-oauth-uri) uri (-> (build-oauth-uri cfg)
(assoc :query query))] (assoc :query query))]
{:status 200 {:status 200
:body {:redirect-uri (str uri)}})) :body {:redirect-uri (str uri)}}))
@ -118,7 +117,7 @@
_ (tokens :verify {:token token :iss :gitlab-oauth}) _ (tokens :verify {:token token :iss :gitlab-oauth})
info (some->> (get-in request [:params :code]) info (some->> (get-in request [:params :code])
(get-access-token cfg) (get-access-token cfg)
(get-user-info))] (get-user-info cfg))]
(when-not info (when-not info
(ex/raise :type :authentication (ex/raise :type :authentication
@ -167,7 +166,7 @@
(d/without-nils cfg))) (d/without-nils cfg)))
(defn- default-handler (defn- default-handler
[req] [_]
(ex/raise :type :not-found)) (ex/raise :type :not-found))
(defmethod ig/init-key :app.http.auth/gitlab (defmethod ig/init-key :app.http.auth/gitlab

View file

@ -11,7 +11,6 @@
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg]
[app.http.session :as session] [app.http.session :as session]
[app.util.http :as http] [app.util.http :as http]
[app.util.time :as dt] [app.util.time :as dt]
@ -142,7 +141,7 @@
::client-secret])) ::client-secret]))
(defn- default-handler (defn- default-handler
[req] [_]
(ex/raise :type :not-found)) (ex/raise :type :not-found))
(defmethod ig/init-key :app.http.auth/google (defmethod ig/init-key :app.http.auth/google

View file

@ -10,7 +10,6 @@
(ns app.http.errors (ns app.http.errors
"A errors handling for the http server." "A errors handling for the http server."
(:require (:require
[app.common.exceptions :as ex]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cfg] [app.config :as cfg]
[clojure.pprint :refer [pprint]] [clojure.pprint :refer [pprint]]

View file

@ -9,16 +9,14 @@
(ns app.http.middleware (ns app.http.middleware
(:require (:require
[app.common.exceptions :as ex]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.util.transit :as t]
[app.util.json :as json] [app.util.json :as json]
[app.util.transit :as t]
[clojure.java.io :as io] [clojure.java.io :as io]
[ring.middleware.cookies :refer [wrap-cookies]] [ring.middleware.cookies :refer [wrap-cookies]]
[ring.middleware.keyword-params :refer [wrap-keyword-params]] [ring.middleware.keyword-params :refer [wrap-keyword-params]]
[ring.middleware.multipart-params :refer [wrap-multipart-params]] [ring.middleware.multipart-params :refer [wrap-multipart-params]]
[ring.middleware.params :refer [wrap-params]] [ring.middleware.params :refer [wrap-params]]))
[ring.middleware.resource :refer [wrap-resource]]))
(defn wrap-server-timing (defn wrap-server-timing
[handler] [handler]
@ -46,15 +44,14 @@
:json (parse-json body) :json (parse-json body)
:transit (parse-transit body)) :transit (parse-transit body))
(catch Exception e (catch Exception e
(let [type :json-verbose (let [data {:type :parse
data {:type :parse
:hint "unable to parse request body" :hint "unable to parse request body"
:message (ex-message e)}] :message (ex-message e)}]
{:status 400 {:status 400
:headers {"content-type" "application/transit+json"} :headers {"content-type" "application/transit+json"}
:body (t/encode-str data {:type type})}))))] :body (t/encode-str data {:type :json-verbose})}))))]
(fn [{:keys [headers body request-method] :as request}] (fn [{:keys [headers body] :as request}]
(let [ctype (get headers "content-type")] (let [ctype (get headers "content-type")]
(handler (handler
(case ctype (case ctype

View file

@ -9,12 +9,12 @@
(ns app.http.session (ns app.http.session
(:require (:require
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[app.db :as db] [app.db :as db]
[app.http.errors :refer [update-thread-context!]] [app.http.errors :refer [update-thread-context!]]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[buddy.core.nonce :as bn])) [buddy.core.nonce :as bn]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(defn next-session-id (defn next-session-id
([] (next-session-id 96)) ([] (next-session-id 96))

View file

@ -9,8 +9,8 @@
(ns app.main (ns app.main
(:require (:require
[app.config :as cfg]
[app.common.data :as d] [app.common.data :as d]
[app.config :as cfg]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.pprint :as pprint] [clojure.pprint :as pprint]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]

View file

@ -14,11 +14,8 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.media :as cm] [app.common.media :as cm]
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg]
[app.rlimits :as rlm] [app.rlimits :as rlm]
[app.svgparse :as svg] [app.svgparse :as svg]
[app.util.http :as http]
[clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.core :as fs]) [datoteka.core :as fs])

View file

@ -12,9 +12,8 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker] [app.worker]
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [clojure.tools.logging :as log]
[integrant.core :as ig] [integrant.core :as ig]
[next.jdbc :as jdbc]) [next.jdbc :as jdbc])
(:import (:import
@ -67,7 +66,7 @@
{::original origf})))})) {::original origf})))}))
(defn- handler (defn- handler
[registry request] [registry _request]
(let [samples (.metricFamilySamples ^CollectorRegistry registry) (let [samples (.metricFamilySamples ^CollectorRegistry registry)
writer (StringWriter.)] writer (StringWriter.)]
(TextFormat/write004 writer samples) (TextFormat/write004 writer samples)
@ -75,7 +74,7 @@
:body (.toString writer)})) :body (.toString writer)}))
(defmethod ig/init-key ::metrics (defmethod ig/init-key ::metrics
[_ opts] [_ _cfg]
(log/infof "Initializing prometheus registry and instrumentation.") (log/infof "Initializing prometheus registry and instrumentation.")
(let [registry (create-registry)] (let [registry (create-registry)]
(instrument-workers! registry) (instrument-workers! registry)
@ -180,7 +179,7 @@
(observe val)))))) (observe val))))))
(defn create (defn create
[{:keys [type name] :as props}] [{:keys [type] :as props}]
(case type (case type
:counter (make-counter props) :counter (make-counter props)
:gauge (make-gauge props) :gauge (make-gauge props)
@ -209,13 +208,13 @@
(with-meta (with-meta
(fn (fn
([a] ([a]
(mobj :inc) (mobj :inc labels)
(origf a)) (origf a))
([a b] ([a b]
(mobj :inc) (mobj :inc labels)
(origf a b)) (origf a b))
([a b & more] ([a b & more]
(mobj :inc) (mobj :inc labels)
(apply origf a b more))) (apply origf a b more)))
(assoc mdata ::original origf))))) (assoc mdata ::original origf)))))

View file

@ -9,10 +9,9 @@
(ns app.migrations (ns app.migrations
(:require (:require
[integrant.core :as ig] [app.migrations.migration-0023 :as mg0023]
[app.db :as db]
[app.util.migrations :as mg] [app.util.migrations :as mg]
[app.migrations.migration-0023 :as mg0023])) [integrant.core :as ig]))
(def migrations (def migrations
[{:name "0001-add-extensions" [{:name "0001-add-extensions"

View file

@ -123,7 +123,7 @@
(when (jetty/connected? conn) (when (jetty/connected? conn)
(jetty/send! conn data) (jetty/send! conn data)
true) true)
(catch java.lang.NullPointerException e (catch java.lang.NullPointerException _e
false))) false)))
(defn websocket (defn websocket

View file

@ -8,7 +8,6 @@
(:refer-clojure :exclude [run!]) (:refer-clojure :exclude [run!])
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.config :as cfg]
[app.util.redis :as redis] [app.util.redis :as redis]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig]) [integrant.core :as ig])

View file

@ -10,7 +10,6 @@
(ns app.rlimits (ns app.rlimits
"Resource usage limits (in other words: semaphores)." "Resource usage limits (in other words: semaphores)."
(:require (:require
[app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[integrant.core :as ig]) [integrant.core :as ig])

View file

@ -22,7 +22,7 @@
[integrant.core :as ig])) [integrant.core :as ig]))
(defn- default-handler (defn- default-handler
[req] [_]
(ex/raise :type :not-found)) (ex/raise :type :not-found))
(defn- rpc-query-handler (defn- rpc-query-handler

View file

@ -15,8 +15,8 @@
[app.rpc.queries.comments :as comments] [app.rpc.queries.comments :as comments]
[app.rpc.queries.files :as files] [app.rpc.queries.files :as files]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s])) [clojure.spec.alpha :as s]))
;; --- Mutation: Create Comment Thread ;; --- Mutation: Create Comment Thread

View file

@ -10,8 +10,8 @@
(ns app.rpc.mutations.demo (ns app.rpc.mutations.demo
"A demo specific mutations." "A demo specific mutations."
(:require (:require
[app.common.uuid :as uuid]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.uuid :as uuid]
[app.config :as cfg] [app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.db.profile-initial-data :refer [create-profile-initial-data]] [app.db.profile-initial-data :refer [create-profile-initial-data]]

View file

@ -20,7 +20,6 @@
[app.util.http :as http] [app.util.http :as http]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs])) [datoteka.core :as fs]))
@ -84,7 +83,7 @@
(defn create-file-media-object (defn create-file-media-object
[{:keys [conn storage svgc] :as cfg} {:keys [id file-id is-local name content] :as params}] [{:keys [conn storage svgc] :as cfg} {:keys [file-id is-local name content] :as params}]
(media/validate-media-type (:content-type content)) (media/validate-media-type (:content-type content))
(let [storage (assoc storage :conn conn) (let [storage (assoc storage :conn conn)
source-path (fs/path (:tempfile content)) source-path (fs/path (:tempfile content))

View file

@ -11,7 +11,6 @@
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.media :as cm]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.db :as db] [app.db :as db]
@ -21,13 +20,9 @@
[app.rpc.queries.profile :as profile] [app.rpc.queries.profile :as profile]
[app.rpc.queries.teams :as teams] [app.rpc.queries.teams :as teams]
[app.storage :as sto] [app.storage :as sto]
[app.tasks :as tasks]
[app.util.services :as sv] [app.util.services :as sv]
[app.util.time :as dt] [app.util.time :as dt]
[buddy.core.codecs :as bc]
[buddy.core.nonce :as bn]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.core :as fs])) [datoteka.core :as fs]))
;; --- Helpers & Specs ;; --- Helpers & Specs
@ -266,10 +261,7 @@
(defn upload-photo (defn upload-photo
[{:keys [storage] :as cfg} {:keys [file]}] [{:keys [storage] :as cfg} {:keys [file]}]
(let [prefix (-> (bn/random-bytes 8) (let [thumb (media/run cfg
(bc/bytes->b64u)
(bc/bytes->str))
thumb (media/run cfg
{:cmd :profile-thumbnail {:cmd :profile-thumbnail
:format :jpeg :format :jpeg
:quality 85 :quality 85

View file

@ -10,9 +10,8 @@
(ns app.rpc.permissions (ns app.rpc.permissions
"A permission checking helper factories." "A permission checking helper factories."
(:require (:require
[app.common.spec :as us]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[clojure.spec.alpha :as s])) [app.common.spec :as us]))
(defn make-edition-check-fn (defn make-edition-check-fn
"A simple factory for edition permission check functions." "A simple factory for edition permission check functions."

View file

@ -15,8 +15,8 @@
[app.db :as db] [app.db :as db]
[app.rpc.permissions :as perms] [app.rpc.permissions :as perms]
[app.rpc.queries.projects :as projects] [app.rpc.queries.projects :as projects]
[app.util.services :as sv]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.services :as sv]
[clojure.spec.alpha :as s])) [clojure.spec.alpha :as s]))
(declare decode-row) (declare decode-row)

View file

@ -9,7 +9,6 @@
(ns app.rpc.queries.projects (ns app.rpc.queries.projects
(:require (:require
[app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.rpc.permissions :as perms] [app.rpc.permissions :as perms]

View file

@ -10,16 +10,11 @@
(ns app.sprops (ns app.sprops
"Server props module." "Server props module."
(:require (:require
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.util.time :as dt]
[buddy.core.codecs :as bc] [buddy.core.codecs :as bc]
[buddy.core.nonce :as bn] [buddy.core.nonce :as bn]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare initialize) (declare initialize)

View file

@ -10,12 +10,12 @@
(ns app.srepl (ns app.srepl
"Server Repl." "Server Repl."
(:require (:require
[integrant.core :as ig]
[app.srepl.main]
[app.common.spec :as us] [app.common.spec :as us]
[app.srepl.main]
[clojure.core.server :as ccs] [clojure.core.server :as ccs]
[clojure.main :as cm]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.main :as cm])) [integrant.core :as ig]))
(defn- repl-init (defn- repl-init
[] []

View file

@ -14,7 +14,6 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.storage.db :as sdb] [app.storage.db :as sdb]
[app.storage.fs :as sfs] [app.storage.fs :as sfs]
@ -27,7 +26,6 @@
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.core :as fs] [datoteka.core :as fs]
[integrant.core :as ig] [integrant.core :as ig]
[lambdaisland.uri :as u]
[promesa.exec :as px]) [promesa.exec :as px])
(:import (:import
java.io.InputStream)) java.io.InputStream))
@ -198,7 +196,7 @@
(defn clone-object (defn clone-object
"Creates a clone of the provided object using backend basded efficient "Creates a clone of the provided object using backend basded efficient
method. Always clones objects to the configured default." method. Always clones objects to the configured default."
[{:keys [pool conn executor] :as storage} object] [{:keys [pool conn] :as storage} object]
(us/assert ::storage storage) (us/assert ::storage storage)
(let [storage (assoc storage :conn (or conn pool)) (let [storage (assoc storage :conn (or conn pool))
object* (create-database-object storage object)] object* (create-database-object storage object)]
@ -242,7 +240,7 @@
(defn get-object-path (defn get-object-path
"Get the Path to the object. Only works with `:fs` type of "Get the Path to the object. Only works with `:fs` type of
storages." storages."
[{:keys [backend conn path] :as storage} object] [storage object]
(let [backend (resolve-backend storage (:backend object))] (let [backend (resolve-backend storage (:backend object))]
(when (not= :fs (:type backend)) (when (not= :fs (:type backend))
(ex/raise :type :internal (ex/raise :type :internal
@ -304,7 +302,7 @@
backend (assoc backend :conn conn)] backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))] (impl/del-objects-in-bulk backend ids)))]
(fn [task] (fn [_]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(loop [n 0] (loop [n 0]
(if-let [[groups total] (retrieve-deleted-objects conn)] (if-let [[groups total] (retrieve-deleted-objects conn)]
@ -373,7 +371,7 @@
(db/exec-one! conn ["update storage_object set touched_at=null where id = ANY(?)" (db/exec-one! conn ["update storage_object set touched_at=null where id = ANY(?)"
(db/create-array conn "uuid" (into-array java.util.UUID ids))]))] (db/create-array conn "uuid" (into-array java.util.UUID ids))]))]
(fn [task] (fn [_]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(loop [cntf 0 (loop [cntf 0
cntd 0] cntd 0]
@ -424,7 +422,7 @@
[_ {:keys [pool storage] :as cfg}] [_ {:keys [pool storage] :as cfg}]
(letfn [(group-results [rows] (letfn [(group-results [rows]
(let [conj (fnil conj [])] (let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id backend exist] :as row}] (reduce (fn [acc {:keys [id exist] :as row}]
(cond-> (update acc :all conj id) (cond-> (update acc :all conj id)
(false? exist) (false? exist)
(update :to-delete conj (dissoc row :exist)))) (update :to-delete conj (dissoc row :exist))))
@ -451,7 +449,7 @@
(let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))] (let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))]
(db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))] (db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))]
(fn [task] (fn [_]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(loop [n 0 d 0] (loop [n 0 d 0]
(if-let [{:keys [all to-delete]} (retrieve-pending conn)] (if-let [{:keys [all to-delete]} (retrieve-pending conn)]

View file

@ -9,21 +9,13 @@
(ns app.storage.db (ns app.storage.db
(:require (:require
[app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.storage.impl :as impl] [app.storage.impl :as impl]
[clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs]
[lambdaisland.uri :as u]
[integrant.core :as ig]) [integrant.core :as ig])
(:import (:import
org.postgresql.largeobject.LargeObject java.io.ByteArrayInputStream))
java.io.ByteArrayInputStream
java.io.ByteArrayOutputStream
java.io.InputStream
java.io.OutputStream))
;; --- BACKEND INIT ;; --- BACKEND INIT
@ -58,11 +50,11 @@
(ByteArrayInputStream. (:data result)))) (ByteArrayInputStream. (:data result))))
(defmethod impl/get-object-url :db (defmethod impl/get-object-url :db
[backend {:keys [id] :as object}] [_ _]
(throw (UnsupportedOperationException. "not supported"))) (throw (UnsupportedOperationException. "not supported")))
(defmethod impl/del-objects-in-bulk :db (defmethod impl/del-objects-in-bulk :db
[backend ids] [_ _]
;; NOOP: because delting the row already deletes the file data from ;; NOOP: because deleting the row already deletes the file data from
;; the database. ;; the database.
nil) nil)

View file

@ -9,17 +9,15 @@
(ns app.storage.fs (ns app.storage.fs
(:require (:require
[app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db]
[app.storage.impl :as impl] [app.storage.impl :as impl]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[datoteka.core :as fs]
[cuerdas.core :as str] [cuerdas.core :as str]
[lambdaisland.uri :as u] [datoteka.core :as fs]
[integrant.core :as ig]) [integrant.core :as ig]
[lambdaisland.uri :as u])
(:import (:import
java.io.InputStream java.io.InputStream
java.io.OutputStream java.io.OutputStream
@ -34,7 +32,7 @@
(s/keys :opt-un [::directory])) (s/keys :opt-un [::directory]))
(defmethod ig/init-key ::backend (defmethod ig/init-key ::backend
[key cfg] [_ cfg]
;; Return a valid backend data structure only if all optional ;; Return a valid backend data structure only if all optional
;; parameters are provided. ;; parameters are provided.
(when (string? (:directory cfg)) (when (string? (:directory cfg))

View file

@ -13,14 +13,13 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[clojure.java.io :as io] [buddy.core.codecs :as bc]
[buddy.core.codecs :as bc]) [clojure.java.io :as io])
(:import (:import
java.nio.ByteBuffer java.nio.ByteBuffer
java.util.UUID java.util.UUID
java.io.ByteArrayInputStream java.io.ByteArrayInputStream
java.io.InputStream java.io.InputStream
java.nio.file.Path
java.nio.file.Files)) java.nio.file.Files))
;; --- API Definition ;; --- API Definition

View file

@ -13,23 +13,18 @@
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db]
[app.storage.impl :as impl] [app.storage.impl :as impl]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[lambdaisland.uri :as u] [integrant.core :as ig]
[integrant.core :as ig]) [lambdaisland.uri :as u])
(:import (:import
java.io.InputStream
java.io.OutputStream
java.nio.file.Path
java.time.Duration java.time.Duration
java.util.Collection java.util.Collection
software.amazon.awssdk.core.sync.RequestBody software.amazon.awssdk.core.sync.RequestBody
software.amazon.awssdk.regions.Region software.amazon.awssdk.regions.Region
software.amazon.awssdk.services.s3.S3Client software.amazon.awssdk.services.s3.S3Client
software.amazon.awssdk.services.s3.S3ClientBuilder
software.amazon.awssdk.services.s3.model.Delete software.amazon.awssdk.services.s3.model.Delete
software.amazon.awssdk.services.s3.model.CopyObjectRequest software.amazon.awssdk.services.s3.model.CopyObjectRequest
software.amazon.awssdk.services.s3.model.DeleteObjectsRequest software.amazon.awssdk.services.s3.model.DeleteObjectsRequest
@ -113,7 +108,7 @@
:eu-central-1 Region/EU_CENTRAL_1)) :eu-central-1 Region/EU_CENTRAL_1))
(defn- build-s3-client (defn- build-s3-client
[{:keys [region bucket]}] [{:keys [region]}]
(.. (S3Client/builder) (.. (S3Client/builder)
(region (lookup-region region)) (region (lookup-region region))
(build))) (build)))

View file

@ -10,18 +10,14 @@
(ns app.svgparse (ns app.svgparse
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us]
[cuerdas.core :as str]
[app.metrics :as mtx] [app.metrics :as mtx]
[clojure.java.io :as io]
[clojure.java.shell :as shell]
[clojure.spec.alpha :as s]
[clojure.xml :as xml]
[app.util.graal :as graal] [app.util.graal :as graal]
[app.util.pool :as pool] [app.util.pool :as pool]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[clojure.xml :as xml]
[integrant.core :as ig]) [integrant.core :as ig])
(:import (:import
java.io.InputStream
java.util.function.Consumer java.util.function.Consumer
org.apache.commons.io.IOUtils)) org.apache.commons.io.IOUtils))
@ -36,7 +32,7 @@
(s/keys :req-un [::mtx/metrics])) (s/keys :req-un [::mtx/metrics]))
(defmethod ig/init-key ::svgc (defmethod ig/init-key ::svgc
[_ {:keys [metrics] :as cfg}] [_ _]
(let [ctx-pool (prepare-context-pool)] (let [ctx-pool (prepare-context-pool)]
(with-meta (with-meta
(fn [data] (fn [data]
@ -69,7 +65,6 @@
(defn- do-svg-clean (defn- do-svg-clean
[ctx data] [ctx data]
(let [res (promise) (let [res (promise)
bindings (graal/get-bindings ctx "js")
optimize (-> (graal/get-bindings ctx "js") optimize (-> (graal/get-bindings ctx "js")
(graal/get-member "svgc") (graal/get-member "svgc")
(graal/get-member "optimize")) (graal/get-member "optimize"))

View file

@ -11,11 +11,11 @@
"Generic task for permanent deletion of objects." "Generic task for permanent deletion of objects."
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[integrant.core :as ig]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx] [app.metrics :as mtx]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log])) [clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare handler) (declare handler)
(declare handle-deletion) (declare handle-deletion)

View file

@ -13,11 +13,8 @@
after some period of inactivity (the default threshold is 72h)." after some period of inactivity (the default threshold is 72h)."
(:require (:require
[app.common.pages.migrations :as pmg] [app.common.pages.migrations :as pmg]
[app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.storage :as sto]
[app.tasks :as tasks]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
@ -25,8 +22,8 @@
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler) (declare handler)
(declare retrieve-candidates)
(declare process-file) (declare process-file)
(declare retrieve-candidates)
(s/def ::storage some?) (s/def ::storage some?)
@ -52,11 +49,6 @@
(run! (partial process-file cfg) files) (run! (partial process-file cfg) files)
(recur))))))) (recur)))))))
(defn- decode-row
[{:keys [data] :as row}]
(cond-> row
(bytes? data) (assoc :data (blob/decode data))))
(def ^:private (def ^:private
sql:retrieve-candidates-chunk sql:retrieve-candidates-chunk
"select f.id, "select f.id,
@ -94,14 +86,14 @@
(into (keys (:media data))))) (into (keys (:media data)))))
(defn- process-file (defn- process-file
[{:keys [conn storage] :as cfg} {:keys [id data age] :as file}] [{:keys [conn] :as cfg} {:keys [id data age] :as file}]
(let [data (-> (blob/decode data) (let [data (-> (blob/decode data)
(assoc :id id) (assoc :id id)
(pmg/migrate-data)) (pmg/migrate-data))
used (collect-used-media data) used (collect-used-media data)
unused (->> (db/query conn :file-media-object {:file-id id}) unused (->> (db/query conn :file-media-object {:file-id id})
(remove #(contains? used (:id %))))] (remove #(contains? used (:id %))))]
(log/infof "processing file: id='%s' age='%s' to-delete=%s" id age (count unused)) (log/infof "processing file: id='%s' age='%s' to-delete=%s" id age (count unused))
@ -114,9 +106,7 @@
(log/debugf "deleting media object: id='%s' media-id='%s' thumb-id='%s'" (log/debugf "deleting media object: id='%s' media-id='%s' thumb-id='%s'"
(:id mobj) (:media-id mobj) (:thumbnail-id mobj)) (:id mobj) (:media-id mobj) (:thumbnail-id mobj))
;; NOTE: deleting the file-media-object in the database ;; NOTE: deleting the file-media-object in the database
;; automatically marks to be deleted the associated storage ;; automatically marks as toched the referenced storage objects.
;; objects with the specialized trigger attached
;; to :file-media-object table.
(db/delete! conn :file-media-object {:id (:id mobj)})) (db/delete! conn :file-media-object {:id (:id mobj)}))
nil)) nil))

View file

@ -11,17 +11,17 @@
"A maintenance task that performs a garbage collection of the file "A maintenance task that performs a garbage collection of the file
change (transaction) log." change (transaction) log."
(:require (:require
[app.common.spec :as us]
[integrant.core :as ig]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log])) [clojure.tools.logging :as log]
[integrant.core :as ig]))
(declare handler) (declare handler)
(s/def ::max-age ::dt/duration) (s/def ::max-age ::dt/duration)
(defmethod ig/pre-init-spec ::handler [_] (defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::mtx/metrics ::max-age])) (s/keys :req-un [::db/pool ::mtx/metrics ::max-age]))

View file

@ -9,12 +9,11 @@
(ns app.tasks.sendmail (ns app.tasks.sendmail
(:require (:require
[app.common.spec :as us]
[app.config :as cfg] [app.config :as cfg]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.util.emails :as emails] [app.util.emails :as emails]
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler) (declare handler)

View file

@ -11,7 +11,6 @@
"A maintenance task that performs a cleanup of already executed tasks "A maintenance task that performs a cleanup of already executed tasks
from the database table." from the database table."
(:require (:require
[app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.metrics :as mtx] [app.metrics :as mtx]
[app.util.time :as dt] [app.util.time :as dt]

View file

@ -12,15 +12,13 @@
information about the current instance and send it to the telemetry information about the current instance and send it to the telemetry
server." server."
(:require (:require
[app.config :as cfg]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.util.http :as http] [app.util.http :as http]
[app.util.json :as json] [app.util.json :as json]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[integrant.core :as ig])) [integrant.core :as ig]))
(declare handler) (declare handler)
@ -60,16 +58,16 @@
[{:keys [sprops] :as cfg}] [{:keys [sprops] :as cfg}]
(let [instance-id (:instance-id sprops) (let [instance-id (:instance-id sprops)
data (retrieve-stats cfg) data (retrieve-stats cfg)
data (assoc data :instance-id instance-id)] data (assoc data :instance-id instance-id)
(let [response (http/send! {:method :post response (http/send! {:method :post
:uri (:uri cfg) :uri (:uri cfg)
:headers {"content-type" "application/json"} :headers {"content-type" "application/json"}
:body (json/encode-str data)})] :body (json/encode-str data)})]
(when (not= 200 (:status response)) (when (not= 200 (:status response))
(ex/raise :type :internal (ex/raise :type :internal
:code :invalid-response-from-google :code :invalid-response-from-google
:context {:status (:status response) :context {:status (:status response)
:body (:body response)}))))) :body (:body response)}))))
(defn retrieve-num-teams (defn retrieve-num-teams
[conn] [conn]

View file

@ -11,7 +11,7 @@
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.db :as db] [app.db :as db]
[app.http.middleware :refer [wrap-parse-request-body wrap-errors]] [app.http.middleware :refer [wrap-parse-request-body]]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[integrant.core :as ig] [integrant.core :as ig]

View file

@ -12,14 +12,12 @@
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.spec :as us] [app.common.spec :as us]
[integrant.core :as ig]
[app.config :as cfg]
[app.util.time :as dt] [app.util.time :as dt]
[app.util.transit :as t] [app.util.transit :as t]
[buddy.core.kdf :as bk] [buddy.core.kdf :as bk]
[buddy.sign.jwe :as jwe] [buddy.sign.jwe :as jwe]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log])) [integrant.core :as ig]))
(defn- derive-tokens-secret (defn- derive-tokens-secret
[key] [key]

View file

@ -6,49 +6,16 @@
(ns app.util.async (ns app.util.async
(:require (:require
[clojure.spec.alpha :as s]
[clojure.core.async :as a] [clojure.core.async :as a]
[cuerdas.core :as str]) [clojure.spec.alpha :as s])
(:import (:import
java.util.concurrent.Executor java.util.concurrent.Executor))
java.util.concurrent.ThreadFactory
java.util.concurrent.ForkJoinPool
java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory
java.util.concurrent.ExecutorService
java.util.concurrent.atomic.AtomicLong))
(s/def ::executor #(instance? Executor %)) (s/def ::executor #(instance? Executor %))
(defonce processors (defonce processors
(delay (.availableProcessors (Runtime/getRuntime)))) (delay (.availableProcessors (Runtime/getRuntime))))
;; (defn forkjoin-thread-factory
;; [f]
;; (reify ForkJoinPool$ForkJoinWorkerThreadFactory
;; (newThread [this pool]
;; (let [wth (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)]
;; (f wth)))))
;; (defn forkjoin-named-thread-factory
;; [name]
;; (reify ForkJoinPool$ForkJoinWorkerThreadFactory
;; (newThread [this pool]
;; (let [wth (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)]
;; (.setName wth (str name ":" (.getPoolIndex wth)))
;; wth))))
;; (defn forkjoin-pool
;; [{:keys [factory async? parallelism]
;; :or {async? true}
;; :as opts}]
;; (let [parallelism (or parallelism @processors)
;; factory (cond
;; (fn? factory) (forkjoin-thread-factory factory)
;; (instance? ForkJoinPool$ForkJoinWorkerThreadFactory factory) factory
;; (nil? factory) ForkJoinPool/defaultForkJoinWorkerThreadFactory
;; :else (throw (ex-info "Unexpected thread factory" {:factory factory})))]
;; (ForkJoinPool. (or parallelism @processors) factory nil async?)))
(defmacro go-try (defmacro go-try
[& body] [& body]
`(a/go `(a/go
@ -84,7 +51,7 @@
(finally (finally
(a/close! c))))) (a/close! c)))))
c c
(catch java.util.concurrent.RejectedExecutionException e (catch java.util.concurrent.RejectedExecutionException _e
(a/close! c) (a/close! c)
c)))) c))))

View file

@ -10,7 +10,6 @@
(ns app.util.graal (ns app.util.graal
"Graal Polyglot integration layer." "Graal Polyglot integration layer."
(:import (:import
java.util.function.Consumer
org.graalvm.polyglot.Context org.graalvm.polyglot.Context
org.graalvm.polyglot.Source org.graalvm.polyglot.Source
org.graalvm.polyglot.Value)) org.graalvm.polyglot.Value))

View file

@ -39,13 +39,10 @@
(defn- impl-migrate-single (defn- impl-migrate-single
[pool modname {:keys [name] :as migration}] [pool modname {:keys [name] :as migration}]
(letfn [(execute [] (when-not (registered? pool modname (:name migration))
(register! pool modname name) (log/info (str/format "applying migration %s/%s" modname name))
((:fn migration) pool))] (register! pool modname name)
(when-not (registered? pool modname (:name migration)) ((:fn migration) pool)))
(log/info (str/format "applying migration %s/%s" modname name))
(register! pool modname name)
((:fn migration) pool))))
(defn- impl-migrate (defn- impl-migrate
[conn migrations _opts] [conn migrations _opts]

View file

@ -128,9 +128,12 @@
(decode))) (decode)))
(defn encode-str (defn encode-str
[message] ([message]
(->> (encode message) (->> (encode message)
(bytes->str))) (bytes->str)))
([message opts]
(->> (encode message opts)
(bytes->str))))
(defn encode-verbose-str (defn encode-verbose-str
[message] [message]

View file

@ -12,7 +12,6 @@
(:require (:require
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[app.config :as cfg]
[app.db :as db] [app.db :as db]
[app.util.async :as aa] [app.util.async :as aa]
[app.util.time :as dt] [app.util.time :as dt]
@ -20,17 +19,13 @@
[clojure.pprint :refer [pprint]] [clojure.pprint :refer [pprint]]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[clojure.tools.logging :as log] [clojure.tools.logging :as log]
[cuerdas.core :as str]
[integrant.core :as ig] [integrant.core :as ig]
[promesa.exec :as px]) [promesa.exec :as px])
(:import (:import
org.eclipse.jetty.util.thread.QueuedThreadPool org.eclipse.jetty.util.thread.QueuedThreadPool
java.util.concurrent.ExecutorService java.util.concurrent.ExecutorService
java.util.concurrent.Executors java.util.concurrent.Executors
java.util.concurrent.Executor java.util.concurrent.Executor))
java.time.Duration
java.time.Instant
java.util.Date))
(s/def ::executor #(instance? Executor %)) (s/def ::executor #(instance? Executor %))
@ -236,7 +231,7 @@
{:status :retry :task item :error error}))))) {:status :retry :task item :error error})))))
(defn- run-task (defn- run-task
[{:keys [tasks conn]} item] [{:keys [tasks]} item]
(try (try
(log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item)) (log/debugf "Started task '%s/%s/%s'." (:name item) (:id item) (:retry-num item))
(handle-task tasks item) (handle-task tasks item)
@ -255,7 +250,7 @@
for update skip locked") for update skip locked")
(defn- event-loop-fn* (defn- event-loop-fn*
[{:keys [tasks pool executor batch-size] :as cfg}] [{:keys [pool executor batch-size] :as cfg}]
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(let [queue (:queue cfg) (let [queue (:queue cfg)
items (->> (db/exec! conn [sql:select-next-tasks queue batch-size]) items (->> (db/exec! conn [sql:select-next-tasks queue batch-size])
@ -304,7 +299,7 @@
(s/keys :req-un [::executor ::db/pool ::schedule])) (s/keys :req-un [::executor ::db/pool ::schedule]))
(defmethod ig/init-key ::scheduler (defmethod ig/init-key ::scheduler
[_ {:keys [executor schedule] :as cfg}] [_ {:keys [schedule] :as cfg}]
(let [scheduler (Executors/newScheduledThreadPool (int 1)) (let [scheduler (Executors/newScheduledThreadPool (int 1))
schedule (filter some? schedule) schedule (filter some? schedule)
cfg (assoc cfg cfg (assoc cfg