move SyncCache to a new pkg/synccache package (#5725)

Move SyncCache from pkg/grpc/databroker to a new pkg/synccache package.

This will allow pomerium-cli to avoid the pebble dependency.
This commit is contained in:
Kenneth Jenkins 2025-07-15 15:59:11 -07:00 committed by GitHub
parent c2115dc1e4
commit e5e799a868
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 20 additions and 18 deletions

View file

@ -1,4 +1,4 @@
package databroker package synccache
import ( import (
"context" "context"
@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb" "google.golang.org/protobuf/types/known/wrapperspb"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/pebbleutil" "github.com/pomerium/pomerium/pkg/pebbleutil"
) )
@ -44,9 +45,9 @@ type SyncCache interface {
// Clear deletes all the data for the given record type in the sync cache. // Clear deletes all the data for the given record type in the sync cache.
Clear(recordType string) error Clear(recordType string) error
// Records yields the databroker records stored in the cache. // Records yields the databroker records stored in the cache.
Records(recordType string) iter.Seq2[*Record, error] Records(recordType string) iter.Seq2[*databroker.Record, error]
// Sync syncs the cache with the databroker. // Sync syncs the cache with the databroker.
Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error
} }
type syncCache struct { type syncCache struct {
@ -57,8 +58,8 @@ type syncCache struct {
writeOptions *pebble.WriteOptions writeOptions *pebble.WriteOptions
} }
// NewSyncCache creates a new SyncCache. // New creates a new SyncCache.
func NewSyncCache(db *pebble.DB, prefix []byte) SyncCache { func New(db *pebble.DB, prefix []byte) SyncCache {
return &syncCache{ return &syncCache{
db: db, db: db,
prefix: prefix, prefix: prefix,
@ -75,7 +76,7 @@ func (c *syncCache) Clear(recordType string) error {
return nil return nil
} }
func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] { func (c *syncCache) Records(recordType string) iter.Seq2[*databroker.Record, error] {
prefix := c.recordPrefix(recordType) prefix := c.recordPrefix(recordType)
iterOptions := new(pebble.IterOptions) iterOptions := new(pebble.IterOptions)
if c.iterOptions != nil { if c.iterOptions != nil {
@ -83,7 +84,7 @@ func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] {
} }
iterOptions.LowerBound = prefix iterOptions.LowerBound = prefix
iterOptions.UpperBound = pebbleutil.PrefixToUpperBound(prefix) iterOptions.UpperBound = pebbleutil.PrefixToUpperBound(prefix)
return func(yield func(*Record, error) bool) { return func(yield func(*databroker.Record, error) bool) {
for record, err := range pebbleutil.Iterate(c.db, iterOptions, pebbleIteratorToRecord) { for record, err := range pebbleutil.Iterate(c.db, iterOptions, pebbleIteratorToRecord) {
if err != nil { if err != nil {
yield(nil, fmt.Errorf("sync-cache: error iterating over cached records (record-type=%s): %w", recordType, err)) yield(nil, fmt.Errorf("sync-cache: error iterating over cached records (record-type=%s): %w", recordType, err))
@ -97,7 +98,7 @@ func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] {
} }
} }
func (c *syncCache) Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error { func (c *syncCache) Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error {
serverVersion, recordVersion := wrapperspb.UInt64(0), wrapperspb.UInt64(0) serverVersion, recordVersion := wrapperspb.UInt64(0), wrapperspb.UInt64(0)
err := errors.Join( err := errors.Join(
c.pebbleGetProto(c.db, c.serverVersionKey(recordType), serverVersion), c.pebbleGetProto(c.db, c.serverVersionKey(recordType), serverVersion),
@ -134,11 +135,11 @@ func (c *syncCache) serverVersionKey(recordType string) []byte {
return append(c.recordTypePrefix(recordType), fieldServerVersion) return append(c.recordTypePrefix(recordType), fieldServerVersion)
} }
func (c *syncCache) sync(ctx context.Context, client DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error { func (c *syncCache) sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error {
streamCtx, cancel := context.WithCancel(ctx) streamCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
stream, err := client.Sync(streamCtx, &SyncRequest{ stream, err := client.Sync(streamCtx, &databroker.SyncRequest{
Type: recordType, Type: recordType,
ServerVersion: serverVersion, ServerVersion: serverVersion,
RecordVersion: recordVersion, RecordVersion: recordVersion,
@ -190,11 +191,11 @@ func (c *syncCache) sync(ctx context.Context, client DataBrokerServiceClient, re
return nil return nil
} }
func (c *syncCache) syncLatest(ctx context.Context, client DataBrokerServiceClient, recordType string) error { func (c *syncCache) syncLatest(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
stream, err := client.SyncLatest(ctx, &SyncLatestRequest{ stream, err := client.SyncLatest(ctx, &databroker.SyncLatestRequest{
Type: recordType, Type: recordType,
}) })
if err != nil { if err != nil {
@ -220,13 +221,13 @@ func (c *syncCache) syncLatest(ctx context.Context, client DataBrokerServiceClie
} }
switch res := res.Response.(type) { switch res := res.Response.(type) {
case *SyncLatestResponse_Record: case *databroker.SyncLatestResponse_Record:
// add the record // add the record
err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record) err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record)
if err != nil { if err != nil {
return fmt.Errorf("sync-cache: error saving record to cache (record-type=%s): %w", recordType, err) return fmt.Errorf("sync-cache: error saving record to cache (record-type=%s): %w", recordType, err)
} }
case *SyncLatestResponse_Versions: case *databroker.SyncLatestResponse_Versions:
// update the versions // update the versions
err = errors.Join( err = errors.Join(
c.pebbleSetProto(batch, c.serverVersionKey(recordType), wrapperspb.UInt64(res.Versions.ServerVersion)), c.pebbleSetProto(batch, c.serverVersionKey(recordType), wrapperspb.UInt64(res.Versions.ServerVersion)),
@ -278,13 +279,13 @@ func (c *syncCache) pebbleSetProto(dst pebble.Writer, key []byte, msg proto.Mess
return c.pebbleSet(dst, key, value) return c.pebbleSet(dst, key, value)
} }
func pebbleIteratorToRecord(it *pebble.Iterator) (*Record, error) { func pebbleIteratorToRecord(it *pebble.Iterator) (*databroker.Record, error) {
value, err := it.ValueAndErr() value, err := it.ValueAndErr()
if err != nil { if err != nil {
return nil, err return nil, err
} }
record := new(Record) record := new(databroker.Record)
err = unmarshalOptions.Unmarshal(value, record) err = unmarshalOptions.Unmarshal(value, record)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -1,4 +1,4 @@
package databroker_test package synccache_test
import ( import (
"context" "context"
@ -19,6 +19,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/pebbleutil" "github.com/pomerium/pomerium/pkg/pebbleutil"
"github.com/pomerium/pomerium/pkg/protoutil" "github.com/pomerium/pomerium/pkg/protoutil"
"github.com/pomerium/pomerium/pkg/synccache"
) )
func TestSyncCache(t *testing.T) { func TestSyncCache(t *testing.T) {
@ -50,7 +51,7 @@ func TestSyncCache(t *testing.T) {
db := pebbleutil.MustOpenMemory(nil) db := pebbleutil.MustOpenMemory(nil)
require.NoError(t, db.Set([]byte("OTHER"), []byte("VALUE"), nil)) require.NoError(t, db.Set([]byte("OTHER"), []byte("VALUE"), nil))
c := databrokerpb.NewSyncCache(db, prefix) c := synccache.New(db, prefix)
assert.NoError(t, c.Sync(ctx, client1, protoutil.GetTypeURL(new(user.User)))) assert.NoError(t, c.Sync(ctx, client1, protoutil.GetTypeURL(new(user.User))))
actual := collect(t, c.Records(protoutil.GetTypeURL(new(user.User)))) actual := collect(t, c.Records(protoutil.GetTypeURL(new(user.User))))