♻️ Make the namespacing independent of the branding.

This commit is contained in:
Andrey Antukh 2020-08-18 19:26:37 +02:00
parent aaf8b71837
commit 6c67c3c71b
305 changed files with 2399 additions and 2580 deletions

View file

@ -0,0 +1,56 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
(ns app.util.async
(:require
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[clojure.core.async :as a])
(:import
java.util.concurrent.Executor))
(defmacro go-try
[& body]
`(a/go
(try
~@body
(catch Exception e# e#))))
(defmacro <?
[ch]
`(let [r# (a/<! ~ch)]
(if (instance? Exception r#)
(throw r#)
r#)))
(defmacro thread-try
[& body]
`(a/thread
(try
~@body
(catch Exception e#
e#))))
(s/def ::executor #(instance? Executor %))
(defn thread-call
[^Executor executor f]
(let [c (a/chan 1)]
(try
(.execute executor
(fn []
(try
(let [ret (try (f) (catch Exception e e))]
(when-not (nil? ret)
(a/>!! c ret)))
(finally
(a/close! c)))))
c
(catch java.util.concurrent.RejectedExecutionException e
(a/offer! c e)
(a/close! c)
c))))

View file

@ -0,0 +1,70 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2016-2020 Andrey Antukh <niwi@niwi.nz>
(ns app.util.blob
"A generic blob storage encoding. Mainly used for
page data, page options and txlog payload storage."
(:require [app.util.transit :as t])
(:import
java.io.ByteArrayInputStream
java.io.ByteArrayOutputStream
java.io.DataInputStream
java.io.DataOutputStream
net.jpountz.lz4.LZ4Factory
net.jpountz.lz4.LZ4FastDecompressor
net.jpountz.lz4.LZ4Compressor))
(defprotocol IDataToBytes
(->bytes [data] "convert data to bytes"))
(extend-protocol IDataToBytes
(Class/forName "[B")
(->bytes [data] data)
String
(->bytes [data] (.getBytes ^String data "UTF-8")))
(def lz4-factory (LZ4Factory/fastestInstance))
(defn encode
[data]
(let [data (t/encode data {:type :json})
data-len (alength ^bytes data)
cp (.fastCompressor ^LZ4Factory lz4-factory)
max-len (.maxCompressedLength cp data-len)
cdata (byte-array max-len)
clen (.compress ^LZ4Compressor cp ^bytes data 0 data-len cdata 0 max-len)]
(with-open [^ByteArrayOutputStream baos (ByteArrayOutputStream. (+ (alength cdata) 2 4))
^DataOutputStream dos (DataOutputStream. baos)]
(.writeShort dos (short 1)) ;; version number
(.writeInt dos (int data-len))
(.write dos ^bytes cdata (int 0) clen)
(.toByteArray baos))))
(declare decode-v1)
(defn decode
"A function used for decode persisted blobs in the database."
[data]
(let [data (->bytes data)]
(with-open [bais (ByteArrayInputStream. data)
dis (DataInputStream. bais)]
(let [version (.readShort dis)
udata-len (.readInt dis)]
(case version
1 (decode-v1 data udata-len)
(throw (ex-info "unsupported version" {:version version})))))))
(defn- decode-v1
[^bytes cdata ^long udata-len]
(let [^LZ4FastDecompressor dcp (.fastDecompressor ^LZ4Factory lz4-factory)
^bytes udata (byte-array udata-len)]
(.decompress dcp cdata 6 udata 0 udata-len)
(t/decode udata {:type :json})))

View file

@ -0,0 +1,12 @@
(ns app.util.cli
"Command line interface helpers.")
(defn exit!
([] (exit! 0))
([code]
(System/exit code)))
(defmacro print-err!
[& args]
`(binding [*out* *err*]
(println ~@args)))

View file

@ -0,0 +1,31 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016 Andrey Antukh <niwi@niwi.nz>
(ns app.util.closeable
"A closeable abstraction. A drop in replacement for
clojure builtin `with-open` syntax abstraction."
(:refer-clojure :exclude [with-open]))
(defprotocol ICloseable
(-close [_] "Close the resource."))
(defmacro with-open
[bindings & body]
{:pre [(vector? bindings)
(even? (count bindings))
(pos? (count bindings))]}
(reduce (fn [acc bindings]
`(let ~(vec bindings)
(try
~acc
(finally
(-close ~(first bindings))))))
`(do ~@body)
(reverse (partition 2 bindings))))
(extend-protocol ICloseable
java.lang.AutoCloseable
(-close [this] (.close this)))

View file

@ -0,0 +1,54 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016 Andrey Antukh <niwi@niwi.nz>
(ns app.util.data
"Data transformations utils."
(:require [clojure.walk :as walk]
[cuerdas.core :as str]))
;; TODO: move to app.common.helpers
(defn dissoc-in
[m [k & ks :as keys]]
(if ks
(if-let [nextmap (get m k)]
(let [newmap (dissoc-in nextmap ks)]
(if (seq newmap)
(assoc m k newmap)
(dissoc m k)))
m)
(dissoc m k)))
(defn normalize-attrs
"Recursively transforms all map keys from strings to keywords."
[m]
(letfn [(tf [[k v]]
(let [ks (-> (name k)
(str/replace "_" "-"))]
[(keyword ks) v]))
(walker [x]
(if (map? x)
(into {} (map tf) x)
x))]
(walk/postwalk walker m)))
(defn strip-delete-attrs
[m]
(dissoc m :deleted-at))
(defn normalize
"Perform a common normalization transformation
for a entity (database retrieved) data structure."
[m]
(-> m normalize-attrs strip-delete-attrs))
(defn deep-merge
[& maps]
(letfn [(merge' [& maps]
(if (every? map? maps)
(apply merge-with merge' maps)
(last maps)))]
(apply merge' (remove nil? maps))))

View file

@ -0,0 +1,98 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.dispatcher
"A generic service dispatcher implementation."
(:refer-clojure :exclude [defmethod])
(:require
[clojure.spec.alpha :as s]
[expound.alpha :as expound]
[app.common.exceptions :as ex])
(:import
clojure.lang.IDeref
clojure.lang.MapEntry
java.util.Map
java.util.HashMap))
(definterface IDispatcher
(^void add [key f]))
(deftype Dispatcher [reg attr wrap]
IDispatcher
(add [this key f]
(.put ^Map reg key (wrap f))
this)
clojure.lang.IDeref
(deref [_]
{:registry reg
:attr attr
:wrap wrap})
clojure.lang.IFn
(invoke [_ params]
(let [key (get params attr)
f (.get ^Map reg key)]
(when (nil? f)
(ex/raise :type :method-not-found
:hint "No method found for the current request."
:context {:key key}))
(f params))))
(defn dispatcher?
[v]
(instance? IDispatcher v))
(defmacro defservice
[sname & {:keys [dispatch-by wrap]}]
`(def ~sname (Dispatcher. (HashMap.) ~dispatch-by ~wrap)))
(defn parse-defmethod
[args]
(loop [r {}
s 0
v (first args)
n (rest args)]
(case s
0 (if (symbol? v)
(recur (assoc r :sym v) 1 (first n) (rest n))
(throw (ex-info "first arg to `defmethod` should be a symbol" {})))
1 (if (qualified-keyword? v)
(recur (-> r
(assoc :key (keyword (name v)))
(assoc :meta {:spec v :doc nil}))
3 (first n) (rest n))
(recur r (inc s) v n))
2 (if (simple-keyword? v)
(recur (-> r
(assoc :key v)
(assoc :meta {:doc nil}))
3 (first n) (rest n))
(throw (ex-info "second arg to `defmethod` should be a keyword" {})))
3 (if (string? v)
(recur (update r :meta assoc :doc v) (inc s) (first n) (rest n))
(recur r 4 v n))
4 (if (map? v)
(recur (update r :meta merge v) (inc s) (first n) (rest n))
(recur r 5 v n))
5 (if (vector? v)
(assoc r :args v :body n)
(throw (ex-info "missing arguments vector" {}))))))
(defn add-method
[^Dispatcher dsp key f meta]
(let [f (with-meta f meta)]
(.add dsp key f)
dsp))
(defmacro defmethod
[& args]
(let [{:keys [key meta sym args body]} (parse-defmethod args)
f `(fn ~args ~@body)]
`(do
(s/assert dispatcher? ~sym)
(add-method ~sym ~key ~f ~meta))))

View file

@ -0,0 +1,91 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.emails
(:require
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[app.common.spec :as us]
[app.common.exceptions :as ex]
[app.util.template :as tmpl]))
;; --- Impl.
(def ^:private email-path "emails/%(id)s/%(lang)s.%(type)s")
(defn- build-base-email
[data context]
(when-not (s/valid? ::parsed-email data)
(ex/raise :type :internal
:code :template-parse-error
:hint "Seems like the email template has invalid data."
:contex data))
{:subject (:subject data)
:content (cond-> []
(:body-text data) (conj {:type "text/plain"
:value (:body-text data)})
(:body-html data) (conj {:type "text/html"
:value (:body-html data)}))})
(defn- render-email-part
[type id context]
(let [lang (:lang context :en)
path (str/format email-path {:id (name id)
:lang (name lang)
:type (name type)})]
(some-> (io/resource path)
(tmpl/render context))))
(defn- impl-build-email
[id context]
(let [lang (:lang context :en)
subj (render-email-part :subj id context)
html (render-email-part :html id context)
text (render-email-part :txt id context)]
{:subject subj
:content (cond-> []
text (conj {:type "text/plain"
:value text})
html (conj {:type "text/html"
:value html}))}))
;; --- Public API
(s/def ::priority #{:high :low})
(s/def ::to ::us/email)
(s/def ::from ::us/email)
(s/def ::reply-to ::us/email)
(s/def ::lang string?)
(s/def ::context
(s/keys :req-un [::to]
:opt-un [::reply-to ::from ::lang ::priority]))
(defn build
([id] (build id {}))
([id extra-context]
(s/assert keyword? id)
(fn [context]
(us/verify ::context context)
(when-let [spec (s/get-spec id)]
(s/assert spec context))
(let [context (merge (if (fn? extra-context)
(extra-context)
extra-context)
context)
email (impl-build-email id context)]
(when-not email
(ex/raise :type :internal
:code :email-template-does-not-exists
:hint "seems like the template is wrong or does not exists."
::id id))
(cond-> (assoc email :id (name id))
(:to context) (assoc :to [(:to context)])
(:from context) (assoc :from (:from context))
(:reply-to context) (assoc :reply-to (:reply-to context)))))))

View file

@ -0,0 +1,26 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.http
"Http client abstraction layer."
(:require
[promesa.core :as p]
[promesa.exec :as px]
[java-http-clj.core :as http]))
(def default-client
(delay (http/build-client {:executor @px/default-executor})))
(defn get!
[url opts]
(let [opts' (merge {:client @default-client :as :string} opts)]
(http/get url nil opts')))
(defn send!
([req]
(http/send req {:client @default-client :as :string}))
([req opts]
(http/send req (merge {:client @default-client :as :string} opts))))

View file

@ -0,0 +1,86 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.migrations
(:require
[clojure.tools.logging :as log]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[next.jdbc :as jdbc]))
(s/def ::name string?)
(s/def ::step (s/keys :req-un [::name ::desc ::fn]))
(s/def ::steps (s/every ::step :kind vector?))
(s/def ::migrations
(s/keys :req-un [::name ::steps]))
;; --- Implementation
(defn- registered?
"Check if concrete migration is already registred."
[pool modname stepname]
(let [sql "select * from migrations where module=? and step=?"
rows (jdbc/execute! pool [sql modname stepname])]
(pos? (count rows))))
(defn- register!
"Register a concrete migration into local migrations database."
[pool modname stepname]
(let [sql "insert into migrations (module, step) values (?, ?)"]
(jdbc/execute! pool [sql modname stepname])
nil))
(defn- impl-migrate-single
[pool modname {:keys [name] :as migration}]
(letfn [(execute []
(register! pool modname name)
((:fn migration) pool))]
(when-not (registered? pool modname (:name migration))
(log/info (str/format "applying migration %s/%s" modname name))
(register! pool modname name)
((:fn migration) pool))))
(defn- impl-migrate
[conn migrations {:keys [fake] :or {fake false}}]
(s/assert ::migrations migrations)
(let [mname (:name migrations)
steps (:steps migrations)]
(jdbc/with-transaction [conn conn]
(run! #(impl-migrate-single conn mname %) steps))))
(defprotocol IMigrationContext
(-migrate [_ migration options]))
;; --- Public Api
(defn setup!
"Initialize the database if it is not initialized."
[conn]
(let [sql (str "create table if not exists migrations ("
" module text,"
" step text,"
" created_at timestamp DEFAULT current_timestamp,"
" unique(module, step)"
");")]
(jdbc/execute! conn [sql])
nil))
(defn migrate!
"Main entry point for apply a migration."
([conn migrations]
(impl-migrate conn migrations nil))
([conn migrations options]
(impl-migrate conn migrations options)))
(defn resource
"Helper for setup migration functions
just using a simple path to sql file
located in the class path."
[path]
(fn [pool]
(let [sql (slurp (io/resource path))]
(jdbc/execute! pool [sql])
true)))

View file

@ -0,0 +1,155 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.redis
"Asynchronous posgresql client."
(:refer-clojure :exclude [run!])
(:require
[promesa.core :as p]
[clojure.core.async :as a])
(:import
io.lettuce.core.RedisClient
io.lettuce.core.RedisURI
io.lettuce.core.codec.StringCodec
io.lettuce.core.api.async.RedisAsyncCommands
io.lettuce.core.api.StatefulRedisConnection
io.lettuce.core.pubsub.RedisPubSubListener
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
))
(defrecord Client [client uri]
java.lang.AutoCloseable
(close [_]
(.shutdown ^RedisClient client)))
(defrecord Connection [^RedisAsyncCommands cmd]
java.lang.AutoCloseable
(close [_]
(let [conn (.getStatefulConnection cmd)]
(.close ^StatefulRedisConnection conn))))
(defn client
[uri]
(->Client (RedisClient/create) (RedisURI/create uri)))
(defn connect
[client]
(let [^RedisURI uri (:uri client)
^RedisClient client (:client client)
^StatefulRedisConnection conn (.connect client StringCodec/UTF8 uri)]
(->Connection (.async conn))))
(defn- impl-subscribe
[^String topic xf ^StatefulRedisPubSubConnection conn]
(let [cmd (.sync conn)
output (a/chan 1 (comp (filter string?) xf))
buffer (a/chan (a/sliding-buffer 64))
sub (reify RedisPubSubListener
(message [it pattern channel message])
(message [it channel message]
;; There are no back pressure, so we use a slidding
;; buffer for cases when the pubsub broker sends
;; more messages that we can process.
(a/put! buffer message))
(psubscribed [it pattern count])
(punsubscribed [it pattern count])
(subscribed [it channel count])
(unsubscribed [it channel count]))]
(.addListener conn sub)
(a/go-loop []
(let [[val port] (a/alts! [buffer (a/timeout 5000)])
message (if (= port buffer) val ::keepalive)]
(if (a/>! output message)
(recur)
(do
(a/close! buffer)
(.removeListener conn sub)
(when (.isOpen conn)
(.close conn))))))
(.subscribe ^RedisPubSubCommands cmd (into-array String [topic]))
output))
(defn subscribe
([client topic]
(subscribe client topic (map identity)))
([client topic xf]
(let [^RedisURI uri (:uri client)
^RedisClient client (:client client)]
(->> (.connectPubSub client StringCodec/UTF8 uri)
(impl-subscribe topic xf)))))
(defn- resolve-to-bool
[v]
(if (= v 1)
true
false))
(defmulti impl-run (fn [conn cmd parmas] cmd))
(defn run!
[conn cmd params]
(let [^RedisAsyncCommands conn (:cmd conn)]
(impl-run conn cmd params)))
(defn run
[conn cmd params]
(let [res (a/chan 1)]
(if (instance? Connection conn)
(-> (run! conn cmd params)
(p/finally (fn [v e]
(if e
(a/offer! res e)
(a/offer! res v)))))
(a/close! res))
res))
(defmethod impl-run :get
[conn _ {:keys [key]}]
(.get ^RedisAsyncCommands conn ^String key))
(defmethod impl-run :set
[conn _ {:keys [key val]}]
(.set ^RedisAsyncCommands conn ^String key ^String val))
(defmethod impl-run :smembers
[conn _ {:keys [key]}]
(-> (.smembers ^RedisAsyncCommands conn ^String key)
(p/then' #(into #{} %))))
(defmethod impl-run :sadd
[conn _ {:keys [key val]}]
(let [keys (into-array String [val])]
(-> (.sadd ^RedisAsyncCommands conn ^String key ^"[S;" keys)
(p/then resolve-to-bool))))
(defmethod impl-run :srem
[conn _ {:keys [key val]}]
(let [keys (into-array String [val])]
(-> (.srem ^RedisAsyncCommands conn ^String key ^"[S;" keys)
(p/then resolve-to-bool))))
(defmethod impl-run :publish
[conn _ {:keys [channel message]}]
(-> (.publish ^RedisAsyncCommands conn ^String channel ^String message)
(p/then resolve-to-bool)))
(defmethod impl-run :hset
[^RedisAsyncCommands conn _ {:keys [key field value]}]
(.hset conn key field value))
(defmethod impl-run :hgetall
[^RedisAsyncCommands conn _ {:keys [key]}]
(.hgetall conn key))
(defmethod impl-run :hdel
[^RedisAsyncCommands conn _ {:keys [key field]}]
(let [fields (into-array String [field])]
(.hdel conn key fields)))

View file

@ -0,0 +1,198 @@
;; Copyright (c) 2019 Andrey Antukh <niwi@niwi.nz>
;; All rights reserved.
;;
;; Redistribution and use in source and binary forms, with or without
;; modification, are permitted provided that the following conditions are met:
;;
;; * Redistributions of source code must retain the above copyright notice, this
;; list of conditions and the following disclaimer.
;;
;; * Redistributions in binary form must reproduce the above copyright notice,
;; this list of conditions and the following disclaimer in the documentation
;; and/or other materials provided with the distribution.
;;
;; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
;; AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
;; IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
;; DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
;; FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
;; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
;; CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
;; OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
;; OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
(ns app.util.sql
"A composable sql helpers."
(:refer-clojure :exclude [test update set format])
(:require [clojure.core :as c]
[cuerdas.core :as str]))
;; --- Low Level Helpers
(defn raw-expr
[m]
(cond
(string? m)
{::type :raw-expr
:sql m
:params []}
(vector? m)
{::type :raw-expr
:sql (first m)
:params (vec (rest m))}
(and (map? m)
(= :raw-expr (::type m)))
m
:else
(throw (ex-info "unexpected input" {:m m}))))
(defn alias-expr
[m]
(cond
(string? m)
{::type :alias-expr
:sql m
:alias nil
:params []}
(vector? m)
{::type :alias-expr
:sql (first m)
:alias (second m)
:params (vec (drop 2 m))}
:else
(throw (ex-info "unexpected input" {:m m}))))
;; --- SQL API (Select only)
(defn from
[name]
{::type :query
::from [(alias-expr name)]
::order []
::select []
::join []
::where []})
(defn select
[m & fields]
(c/update m ::select into (map alias-expr fields)))
(defn limit
[m n]
(assoc m ::limit [(raw-expr ["LIMIT ?" n])]))
(defn offset
[m n]
(assoc m ::offset [(raw-expr ["OFFSET ?" n])]))
(defn order
[m e]
(c/update m ::order conj (raw-expr e)))
(defn- join*
[m type table condition]
(c/update m ::join conj
{::type :join-expr
:type type
:table (alias-expr table)
:condition (raw-expr condition)}))
(defn join
[m table condition]
(join* m :inner table condition))
(defn ljoin
[m table condition]
(join* m :left table condition))
(defn rjoin
[m table condition]
(join* m :right table condition))
(defn where
[m & conditions]
(->> (filter identity conditions)
(reduce #(c/update %1 ::where conj (raw-expr %2)) m)))
;; --- Formating
(defmulti format-expr ::type)
(defmethod format-expr :raw-expr
[{:keys [sql params]}]
[sql params])
(defmethod format-expr :alias-expr
[{:keys [sql alias params]}]
(if alias
[(str sql " AS " alias) params]
[sql params]))
(defmethod format-expr :join-expr
[{:keys [table type condition]}]
(let [[csql cparams] (format-expr condition)
[tsql tparams] (format-expr table)
prefix (str/upper (name type))]
[(str prefix " JOIN " tsql " ON (" csql ")") (into cparams tparams)]))
(defn- format-exprs
([items] (format-exprs items {}))
([items {:keys [prefix suffix join-with]
:or {prefix ""
suffix ""
join-with ","}}]
(loop [rs []
rp []
v (first items)
n (rest items)]
(if v
(let [[s p] (format-expr v)]
(recur (conj rs s)
(into rp p)
(first n)
(rest n)))
(if (empty? rs)
["" []]
[(str prefix (str/join join-with rs) suffix) rp])))))
(defn- process-param-tokens
[sql]
(let [cnt (java.util.concurrent.atomic.AtomicInteger. 1)]
(str/replace sql #"\?" (fn [& args]
(str "$" (.getAndIncrement cnt))))))
(def ^:private select-formatters
[#(format-exprs (::select %) {:prefix "SELECT "})
#(format-exprs (::from %) {:prefix "FROM "})
#(format-exprs (::join %) {:join-with " "})
#(format-exprs (::where %) {:prefix "WHERE ("
:join-with ") AND ("
:suffix ")"})
#(format-exprs (::order %) {:prefix "ORDER BY "} )
#(format-exprs (::limit %))
#(format-exprs (::offset %))])
(defn- collect
[formatters qdata]
(loop [sqls []
params []
f (first formatters)
r (rest formatters)]
(if (fn? f)
(let [[s p] (f qdata)]
(recur (conj sqls s)
(into params p)
(first r)
(rest r)))
[(str/join " " sqls) params])))
(defn fmt
[qdata]
(let [[sql params] (collect select-formatters qdata)]
(into [(process-param-tokens sql)] params)))

View file

@ -0,0 +1,194 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 Andrey Antukh <niwi@niwi.nz>
(ns app.util.storage
"A local filesystem storage implementation."
(:require
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.core :as fs]
[datoteka.proto :as fp]
[sodi.prng :as sodi.prng]
[sodi.util :as sodi.util]
[app.common.exceptions :as ex])
(:import
java.io.ByteArrayInputStream
java.io.InputStream
java.io.OutputStream
java.net.URI
java.nio.file.Files
java.nio.file.NoSuchFileException
java.nio.file.Path
java.security.MessageDigest))
(defn uri
[v]
(cond
(instance? URI v) v
(string? v) (URI. v)
:else (throw (IllegalArgumentException. "unexpected input"))))
(defn- normalize-path
[^Path base ^Path path]
(when (fs/absolute? path)
(ex/raise :type :filesystem-error
:code :suspicious-operation
:hint "Suspicios operation: absolute path not allowed."
:contex {:path path :base base}))
(let [^Path fullpath (.resolve base path)
^Path fullpath (.normalize fullpath)]
(when-not (.startsWith fullpath base)
(ex/raise :type :filesystem-error
:code :suspicious-operation
:hint "Suspicios operation: go to parent dir is not allowed."
:contex {:path path :base base}))
fullpath))
(defn- transform-path
[storage ^Path path]
(if-let [xf (::xf storage)]
((xf (fn [a b] b)) nil path)
path))
(defn blob
[^String v]
(let [data (.getBytes v "UTF-8")]
(ByteArrayInputStream. ^bytes data)))
(defn save!
[storage path content]
(s/assert ::storage storage)
(let [^Path base (::base-path storage)
^Path path (->> (fs/path path)
(transform-path storage))
^Path fullpath (normalize-path base path)]
(when-not (fs/exists? (.getParent fullpath))
(fs/create-dir (.getParent fullpath)))
(loop [iteration nil]
(let [[basepath ext] (fs/split-ext fullpath)
candidate (fs/path (str basepath iteration ext))]
(if (fs/exists? candidate)
(recur (if (nil? iteration) 1 (inc iteration)))
(with-open [^InputStream src (io/input-stream content)
^OutputStream dst (io/output-stream candidate)]
(io/copy src dst)
(fs/relativize candidate base)))))))
(defn delete!
[storage path]
(s/assert ::storage storage)
(try
(->> (fs/path path)
(normalize-path (::base-path storage))
(fs/delete))
true
(catch java.nio.file.NoSuchFileException e
false)))
(defn clear!
[storage]
(s/assert ::storage storage)
(fs/delete (::base-path storage))
(fs/create-dir (::base-path storage))
nil)
(defn exists?
[storage path]
(s/assert ::storage storage)
(->> (fs/path path)
(normalize-path (::base-path storage))
(fs/exists?)))
(defn lookup
[storage path]
(s/assert ::storage storage)
(->> (fs/path path)
(normalize-path (::base-path storage))))
(defn public-uri
[storage path]
(s/assert ::storage storage)
(let [^URI base (::base-uri storage)
^String path (str path)]
(.resolve base path)))
(s/def ::base-path (s/or :path fs/path? :str string?))
(s/def ::base-uri (s/or :uri #(instance? URI %) :str string?))
(s/def ::xf fn?)
(s/def ::storage
(s/keys :req [::base-path] :opt [::xf ::base-uri]))
(s/def ::create-options
(s/keys :req-un [::base-path] :opt-un [::xf ::base-uri]))
(defn create
"Create an instance of local FileSystem storage providing an
absolute base path.
If that path does not exists it will be automatically created,
if it exists but is not a directory, an exception will be
raised.
This function expects a map with the following options:
- `:base-path`: a fisical directory on your local machine
- `:base-uri`: a base uri used for resolve the files
"
[{:keys [base-path base-uri xf] :as options}]
(s/assert ::create-options options)
(let [^Path base-path (fs/path base-path)]
(when (and (fs/exists? base-path)
(not (fs/directory? base-path)))
(ex/raise :type :filesystem-error
:code :file-already-exists
:hint "File already exists, expects directory."))
(when-not (fs/exists? base-path)
(fs/create-dir base-path))
(cond-> {::base-path base-path}
base-uri (assoc ::base-uri (uri base-uri))
xf (assoc ::xf xf))))
;; This is don't need to be secure and we dont need to reseed it; the
;; security guarranties of this prng instance are very low (we only
;; use it for generate a random path where store the file).
(def ^:private prng
(delay
(doto (java.security.SecureRandom/getInstance "SHA1PRNG")
(.setSeed ^bytes (sodi.prng/random-bytes 64)))))
(defn with-xf
[storage xfm]
(let [xf (::xf storage)]
(if (nil? xf)
(assoc storage ::xf xfm)
(assoc storage ::xf (comp xf xfm)))))
(def random-path
(map (fn [^Path path]
(let [name (str (.getFileName path))
hash (-> (sodi.prng/random-bytes @prng 10)
(sodi.util/bytes->b64s))
tokens (re-seq #"[\w\d\-\_]{2}" hash)
path-tokens (take 3 tokens)
rest-tokens (drop 3 tokens)
path (fs/path path-tokens)
frest (apply str rest-tokens)]
(fs/path (list path frest name))))))
(def slugify-filename
(map (fn [path]
(let [parent (or (fs/parent path) "")
[name ext] (fs/split-ext (fs/name path))]
(fs/path parent (str (str/uslug name) ext))))))
(defn prefix-path
[prefix]
(map (fn [^Path path] (fs/join (fs/path prefix) path))))

View file

@ -0,0 +1,99 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016-2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.svg
"Icons SVG parsing helpers."
(:require
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[app.common.spec :as us]
[app.common.exceptions :as ex])
(:import
org.jsoup.Jsoup
org.jsoup.nodes.Attribute
org.jsoup.nodes.Element
org.jsoup.nodes.Document
java.io.InputStream))
(s/def ::content string?)
(s/def ::width ::us/number)
(s/def ::height ::us/number)
(s/def ::name string?)
(s/def ::view-box (s/coll-of ::us/number :min-count 4 :max-count 4))
(s/def ::svg-entity
(s/keys :req-un [::content ::width ::height ::view-box]
:opt-un [::name]))
;; --- Implementation
(defn- parse-double
[data]
(s/assert ::us/string data)
(Double/parseDouble data))
(defn- parse-viewbox
[data]
(s/assert ::us/string data)
(mapv parse-double (str/split data #"\s+")))
(defn- parse-attrs
[^Element element]
(persistent!
(reduce (fn [acc ^Attribute attr]
(let [key (.getKey attr)
val (.getValue attr)]
(case key
"width" (assoc! acc :width (parse-double val))
"height" (assoc! acc :height (parse-double val))
"viewbox" (assoc! acc :view-box (parse-viewbox val))
"sodipodi:docname" (assoc! acc :name val)
acc)))
(transient {})
(.attributes element))))
(defn- impl-parse
[data]
(try
(let [document (Jsoup/parse ^String data)
element (some-> (.body ^Document document)
(.getElementsByTag "svg")
(first))
content (.html element)
attrs (parse-attrs element)]
(assoc attrs :content content))
(catch java.lang.IllegalArgumentException e
(ex/raise :type :validation
:code ::invalid-input
:message "Input does not seems to be a valid svg."))
(catch java.lang.NullPointerException e
(ex/raise :type :validation
:code ::invalid-input
:message "Input does not seems to be a valid svg."))
(catch org.jsoup.UncheckedIOException e
(ex/raise :type :validation
:code ::invalid-input
:message "Input does not seems to be a valid svg."))
(catch Exception e
(ex/raise :type :internal
:code ::unexpected))))
;; --- Public Api
(defn parse-string
"Parse SVG from a string."
[data]
(s/assert ::us/string data)
(let [result (impl-parse data)]
(if (s/valid? ::svg-entity result)
result
(ex/raise :type :validation
:code ::invalid-result
:message "The result does not conform valid svg entity."))))
(defn parse
[data]
(parse-string (slurp data)))

View file

@ -0,0 +1,37 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016-2019 Andrey Antukh <niwi@niwi.nz>
(ns app.util.template
"A lightweight abstraction over mustache.java template engine.
The documentation can be found: http://mustache.github.io/mustache.5.html"
(:require
[clojure.tools.logging :as log]
[clojure.walk :as walk]
[clojure.java.io :as io]
[cuerdas.core :as str]
[selmer.parser :as sp]
[app.common.exceptions :as ex]))
;; (sp/cache-off!)
(defn render
[path context]
(try
(sp/render-file path context)
(catch Exception cause
(ex/raise :type :internal
:code :template-render-error
:cause cause))))
(defn render-string
[content context]
(try
(sp/render content context)
(catch Exception cause
(ex/raise :type :internal
:code :template-render-error
:cause cause))))

View file

@ -0,0 +1,256 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2016-2020 Andrey Antukh <niwi@niwi.nz>
(ns app.util.time
(:require
[clojure.spec.alpha :as s]
[app.common.exceptions :as ex]
[cognitect.transit :as t])
(:import
java.time.Instant
java.time.OffsetDateTime
java.time.Duration
java.util.Date
java.time.temporal.TemporalAmount
org.apache.logging.log4j.core.util.CronExpression))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Instant & Duration
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn from-string
[s]
{:pre [(string? s)]}
(Instant/parse s))
(defn now
[]
(Instant/now))
(defn plus
[d ta]
(.plus d ^TemporalAmount ta))
(defn- obj->duration
[{:keys [days minutes seconds hours nanos millis]}]
(cond-> (Duration/ofMillis (if (int? millis) ^long millis 0))
(int? days) (.plusDays ^long days)
(int? hours) (.plusHours ^long hours)
(int? minutes) (.plusMinutes ^long minutes)
(int? seconds) (.plusSeconds ^long seconds)
(int? nanos) (.plusNanos ^long nanos)))
(defn duration?
[v]
(instance? Duration v))
(defn duration
[ms-or-obj]
(cond
(duration? ms-or-obj)
ms-or-obj
(integer? ms-or-obj)
(Duration/ofMillis ms-or-obj)
(string? ms-or-obj)
(Duration/parse ms-or-obj)
:else
(obj->duration ms-or-obj)))
(defn duration-between
[t1 t2]
(Duration/between t1 t2))
(defn parse-duration
[s]
(Duration/parse (str "PT" s)))
(extend-protocol clojure.core/Inst
java.time.Duration
(inst-ms* [v] (.toMillis ^Duration v)))
(defmethod print-method Duration
[mv ^java.io.Writer writer]
(.write writer (str "#app/duration \"" (.toString ^Duration mv) "\"")))
(defmethod print-dup Duration [o w]
(print-method o w))
(letfn [(conformer [v]
(cond
(duration? v) v
(string? v)
(try
(parse-duration v)
(catch java.time.format.DateTimeParseException e
::s/invalid))
:else
::s/invalid))
(unformer [v]
(subs (str v) 2))]
(s/def ::duration (s/conformer conformer unformer)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Cron Expression
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Cron expressions are comprised of 6 required fields and one
;; optional field separated by white space. The fields respectively
;; are described as follows:
;;
;; Field Name Allowed Values Allowed Special Characters
;; Seconds 0-59 , - * /
;; Minutes 0-59 , - * /
;; Hours 0-23 , - * /
;; Day-of-month 1-31 , - * ? / L W
;; Month 0-11 or JAN-DEC , - * /
;; Day-of-Week 1-7 or SUN-SAT , - * ? / L #
;; Year (Optional) empty, 1970-2199 , - * /
;;
;; The '*' character is used to specify all values. For example, "*"
;; in the minute field means "every minute".
;;
;; The '?' character is allowed for the day-of-month and day-of-week
;; fields. It is used to specify 'no specific value'. This is useful
;; when you need to specify something in one of the two fields, but
;; not the other.
;;
;; The '-' character is used to specify ranges For example "10-12" in
;; the hour field means "the hours 10, 11 and 12".
;;
;; The ',' character is used to specify additional values. For
;; example "MON,WED,FRI" in the day-of-week field means "the days
;; Monday, Wednesday, and Friday".
;;
;; The '/' character is used to specify increments. For example "0/15"
;; in the seconds field means "the seconds 0, 15, 30, and
;; 45". And "5/15" in the seconds field means "the seconds 5, 20, 35,
;; and 50". Specifying '*' before the '/' is equivalent to specifying
;; 0 is the value to start with. Essentially, for each field in the
;; expression, there is a set of numbers that can be turned on or
;; off. For seconds and minutes, the numbers range from 0 to 59. For
;; hours 0 to 23, for days of the month 0 to 31, and for months 0 to
;; 11 (JAN to DEC). The "/" character simply helps you turn on
;; every "nth" value in the given set. Thus "7/6" in the month field
;; only turns on month "7", it does NOT mean every 6th month, please
;; note that subtlety.
;;
;; The 'L' character is allowed for the day-of-month and day-of-week
;; fields. This character is short-hand for "last", but it has
;; different meaning in each of the two fields. For example, the
;; value "L" in the day-of-month field means "the last day of the
;; month" - day 31 for January, day 28 for February on non-leap
;; years. If used in the day-of-week field by itself, it simply
;; means "7" or "SAT". But if used in the day-of-week field after
;; another value, it means "the last xxx day of the month" - for
;; example "6L" means "the last friday of the month". You can also
;; specify an offset from the last day of the month, such as "L-3"
;; which would mean the third-to-last day of the calendar month. When
;; using the 'L' option, it is important not to specify lists, or
;; ranges of values, as you'll get confusing/unexpected results.
;;
;; The 'W' character is allowed for the day-of-month field. This
;; character is used to specify the weekday (Monday-Friday) nearest
;; the given day. As an example, if you were to specify "15W" as the
;; value for the day-of-month field, the meaning is: "the nearest
;; weekday to the 15th of the month". So if the 15th is a Saturday,
;; the trigger will fire on Friday the 14th. If the 15th is a Sunday,
;; the trigger will fire on Monday the 16th. If the 15th is a Tuesday,
;; then it will fire on Tuesday the 15th. However if you specify "1W"
;; as the value for day-of-month, and the 1st is a Saturday, the
;; trigger will fire on Monday the 3rd, as it will not 'jump' over the
;; boundary of a month's days. The 'W' character can only be specified
;; when the day-of-month is a single day, not a range or list of days.
;;
;; The 'L' and 'W' characters can also be combined for the
;; day-of-month expression to yield 'LW', which translates to "last
;; weekday of the month".
;;
;; The '#' character is allowed for the day-of-week field. This
;; character is used to specify "the nth" XXX day of the month. For
;; example, the value of "6#3" in the day-of-week field means the
;; third Friday of the month (day 6 = Friday and "#3" = the 3rd one in
;; the month). Other examples: "2#1" = the first Monday of the month
;; and "4#5" = the fifth Wednesday of the month. Note that if you
;; specify "#5" and there is not 5 of the given day-of-week in the
;; month, then no firing will occur that month. If the '#' character
;; is used, there can only be one expression in the day-of-week
;; field ("3#1,6#3" is not valid, since there are two expressions).
;;
;; The legal characters and the names of months and days of the week
;; are not case sensitive.
(defn cron
"Creates an instance of CronExpression from string."
[s]
(try
(CronExpression. s)
(catch java.text.ParseException e
(ex/raise :type :parse
:code :invalid-cron-expression
:cause e
:context {:expr s}))))
(defn cron?
[v]
(instance? CronExpression v))
(defn next-valid-instant-from
[^CronExpression cron ^Instant now]
(s/assert cron? cron)
(.toInstant (.getNextValidTimeAfter cron (Date/from now))))
(defmethod print-method CronExpression
[mv ^java.io.Writer writer]
(.write writer (str "#app/cron \"" (.toString ^CronExpression mv) "\"")))
(defmethod print-dup CronExpression
[o w]
(print-ctor o (fn [o w] (print-dup (.toString ^CronExpression o) w)) w))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Serialization
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare from-string)
(def ^:private instant-write-handler
(t/write-handler
(constantly "m")
(fn [v] (str (.toEpochMilli ^Instant v)))))
(def ^:private offset-datetime-write-handler
(t/write-handler
(constantly "m")
(fn [v] (str (.toEpochMilli (.toInstant ^OffsetDateTime v))))))
(def ^:private read-handler
(t/read-handler
(fn [v] (-> (Long/parseLong v)
(Instant/ofEpochMilli)))))
(def +read-handlers+
{"m" read-handler})
(def +write-handlers+
{Instant instant-write-handler
OffsetDateTime offset-datetime-write-handler})
(defmethod print-method Instant
[mv ^java.io.Writer writer]
(.write writer (str "#app/instant \"" (.toString ^Instant mv) "\"")))
(defmethod print-dup Instant [o w]
(print-method o w))

View file

@ -0,0 +1,144 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; This Source Code Form is "Incompatible With Secondary Licenses", as
;; defined by the Mozilla Public License, v. 2.0.
;;
;; Copyright (c) 2020 UXBOX Labs SL
(ns app.util.transit
(:require
[cognitect.transit :as t]
[clojure.java.io :as io]
[linked.core :as lk]
[app.util.time :as dt]
[app.util.data :as data]
[app.common.geom.point :as gpt]
[app.common.geom.matrix :as gmt])
(:import
linked.set.LinkedSet
java.io.ByteArrayInputStream
java.io.ByteArrayOutputStream
java.io.File
app.common.geom.point.Point
app.common.geom.matrix.Matrix))
;; --- Handlers
(def ^:private file-write-handler
(t/write-handler
(constantly "file")
(fn [v] (str v))))
(def point-write-handler
(t/write-handler
(constantly "point")
(fn [v] (into {} v))))
(def point-read-handler
(t/read-handler gpt/map->Point))
(def matrix-write-handler
(t/write-handler
(constantly "matrix")
(fn [v] (into {} v))))
(def matrix-read-handler
(t/read-handler gmt/map->Matrix))
(def ordered-set-write-handler
(t/write-handler
(constantly "ordered-set")
(fn [v] (vec v))))
(def ordered-set-read-handler
(t/read-handler #(into (lk/set) %)))
(def +read-handlers+
(assoc dt/+read-handlers+
"matrix" matrix-read-handler
"ordered-set" ordered-set-read-handler
"point" point-read-handler))
(def +write-handlers+
(assoc dt/+write-handlers+
File file-write-handler
LinkedSet ordered-set-write-handler
Matrix matrix-write-handler
Point point-write-handler))
;; --- Low-Level Api
(defn reader
([istream]
(reader istream nil))
([istream {:keys [type] :or {type :json}}]
(t/reader istream type {:handlers +read-handlers+})))
(defn read!
"Read value from streamed transit reader."
[reader]
(t/read reader))
(defn writer
([ostream]
(writer ostream nil))
([ostream {:keys [type] :or {type :json}}]
(t/writer ostream type {:handlers +write-handlers+})))
(defn write!
[writer data]
(t/write writer data))
;; --- High-Level Api
(declare str->bytes)
(declare bytes->str)
(defn decode
([data]
(decode data nil))
([data opts]
(with-open [input (ByteArrayInputStream. ^bytes data)]
(read! (reader input opts)))))
(defn encode
([data]
(encode data nil))
([data opts]
(with-open [out (ByteArrayOutputStream.)]
(let [w (writer out opts)]
(write! w data)
(.toByteArray out)))))
(defn decode-str
[message]
(->> (str->bytes message)
(decode)))
(defn encode-str
[message]
(->> (encode message)
(bytes->str)))
(defn encode-verbose-str
[message]
(->> (encode message {:type :json-verbose})
(bytes->str)))
;; --- Helpers
(defn str->bytes
"Convert string to byte array."
([^String s]
(str->bytes s "UTF-8"))
([^String s, ^String encoding]
(.getBytes s encoding)))
(defn bytes->str
"Convert byte array to String."
([^bytes data]
(bytes->str data "UTF-8"))
([^bytes data, ^String encoding]
(String. data encoding)))

View file

@ -0,0 +1,74 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) 2016 Andrey Antukh <niwi@niwi.nz>
(ns app.util.workers
"A distributed asynchronous tasks queue implementation on top
of PostgreSQL reliable advirsory locking mechanism."
#_(:require
[suricatta.core :as sc]
[app.db :as db]))
;; (defn- poll-for-task
;; [conn queue]
;; (let [sql (sql/acquire-task {:queue queue})]
;; (sc/fetch-one conn sql)))
;; (defn- mark-task-done
;; [conn {:keys [id]}]
;; (let [sql (sql/mark-task-done {:id id})]
;; (sc/execute conn sql)))
;; (defn- mark-task-failed
;; [conn {:keys [id]} error]
;; (let [sql (sql/mark-task-done {:id id :error (.getMessage error)})]
;; (sc/execute conn sql)))
;; (defn- watch-unit
;; [conn queue callback]
;; (let [task (poll-for-task conn queue)]
;; (if (nil? task)
;; (Thread/sleep 1000)
;; (try
;; (sc/atomic conn
;; (callback conn task)
;; (mark-task-done conn task))
;; (catch Exception e
;; (mark-task-failed conn task e))))))
;; (defn- watch-loop
;; "Watch tasks on the specified queue and executes a
;; callback for each task is received.
;; NOTE: This function blocks the current thread."
;; [queue callback]
;; (try
;; (loop []
;; (with-open [conn (db/connection)]
;; (sc/atomic conn (watch-unit conn queue callback)))
;; (recur))
;; (catch InterruptedException e
;; ;; just ignoring
;; )))
;; (defn watch!
;; [queue callback]
;; (let [runnable #(watch-loop queue callback)
;; thread (Thread. ^Runnable runnable)]
;; (.setDaemon thread true)
;; (.start thread)
;; (reify
;; java.lang.AutoCloseable
;; (close [_]
;; (.interrupt thread)
;; (.join thread 2000))
;; clojure.lang.IDeref
;; (deref [_]
;; (.join thread))
;; clojure.lang.IBlockingDeref
;; (deref [_ ms default]
;; (.join thread ms)
;; default))))