🎉 Add zstd+nippy based blob storage format.

This commit is contained in:
Andrey Antukh 2021-02-07 16:33:10 +01:00 committed by Hirunatan
parent 68ed30ff35
commit a709c47f6f
4 changed files with 79 additions and 38 deletions

View file

@ -18,6 +18,8 @@
org.slf4j/slf4j-api {:mvn/version "1.7.30"} org.slf4j/slf4j-api {:mvn/version "1.7.30"}
org.graalvm.js/js {:mvn/version "20.3.0"} org.graalvm.js/js {:mvn/version "20.3.0"}
com.taoensso/nippy {:mvn/version "3.1.1"}
com.github.luben/zstd-jni {:mvn/version "1.4.8-3"}
io.prometheus/simpleclient {:mvn/version "0.9.0"} io.prometheus/simpleclient {:mvn/version "0.9.0"}
io.prometheus/simpleclient_hotspot {:mvn/version "0.9.0"} io.prometheus/simpleclient_hotspot {:mvn/version "0.9.0"}

View file

@ -14,6 +14,7 @@
[app.util.time :as dt] [app.util.time :as dt]
[app.util.transit :as t] [app.util.transit :as t]
[app.common.exceptions :as ex] [app.common.exceptions :as ex]
[taoensso.nippy :as nippy]
[clojure.data.json :as json] [clojure.data.json :as json]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.test :as test] [clojure.test :as test]

View file

