♻️ Refactor persistence flow

This commit is contained in:
Andrey Antukh 2022-04-21 18:11:24 +02:00 committed by Andrés Moya
parent c01e4e52f8
commit 049f4ce784
2 changed files with 81 additions and 86 deletions

View file

@ -163,7 +163,7 @@
(defn finalize-file (defn finalize-file
[_project-id file-id] [_project-id file-id]
(ptk/reify ::finalize (ptk/reify ::finalize-file
ptk/UpdateEvent ptk/UpdateEvent
(update [_ state] (update [_ state]
(dissoc state (dissoc state

View file

@ -6,8 +6,8 @@
(ns app.main.data.workspace.persistence (ns app.main.data.workspace.persistence
(:require (:require
[app.common.logging :as log]
[app.common.pages :as cp] [app.common.pages :as cp]
[app.common.pages.helpers :as cph]
[app.common.spec :as us] [app.common.spec :as us]
[app.common.spec.change :as spec.change] [app.common.spec.change :as spec.change]
[app.common.spec.file :as spec.file] [app.common.spec.file :as spec.file]
@ -16,19 +16,19 @@
[app.main.data.dashboard :as dd] [app.main.data.dashboard :as dd]
[app.main.data.fonts :as df] [app.main.data.fonts :as df]
[app.main.data.workspace.changes :as dch] [app.main.data.workspace.changes :as dch]
[app.main.data.workspace.common :as dwc]
[app.main.data.workspace.selection :as dws]
[app.main.data.workspace.state-helpers :as wsh] [app.main.data.workspace.state-helpers :as wsh]
[app.main.refs :as refs]
[app.main.repo :as rp] [app.main.repo :as rp]
[app.main.store :as st] [app.main.store :as st]
[app.util.http :as http] [app.util.http :as http]
[app.util.router :as rt]
[app.util.time :as dt] [app.util.time :as dt]
[beicon.core :as rx] [beicon.core :as rx]
[cljs.spec.alpha :as s] [cljs.spec.alpha :as s]
[clojure.set :as set] [okulary.core :as l]
[potok.core :as ptk])) [potok.core :as ptk]))
(log/set-level! :info)
(declare persist-changes) (declare persist-changes)
(declare persist-synchronous-changes) (declare persist-synchronous-changes)
(declare shapes-changes-persisted) (declare shapes-changes-persisted)
@ -39,18 +39,17 @@
(defn initialize-file-persistence (defn initialize-file-persistence
[file-id] [file-id]
(ptk/reify ::initialize-persistence (ptk/reify ::initialize-persistence
ptk/EffectEvent ptk/WatchEvent
(effect [_ _ stream] (watch [_ _ stream]
(log/debug :hint "initialize persistence")
(let [stoper (rx/filter #(= ::finalize %) stream) (let [stoper (rx/filter #(= ::finalize %) stream)
forcer (rx/filter #(= ::force-persist %) stream) commits (l/atom [])
notifier (->> stream
(rx/filter dch/commit-changes?)
(rx/debounce 2000)
(rx/merge stoper forcer))
local-file? local-file?
#(as-> (:file-id %) event-file-id #(as-> (:file-id %) event-file-id
(or (nil? event-file-id) (or (nil? event-file-id)
(= event-file-id file-id))) (= event-file-id file-id)))
library-file? library-file?
#(as-> (:file-id %) event-file-id #(as-> (:file-id %) event-file-id
(and (some? event-file-id) (and (some? event-file-id)
@ -71,93 +70,89 @@
;; Disable reload stoper ;; Disable reload stoper
(swap! st/ongoing-tasks disj :workspace-change) (swap! st/ongoing-tasks disj :workspace-change)
(st/emit! (update-persistence-status {:status :saved})))] (st/emit! (update-persistence-status {:status :saved})))]
(->> (rx/merge
(->> stream (rx/merge
(rx/filter dch/commit-changes?) (->> stream
(rx/map deref) (rx/filter dch/commit-changes?)
(rx/filter local-file?) (rx/map deref)
(rx/tap on-dirty) (rx/filter local-file?)
(rx/buffer-until notifier) (rx/tap on-dirty)
(rx/filter (complement empty?)) (rx/filter (complement empty?))
(rx/map (fn [buf] (rx/map (fn [commit]
(->> (into [] (comp (map #(assoc % :id (uuid/next))) (-> commit
(map #(assoc % :file-id file-id))) (assoc :id (uuid/next))
buf) (assoc :file-id file-id))))
(persist-changes file-id)))) (rx/observe-on :async)
(rx/tap on-saving) (rx/tap #(swap! commits conj %))
(rx/take-until (rx/delay 100 stoper))) (rx/take-until (rx/delay 100 stoper))
(->> stream (rx/finalize (fn []
(rx/filter dch/commit-changes?) (log/debug :hint "finalize persistence: changes watcher"))))
(rx/map deref)
(rx/filter library-file?) (->> (rx/from-atom commits)
(rx/filter (complement #(empty? (:changes %)))) (rx/filter (complement empty?))
(rx/map persist-synchronous-changes) (rx/sample-when (rx/merge
(rx/take-until (rx/delay 100 stoper))) (rx/interval 5000)
(->> stream (rx/filter #(= ::force-persist %) stream)
(rx/filter (ptk/type? ::changes-persisted)) (->> (rx/from-atom commits)
(rx/tap on-saved) (rx/filter (complement empty?))
(rx/ignore) (rx/debounce 2000))))
(rx/take-until stoper))) (rx/tap #(reset! commits []))
(rx/subs #(st/emit! %) (rx/tap on-saving)
(constantly nil) (rx/mapcat (fn [changes]
(fn [] ;; NOTE: this is needed for don't start the
(on-saved)))))))) ;; next persistence before this one is
;; finished.
(rx/merge
(rx/of (persist-changes file-id changes))
(->> stream
(rx/filter (ptk/type? ::changes-persisted))
(rx/take 1)
(rx/tap on-saved)
(rx/ignore)))))
(rx/take-until (rx/delay 100 stoper))
(rx/finalize (fn []
(log/debug :hint "finalize persistence: save loop"))))
;; Synchronous changes
(->> stream
(rx/filter dch/commit-changes?)
(rx/map deref)
(rx/filter library-file?)
(rx/filter (complement #(empty? (:changes %))))
(rx/map persist-synchronous-changes)
(rx/take-until (rx/delay 100 stoper))
(rx/finalize (fn []
(log/debug :hint "finalize persistence: synchronous save loop"))))
)))))
(defn persist-changes (defn persist-changes
[file-id changes] [file-id changes]
(log/debug :hint "persist changes" :changes (count changes))
(us/verify ::us/uuid file-id) (us/verify ::us/uuid file-id)
(ptk/reify ::persist-changes (ptk/reify ::persist-changes
ptk/UpdateEvent
(update [_ state]
(let [into* (fnil into [])]
(update-in state [:workspace-persistence :queue] into* changes)))
ptk/WatchEvent ptk/WatchEvent
(watch [_ state _] (watch [_ state _]
(let [sid (:session-id state) (let [sid (:session-id state)
file (get state :workspace-file) file (get state :workspace-file)
queue (get-in state [:workspace-persistence :queue] [])
params {:id (:id file) params {:id (:id file)
:revn (:revn file) :revn (:revn file)
:session-id sid :session-id sid
:changes-with-metadata (into [] queue)} :changes-with-metadata (into [] changes)}]
ids (into #{} (map :id) queue)
update-persistence-queue
(fn [state]
(update-in state [:workspace-persistence :queue]
(fn [items] (into [] (remove #(ids (:id %))) items))))
handle-response
(fn [lagged]
(let [lagged (cond->> lagged
(= #{sid} (into #{} (map :session-id) lagged))
(map #(assoc % :changes [])))]
(rx/concat
(rx/of update-persistence-queue)
(->> (rx/of lagged)
(rx/mapcat seq)
(rx/map #(shapes-changes-persisted file-id %))))))
on-error
(fn [{:keys [type] :as error}]
(if (or (= :bad-gateway type)
(= :service-unavailable type))
(rx/of (update-persistence-status {:status :error :reason type}))
(rx/concat
(rx/of update-persistence-queue)
(rx/of (update-persistence-status {:status :error :reason type}))
(rx/of (dws/deselect-all))
(->> (rx/of nil)
(rx/delay 200)
(rx/mapcat #(rx/throw error))))))]
(when (= file-id (:id params)) (when (= file-id (:id params))
(->> (rp/mutation :update-file params) (->> (rp/mutation :update-file params)
(rx/mapcat handle-response) (rx/mapcat (fn [lagged]
(rx/catch on-error))))))) (log/debug :hint "changes persisted" :lagged (count lagged))
(let [lagged (cond->> lagged
(= #{sid} (into #{} (map :session-id) lagged))
(map #(assoc % :changes [])))]
(->> (rx/of lagged)
(rx/mapcat seq)
(rx/map #(shapes-changes-persisted file-id %))))))
(rx/catch (fn [cause]
(rx/concat
(rx/of (rt/assign-exception cause))
(rx/throw cause))))))))))
(defn persist-synchronous-changes (defn persist-synchronous-changes
[{:keys [file-id changes]}] [{:keys [file-id changes]}]