Merge remote-tracking branch 'origin/main' into develop

This commit is contained in:
Andrey Antukh 2022-02-12 17:33:28 +01:00
commit 741d2b3f3c
15 changed files with 133 additions and 88 deletions

View file

@ -140,7 +140,6 @@
indicating the action the program should take and the options provided." indicating the action the program should take and the options provided."
[args] [args]
(let [{:keys [options arguments errors summary] :as opts} (parse-opts args cli-options)] (let [{:keys [options arguments errors summary] :as opts} (parse-opts args cli-options)]
;; (pp/pprint opts)
(cond (cond
(:help options) ; help => exit OK with usage summary (:help options) ; help => exit OK with usage summary
{:exit-message (usage summary) :ok? true} {:exit-message (usage summary) :ok? true}

View file

@ -50,16 +50,26 @@
(declare instrument-jdbc!) (declare instrument-jdbc!)
(declare apply-migrations!) (declare apply-migrations!)
(s/def ::name keyword?) (s/def ::connection-timeout ::us/integer)
(s/def ::uri ::us/not-empty-string)
(s/def ::min-pool-size ::us/integer)
(s/def ::max-pool-size ::us/integer) (s/def ::max-pool-size ::us/integer)
(s/def ::migrations map?) (s/def ::migrations map?)
(s/def ::min-pool-size ::us/integer)
(s/def ::name keyword?)
(s/def ::password ::us/string)
(s/def ::read-only ::us/boolean) (s/def ::read-only ::us/boolean)
(s/def ::uri ::us/not-empty-string)
(s/def ::username ::us/string)
(s/def ::validation-timeout ::us/integer)
(defmethod ig/pre-init-spec ::pool [_] (defmethod ig/pre-init-spec ::pool [_]
(s/keys :req-un [::uri ::name ::min-pool-size ::max-pool-size] (s/keys :req-un [::uri ::name ::username ::password]
:opt-un [::migrations ::mtx/metrics ::read-only])) :opt-un [::min-pool-size
::max-pool-size
::connection-timeout
::validation-timeout
::migrations
::mtx/metrics
::read-only]))
(defmethod ig/init-key ::pool (defmethod ig/init-key ::pool
[_ {:keys [migrations metrics name read-only] :as cfg}] [_ {:keys [migrations metrics name read-only] :as cfg}]
@ -111,11 +121,11 @@
(.setPoolName (d/name (:name cfg))) (.setPoolName (d/name (:name cfg)))
(.setAutoCommit true) (.setAutoCommit true)
(.setReadOnly read-only) (.setReadOnly read-only)
(.setConnectionTimeout 10000) ;; 10seg (.setConnectionTimeout (:connection-timeout cfg 10000)) ;; 10seg
(.setValidationTimeout 10000) ;; 10seg (.setValidationTimeout (:validation-timeout cfg 10000)) ;; 10seg
(.setIdleTimeout 120000) ;; 2min (.setIdleTimeout 120000) ;; 2min
(.setMaxLifetime 1800000) ;; 30min (.setMaxLifetime 1800000) ;; 30min
(.setMinimumIdle (:min-pool-size cfg 0)) (.setMinimumIdle (:min-pool-size cfg 0))
(.setMaximumPoolSize (:max-pool-size cfg 50)) (.setMaximumPoolSize (:max-pool-size cfg 50))
(.setConnectionInitSql initsql) (.setConnectionInitSql initsql)
(.setInitializationFailTimeout -1)) (.setInitializationFailTimeout -1))

View file

@ -76,11 +76,11 @@
(try (try
(handler request) (handler request)
(catch Throwable e (catch Throwable e
(l/with-context (errors/get-error-context request e) (l/error :hint "unexpected error processing request"
(l/error :hint "unexpected error processing request" ::l/context (errors/get-error-context request e)
:query-string (:query-string request) :query-string (:query-string request)
:cause e) :cause e)
{:status 500 :body "internal server error"})))))) {:status 500 :body "internal server error"})))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Http Router ;; Http Router
@ -150,8 +150,8 @@
[middleware/errors errors/handle] [middleware/errors errors/handle]
[middleware/cookies]]} [middleware/cookies]]}
["/health" {:get (:health-check debug)}]
["/_doc" {:get (doc/handler rpc)}] ["/_doc" {:get (doc/handler rpc)}]
["/feedback" {:middleware [(:middleware session)] ["/feedback" {:middleware [(:middleware session)]
:post feedback}] :post feedback}]
["/auth/oauth/:provider" {:post (:handler oauth)}] ["/auth/oauth/:provider" {:post (:handler oauth)}]

