Add the ability to stream events on rpc methods

This commit is contained in:
Andrey Antukh 2023-12-01 16:18:39 +01:00 committed by Andrés Moya
parent f3e9efa6fe
commit 03518a8da1
21 changed files with 408 additions and 106 deletions

View file

@ -32,6 +32,7 @@
[app.common.types.shape-tree :as ctst]
[app.common.uuid :as uuid]
[app.db :as db]
[app.http.sse :as sse]
[app.media :as media]
[app.rpc.commands.files :as files]
[app.rpc.commands.files-snapshot :as fsnap]
@ -362,6 +363,8 @@
shapes from library components. Mark the file with
the :components-v2 option."
[file-data libraries]
(sse/tap {:type :migration-progress
:section :components})
(let [components (ctkl/components-seq file-data)]
(if (empty? components)
(assoc-in file-data [:options :components-v2] true)
@ -435,6 +438,9 @@
add-instance-grid
(fn [fdata frame-id grid assets]
(reduce (fn [result [component position]]
(sse/tap {:type :migration-progress
:section :components
:name (:name component)})
(add-main-instance result component frame-id (gpt/add position
(gpt/point grid-gap grid-gap))))
fdata
@ -701,6 +707,9 @@
(->> (d/zip media-group grid)
(map (fn [[mobj position]]
(l/trc :hint "submit graphic processing" :file-id (str (:id fdata)) :id (str (:id mobj)))
(sse/tap {:type :migration-progress
:section :graphics
:name (:name mobj)})
(px/submit! executor (partial process mobj position))))
(reduce (fn [fdata promise]
(if-let [changes (deref promise)]
@ -713,6 +722,8 @@
(defn- migrate-graphics
[fdata]
(sse/tap {:type :migration-progress
:section :graphics})
(if (empty? (:media fdata))
fdata
(let [[fdata page-id start-pos]
@ -812,7 +823,6 @@
(defn migrate-file!
[system file-id & {:keys [validate? throw-on-validate?]}]
(let [tpoint (dt/tpoint)
file-id (if (string? file-id)
(parse-uuid file-id)

View file

@ -232,3 +232,7 @@
(if (ex/error? cause)
(handle-error cause request nil)
(handle-exception cause request nil)))
(defn handle'
[cause request]
(::rres/body (handle cause request)))

View file

@ -0,0 +1,86 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.http.sse
"SSE (server sent events) helpers"
(:refer-clojure :exclude [tap])
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.transit :as t]
[app.http.errors :as errors]
[promesa.core :as p]
[promesa.exec :as px]
[promesa.exec.csp :as sp]
[promesa.util :as pu]
[ring.response :as rres])
(:import
java.io.OutputStream))
(def ^:dynamic *channel* nil)
(defn- write!
[^OutputStream output ^bytes data]
(l/trc :hint "writting data" :data data :length (alength data))
(.write output data)
(.flush output))
(defn- create-writer-loop
[^OutputStream output]
(try
(loop []
(when-let [event (sp/take! *channel*)]
(let [result (ex/try! (write! output event))]
(if (ex/exception? result)
(l/wrn :hint "unexpected exception on sse writer" :cause result)
(recur)))))
(finally
(pu/close! output))))
(defn- encode
[[name data]]
(try
(let [data (with-out-str
(println "event:" (d/name name))
(println "data:" (t/encode-str data {:type :json-verbose}))
(println))]
(.getBytes data "UTF-8"))
(catch Throwable cause
(l/err :hint "unexpected error on encoding value on sse stream"
:cause cause)
nil)))
;; ---- PUBLIC API
(def default-headers
{"Content-Type" "text/event-stream;charset=UTF-8"
"Cache-Control" "no-cache, no-store, max-age=0, must-revalidate"
"Pragma" "no-cache"})
(defn tap
([data] (tap "event" data))
([name data]
(when-let [channel *channel*]
(sp/put! channel [name data])
nil)))
(defn response
[handler & {:keys [buf] :or {buf 32} :as opts}]
(fn [request]
{::rres/headers default-headers
::rres/status 200
::rres/body (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output]
(binding [*channel* (sp/chan :buf buf :xf (keep encode))]
(let [writer (px/run! :virtual (partial create-writer-loop output))]
(try
(tap "end" (handler))
(catch Throwable cause
(tap "error" (errors/handle' cause request)))
(finally
(sp/close! *channel*)
(p/await! writer)))))))}))

View file

@ -57,14 +57,16 @@
(defn- handle-response
[request result]
(if (fn? result)
(result request)
(let [mdata (meta result)]
(-> {::rres/status (::http/status mdata 200)
::rres/headers (::http/headers mdata {})
::rres/body (rph/unwrap result)}
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata)))))
(let [mdata (meta result)
response (if (fn? result)
(result request)
(let [result (rph/unwrap result)]
{::rres/status (::http/status mdata 200)
::rres/headers (::http/headers mdata {})
::rres/body result}))]
(-> response
(handle-response-transformation request mdata)
(handle-before-comple-hook mdata))))
(defn- rpc-handler
"Ring handler that dispatches cmd requests and convert between

View file

@ -15,6 +15,7 @@
[app.common.files.validate :as fval]
[app.common.fressian :as fres]
[app.common.logging :as l]
[app.common.schema :as sm]
[app.common.spec :as us]
[app.common.types.file :as ctf]
[app.common.uuid :as uuid]
@ -22,6 +23,7 @@
[app.db :as db]
[app.features.components-v2 :as features.components-v2]
[app.features.fdata :as features.fdata]
[app.http.sse :as sse]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.media :as media]
@ -30,7 +32,6 @@
[app.rpc.commands.projects :as projects]
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.storage :as sto]
[app.storage.tmp :as tmp]
[app.tasks.file-gc]
@ -38,11 +39,13 @@
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.set :as set]
[clojure.spec.alpha :as s]
[clojure.walk :as walk]
[cuerdas.core :as str]
[datoteka.io :as io]
[promesa.core :as p]
[promesa.util :as pu]
[ring.response :as rres]
[yetti.adapter :as yt])
@ -642,6 +645,9 @@
validate? (contains? cf/flags :file-validation)
features (cfeat/get-team-enabled-features cf/flags team)]
(sse/tap {:type :import-progress
:section :read-import})
;; Process all sections
(run! (fn [section]
(l/dbg :hint "reading section" :section section ::l/sync? true)
@ -651,6 +657,8 @@
(assoc ::section section)
(assoc ::input input))]
(binding [*options* options]
(sse/tap {:type :import-progress
:section section})
(read-section options))))
[:v1/metadata :v1/files :v1/rels :v1/sobjects])
@ -1056,54 +1064,71 @@
;; --- Command: export-binfile
(s/def ::file-id ::us/uuid)
(s/def ::include-libraries? ::us/boolean)
(s/def ::embed-assets? ::us/boolean)
(s/def ::export-binfile
(s/keys :req [::rpc/profile-id]
:req-un [::file-id ::include-libraries? ::embed-assets?]))
(def ^:private
schema:export-binfile
(sm/define
[:map {:title "export-binfile"}
[:file-id ::sm/uuid]
[:include-libraries? :boolean]
[:embed-assets? :boolean]]))
(sv/defmethod ::export-binfile
"Export a penpot file in a binary format."
{::doc/added "1.15"
::webhooks/event? true}
::webhooks/event? true
::sm/result schema:export-binfile}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}]
(files/check-read-permissions! pool profile-id file-id)
(let [body (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(-> cfg
(assoc ::file-ids [file-id])
(assoc ::embed-assets? embed-assets?)
(assoc ::include-libraries? include-libraries?)
(export! output-stream))))]
(fn [_]
{::rres/status 200
::rres/headers {"content-type" "application/octet-stream"}
::rres/body (reify rres/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
(-> cfg
(assoc ::file-ids [file-id])
(assoc ::embed-assets? embed-assets?)
(assoc ::include-libraries? include-libraries?)
(export! output-stream))))}))
(fn [_]
{::rres/status 200
::rres/body body
::rres/headers {"content-type" "application/octet-stream"}})))
(s/def ::file ::media/upload)
(s/def ::import-binfile
(s/keys :req [::rpc/profile-id]
:req-un [::project-id ::file]))
;; --- Command: import-binfile
(def ^:private
schema:import-binfile
(sm/define
[:map {:title "import-binfile"}
[:project-id ::sm/uuid]
[:file ::media/upload]]))
(declare ^:private import-binfile)
(sv/defmethod ::import-binfile
"Import a penpot file in a binary format."
{::doc/added "1.15"
::webhooks/event? true}
::webhooks/event? true
::sse/stream? true
::sm/params schema:import-binfile}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id project-id file] :as params}]
(db/with-atomic [conn pool]
(projects/check-read-permissions! conn profile-id project-id)
(let [ids (import! (assoc cfg
::input (:path file)
::project-id project-id
::profile-id profile-id
::ignore-index-errors? true))]
(projects/check-read-permissions! pool profile-id project-id)
(let [params (-> cfg
(assoc ::input (:path file))
(assoc ::project-id project-id)
(assoc ::profile-id profile-id)
(assoc ::ignore-index-errors? true))]
(with-meta
(sse/response #(import-binfile params))
{::audit/props {:file nil}})))
(db/update! conn :project
{:modified-at (dt/now)}
{:id project-id})
(rph/with-meta ids
{::audit/props {:file nil :file-ids ids}}))))
(defn- import-binfile
[{:keys [::wrk/executor ::project-id] :as params}]
(db/tx-run! params
(fn [{:keys [::db/conn] :as params}]
;; NOTE: the importation process performs some operations that
;; are not very friendly with virtual threads, and for avoid
;; unexpected blocking of other concurrent operations we
;; dispatch that operation to a dedicated executor.
(let [result (p/thread-call executor (partial import! params))]
(db/update! conn :project
{:modified-at (dt/now)}
{:id project-id})
(deref result)))))

