Improve iteration and add concat-all and fully lazy mapcat helper

This commit is contained in:
Andrey Antukh 2022-09-28 23:26:31 +02:00 committed by Andrés Moya
parent 058727a44b
commit 886ab0e152
6 changed files with 68 additions and 86 deletions

View file

@ -9,11 +9,13 @@
[app.common.data :as d] [app.common.data :as d]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[app.common.geom.matrix :as gmt] [app.common.geom.matrix :as gmt]
[app.common.logging :as l]
[app.common.perf :as perf] [app.common.perf :as perf]
[app.common.pprint :as pp] [app.common.pprint :as pp]
[app.common.transit :as t] [app.common.transit :as t]
[app.config :as cfg] [app.config :as cfg]
[app.main :as main] [app.main :as main]
[app.srepl.main :as srepl]
[app.util.blob :as blob] [app.util.blob :as blob]
[app.util.fressian :as fres] [app.util.fressian :as fres]
[app.util.json :as json] [app.util.json :as json]

View file

@ -89,8 +89,8 @@
The `on-file` parameter should be a function that receives the file The `on-file` parameter should be a function that receives the file
and the previous state and returns the new state." and the previous state and returns the new state."
[system & {:keys [chunk-size max-chunks start-at on-file on-error on-end] [system & {:keys [chunk-size max-items start-at on-file on-error on-end]
:or {chunk-size 10 max-chunks Long/MAX_VALUE}}] :or {chunk-size 10 max-items Long/MAX_VALUE}}]
(letfn [(get-chunk [conn cursor] (letfn [(get-chunk [conn cursor]
(let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])] (let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])]
[(some->> rows peek :created-at) (seq rows)])) [(some->> rows peek :created-at) (seq rows)]))
@ -100,8 +100,7 @@
:vf second :vf second
:kf first :kf first
:initk (or start-at (dt/now))) :initk (or start-at (dt/now)))
(take max-chunks) (take max-items)
(mapcat identity)
(map #(update % :data blob/decode)))) (map #(update % :data blob/decode))))
(on-error* [file cause] (on-error* [file cause]

View file

@ -284,11 +284,10 @@
(some->> (seq rows) (d/group-by #(-> % :backend keyword) :id #{}) seq)])) (some->> (seq rows) (d/group-by #(-> % :backend keyword) :id #{}) seq)]))
(retrieve-deleted-objects [conn min-age] (retrieve-deleted-objects [conn min-age]
(->> (d/iteration (partial retrieve-deleted-objects-chunk conn min-age) (d/iteration (partial retrieve-deleted-objects-chunk conn min-age)
:initk (dt/now) :initk (dt/now)
:vf second :vf second
:kf first) :kf first))
(sequence cat)))
(delete-in-bulk [conn backend-name ids] (delete-in-bulk [conn backend-name ids]
(let [backend (impl/resolve-backend storage backend-name) (let [backend (impl/resolve-backend storage backend-name)
@ -397,12 +396,10 @@
(d/group-by get-bucket :id #{} rows)]))) (d/group-by get-bucket :id #{} rows)])))
(retrieve-touched [conn] (retrieve-touched [conn]
(->> (d/iteration (fn [cursor] (d/iteration (partial retrieve-touched-chunk conn)
(retrieve-touched-chunk conn cursor))
:initk (dt/now) :initk (dt/now)
:vf second :vf second
:kf first) :kf first))
(sequence cat)))
(process-objects! [conn get-fn ids bucket] (process-objects! [conn get-fn ids bucket]
(loop [to-freeze #{} (loop [to-freeze #{}

View file

@ -90,11 +90,10 @@
get-chunk (fn [cursor] get-chunk (fn [cursor]
(let [rows (db/exec! conn [sql:retrieve-candidates-chunk interval cursor])] (let [rows (db/exec! conn [sql:retrieve-candidates-chunk interval cursor])]
[(some->> rows peek :modified-at) (seq rows)]))] [(some->> rows peek :modified-at) (seq rows)]))]
(d/iteration get-chunk
(sequence cat (d/iteration get-chunk
:vf second :vf second
:kf first :kf first
:initk (dt/now)))))) :initk (dt/now)))))
(defn collect-used-media (defn collect-used-media
[data] [data]

View file

@ -7,7 +7,7 @@
(ns app.common.data (ns app.common.data
"Data manipulation and query helper functions." "Data manipulation and query helper functions."
(:refer-clojure :exclude [read-string hash-map merge name update-vals (:refer-clojure :exclude [read-string hash-map merge name update-vals
parse-double group-by iteration]) parse-double group-by iteration concat mapcat])
#?(:cljs #?(:cljs
(:require-macros [app.common.data])) (:require-macros [app.common.data]))
@ -17,8 +17,8 @@
[cuerdas.core :as str] [cuerdas.core :as str]
#?(:cljs [cljs.reader :as r] #?(:cljs [cljs.reader :as r]
:clj [clojure.edn :as r]) :clj [clojure.edn :as r])
#?(:cljs [cljs.core :as core] #?(:cljs [cljs.core :as c]
:clj [clojure.core :as core]) :clj [clojure.core :as c])
[linked.set :as lks]) [linked.set :as lks])
#?(:clj #?(:clj
@ -60,7 +60,7 @@
(defn editable-collection? (defn editable-collection?
[m] [m]
#?(:clj (instance? clojure.lang.IEditableCollection m) #?(:clj (instance? clojure.lang.IEditableCollection m)
:cljs (implements? core/IEditableCollection m))) :cljs (implements? c/IEditableCollection m)))
(defn deep-merge (defn deep-merge
([a b] ([a b]
@ -81,6 +81,24 @@
m) m)
(dissoc m k))) (dissoc m k)))
(defn concat-all
"A totally lazy implementation of concat with different call
signature. It works like a flatten with a single level of neesting."
[colls]
(lazy-seq
(let [c (seq colls)
o (first c)
r (rest c)]
(if-let [o (seq o)]
(cons (first o) (concat-all (cons (rest o) r)))
(some-> (seq r) concat-all)))))
(defn mapcat
"A fully lazy version of mapcat."
([f] (c/mapcat f))
([f & colls]
(concat-all (apply map f colls))))
(defn- transient-concat (defn- transient-concat
[c1 colls] [c1 colls]
(loop [result (transient c1) (loop [result (transient c1)
@ -214,21 +232,12 @@
([mfn coll] ([mfn coll]
(into {} (mapm mfn) coll))) (into {} (mapm mfn) coll)))
;; TEMPORARY COPY of clojure.core/update-vals until we migrate to clojure 1.11
(defn update-vals (defn update-vals
"m f => {k (f v) ...} "m f => {k (f v) ...}
Given a map m and a function f of 1-argument, returns a new map where the keys of m Given a map m and a function f of 1-argument, returns a new map where the keys of m
are mapped to result of applying f to the corresponding values of m." are mapped to result of applying f to the corresponding values of m."
[m f] [m f]
(with-meta (c/update-vals m f))
(persistent!
(reduce-kv (fn [acc k v] (assoc! acc k (f v)))
(if (editable-collection? m)
(transient m)
(transient {}))
m))
(meta m)))
(defn removev (defn removev
"Returns a vector of the items in coll for which (fn item) returns logical false" "Returns a vector of the items in coll for which (fn item) returns logical false"
@ -294,7 +303,7 @@
(empty? col2) acc (empty? col2) acc
:else (recur (rest col1) col2 join-fn :else (recur (rest col1) col2 join-fn
(let [other (mapv (partial join-fn (first col1)) col2)] (let [other (mapv (partial join-fn (first col1)) col2)]
(concat acc other)))))) (c/concat acc other))))))
(def sentinel (def sentinel
#?(:clj (Object.) #?(:clj (Object.)
@ -478,7 +487,7 @@
([maybe-keyword default-value] ([maybe-keyword default-value]
(cond (cond
(keyword? maybe-keyword) (keyword? maybe-keyword)
(core/name maybe-keyword) (c/name maybe-keyword)
(string? maybe-keyword) (string? maybe-keyword)
maybe-keyword maybe-keyword
@ -496,7 +505,7 @@
[coll] [coll]
(map vector (map vector
coll coll
(concat (rest coll) [nil]))) (c/concat (rest coll) [nil])))
(defn with-prev (defn with-prev
"Given a collection will return a new collection where each element "Given a collection will return a new collection where each element
@ -505,7 +514,7 @@
[coll] [coll]
(map vector (map vector
coll coll
(concat [nil] coll))) (c/concat [nil] coll)))
(defn with-prev-next (defn with-prev-next
"Given a collection will return a new collection where every item is paired "Given a collection will return a new collection where every item is paired
@ -514,8 +523,8 @@
[coll] [coll]
(map vector (map vector
coll coll
(concat [nil] coll) (c/concat [nil] coll)
(concat (rest coll) [nil]))) (c/concat (rest coll) [nil])))
(defn prefix-keyword (defn prefix-keyword
"Given a keyword and a prefix will return a new keyword with the prefix attached "Given a keyword and a prefix will return a new keyword with the prefix attached
@ -653,52 +662,28 @@
{} {}
coll)))) coll))))
;; TEMPORAL COPY of clojure-1.11 iteration function, should be
;; replaced with the builtin on when stable version is released.
#?(:clj
(defn iteration (defn iteration
"Creates a seqable/reducible via repeated calls to step, "Creates a toally lazy seqable via repeated calls to step, a
a function of some (continuation token) 'k'. The first call to step function of some (continuation token) 'k'. The first call to step
will be passed initk, returning 'ret'. Iff (somef ret) is true, will be passed initk, returning 'ret'. If (somef ret) is true, (vf
(vf ret) will be included in the iteration, else iteration will ret) will be included in the iteration, else iteration will
terminate and vf/kf will not be called. If (kf ret) is non-nil it terminate and vf/kf will not be called. If (kf ret) is non-nil it
will be passed to the next step call, else iteration will terminate. will be passed to the next step call, else iteration will terminate.
This can be used e.g. to consume APIs that return paginated or batched data. This can be used e.g. to consume APIs that return paginated or batched data.
step - (possibly impure) fn of 'k' -> 'ret' step - (possibly impure) fn of 'k' -> 'ret'
:somef - fn of 'ret' -> logical true/false, default 'some?' :somef - fn of 'ret' -> logical true/false, default 'some?'
:vf - fn of 'ret' -> 'v', a value produced by the iteration, default 'identity' :vf - fn of 'ret' -> 'v', a value produced by the iteration, default 'identity'
:kf - fn of 'ret' -> 'next-k' or nil (signaling 'do not continue'), default 'identity' :kf - fn of 'ret' -> 'next-k' or nil (signaling 'do not continue'), default 'identity'
:initk - the first value passed to step, default 'nil' :initk - the first value passed to step, default 'nil'
It is presumed that step with non-initk is unreproducible/non-idempotent.
If step with initk is unreproducible it is on the consumer to not consume twice." It is presumed that step with non-initk is
{:added "1.11"} unreproducible/non-idempotent. If step with initk is unreproducible
[step & {:keys [somef vf kf initk] it is on the consumer to not consume twice."
:or {vf identity [& args]
kf identity (->> (apply c/iteration args)
somef some? (concat-all)))
initk nil}}]
(reify
clojure.lang.Seqable
(seq [_]
((fn next [ret]
(when (somef ret)
(cons (vf ret)
(when-some [k (kf ret)]
(lazy-seq (next (step k)))))))
(step initk)))
clojure.lang.IReduceInit
(reduce [_ rf init]
(loop [acc init
ret (step initk)]
(if (somef ret)
(let [acc (rf acc (vf ret))]
(if (reduced? acc)
@acc
(if-some [k (kf ret)]
(recur acc (step k))
acc)))
acc))))))
(defn toggle-selection (defn toggle-selection
([set value] ([set value]

View file

@ -3,7 +3,7 @@
{penpot/common {penpot/common
{:local/root "../common"} {:local/root "../common"}
org.clojure/clojure {:mvn/version "1.10.3"} org.clojure/clojure {:mvn/version "1.11.1"}
binaryage/devtools {:mvn/version "RELEASE"} binaryage/devtools {:mvn/version "RELEASE"}
metosin/reitit-core {:mvn/version "0.5.18"} metosin/reitit-core {:mvn/version "0.5.18"}