View file

@ -18,9 +18,9 @@
[app.util.template :as tmpl] [app.util.template :as tmpl]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.pprint :as ppr]
[cuerdas.core :as str] [cuerdas.core :as str]
[datoteka.core :as fs] [datoteka.core :as fs]
[fipp.edn :as fpp]
[integrant.core :as ig])) [integrant.core :as ig]))
;; (selmer.parser/cache-off!) ;; (selmer.parser/cache-off!)
@ -147,21 +147,20 @@
(some-> (db/get-by-id pool :server-error-report id) :content db/decode-transit-pgobject))) (some-> (db/get-by-id pool :server-error-report id) :content db/decode-transit-pgobject)))
(render-template [report] (render-template [report]
(binding [ppr/*print-right-margin* 300] (let [context (dissoc report
(let [context (dissoc report :trace :cause :params :data :spec-problems
:trace :cause :params :data :spec-problems :spec-explain :spec-value :error :explain :hint)
:spec-explain :spec-value :error :explain :hint) params {:context (with-out-str (fpp/pprint context {:width 300}))
params {:context (with-out-str (ppr/pprint context)) :hint (:hint report)
:hint (:hint report) :spec-explain (:spec-explain report)
:spec-explain (:spec-explain report) :spec-problems (:spec-problems report)
:spec-problems (:spec-problems report) :spec-value (:spec-value report)
:spec-value (:spec-value report) :data (:data report)
:data (:data report) :trace (or (:trace report)
:trace (or (:trace report) (some-> report :error :trace))
(some-> report :error :trace)) :params (:params report)}]
:params (:params report)}] (-> (io/resource "templates/error-report.tmpl")
(-> (io/resource "templates/error-report.tmpl") (tmpl/render params))))
(tmpl/render params)))))
] ]
(when-not (authorized? pool request) (when-not (authorized? pool request)
@ -195,9 +194,17 @@
:body (-> (io/resource "templates/error-list.tmpl") :body (-> (io/resource "templates/error-list.tmpl")
(tmpl/render {:items items}))})) (tmpl/render {:items items}))}))
(defn health-check
"Mainly a task that performs a health check."
[{:keys [pool]} _]
(db/with-atomic [conn pool]
(db/exec-one! conn ["select count(*) as count from server_prop;"])
{:status 200 :body "Ok"}))
(defmethod ig/init-key ::handlers (defmethod ig/init-key ::handlers
[_ cfg] [_ cfg]
{:index (partial index cfg) {:index (partial index cfg)
:health-check (partial health-check cfg)
:retrieve-file-data (partial retrieve-file-data cfg) :retrieve-file-data (partial retrieve-file-data cfg)
:retrieve-file-changes (partial retrieve-file-changes cfg) :retrieve-file-changes (partial retrieve-file-changes cfg)
:retrieve-error (partial retrieve-error cfg) :retrieve-error (partial retrieve-error cfg)

View file

@ -11,7 +11,6 @@
[app.common.logging :as l] [app.common.logging :as l]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.uuid :as uuid] [app.common.uuid :as uuid]
[clojure.pprint]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str])) [cuerdas.core :as str]))
@ -36,6 +35,7 @@
:data (some-> data (dissoc ::s/problems ::s/value ::s/spec)) :data (some-> data (dissoc ::s/problems ::s/value ::s/spec))
:ip-addr (parse-client-ip request) :ip-addr (parse-client-ip request)
:profile-id (:profile-id request)} :profile-id (:profile-id request)}
(let [headers (:headers request)] (let [headers (:headers request)]
{:user-agent (get headers "user-agent") {:user-agent (get headers "user-agent")
:frontend-version (get headers "x-frontend-version" "unknown")}) :frontend-version (get headers "x-frontend-version" "unknown")})
@ -70,8 +70,10 @@
(defmethod handle-exception :assertion (defmethod handle-exception :assertion
[error request] [error request]
(let [edata (ex-data error)] (let [edata (ex-data error)]
(l/with-context (get-error-context request error) (l/error ::l/raw (ex-message error)
(l/error ::l/raw (ex-message error) :cause error)) ::l/context (get-error-context request error)
:cause error)
{:status 500 {:status 500
:body {:type :server-error :body {:type :server-error
:code :assertion :code :assertion
@ -93,9 +95,9 @@
(ex/exception? (:handling edata))) (ex/exception? (:handling edata)))
(handle-exception (:handling edata) request) (handle-exception (:handling edata) request)
(do (do
(l/with-context (get-error-context request error) (l/error ::l/raw (ex-message error)
(l/error ::l/raw (ex-message error) :cause error)) ::l/context (get-error-context request error)
:cause error)
{:status 500 {:status 500
:body {:type :server-error :body {:type :server-error
:code :unexpected :code :unexpected
@ -105,10 +107,9 @@
(defmethod handle-exception org.postgresql.util.PSQLException (defmethod handle-exception org.postgresql.util.PSQLException
[error request] [error request]
(let [state (.getSQLState ^java.sql.SQLException error)] (let [state (.getSQLState ^java.sql.SQLException error)]
(l/error ::l/raw (ex-message error)
(l/with-context (get-error-context request error) ::l/context (get-error-context request error)
(l/error ::l/raw (ex-message error) :cause error)) :cause error)
(cond (cond
(= state "57014") (= state "57014")
{:status 504 {:status 504

View file

@ -193,20 +193,21 @@
(defn- persist-events (defn- persist-events
[{:keys [pool executor] :as cfg} events] [{:keys [pool executor] :as cfg} events]
(letfn [(event->row [event] (letfn [(event->row [event]
[(uuid/next) (when (:profile-id event)
(:name event) [(uuid/next)
(:type event) (:name event)
(:profile-id event) (:type event)
(:tracked-at event) (:profile-id event)
(some-> (:ip-addr event) db/inet) (:tracked-at event)
(db/tjson (:props event)) (some-> (:ip-addr event) db/inet)
"backend"])] (db/tjson (:props event))
"backend"]))]
(aa/with-thread executor (aa/with-thread executor
(when (seq events) (when (seq events)
(db/with-atomic [conn pool] (db/with-atomic [conn pool]
(db/insert-multi! conn :audit-log (db/insert-multi! conn :audit-log
[:id :name :type :profile-id :tracked-at :ip-addr :props :source] [:id :name :type :profile-id :tracked-at :ip-addr :props :source]
(sequence (map event->row) events))))))) (sequence (keep event->row) events)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Archive Task ;; Archive Task

View file

@ -70,12 +70,15 @@
(defmethod ig/pre-init-spec ::reporter [_] (defmethod ig/pre-init-spec ::reporter [_]
(s/keys :req-un [::wrk/executor ::db/pool ::receiver])) (s/keys :req-un [::wrk/executor ::db/pool ::receiver]))
(defn error-event?
[event]
(= "error" (:logger/level event)))
(defmethod ig/init-key ::reporter (defmethod ig/init-key ::reporter
[_ {:keys [receiver] :as cfg}] [_ {:keys [receiver] :as cfg}]
(l/info :msg "initializing database error persistence") (l/info :msg "initializing database error persistence")
(let [output (a/chan (a/sliding-buffer 128) (let [output (a/chan (a/sliding-buffer 5)
(filter (fn [event] (filter error-event?))]
(= (:logger/level event) "error"))))]
(receiver :sub output) (receiver :sub output)
(a/go-loop [] (a/go-loop []
(let [msg (a/<! output)] (let [msg (a/<! output)]

View file

@ -120,8 +120,6 @@
(.captureMessage ^IHub shub msg) (.captureMessage ^IHub shub msg)
)) ))
] ]
;; (clojure.pprint/pprint event)
(when @enabled (when @enabled
(.withScope ^IHub shub (reify ScopeCallback (.withScope ^IHub shub (reify ScopeCallback
(run [_ scope] (run [_ scope]

View file

@ -22,7 +22,7 @@
:migrations (ig/ref :app.migrations/all) :migrations (ig/ref :app.migrations/all)
:name :main :name :main
:min-pool-size 0 :min-pool-size 0
:max-pool-size 30} :max-pool-size 60}
:app.migrations/migrations :app.migrations/migrations
{} {}

View file

@ -17,7 +17,7 @@
[app.srepl.dev :as dev] [app.srepl.dev :as dev]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.time :as dt] [app.util.time :as dt]
[clojure.pprint :refer [pprint]] [fipp.edn :refer [pprint]]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str] [cuerdas.core :as str]
[expound.alpha :as expound])) [expound.alpha :as expound]))

View file

@ -8,7 +8,8 @@
(:require (:require
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[clojure.spec.alpha :as s] [clojure.spec.alpha :as s]
[cuerdas.core :as str]) [cuerdas.core :as str]
[fipp.ednize :as fez])
(:import (:import
java.time.Duration java.time.Duration
java.time.Instant java.time.Instant
@ -111,6 +112,11 @@
(defmethod print-dup Duration [o w] (defmethod print-dup Duration [o w]
(print-method o w)) (print-method o w))
(extend-protocol fez/IEdn
Duration
(-edn [o] (pr-str o)))
;; --- INSTANT ;; --- INSTANT
(defn instant (defn instant
@ -175,6 +181,10 @@
(defmethod print-dup Instant [o w] (defmethod print-dup Instant [o w]
(print-method o w)) (print-method o w))
(extend-protocol fez/IEdn
Instant
(-edn [o] (pr-str o)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Cron Expression ;; Cron Expression
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View file

@ -15,7 +15,9 @@
[clojure.core.async :as a] [clojure.core.async :as a]
[yetti.websocket :as yws]) [yetti.websocket :as yws])
(:import (:import
java.nio.ByteBuffer)) java.nio.ByteBuffer
org.eclipse.jetty.io.EofException))
(declare decode-beat) (declare decode-beat)
(declare encode-beat) (declare encode-beat)
@ -132,17 +134,25 @@
(defn- ws-send! (defn- ws-send!
[conn s] [conn s]
(let [ch (a/chan 1)] (let [ch (a/chan 1)]
(yws/send! conn s (fn [e] (try
(when e (a/offer! ch e)) (yws/send! conn s (fn [e]
(a/close! ch))) (when e (a/offer! ch e))
(a/close! ch)))
(catch EofException cause
(a/offer! ch cause)
(a/close! ch)))
ch)) ch))
(defn- ws-ping! (defn- ws-ping!
[conn s] [conn s]
(let [ch (a/chan 1)] (let [ch (a/chan 1)]
(yws/ping! conn s (fn [e] (try
(when e (a/offer! ch e)) (yws/ping! conn s (fn [e]
(a/close! ch))) (when e (a/offer! ch e))
(a/close! ch)))
(catch EofException cause
(a/offer! ch cause)
(a/close! ch)))
ch)) ch))
(defn- encode-beat (defn- encode-beat

View file

@ -260,10 +260,16 @@
(defn get-error-context (defn get-error-context
[error item] [error item]
(let [edata (ex-data error)] (let [data (ex-data error)]
{:id (uuid/next) (merge
:data edata {:id (uuid/next)
:params item})) :hint (ex-message error)
:spec-problems (some->> data ::s/problems (take 10) seq vec)
:spec-value (some->> data ::s/value)
:data (some-> data (dissoc ::s/problems ::s/value ::s/spec))
:params item}
(when (and data (::s/problems data))
{:spec-explain (us/pretty-explain data)}))))
(defn- handle-exception (defn- handle-exception
[error item] [error item]
@ -277,8 +283,10 @@
(= ::noop (:strategy edata)) (= ::noop (:strategy edata))
(assoc :inc-by 0)) (assoc :inc-by 0))
(l/with-context (get-error-context error item) (do
(l/error :cause error :hint "unhandled exception on task") (l/error :hint "unhandled exception on task"
::l/context (get-error-context error item)
:cause error)
(if (>= (:retry-num item) (:max-retries item)) (if (>= (:retry-num item) (:max-retries item))
{:status :failed :task item :error error} {:status :failed :task item :error error}
{:status :retry :task item :error error}))))) {:status :retry :task item :error error})))))

View file

@ -9,6 +9,7 @@
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[clojure.pprint :refer [pprint]] [clojure.pprint :refer [pprint]]
[cuerdas.core :as str] [cuerdas.core :as str]
[fipp.edn :as fpp]
#?(:clj [io.aviso.exception :as ie]) #?(:clj [io.aviso.exception :as ie])
#?(:cljs [goog.log :as glog])) #?(:cljs [goog.log :as glog]))
#?(:cljs (:require-macros [app.common.logging]) #?(:cljs (:require-macros [app.common.logging])
@ -52,22 +53,16 @@
(defn stringify-data (defn stringify-data
[val] [val]
(cond (cond
(instance? clojure.lang.Named val)
(name val)
(instance? Throwable val)
(binding [ie/*app-frame-names* [#"app.*"]
ie/*fonts* nil
ie/*traditional* true]
(ie/format-exception val nil))
(string? val) (string? val)
val val
(instance? clojure.lang.Named val)
(name val)
(coll? val) (coll? val)
(binding [clojure.pprint/*print-right-margin* 200] (binding [*print-level* 5
(-> (with-out-str (pprint val)) *print-length* 20]
(simple-prune (* 1024 1024 3)))) (with-out-str (fpp/pprint val {:width 200})))
:else :else
(str val)))) (str val))))
@ -163,13 +158,13 @@
(.isEnabled ^Logger logger ^Level level))) (.isEnabled ^Logger logger ^Level level)))
(defmacro log (defmacro log
[& {:keys [level cause ::logger ::async ::raw] :or {async true} :as props}] [& {:keys [level cause ::logger ::async ::raw ::context] :or {async true} :as props}]
(if (:ns &env) ; CLJS (if (:ns &env) ; CLJS
`(write-log! ~(or logger (str *ns*)) `(write-log! ~(or logger (str *ns*))
~level ~level
~cause ~cause
(or ~raw ~(dissoc props :level :cause ::logger ::raw))) (or ~raw ~(dissoc props :level :cause ::logger ::raw ::context)))
(let [props (dissoc props :level :cause ::logger ::async ::raw) (let [props (dissoc props :level :cause ::logger ::async ::raw ::context)
logger (or logger (str *ns*)) logger (or logger (str *ns*))
logger-sym (gensym "log") logger-sym (gensym "log")
level-sym (gensym "log")] level-sym (gensym "log")]
@ -180,7 +175,7 @@
`(->> (ThreadContext/getImmutableContext) `(->> (ThreadContext/getImmutableContext)
(send-off logging-agent (send-off logging-agent
(fn [_# cdata#] (fn [_# cdata#]
(with-context (into {} cdata#) (with-context (-> {} (into cdata#) (into ~context))
(->> (or ~raw (build-map-message ~props)) (->> (or ~raw (build-map-message ~props))
(write-log! ~logger-sym ~level-sym ~cause)))))) (write-log! ~logger-sym ~level-sym ~cause))))))

View file

@ -340,7 +340,10 @@
pre-process-images pre-process-images
(->> (rx/from nodes) (->> (rx/from nodes)
(rx/filter media-node?) (rx/filter media-node?)
(rx/merge-map ;; TODO: this should be merge-map, but we disable the
;; parallel upload until we resolve resource usage issues
;; on backend.
(rx/mapcat
(fn [node] (fn [node]
(->> (resolve-media context file-id node) (->> (resolve-media context file-id node)
(rx/map (fn [result] [node result]))))) (rx/map (fn [result] [node result])))))