View file

@ -14,6 +14,7 @@
[app.common.schema :as sm]
[app.common.uuid :as uuid]
[app.db :as db]
[app.http.sse :as sse]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc :as-alias rpc]
[app.rpc.commands.binfile :as binfile]
@ -27,7 +28,9 @@
[app.util.pointer-map :as pmap]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.walk :as walk]
[promesa.core :as p]
[promesa.exec :as px]))
;; --- COMMAND: Duplicate File
@ -405,29 +408,6 @@
;; --- COMMAND: Clone Template
(defn- clone-template!
[{:keys [::db/conn] :as cfg} {:keys [profile-id template-id project-id]}]
(let [template (tmpl/get-template-stream cfg template-id)
project (db/get-by-id conn :project project-id {:columns [:id :team-id]})]
(when-not template
(ex/raise :type :not-found
:code :template-not-found
:hint "template not found"))
(teams/check-edition-permissions! conn profile-id (:team-id project))
(-> cfg
;; FIXME: maybe reuse the conn instead of creating more
;; connections in the import process?
(dissoc ::db/conn)
(assoc ::binfile/input template)
(assoc ::binfile/project-id (:id project))
(assoc ::binfile/profile-id profile-id)
(assoc ::binfile/ignore-index-errors? true)
(assoc ::binfile/migrate? true)
(binfile/import!))))
(def ^:private
schema:clone-template
(sm/define
@ -435,15 +415,46 @@
[:project-id ::sm/uuid]
[:template-id ::sm/word-string]]))
(declare ^:private clone-template)
(sv/defmethod ::clone-template
"Clone into the specified project the template by its id."
{::doc/added "1.16"
::sse/stream? true
::webhooks/event? true
::sm/params schema:clone-template}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}]
(db/with-atomic [conn pool]
(-> (assoc cfg ::db/conn conn)
(clone-template! (assoc params :profile-id profile-id)))))
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id project-id template-id] :as params}]
(let [project (db/get-by-id pool :project project-id {:columns [:id :team-id]})
_ (teams/check-edition-permissions! pool profile-id (:team-id project))
template (tmpl/get-template-stream cfg template-id)
params (-> cfg
(assoc ::binfile/input template)
(assoc ::binfile/project-id (:id project))
(assoc ::binfile/profile-id profile-id)
(assoc ::binfile/ignore-index-errors? true)
(assoc ::binfile/migrate? true))]
(when-not template
(ex/raise :type :not-found
:code :template-not-found
:hint "template not found"))
(sse/response #(clone-template params))))
(defn- clone-template
[{:keys [::wrk/executor ::binfile/project-id] :as params}]
(db/tx-run! params
(fn [{:keys [::db/conn] :as params}]
;; NOTE: the importation process performs some operations that
;; are not very friendly with virtual threads, and for avoid
;; unexpected blocking of other concurrent operations we
;; dispatch that operation to a dedicated executor.
(let [result (p/thread-call executor (partial binfile/import! params))]
(db/update! conn :project
{:modified-at (dt/now)}
{:id project-id})
(deref result)))))
;; --- COMMAND: Get list of builtin templates

View file

@ -16,6 +16,7 @@
[app.common.schema.openapi :as oapi]
[app.common.schema.registry :as sr]
[app.config :as cf]
[app.http.sse :as-alias sse]
[app.loggers.webhooks :as-alias webhooks]
[app.rpc :as-alias rpc]
[app.util.json :as json]
@ -55,6 +56,7 @@
:module (or (some-> (::module mdata) d/name)
(-> (:ns mdata) (str/split ".") last))
:auth (::rpc/auth mdata true)
:sse (::sse/stream? mdata false)
:webhook (::webhooks/event? mdata false)
:docs (::sv/docstring mdata)
:deprecated (::deprecated mdata)