@ -24,6 +24,8 @@
:database-username "penpot" :database-username "penpot"
:database-password "penpot" :database-password "penpot"
:default-blob-version 1
:asserts-enabled false :asserts-enabled false
:public-uri "http://localhost:3449" :public-uri "http://localhost:3449"
@ -80,6 +82,7 @@
(s/def ::database-uri ::us/string) (s/def ::database-uri ::us/string)
(s/def ::redis-uri ::us/string) (s/def ::redis-uri ::us/string)
(s/def ::storage-backend ::us/keyword) (s/def ::storage-backend ::us/keyword)
(s/def ::storage-fs-directory ::us/string) (s/def ::storage-fs-directory ::us/string)
(s/def ::assets-path ::us/string) (s/def ::assets-path ::us/string)
@ -143,12 +146,15 @@
(s/def ::initial-data-file ::us/string) (s/def ::initial-data-file ::us/string)
(s/def ::initial-data-project-name ::us/string) (s/def ::initial-data-project-name ::us/string)
(s/def ::default-blob-version ::us/integer)
(s/def ::config (s/def ::config
(s/keys :opt-un [::allow-demo-users (s/keys :opt-un [::allow-demo-users
::asserts-enabled ::asserts-enabled
::database-password ::database-password
::database-uri ::database-uri
::database-username ::database-username
::default-blob-version
::error-report-webhook ::error-report-webhook
::github-client-id ::github-client-id
::github-client-secret ::github-client-secret

View file

@ -10,61 +10,93 @@
(ns app.util.blob (ns app.util.blob
"A generic blob storage encoding. Mainly used for "A generic blob storage encoding. Mainly used for
page data, page options and txlog payload storage." page data, page options and txlog payload storage."
(:require [app.util.transit :as t]) (:require
[app.config :as cfg]
[app.util.transit :as t]
[taoensso.nippy :as n])
(:import (:import
java.io.ByteArrayInputStream java.io.ByteArrayInputStream
java.io.ByteArrayOutputStream java.io.ByteArrayOutputStream
java.io.DataInputStream java.io.DataInputStream
java.io.DataOutputStream java.io.DataOutputStream
com.github.luben.zstd.Zstd
net.jpountz.lz4.LZ4Factory net.jpountz.lz4.LZ4Factory
net.jpountz.lz4.LZ4FastDecompressor net.jpountz.lz4.LZ4FastDecompressor
net.jpountz.lz4.LZ4Compressor)) net.jpountz.lz4.LZ4Compressor))
(defprotocol IDataToBytes
(->bytes [data] "convert data to bytes"))
(extend-protocol IDataToBytes
(Class/forName "[B")
(->bytes [data] data)
String
(->bytes [data] (.getBytes ^String data "UTF-8")))
(def lz4-factory (LZ4Factory/fastestInstance)) (def lz4-factory (LZ4Factory/fastestInstance))
(defn encode
[data]
(let [data (t/encode data {:type :json})
data-len (alength ^bytes data)
cp (.fastCompressor ^LZ4Factory lz4-factory)
max-len (.maxCompressedLength cp data-len)
cdata (byte-array max-len)
clen (.compress ^LZ4Compressor cp ^bytes data 0 data-len cdata 0 max-len)]
(with-open [^ByteArrayOutputStream baos (ByteArrayOutputStream. (+ (alength cdata) 2 4))
^DataOutputStream dos (DataOutputStream. baos)]
(.writeShort dos (short 1)) ;; version number
(.writeInt dos (int data-len))
(.write dos ^bytes cdata (int 0) clen)
(.toByteArray baos))))
(declare decode-v1) (declare decode-v1)
(declare decode-v2)
(declare encode-v1)
(declare encode-v2)
(def default-version
(:default-blob-version cfg/config 1))
(defn encode
([data] (encode data nil))
([data {:keys [version] :or {version default-version}}]
(case version
1 (encode-v1 data)
2 (encode-v2 data)
(throw (ex-info "unsupported version" {:version version})))))
(defn decode (defn decode
"A function used for decode persisted blobs in the database." "A function used for decode persisted blobs in the database."
[data] [^bytes data]
(let [data (->bytes data)]
(with-open [bais (ByteArrayInputStream. data) (with-open [bais (ByteArrayInputStream. data)
dis (DataInputStream. bais)] dis (DataInputStream. bais)]
(let [version (.readShort dis) (let [version (.readShort dis)
udata-len (.readInt dis)] ulen (.readInt dis)]
(case version (case version
1 (decode-v1 data udata-len) 1 (decode-v1 data ulen)
(throw (ex-info "unsupported version" {:version version}))))))) 2 (decode-v2 data ulen)
(throw (ex-info "unsupported version" {:version version}))))))
;; --- IMPL
(defn- encode-v1
[data]
(let [data (t/encode data {:type :json})
dlen (alength ^bytes data)
cp (.fastCompressor ^LZ4Factory lz4-factory)
mlen (.maxCompressedLength cp dlen)
cdata (byte-array mlen)
clen (.compress ^LZ4Compressor cp ^bytes data 0 dlen cdata 0 mlen)]
(with-open [^ByteArrayOutputStream baos (ByteArrayOutputStream. (+ (alength cdata) 2 4))
^DataOutputStream dos (DataOutputStream. baos)]
(.writeShort dos (short 1)) ;; version number
(.writeInt dos (int dlen))
(.write dos ^bytes cdata (int 0) clen)
(.toByteArray baos))))
(defn- decode-v1 (defn- decode-v1
[^bytes cdata ^long udata-len] [^bytes cdata ^long ulen]
(let [^LZ4FastDecompressor dcp (.fastDecompressor ^LZ4Factory lz4-factory) (let [dcp (.fastDecompressor ^LZ4Factory lz4-factory)
^bytes udata (byte-array udata-len)] udata (byte-array ulen)]
(.decompress dcp cdata 6 udata 0 udata-len) (.decompress ^LZ4FastDecompressor dcp cdata 6 ^bytes udata 0 ulen)
(t/decode udata {:type :json}))) (t/decode udata {:type :json})))
(defn- encode-v2
[data]
(let [data (n/fast-freeze data)
dlen (alength data)
mlen (Zstd/compressBound dlen)
cdata (byte-array mlen)
clen (Zstd/compressByteArray ^bytes cdata 0 mlen
^bytes data 0 dlen
8)]
(with-open [^ByteArrayOutputStream baos (ByteArrayOutputStream. (+ (alength cdata) 2 4))
^DataOutputStream dos (DataOutputStream. baos)]
(.writeShort dos (short 2)) ;; version number
(.writeInt dos (int dlen))
(.write dos ^bytes cdata (int 0) clen)
(.toByteArray baos))))
(defn- decode-v2
[^bytes cdata ^long ulen]
(let [udata (byte-array ulen)]
(Zstd/decompressByteArray ^bytes udata 0 ulen
^bytes cdata 6 (- (alength cdata) 6))
(n/fast-thaw udata)))