From ff607fa01807b453bd24e183db5e73422de10bf6 Mon Sep 17 00:00:00 2001 From: Caleb Doxsey Date: Mon, 2 Jun 2025 13:42:30 -0600 Subject: [PATCH] databroker: add sync-cache (#5639) ## Summary Add a new `SyncCache`: ```go type SyncCache interface { // Clear deletes all the data for the given record type in the sync cache. Clear(recordType string) error // Records yields the databroker records stored in the cache. Records(recordType string) iter.Seq2[*Record, error] // Sync syncs the cache with the databroker. Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error } ``` The cache maintains databroker records in a local pebble database (which could be on-disk or in-memory). The way it's used is you first call `.Sync(ctx, client, recordType)` and then `.Records(recordType)`, which returns an iterator over all the records. Internally we store the databroker records in a pebble key-value database. Pebble was chosen because its fast and well-tested, but any ordered key-value store would work. The first time we call `SyncLatest` to retrieve all the records. Each subsequent time we call `Sync` with the current server and record versions to retrieve only the changes. This is significantly more efficient than calling `SyncLatest` every time. The primary use for this is in the enterprise-console as part of directory sync to improve performance with large datasets. ## Related issues - [ENG-2401](https://linear.app/pomerium/issue/ENG-2401/enterprise-console-improve-performance-of-directory-sync-using-cached) ## Checklist - [x] reference any related issues - [x] updated unit tests - [x] add appropriate label (`enhancement`, `bug`, `breaking`, `dependencies`, `ci`) - [x] ready for review --------- Co-authored-by: Denis Mishin --- go.mod | 15 +- go.sum | 38 +++- pkg/grpc/databroker/sync_cache.go | 294 +++++++++++++++++++++++++ pkg/grpc/databroker/sync_cache_test.go | 109 +++++++++ pkg/pebbleutil/pebbleutil.go | 115 ++++++++++ 5 files changed, 569 insertions(+), 2 deletions(-) create mode 100644 pkg/grpc/databroker/sync_cache.go create mode 100644 pkg/grpc/databroker/sync_cache_test.go create mode 100644 pkg/pebbleutil/pebbleutil.go diff --git a/go.mod b/go.mod index 92bac985c..34c1ea8ea 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/caddyserver/certmagic v0.23.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/cloudflare/circl v1.6.1 + github.com/cockroachdb/pebble/v2 v2.0.4 github.com/coreos/go-oidc/v3 v3.14.1 github.com/docker/docker v28.1.1+incompatible github.com/envoyproxy/go-control-plane/envoy v1.32.4 @@ -116,6 +117,7 @@ require ( cloud.google.com/go/monitoring v1.24.0 // indirect dario.cat/mergo v1.0.1 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect + github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect @@ -143,6 +145,13 @@ require ( github.com/caddyserver/zerossl v0.1.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 // indirect + github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94 // indirect + github.com/cockroachdb/errors v1.11.3 // indirect + github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect + github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect + github.com/cockroachdb/redact v1.1.5 // indirect + github.com/cockroachdb/swiss v0.0.0-20250304010804-34a2c6a59016 // indirect + github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect github.com/cpuguy83/dockercfg v0.3.2 // indirect @@ -154,6 +163,7 @@ require ( github.com/ebitengine/purego v0.8.2 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fxamacker/cbor/v2 v2.6.0 // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-ini/ini v1.67.0 // indirect github.com/go-jose/go-jose/v4 v4.0.5 // indirect github.com/go-kit/log v0.2.1 // indirect @@ -166,7 +176,7 @@ require ( github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect github.com/google/cel-go v0.25.0 // indirect github.com/google/go-tpm v0.9.0 // indirect github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect @@ -181,6 +191,8 @@ require ( github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/kr/text v0.2.0 // indirect github.com/kralicky/go-adaptive-radix-tree v0.0.0-20240624235931-330eb762e74c // indirect github.com/libdns/libdns v1.0.0-beta.1 // indirect github.com/lufia/plan9stats v0.0.0-20240513124658-fba389f38bae // indirect @@ -212,6 +224,7 @@ require ( github.com/prometheus/statsd_exporter v0.22.7 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/rs/xid v1.6.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect github.com/shirou/gopsutil/v4 v4.25.1 // indirect diff --git a/go.sum b/go.sum index d9a6b198c..394f12a58 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/CAFxX/httpcompression v0.0.9 h1:0ue2X8dOLEpxTm8tt+OdHcgA+gbDge0OqFQWGKSqgrg= github.com/CAFxX/httpcompression v0.0.9/go.mod h1:XX8oPZA+4IDcfZ0A71Hz0mZsv/YJOgYygkFhizVPilM= +github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE= +github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 h1:ErKg/3iS1AKcTkf3yixlZ54f9U1rljCkQyEXWUnIUxc= github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0/go.mod h1:yAZHSGnqScoU556rBOVkwLze6WP5N+U11RHuWaGVxwY= github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 h1:fYE9p3esPxA/C0rQ0AHhP0drtPXDRhaWiwg1DPqO7IU= @@ -80,6 +82,8 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f h1:JjxwchlOepwsUWcQwD2mLUAGE9aCp0/ehy6yCHFBOvo= +github.com/aclements/go-perfevent v0.0.0-20240301234650-f7843625020f/go.mod h1:tMDTce/yLLN/SK8gMOxQfnyeMeCg8KGzp0D1cbECEeo= github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -166,6 +170,26 @@ github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42 h1:Om6kYQYDUk5wWbT0t0q6pvyM49i9XZAv9dDrkDA7gjk= github.com/cncf/xds/go v0.0.0-20250121191232-2f005788dc42/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= +github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94 h1:bvJv505UUfjzbaIPdNS4AEkHreDqQk6yuNpsdRHpwFA= +github.com/cockroachdb/crlib v0.0.0-20241015224233-894974b3ad94/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac= +github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056 h1:slXychO2uDM6hYRu4c0pD0udNI8uObfeKN6UInWViS8= +github.com/cockroachdb/datadriven v1.0.3-0.20240530155848-7682d40af056/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= +github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= +github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= +github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895 h1:XANOgPYtvELQ/h4IrmPAohXqe2pWA8Bwhejr3VQoZsA= +github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895/go.mod h1:aPd7gM9ov9M8v32Yy5NJrDyOcD8z642dqs+F0CeNXfA= +github.com/cockroachdb/pebble/v2 v2.0.4 h1:1bnNm9TxObCKYYV8gxp17ja8XK+tcsvyhl9FALenocA= +github.com/cockroachdb/pebble/v2 v2.0.4/go.mod h1:VqRjBA5foHAE0sW2N/E8lg+y0zHWV1xKLZNGoReg16c= +github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= +github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/swiss v0.0.0-20250304010804-34a2c6a59016 h1:d0dxi6Q8yGtRBupybhtBC4GJk8WegtDAFr+Xk+/3sSE= +github.com/cockroachdb/swiss v0.0.0-20250304010804-34a2c6a59016/go.mod h1:yBRu/cnL4ks9bgy4vAASdjIW+/xMlFwuHKqtmh3GZQg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= @@ -176,6 +200,7 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/dockercfg v0.3.2 h1:DlJTyZGBDlXqUZ2Dk2Q3xHs/FtnooJJVaad2S9GKorA= github.com/cpuguy83/dockercfg v0.3.2/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -230,8 +255,14 @@ github.com/fxamacker/cbor/v2 v2.6.0 h1:sU6J2usfADwWlYDAFhZBQ6TnLFBHxgesMrQfQgk1t github.com/fxamacker/cbor/v2 v2.6.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/gaissmai/bart v0.20.4 h1:Ik47r1fy3jRVU+1eYzKSW3ho2UgBVTVnUS8O993584U= github.com/gaissmai/bart v0.20.4/go.mod h1:cEed+ge8dalcbpi8wtS9x9m2hn/fNJH5suhdGQOHnYk= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9 h1:r5GgOLGbza2wVHRzK7aAj6lWZjfbAwiu/RDCVOKjRyM= +github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9/go.mod h1:106OIgooyS7OzLDOpUGgm9fA3bQENb/cFSyyBmMoJDs= github.com/go-chi/chi/v5 v5.2.1 h1:KOIHODQj58PmL80G2Eak4WdvUzjSJSm0vG72crDCqb8= github.com/go-chi/chi/v5 v5.2.1/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -307,8 +338,9 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e h1:4bw4WeyTYPp0smaXiJZCNnLrvVBqirQVreixayXezGc= +github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f h1:jopqB+UTSdJGEJT8tEqYyE29zN91fi2827oLET8tl7k= github.com/google/brotli/go/cbrotli v0.0.0-20230829110029-ed738e842d2f/go.mod h1:nOPhAkwVliJdNTkj3gXpljmWhjc4wCaVqbMJcPKWP4s= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -521,8 +553,11 @@ github.com/peterbourgon/ff/v3 v3.4.0/go.mod h1:zjJVUhx+twciwfDl0zBcFzl4dW8axCRyX github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pires/go-proxyproto v0.8.0 h1:5unRmEAPbHXHuLjDg01CxJWf91cw3lKHc/0xzKpXEe0= github.com/pires/go-proxyproto v0.8.0/go.mod h1:iknsfgnH8EkjrMeMyvfKByp9TiBZCKZM0jx2xmKqnVY= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -586,6 +621,7 @@ github.com/quic-go/quic-go v0.51.0/go.mod h1:MFlGGpcpJqRAfmYi6NC2cptDPSxRWTOGNuP github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= diff --git a/pkg/grpc/databroker/sync_cache.go b/pkg/grpc/databroker/sync_cache.go new file mode 100644 index 000000000..40c87d1e7 --- /dev/null +++ b/pkg/grpc/databroker/sync_cache.go @@ -0,0 +1,294 @@ +package databroker + +import ( + "context" + "errors" + "fmt" + "io" + "iter" + "slices" + + pebble "github.com/cockroachdb/pebble/v2" + "github.com/google/uuid" + "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/wrapperspb" + + "github.com/pomerium/pomerium/pkg/pebbleutil" +) + +const ( + fieldServerVersion byte = 1 + fieldRecordVersion byte = 2 + fieldRecord byte = 3 +) + +var ( + syncCacheUUIDNamespace = uuid.MustParse("c9acb8d4-f10a-4e3c-9308-c285e1ebfb58") + marshalOptions = &proto.MarshalOptions{AllowPartial: true, Deterministic: true} + unmarshalOptions = &proto.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true} +) + +// A SyncCache uses the databroker Sync and SyncLatest methods to populate a cache of records. +// To use the SyncCache call Sync followed by Records. +// +// Data is stored in a pebble database in this format: +// +// {{prefix}}{{uuidv5(recordType)}}0x01: the server version +// {{prefix}}{{uuidv5(recordType)}}0x02: the latest record version +// {{prefix}}{{uuidv5(recordType)}}0x03{{recordID}}: a databroker record +// +// Values are protobuf encoded. +type SyncCache interface { + // Clear deletes all the data for the given record type in the sync cache. + Clear(recordType string) error + // Records yields the databroker records stored in the cache. + Records(recordType string) iter.Seq2[*Record, error] + // Sync syncs the cache with the databroker. + Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error +} + +type syncCache struct { + db *pebble.DB + prefix []byte + + iterOptions *pebble.IterOptions + writeOptions *pebble.WriteOptions +} + +// NewSyncCache creates a new SyncCache. +func NewSyncCache(db *pebble.DB, prefix []byte) SyncCache { + return &syncCache{ + db: db, + prefix: prefix, + } +} + +func (c *syncCache) Clear(recordType string) error { + // delete all the existing data + err := c.pebbleDeletePrefix(c.db, c.recordTypePrefix(recordType)) + if err != nil { + return fmt.Errorf("sync-cache: error clearing data from cache (record-type=%s): %w", recordType, err) + } + + return nil +} + +func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] { + prefix := c.recordPrefix(recordType) + iterOptions := new(pebble.IterOptions) + if c.iterOptions != nil { + *iterOptions = *c.iterOptions + } + iterOptions.LowerBound = prefix + iterOptions.UpperBound = pebbleutil.PrefixToUpperBound(prefix) + return func(yield func(*Record, error) bool) { + for record, err := range pebbleutil.Iterate(c.db, iterOptions, pebbleIteratorToRecord) { + if err != nil { + yield(nil, fmt.Errorf("sync-cache: error iterating over cached records (record-type=%s): %w", recordType, err)) + return + } + + if !yield(record, nil) { + return + } + } + } +} + +func (c *syncCache) Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error { + serverVersion, recordVersion := wrapperspb.UInt64(0), wrapperspb.UInt64(0) + err := errors.Join( + c.pebbleGetProto(c.db, c.serverVersionKey(recordType), serverVersion), + c.pebbleGetProto(c.db, c.recordVersionKey(recordType), recordVersion), + ) + if errors.Is(err, pebble.ErrNotFound) { + // if we've never synced, use sync latest + return c.syncLatest(ctx, client, recordType) + } else if err != nil { + return fmt.Errorf("sync-cache: error retrieving server and record version from cache (record-type=%s): %w", recordType, err) + } + + return c.sync(ctx, client, recordType, serverVersion.Value, recordVersion.Value) +} + +func (c *syncCache) recordKey(recordType, recordID string) []byte { + return slices.Concat(c.recordPrefix(recordType), []byte(recordID)) +} + +func (c *syncCache) recordPrefix(recordType string) []byte { + return append(c.recordTypePrefix(recordType), fieldRecord) +} + +func (c *syncCache) recordTypePrefix(recordType string) []byte { + id := uuid.NewSHA1(syncCacheUUIDNamespace, []byte(recordType)) + return slices.Concat(c.prefix, id[:]) +} + +func (c *syncCache) recordVersionKey(recordType string) []byte { + return append(c.recordTypePrefix(recordType), fieldRecordVersion) +} + +func (c *syncCache) serverVersionKey(recordType string) []byte { + return append(c.recordTypePrefix(recordType), fieldServerVersion) +} + +func (c *syncCache) sync(ctx context.Context, client DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error { + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + + stream, err := client.Sync(streamCtx, &SyncRequest{ + Type: recordType, + ServerVersion: serverVersion, + RecordVersion: recordVersion, + Wait: proto.Bool(false), + }) + if err != nil { + return fmt.Errorf("sync-cache: error starting sync stream (record-type=%s): %w", recordType, err) + } + + // batch the updates together + batch := c.db.NewBatch() + defer batch.Close() + + for { + res, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } else if status.Code(err) == codes.Aborted { + cancel() + // the server version changed, so use sync latest + return c.syncLatest(ctx, client, recordType) + } else if err != nil { + return fmt.Errorf("sync-cache: error receiving message from sync stream (record-type=%s): %w", recordType, err) + } + + // either delete or update the record + if res.Record.DeletedAt != nil { + err = c.pebbleDelete(batch, c.recordKey(recordType, res.Record.Id)) + } else { + err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record) + } + if err != nil { + return fmt.Errorf("sync-cache: error updating record in cache (record-type=%s): %w", recordType, err) + } + + // update the record version + recordVersion = max(recordVersion, res.Record.Version) + err = c.pebbleSetProto(batch, c.recordVersionKey(recordType), wrapperspb.UInt64(recordVersion)) + if err != nil { + return fmt.Errorf("sync-cache: error updating record version in cache (record-type=%s): %w", recordType, err) + } + } + + err = batch.Commit(c.writeOptions) + if err != nil { + return fmt.Errorf("sync-cache: error committing changes to cache (record-type=%s): %w", recordType, err) + } + + return nil +} + +func (c *syncCache) syncLatest(ctx context.Context, client DataBrokerServiceClient, recordType string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + stream, err := client.SyncLatest(ctx, &SyncLatestRequest{ + Type: recordType, + }) + if err != nil { + return fmt.Errorf("sync-cache: error starting sync latest stream (record-type=%s): %w", recordType, err) + } + + // batch the updates together + batch := c.db.NewBatch() + defer batch.Close() + + // delete all the existing data + err = c.pebbleDeletePrefix(batch, c.recordTypePrefix(recordType)) + if err != nil { + return fmt.Errorf("sync-cache: error deleting existing data from cache (record-type=%s): %w", recordType, err) + } + + for { + res, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } else if err != nil { + return fmt.Errorf("sync-cache: error receiving message from sync latest stream (record-type=%s): %w", recordType, err) + } + + switch res := res.Response.(type) { + case *SyncLatestResponse_Record: + // add the record + err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record) + if err != nil { + return fmt.Errorf("sync-cache: error saving record to cache (record-type=%s): %w", recordType, err) + } + case *SyncLatestResponse_Versions: + // update the versions + err = errors.Join( + c.pebbleSetProto(batch, c.serverVersionKey(recordType), wrapperspb.UInt64(res.Versions.ServerVersion)), + c.pebbleSetProto(batch, c.recordVersionKey(recordType), wrapperspb.UInt64(res.Versions.LatestRecordVersion)), + ) + if err != nil { + return fmt.Errorf("sync-cache: error saving versions to cache (record-type=%s): %w", recordType, err) + } + default: + return fmt.Errorf("sync-cache: unknown message type from sync latest stream (record-type=%s): %T", recordType, res) + } + } + + err = batch.Commit(c.writeOptions) + if err != nil { + return fmt.Errorf("sync-cache: error committing changes to cache (record-type=%s): %w", recordType, err) + } + + return nil +} + +func (c *syncCache) pebbleDelete(dst pebble.Writer, key []byte) error { + return dst.Delete(key, c.writeOptions) +} + +func (c *syncCache) pebbleDeletePrefix(dst pebble.Writer, prefix []byte) error { + return dst.DeleteRange(prefix, pebbleutil.PrefixToUpperBound(prefix), c.writeOptions) +} + +func (c *syncCache) pebbleGetProto(src pebble.Reader, key []byte, msg proto.Message) error { + value, closer, err := src.Get(key) + if err != nil { + return err + } + err = unmarshalOptions.Unmarshal(value, msg) + _ = closer.Close() + return err +} + +func (c *syncCache) pebbleSet(dst pebble.Writer, key, value []byte) error { + return dst.Set(key, value, c.writeOptions) +} + +func (c *syncCache) pebbleSetProto(dst pebble.Writer, key []byte, msg proto.Message) error { + value, err := marshalOptions.Marshal(msg) + if err != nil { + return err + } + return c.pebbleSet(dst, key, value) +} + +func pebbleIteratorToRecord(it *pebble.Iterator) (*Record, error) { + value, err := it.ValueAndErr() + if err != nil { + return nil, err + } + + record := new(Record) + err = unmarshalOptions.Unmarshal(value, record) + if err != nil { + return nil, err + } + + return record, nil +} diff --git a/pkg/grpc/databroker/sync_cache_test.go b/pkg/grpc/databroker/sync_cache_test.go new file mode 100644 index 000000000..ac6eec6e5 --- /dev/null +++ b/pkg/grpc/databroker/sync_cache_test.go @@ -0,0 +1,109 @@ +package databroker_test + +import ( + "context" + "iter" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace/noop" + grpc "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/pomerium/pomerium/internal/databroker" + "github.com/pomerium/pomerium/internal/testutil" + databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpc/user" + "github.com/pomerium/pomerium/pkg/pebbleutil" + "github.com/pomerium/pomerium/pkg/protoutil" +) + +func TestSyncCache(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + + prefix := []byte{1, 2, 3} + typeUUID := uuid.MustParse("e86635f2-f7ad-594d-a37e-a447aca46801") + + cc1 := testutil.NewGRPCServer(t, func(s *grpc.Server) { + databrokerpb.RegisterDataBrokerServiceServer(s, databroker.New(ctx, noop.NewTracerProvider())) + }) + cc2 := testutil.NewGRPCServer(t, func(s *grpc.Server) { + databrokerpb.RegisterDataBrokerServiceServer(s, databroker.New(ctx, noop.NewTracerProvider())) + }) + client1 := databrokerpb.NewDataBrokerServiceClient(cc1) + client2 := databrokerpb.NewDataBrokerServiceClient(cc2) + + expected := []*databrokerpb.Record{ + databrokerpb.NewRecord(&user.User{Id: "u1"}), + databrokerpb.NewRecord(&user.User{Id: "u2"}), + databrokerpb.NewRecord(&user.User{Id: "u3"}), + } + res, err := client1.Put(ctx, &databrokerpb.PutRequest{Records: expected}) + require.NoError(t, err) + expected = res.Records + + db := pebbleutil.MustOpenMemory(nil) + require.NoError(t, db.Set([]byte("OTHER"), []byte("VALUE"), nil)) + c := databrokerpb.NewSyncCache(db, prefix) + + assert.NoError(t, c.Sync(ctx, client1, protoutil.GetTypeURL(new(user.User)))) + actual := collect(t, c.Records(protoutil.GetTypeURL(new(user.User)))) + testutil.AssertProtoEqual(t, expected, actual) + + assert.Equal(t, [][]byte{ + append(append(prefix, typeUUID[:]...), 1), // server version + append(append(prefix, typeUUID[:]...), 2), // record version + append(append(append(prefix, typeUUID[:]...), 3), "u1"...), // record u1 + append(append(append(prefix, typeUUID[:]...), 3), "u2"...), // record u2 + append(append(append(prefix, typeUUID[:]...), 3), "u3"...), // record u3 + []byte("OTHER"), + }, collect(t, pebbleutil.IterateKeys(db, nil))) + + // test adding a new record + + u4 := databrokerpb.NewRecord(&user.User{Id: "u4"}) + res, err = client1.Put(ctx, &databrokerpb.PutRequest{Records: []*databrokerpb.Record{u4}}) + require.NoError(t, err) + expected = append(expected, res.Records...) + + assert.NoError(t, c.Sync(ctx, client1, protoutil.GetTypeURL(new(user.User)))) + actual = collect(t, c.Records(protoutil.GetTypeURL(new(user.User)))) + testutil.AssertProtoEqual(t, expected, actual) + + // test deleting a record + + u4.DeletedAt = timestamppb.Now() + _, err = client1.Put(ctx, &databrokerpb.PutRequest{Records: []*databrokerpb.Record{u4}}) + require.NoError(t, err) + expected = expected[:len(expected)-1] + + assert.NoError(t, c.Sync(ctx, client1, protoutil.GetTypeURL(new(user.User)))) + actual = collect(t, c.Records(protoutil.GetTypeURL(new(user.User)))) + testutil.AssertProtoEqual(t, expected, actual) + + // test server version change + + assert.NoError(t, c.Sync(ctx, client2, protoutil.GetTypeURL(new(user.User)))) + actual = collect(t, c.Records(protoutil.GetTypeURL(new(user.User)))) + assert.Empty(t, actual) + + // make sure clear works but other keys are left untouched + + assert.NoError(t, c.Clear(protoutil.GetTypeURL(new(user.User)))) + assert.Equal(t, [][]byte{[]byte("OTHER")}, collect(t, pebbleutil.IterateKeys(db, nil))) +} + +func collect[T any](tb testing.TB, seq iter.Seq2[T, error]) []T { + var records []T + for record, err := range seq { + require.NoError(tb, err) + records = append(records, record) + } + return records +} diff --git a/pkg/pebbleutil/pebbleutil.go b/pkg/pebbleutil/pebbleutil.go new file mode 100644 index 000000000..635e392e7 --- /dev/null +++ b/pkg/pebbleutil/pebbleutil.go @@ -0,0 +1,115 @@ +package pebbleutil + +import ( + "context" + "iter" + "slices" + + "github.com/cockroachdb/pebble/v2" + "github.com/cockroachdb/pebble/v2/vfs" +) + +// Iterate iterates over a pebble reader. +func Iterate[T any](src pebble.Reader, iterOptions *pebble.IterOptions, f func(it *pebble.Iterator) (T, error)) iter.Seq2[T, error] { + var zero T + return func(yield func(T, error) bool) { + it, err := src.NewIter(iterOptions) + if err != nil { + yield(zero, err) + return + } + + for it.First(); it.Valid(); it.Next() { + value, err := f(it) + if err != nil { + _ = it.Close() + yield(zero, err) + return + } + + if !yield(value, nil) { + _ = it.Close() + return + } + } + + err = it.Error() + if err != nil { + _ = it.Close() + yield(zero, err) + return + } + + err = it.Close() + if err != nil { + yield(zero, err) + return + } + } +} + +// IterateKeys yields the keys in a pebble reader. +func IterateKeys(src pebble.Reader, iterOptions *pebble.IterOptions) iter.Seq2[[]byte, error] { + return Iterate(src, iterOptions, func(it *pebble.Iterator) ([]byte, error) { + return slices.Clone(it.Key()), nil + }) +} + +// IterateValues yields the values in a pebble reader. +func IterateValues(src pebble.Reader, iterOptions *pebble.IterOptions) iter.Seq2[[]byte, error] { + return Iterate(src, iterOptions, func(it *pebble.Iterator) ([]byte, error) { + value, err := it.ValueAndErr() + if err != nil { + return nil, err + } + return slices.Clone(value), nil + }) +} + +// MustOpen opens a pebble database. It sets options useful for pomerium and panics if there is an error. +func MustOpen(dirname string, options *pebble.Options) *pebble.DB { + db, err := Open(dirname, options) + if err != nil { + panic(err) + } + return db +} + +// MustOpenMemory opens an in-memory pebble database. It panics if there is an error. +func MustOpenMemory(options *pebble.Options) *pebble.DB { + if options == nil { + options = new(pebble.Options) + } + options.FS = vfs.NewMem() + return MustOpen("", options) +} + +// Open opens a pebble database. It sets options useful for pomerium. +func Open(dirname string, options *pebble.Options) (*pebble.DB, error) { + if options == nil { + options = new(pebble.Options) + } + options.LoggerAndTracer = pebbleLogger{} + return pebble.Open(dirname, options) +} + +// PrefixToUpperBound returns an upper bound for the given prefix. +func PrefixToUpperBound(prefix []byte) []byte { + upperBound := make([]byte, len(prefix)) + copy(upperBound, prefix) + for i := len(upperBound) - 1; i >= 0; i-- { + upperBound[i] = upperBound[i] + 1 + if upperBound[i] != 0 { + return upperBound[:i+1] + } + } + return nil // no upper-bound +} + +type pebbleLogger struct{} + +func (pebbleLogger) Infof(_ string, _ ...any) {} +func (pebbleLogger) Errorf(_ string, _ ...any) {} +func (pebbleLogger) Fatalf(_ string, _ ...any) {} +func (pebbleLogger) Eventf(_ context.Context, _ string, _ ...any) {} +func (pebbleLogger) IsTracingEnabled(_ context.Context) bool { return false }