diff --git a/pkg/grpc/databroker/sync_cache.go b/pkg/synccache/sync_cache.go similarity index 87% rename from pkg/grpc/databroker/sync_cache.go rename to pkg/synccache/sync_cache.go index 40c87d1e7..fb081352a 100644 --- a/pkg/grpc/databroker/sync_cache.go +++ b/pkg/synccache/sync_cache.go @@ -1,4 +1,4 @@ -package databroker +package synccache import ( "context" @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/wrapperspb" + "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/pebbleutil" ) @@ -44,9 +45,9 @@ type SyncCache interface { // Clear deletes all the data for the given record type in the sync cache. Clear(recordType string) error // Records yields the databroker records stored in the cache. - Records(recordType string) iter.Seq2[*Record, error] + Records(recordType string) iter.Seq2[*databroker.Record, error] // Sync syncs the cache with the databroker. - Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error + Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error } type syncCache struct { @@ -57,8 +58,8 @@ type syncCache struct { writeOptions *pebble.WriteOptions } -// NewSyncCache creates a new SyncCache. -func NewSyncCache(db *pebble.DB, prefix []byte) SyncCache { +// New creates a new SyncCache. +func New(db *pebble.DB, prefix []byte) SyncCache { return &syncCache{ db: db, prefix: prefix, @@ -75,7 +76,7 @@ func (c *syncCache) Clear(recordType string) error { return nil } -func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] { +func (c *syncCache) Records(recordType string) iter.Seq2[*databroker.Record, error] { prefix := c.recordPrefix(recordType) iterOptions := new(pebble.IterOptions) if c.iterOptions != nil { @@ -83,7 +84,7 @@ func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] { } iterOptions.LowerBound = prefix iterOptions.UpperBound = pebbleutil.PrefixToUpperBound(prefix) - return func(yield func(*Record, error) bool) { + return func(yield func(*databroker.Record, error) bool) { for record, err := range pebbleutil.Iterate(c.db, iterOptions, pebbleIteratorToRecord) { if err != nil { yield(nil, fmt.Errorf("sync-cache: error iterating over cached records (record-type=%s): %w", recordType, err)) @@ -97,7 +98,7 @@ func (c *syncCache) Records(recordType string) iter.Seq2[*Record, error] { } } -func (c *syncCache) Sync(ctx context.Context, client DataBrokerServiceClient, recordType string) error { +func (c *syncCache) Sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error { serverVersion, recordVersion := wrapperspb.UInt64(0), wrapperspb.UInt64(0) err := errors.Join( c.pebbleGetProto(c.db, c.serverVersionKey(recordType), serverVersion), @@ -134,11 +135,11 @@ func (c *syncCache) serverVersionKey(recordType string) []byte { return append(c.recordTypePrefix(recordType), fieldServerVersion) } -func (c *syncCache) sync(ctx context.Context, client DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error { +func (c *syncCache) sync(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string, serverVersion, recordVersion uint64) error { streamCtx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := client.Sync(streamCtx, &SyncRequest{ + stream, err := client.Sync(streamCtx, &databroker.SyncRequest{ Type: recordType, ServerVersion: serverVersion, RecordVersion: recordVersion, @@ -190,11 +191,11 @@ func (c *syncCache) sync(ctx context.Context, client DataBrokerServiceClient, re return nil } -func (c *syncCache) syncLatest(ctx context.Context, client DataBrokerServiceClient, recordType string) error { +func (c *syncCache) syncLatest(ctx context.Context, client databroker.DataBrokerServiceClient, recordType string) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := client.SyncLatest(ctx, &SyncLatestRequest{ + stream, err := client.SyncLatest(ctx, &databroker.SyncLatestRequest{ Type: recordType, }) if err != nil { @@ -220,13 +221,13 @@ func (c *syncCache) syncLatest(ctx context.Context, client DataBrokerServiceClie } switch res := res.Response.(type) { - case *SyncLatestResponse_Record: + case *databroker.SyncLatestResponse_Record: // add the record err = c.pebbleSetProto(batch, c.recordKey(recordType, res.Record.Id), res.Record) if err != nil { return fmt.Errorf("sync-cache: error saving record to cache (record-type=%s): %w", recordType, err) } - case *SyncLatestResponse_Versions: + case *databroker.SyncLatestResponse_Versions: // update the versions err = errors.Join( c.pebbleSetProto(batch, c.serverVersionKey(recordType), wrapperspb.UInt64(res.Versions.ServerVersion)), @@ -278,13 +279,13 @@ func (c *syncCache) pebbleSetProto(dst pebble.Writer, key []byte, msg proto.Mess return c.pebbleSet(dst, key, value) } -func pebbleIteratorToRecord(it *pebble.Iterator) (*Record, error) { +func pebbleIteratorToRecord(it *pebble.Iterator) (*databroker.Record, error) { value, err := it.ValueAndErr() if err != nil { return nil, err } - record := new(Record) + record := new(databroker.Record) err = unmarshalOptions.Unmarshal(value, record) if err != nil { return nil, err diff --git a/pkg/grpc/databroker/sync_cache_test.go b/pkg/synccache/sync_cache_test.go similarity index 97% rename from pkg/grpc/databroker/sync_cache_test.go rename to pkg/synccache/sync_cache_test.go index 6d0b4283f..875c7f7d1 100644 --- a/pkg/grpc/databroker/sync_cache_test.go +++ b/pkg/synccache/sync_cache_test.go @@ -1,4 +1,4 @@ -package databroker_test +package synccache_test import ( "context" @@ -19,6 +19,7 @@ import ( "github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/pebbleutil" "github.com/pomerium/pomerium/pkg/protoutil" + "github.com/pomerium/pomerium/pkg/synccache" ) func TestSyncCache(t *testing.T) { @@ -50,7 +51,7 @@ func TestSyncCache(t *testing.T) { db := pebbleutil.MustOpenMemory(nil) require.NoError(t, db.Set([]byte("OTHER"), []byte("VALUE"), nil)) - c := databrokerpb.NewSyncCache(db, prefix) + c := synccache.New(db, prefix) assert.NoError(t, c.Sync(ctx, client1, protoutil.GetTypeURL(new(user.User)))) actual := collect(t, c.Records(protoutil.GetTypeURL(new(user.User))))