Enhance synchronization of nested shapes

This commit is contained in:
Andrés Moya 2023-07-25 11:32:49 +02:00 committed by Andrey Antukh
parent 2e33575f01
commit 8b801b65f6
7 changed files with 208 additions and 112 deletions

View file

@ -125,7 +125,7 @@
(dissoc file :data))))))
(def ^:private sql:retrieve-files-chunk
"SELECT id, name, created_at, revn, data FROM file
"SELECT id, name, features, created_at, revn, data FROM file
WHERE created_at < ? AND deleted_at is NULL
ORDER BY created_at desc LIMIT ?")
@ -147,7 +147,9 @@
:kf first
:initk (or start-at (dt/now)))
(take max-items)
(map #(update % :data blob/decode))))
(map #(-> %
(update :data blob/decode)
(update :features db/decode-pgarray #{})))))
(on-error* [cause file]
(println "unexpected exception happened on processing file: " (:id file))
@ -184,11 +186,13 @@
on-end
on-init]
:or {chunk-size 10
max-items Long/MAX_VALUE
workers 1}}]
(letfn [(get-chunk [conn cursor]
(let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])]
[(some->> rows peek :created-at) (seq rows)]))
[(some->> rows peek :created-at)
(map #(update % :features db/decode-pgarray #{}) rows)]))
(get-candidates [conn]
(->> (d/iteration (partial get-chunk conn)
@ -197,7 +201,6 @@
:initk (or start-at (dt/now)))
(take max-items)))
(on-error* [cause file]
(println! "unexpected exception happened on processing file: " (:id file))
(strace/print-stack-trace cause))