diff --git a/internal/databroker/server.go b/internal/databroker/server.go index cd39f001f..0b3542dfa 100644 --- a/internal/databroker/server.go +++ b/internal/databroker/server.go @@ -12,8 +12,6 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/wrapperspb" "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/log" @@ -26,17 +24,11 @@ import ( "github.com/pomerium/pomerium/pkg/storage/redis" ) -const ( - recordTypeServerVersion = "server_version" - serverVersionKey = "version" -) - // Server implements the databroker service using an in memory database. type Server struct { cfg *serverConfig mu sync.RWMutex - version uint64 backend storage.Backend } @@ -47,40 +39,6 @@ func New(options ...ServerOption) *Server { return srv } -func (srv *Server) initVersion(ctx context.Context) { - db, _, err := srv.getBackendLocked() - if err != nil { - log.Error(ctx).Err(err).Msg("failed to init server version") - return - } - - // Get version from storage first. - r, err := db.Get(ctx, recordTypeServerVersion, serverVersionKey) - switch { - case err == nil: - var sv wrapperspb.UInt64Value - if err := r.GetData().UnmarshalTo(&sv); err == nil { - log.Debug(ctx).Uint64("server_version", sv.Value).Msg("got db version from Backend") - srv.version = sv.Value - } - return - case errors.Is(err, storage.ErrNotFound): // no server version, so we'll create a new one - case err != nil: - log.Error(ctx).Err(err).Msg("failed to retrieve server version") - return - } - - srv.version = cryptutil.NewRandomUInt64() - data, _ := anypb.New(wrapperspb.UInt64(srv.version)) - if err := db.Put(context.Background(), &databroker.Record{ - Type: recordTypeServerVersion, - Id: serverVersionKey, - Data: data, - }); err != nil { - log.Warn(ctx).Err(err).Msg("failed to save server version.") - } -} - // UpdateConfig updates the server with the new options. func (srv *Server) UpdateConfig(options ...ServerOption) { srv.mu.Lock() @@ -102,8 +60,6 @@ func (srv *Server) UpdateConfig(options ...ServerOption) { } srv.backend = nil } - - srv.initVersion(ctx) } // Get gets a record from the in-memory list. @@ -116,7 +72,7 @@ func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databr Str("id", req.GetId()). Msg("get") - db, version, err := srv.getBackend() + db, err := srv.getBackend() if err != nil { return nil, err } @@ -130,8 +86,7 @@ func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databr return nil, status.Error(codes.NotFound, "record not found") } return &databroker.GetResponse{ - Record: record, - ServerVersion: version, + Record: record, }, nil } @@ -149,7 +104,7 @@ func (srv *Server) Query(ctx context.Context, req *databroker.QueryRequest) (*da query := strings.ToLower(req.GetQuery()) - db, _, err := srv.getBackend() + db, err := srv.getBackend() if err != nil { return nil, err } @@ -189,15 +144,17 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr Str("id", record.GetId()). Msg("put") - db, version, err := srv.getBackend() + db, err := srv.getBackend() if err != nil { return nil, err } - if err := db.Put(ctx, record); err != nil { + + serverVersion, err := db.Put(ctx, record) + if err != nil { return nil, err } return &databroker.PutResponse{ - ServerVersion: version, + ServerVersion: serverVersion, Record: record, }, nil } @@ -207,7 +164,7 @@ func (srv *Server) SetOptions(ctx context.Context, req *databroker.SetOptionsReq _, span := trace.StartSpan(ctx, "databroker.grpc.SetOptions") defer span.End() - backend, _, err := srv.getBackend() + backend, err := srv.getBackend() if err != nil { return nil, err } @@ -226,29 +183,25 @@ func (srv *Server) SetOptions(ctx context.Context, req *databroker.SetOptionsReq // Sync streams updates for the given record type. func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBrokerService_SyncServer) error { - _, span := trace.StartSpan(stream.Context(), "databroker.grpc.Sync") + ctx := stream.Context() + ctx, span := trace.StartSpan(ctx, "databroker.grpc.Sync") defer span.End() - log.Info(stream.Context()). + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + log.Info(ctx). Str("peer", grpcutil.GetPeerAddr(stream.Context())). Uint64("server_version", req.GetServerVersion()). Uint64("record_version", req.GetRecordVersion()). Msg("sync") - backend, serverVersion, err := srv.getBackend() + backend, err := srv.getBackend() if err != nil { return err } - // reset record version if the server versions don't match - if req.GetServerVersion() != serverVersion { - return status.Errorf(codes.Aborted, "invalid server version, got %d, expected: %d", req.GetServerVersion(), serverVersion) - } - - ctx := stream.Context() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - recordStream, err := backend.Sync(ctx, req.GetRecordVersion()) + recordStream, err := backend.Sync(ctx, req.GetServerVersion(), req.GetRecordVersion()) if err != nil { return err } @@ -256,8 +209,7 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke for recordStream.Next(true) { err = stream.Send(&databroker.SyncResponse{ - ServerVersion: serverVersion, - Record: recordStream.Record(), + Record: recordStream.Record(), }) if err != nil { return err @@ -269,23 +221,24 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke // SyncLatest returns the latest value of every record in the databroker as a stream of records. func (srv *Server) SyncLatest(req *databroker.SyncLatestRequest, stream databroker.DataBrokerService_SyncLatestServer) error { - _, span := trace.StartSpan(stream.Context(), "databroker.grpc.SyncLatest") + ctx := stream.Context() + ctx, span := trace.StartSpan(ctx, "databroker.grpc.SyncLatest") defer span.End() - log.Info(stream.Context()). + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + log.Info(ctx). Str("peer", grpcutil.GetPeerAddr(stream.Context())). Str("type", req.GetType()). Msg("sync latest") - backend, serverVersion, err := srv.getBackend() + backend, err := srv.getBackend() if err != nil { return err } - ctx := stream.Context() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - records, latestRecordVersion, err := backend.GetAll(ctx) + records, versions, err := backend.GetAll(ctx) if err != nil { return err } @@ -306,25 +259,20 @@ func (srv *Server) SyncLatest(req *databroker.SyncLatestRequest, stream databrok // always send the server version last in case there are no records return stream.Send(&databroker.SyncLatestResponse{ Response: &databroker.SyncLatestResponse_Versions{ - Versions: &databroker.Versions{ - ServerVersion: serverVersion, - LatestRecordVersion: latestRecordVersion, - }, + Versions: versions, }, }) } -func (srv *Server) getBackend() (backend storage.Backend, version uint64, err error) { +func (srv *Server) getBackend() (backend storage.Backend, err error) { // double-checked locking: // first try the read lock, then re-try with the write lock, and finally create a new backend if nil srv.mu.RLock() backend = srv.backend - version = srv.version srv.mu.RUnlock() if backend == nil { srv.mu.Lock() backend = srv.backend - version = srv.version var err error if backend == nil { backend, err = srv.newBackendLocked() @@ -332,24 +280,10 @@ func (srv *Server) getBackend() (backend storage.Backend, version uint64, err er } srv.mu.Unlock() if err != nil { - return nil, 0, err + return nil, err } } - return backend, version, nil -} - -func (srv *Server) getBackendLocked() (backend storage.Backend, version uint64, err error) { - backend = srv.backend - version = srv.version - if backend == nil { - var err error - backend, err = srv.newBackendLocked() - srv.backend = backend - if err != nil { - return nil, 0, err - } - } - return backend, version, nil + return backend, nil } func (srv *Server) newBackendLocked() (backend storage.Backend, err error) { diff --git a/internal/databroker/server_test.go b/internal/databroker/server_test.go index 235645941..6f59fdcdb 100644 --- a/internal/databroker/server_test.go +++ b/internal/databroker/server_test.go @@ -16,8 +16,7 @@ import ( func newServer(cfg *serverConfig) *Server { return &Server{ - version: 11, - cfg: cfg, + cfg: cfg, } } diff --git a/pkg/grpc/databroker/databroker.pb.go b/pkg/grpc/databroker/databroker.pb.go index 9c7719a49..069be7386 100644 --- a/pkg/grpc/databroker/databroker.pb.go +++ b/pkg/grpc/databroker/databroker.pb.go @@ -279,8 +279,7 @@ type GetResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` - ServerVersion uint64 `protobuf:"varint,2,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"` + Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` } func (x *GetResponse) Reset() { @@ -322,13 +321,6 @@ func (x *GetResponse) GetRecord() *Record { return nil } -func (x *GetResponse) GetServerVersion() uint64 { - if x != nil { - return x.ServerVersion - } - return 0 -} - type QueryRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -719,8 +711,7 @@ type SyncResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ServerVersion uint64 `protobuf:"varint,1,opt,name=server_version,json=serverVersion,proto3" json:"server_version,omitempty"` - Record *Record `protobuf:"bytes,2,opt,name=record,proto3" json:"record,omitempty"` + Record *Record `protobuf:"bytes,1,opt,name=record,proto3" json:"record,omitempty"` } func (x *SyncResponse) Reset() { @@ -755,13 +746,6 @@ func (*SyncResponse) Descriptor() ([]byte, []int) { return file_databroker_proto_rawDescGZIP(), []int{12} } -func (x *SyncResponse) GetServerVersion() uint64 { - if x != nil { - return x.ServerVersion - } - return 0 -} - func (x *SyncResponse) GetRecord() *Record { if x != nil { return x.Record @@ -932,98 +916,93 @@ var file_databroker_proto_rawDesc = []byte{ 0x61, 0x63, 0x69, 0x74, 0x79, 0x22, 0x30, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x60, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x39, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, - 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x66, 0x0a, 0x0c, 0x51, 0x75, 0x65, - 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, - 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, - 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, - 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, - 0x74, 0x22, 0x5e, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, - 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, - 0x74, 0x22, 0x38, 0x0a, 0x0a, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, - 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x60, 0x0a, 0x0b, 0x50, - 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, - 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, - 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x56, 0x0a, - 0x11, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x72, 0x64, 0x22, 0x66, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x2d, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, - 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, 0x0a, 0x12, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x07, 0x6f, - 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x5b, 0x0a, 0x0b, 0x53, 0x79, - 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, - 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, - 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, - 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x61, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2a, - 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, - 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, - 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x27, 0x0a, 0x11, 0x53, 0x79, - 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, - 0x79, 0x70, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x12, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, - 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x06, 0x72, 0x65, - 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x48, 0x00, - 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x32, 0x0a, 0x08, 0x76, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, - 0x48, 0x00, 0x52, 0x08, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x0a, 0x0a, 0x08, - 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x9a, 0x03, 0x0a, 0x11, 0x44, 0x61, 0x74, - 0x61, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x36, - 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, - 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x16, 0x2e, - 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, - 0x65, 0x72, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, - 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, - 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, 0x0a, 0x0a, - 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1d, 0x2e, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x61, 0x74, 0x61, - 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x04, 0x53, 0x79, 0x6e, - 0x63, 0x12, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, - 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x64, 0x61, 0x74, - 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x4d, 0x0a, 0x0a, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, - 0x74, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, - 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, - 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, - 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, 0x66, + 0x66, 0x73, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x03, 0x52, 0x05, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x22, 0x5e, 0x0a, 0x0d, 0x51, 0x75, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x07, 0x72, + 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x52, 0x07, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x6f, 0x74, + 0x61, 0x6c, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, + 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x38, 0x0a, 0x0a, 0x50, 0x75, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, + 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, + 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x22, 0x60, 0x0a, 0x0b, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, + 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, + 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x56, 0x0a, 0x11, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, + 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, + 0x2d, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x13, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x4f, 0x70, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, + 0x0a, 0x12, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2d, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x22, 0x5b, 0x0a, 0x0b, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x22, 0x3a, 0x0a, 0x0c, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x12, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, + 0x63, 0x6f, 0x72, 0x64, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x22, 0x27, 0x0a, 0x11, + 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x82, 0x01, 0x0a, 0x12, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, + 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2c, 0x0a, 0x06, + 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, + 0x48, 0x00, 0x52, 0x06, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x32, 0x0a, 0x08, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x73, 0x48, 0x00, 0x52, 0x08, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x0a, + 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x9a, 0x03, 0x0a, 0x11, 0x44, + 0x61, 0x74, 0x61, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x47, 0x65, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, + 0x16, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x3c, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x18, 0x2e, 0x64, 0x61, 0x74, 0x61, + 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4b, + 0x0a, 0x0a, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1d, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x61, + 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x65, 0x74, 0x4f, 0x70, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x04, 0x53, + 0x79, 0x6e, 0x63, 0x12, 0x17, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, + 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x64, + 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x4d, 0x0a, 0x0a, 0x53, 0x79, 0x6e, 0x63, + 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, + 0x6b, 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x2e, 0x53, 0x79, 0x6e, 0x63, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x32, 0x5a, 0x30, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, + 0x6f, 0x6d, 0x65, 0x72, 0x69, 0x75, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x67, 0x72, 0x70, 0x63, + 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/grpc/databroker/databroker.proto b/pkg/grpc/databroker/databroker.proto index 34f2ff125..9e2db884e 100644 --- a/pkg/grpc/databroker/databroker.proto +++ b/pkg/grpc/databroker/databroker.proto @@ -33,7 +33,6 @@ message GetRequest { } message GetResponse { Record record = 1; - uint64 server_version = 2; } message QueryRequest { @@ -66,8 +65,7 @@ message SyncRequest { uint64 record_version = 2; } message SyncResponse { - uint64 server_version = 1; - Record record = 2; + Record record = 1; } message SyncLatestRequest { string type = 1; } diff --git a/pkg/grpc/databroker/syncer_test.go b/pkg/grpc/databroker/syncer_test.go index e6ce522ad..7e51561e5 100644 --- a/pkg/grpc/databroker/syncer_test.go +++ b/pkg/grpc/databroker/syncer_test.go @@ -75,12 +75,10 @@ func TestSyncer(t *testing.T) { return status.Error(codes.Aborted, "ABORTED") case 3: _ = server.Send(&SyncResponse{ - ServerVersion: 2001, - Record: r3, + Record: r3, }) _ = server.Send(&SyncResponse{ - ServerVersion: 2001, - Record: r5, + Record: r5, }) case 4: select {} // block forever diff --git a/pkg/storage/encrypted.go b/pkg/storage/encrypted.go index e81411736..61ea829c5 100644 --- a/pkg/storage/encrypted.go +++ b/pkg/storage/encrypted.go @@ -79,49 +79,50 @@ func (e *encryptedBackend) Get(ctx context.Context, recordType, id string) (*dat return record, nil } -func (e *encryptedBackend) GetAll(ctx context.Context) ([]*databroker.Record, uint64, error) { - records, version, err := e.underlying.GetAll(ctx) +func (e *encryptedBackend) GetAll(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) { + records, versions, err := e.underlying.GetAll(ctx) if err != nil { - return nil, 0, err + return nil, versions, err } for i := range records { records[i], err = e.decryptRecord(records[i]) if err != nil { - return nil, 0, err + return nil, versions, err } } - return records, version, nil + return records, versions, nil } func (e *encryptedBackend) GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) { return e.underlying.GetOptions(ctx, recordType) } -func (e *encryptedBackend) Put(ctx context.Context, record *databroker.Record) error { +func (e *encryptedBackend) Put(ctx context.Context, record *databroker.Record) (uint64, error) { encrypted, err := e.encrypt(record.GetData()) if err != nil { - return err + return 0, err } newRecord := proto.Clone(record).(*databroker.Record) newRecord.Data = encrypted - if err = e.underlying.Put(ctx, newRecord); err != nil { - return err + serverVersion, err := e.underlying.Put(ctx, newRecord) + if err != nil { + return 0, err } record.ModifiedAt = newRecord.ModifiedAt record.Version = newRecord.Version - return nil + return serverVersion, nil } func (e *encryptedBackend) SetOptions(ctx context.Context, recordType string, options *databroker.Options) error { return e.underlying.SetOptions(ctx, recordType, options) } -func (e *encryptedBackend) Sync(ctx context.Context, version uint64) (RecordStream, error) { - stream, err := e.underlying.Sync(ctx, version) +func (e *encryptedBackend) Sync(ctx context.Context, serverVersion, recordVersion uint64) (RecordStream, error) { + stream, err := e.underlying.Sync(ctx, serverVersion, recordVersion) if err != nil { return nil, err } diff --git a/pkg/storage/encrypted_test.go b/pkg/storage/encrypted_test.go index cfa5bf3b5..2cac64279 100644 --- a/pkg/storage/encrypted_test.go +++ b/pkg/storage/encrypted_test.go @@ -19,11 +19,11 @@ func TestEncryptedBackend(t *testing.T) { m := map[string]*anypb.Any{} backend := &mockBackend{ - put: func(ctx context.Context, record *databroker.Record) error { + put: func(ctx context.Context, record *databroker.Record) (uint64, error) { record.ModifiedAt = timestamppb.Now() record.Version++ m[record.GetId()] = record.GetData() - return nil + return 0, nil }, get: func(ctx context.Context, recordType, id string) (*databroker.Record, error) { data, ok := m[id] @@ -37,7 +37,7 @@ func TestEncryptedBackend(t *testing.T) { ModifiedAt: timestamppb.Now(), }, nil }, - getAll: func(ctx context.Context) ([]*databroker.Record, uint64, error) { + getAll: func(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) { var records []*databroker.Record for id, data := range m { records = append(records, &databroker.Record{ @@ -47,7 +47,7 @@ func TestEncryptedBackend(t *testing.T) { ModifiedAt: timestamppb.Now(), }) } - return records, 0, nil + return records, &databroker.Versions{}, nil }, } @@ -63,7 +63,7 @@ func TestEncryptedBackend(t *testing.T) { Id: "TEST-1", Data: any, } - err = e.Put(ctx, rec) + _, err = e.Put(ctx, rec) if !assert.NoError(t, err) { return } diff --git a/pkg/storage/inmemory/backend.go b/pkg/storage/inmemory/backend.go index 5583b528c..3e2163a12 100644 --- a/pkg/storage/inmemory/backend.go +++ b/pkg/storage/inmemory/backend.go @@ -15,6 +15,7 @@ import ( "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/signal" + "github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) @@ -34,8 +35,9 @@ func (change recordChange) Less(item btree.Item) bool { // A Backend stores data in-memory. type Backend struct { - cfg *config - onChange *signal.Signal + cfg *config + onChange *signal.Signal + serverVersion uint64 lastVersion uint64 closeOnce sync.Once @@ -51,12 +53,13 @@ type Backend struct { func New(options ...Option) *Backend { cfg := getConfig(options...) backend := &Backend{ - cfg: cfg, - onChange: signal.New(), - closed: make(chan struct{}), - lookup: make(map[string]*RecordCollection), - capacity: map[string]*uint64{}, - changes: btree.New(cfg.degree), + cfg: cfg, + onChange: signal.New(), + serverVersion: cryptutil.NewRandomUInt64(), + closed: make(chan struct{}), + lookup: make(map[string]*RecordCollection), + capacity: map[string]*uint64{}, + changes: btree.New(cfg.degree), } if cfg.expiry != 0 { go func() { @@ -133,7 +136,7 @@ func (backend *Backend) Get(_ context.Context, recordType, id string) (*databrok } // GetAll gets all the records from the in-memory store. -func (backend *Backend) GetAll(_ context.Context) ([]*databroker.Record, uint64, error) { +func (backend *Backend) GetAll(_ context.Context) ([]*databroker.Record, *databroker.Versions, error) { backend.mu.RLock() defer backend.mu.RUnlock() @@ -143,7 +146,10 @@ func (backend *Backend) GetAll(_ context.Context) ([]*databroker.Record, uint64, all = append(all, dup(r)) } } - return all, backend.lastVersion, nil + return all, &databroker.Versions{ + ServerVersion: backend.serverVersion, + LatestRecordVersion: backend.lastVersion, + }, nil } // GetOptions returns the options for a type in the in-memory store. @@ -160,9 +166,9 @@ func (backend *Backend) GetOptions(_ context.Context, recordType string) (*datab } // Put puts a record into the in-memory store. -func (backend *Backend) Put(ctx context.Context, record *databroker.Record) error { +func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) { if record == nil { - return fmt.Errorf("records cannot be nil") + return backend.serverVersion, fmt.Errorf("records cannot be nil") } ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { @@ -191,7 +197,7 @@ func (backend *Backend) Put(ctx context.Context, record *databroker.Record) erro backend.enforceCapacity(record.GetType()) - return nil + return backend.serverVersion, nil } // SetOptions sets the options for a type in the in-memory store. @@ -209,9 +215,12 @@ func (backend *Backend) SetOptions(_ context.Context, recordType string, options return nil } -// Sync returns a record stream for any changes after version. -func (backend *Backend) Sync(ctx context.Context, version uint64) (storage.RecordStream, error) { - return newRecordStream(ctx, backend, version), nil +// Sync returns a record stream for any changes after recordVersion. +func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion uint64) (storage.RecordStream, error) { + if serverVersion != backend.serverVersion { + return nil, storage.ErrInvalidServerVersion + } + return newRecordStream(ctx, backend, recordVersion), nil } func (backend *Backend) recordChange(record *databroker.Record) { diff --git a/pkg/storage/inmemory/backend_test.go b/pkg/storage/inmemory/backend_test.go index 0720ee3e2..5ed2f3fb8 100644 --- a/pkg/storage/inmemory/backend_test.go +++ b/pkg/storage/inmemory/backend_test.go @@ -27,11 +27,13 @@ func TestBackend(t *testing.T) { }) t.Run("get record", func(t *testing.T) { data := new(anypb.Any) - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: "abcd", Data: data, - })) + }) + assert.NoError(t, err) + assert.Equal(t, backend.serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") require.NoError(t, err) if assert.NotNil(t, record) { @@ -44,26 +46,30 @@ func TestBackend(t *testing.T) { } }) t.Run("delete record", func(t *testing.T) { - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: "abcd", DeletedAt: timestamppb.Now(), - })) + }) + assert.NoError(t, err) + assert.Equal(t, backend.serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") assert.Error(t, err) assert.Nil(t, record) }) t.Run("get all records", func(t *testing.T) { for i := 0; i < 1000; i++ { - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - })) + }) + assert.NoError(t, err) + assert.Equal(t, backend.serverVersion, sv) } - records, version, err := backend.GetAll(ctx) + records, versions, err := backend.GetAll(ctx) assert.NoError(t, err) assert.Len(t, records, 1000) - assert.Equal(t, uint64(1002), version) + assert.Equal(t, uint64(1002), versions.LatestRecordVersion) }) } @@ -73,12 +79,14 @@ func TestExpiry(t *testing.T) { defer func() { _ = backend.Close() }() for i := 0; i < 1000; i++ { - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - })) + }) + assert.NoError(t, err) + assert.Equal(t, backend.serverVersion, sv) } - stream, err := backend.Sync(ctx, 0) + stream, err := backend.Sync(ctx, backend.serverVersion, 0) require.NoError(t, err) var records []*databroker.Record for stream.Next(false) { @@ -89,7 +97,7 @@ func TestExpiry(t *testing.T) { backend.removeChangesBefore(time.Now().Add(time.Second)) - stream, err = backend.Sync(ctx, 0) + stream, err = backend.Sync(ctx, backend.serverVersion, 0) require.NoError(t, err) records = nil for stream.Next(false) { @@ -112,7 +120,7 @@ func TestConcurrency(t *testing.T) { }) eg.Go(func() error { for i := 0; i < 1000; i++ { - _ = backend.Put(ctx, &databroker.Record{ + _, _ = backend.Put(ctx, &databroker.Record{ Id: fmt.Sprint(i), }) } @@ -126,7 +134,7 @@ func TestStream(t *testing.T) { backend := New() defer func() { _ = backend.Close() }() - stream, err := backend.Sync(ctx, 0) + stream, err := backend.Sync(ctx, backend.serverVersion, 0) require.NoError(t, err) defer func() { _ = stream.Close() }() @@ -142,10 +150,11 @@ func TestStream(t *testing.T) { }) eg.Go(func() error { for i := 0; i < 10000; i++ { - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + _, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - })) + }) + assert.NoError(t, err) } return nil }) @@ -163,7 +172,7 @@ func TestCapacity(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - err = backend.Put(ctx, &databroker.Record{ + _, err = backend.Put(ctx, &databroker.Record{ Type: "EXAMPLE", Id: fmt.Sprint(i), }) diff --git a/pkg/storage/inmemory/stream.go b/pkg/storage/inmemory/stream.go index 3693cd380..7fd451558 100644 --- a/pkg/storage/inmemory/stream.go +++ b/pkg/storage/inmemory/stream.go @@ -12,21 +12,21 @@ type recordStream struct { ctx context.Context backend *Backend - changed chan context.Context - ready []*databroker.Record - version uint64 + changed chan context.Context + ready []*databroker.Record + recordVersion uint64 closeOnce sync.Once closed chan struct{} } -func newRecordStream(ctx context.Context, backend *Backend, version uint64) *recordStream { +func newRecordStream(ctx context.Context, backend *Backend, recordVersion uint64) *recordStream { stream := &recordStream{ ctx: ctx, backend: backend, - changed: backend.onChange.Bind(), - version: version, + changed: backend.onChange.Bind(), + recordVersion: recordVersion, closed: make(chan struct{}), } @@ -42,11 +42,11 @@ func newRecordStream(ctx context.Context, backend *Backend, version uint64) *rec } func (stream *recordStream) fill() { - stream.ready = stream.backend.getSince(stream.version) + stream.ready = stream.backend.getSince(stream.recordVersion) if len(stream.ready) > 0 { // records are sorted by version, // so update the local version to the last record - stream.version = stream.ready[len(stream.ready)-1].GetVersion() + stream.recordVersion = stream.ready[len(stream.ready)-1].GetVersion() } } diff --git a/pkg/storage/redis/redis.go b/pkg/storage/redis/redis.go index 9f824a381..361e11dfc 100644 --- a/pkg/storage/redis/redis.go +++ b/pkg/storage/redis/redis.go @@ -17,6 +17,7 @@ import ( "github.com/pomerium/pomerium/internal/signal" "github.com/pomerium/pomerium/internal/telemetry/metrics" "github.com/pomerium/pomerium/internal/telemetry/trace" + "github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/storage" ) @@ -27,6 +28,7 @@ const ( // we rely on transactions in redis, so all redis-cluster keys need to be // on the same node. Using a `hash tag` gives us this capability. + serverVersionKey = "{pomerium_v3}.server_version" lastVersionKey = "{pomerium_v3}.last_version" lastVersionChKey = "{pomerium_v3}.last_version_ch" recordHashKey = "{pomerium_v3}.records" @@ -45,10 +47,10 @@ var ( // // What's stored: // -// - last_version: an integer version number -// - last_version_ch: a PubSub channel for version number updates +// - last_version: an integer recordVersion number +// - last_version_ch: a PubSub channel for recordVersion number updates // - records: a Hash of records. The hash key is {recordType}/{recordID}, the hash value the protobuf record. -// - changes: a Sorted Set of all the changes. The score is the version number, the member the protobuf record. +// - changes: a Sorted Set of all the changes. The score is the recordVersion number, the member the protobuf record. // - options: a Hash of options. The hash key is {recordType}, the hash value the protobuf options. // - changes.{recordType}: a Sorted Set of the changes for a record type. The score is the current time, // the value the record id. @@ -133,29 +135,39 @@ func (backend *Backend) Get(ctx context.Context, recordType, id string) (_ *data } // GetAll gets all the records from redis. -func (backend *Backend) GetAll(ctx context.Context) (records []*databroker.Record, latestRecordVersion uint64, err error) { - _, span := trace.StartSpan(ctx, "databroker.redis.GetAll") +func (backend *Backend) GetAll(ctx context.Context) (records []*databroker.Record, versions *databroker.Versions, err error) { + ctx, span := trace.StartSpan(ctx, "databroker.redis.GetAll") defer span.End() defer func(start time.Time) { recordOperation(ctx, start, "getall", err) }(time.Now()) + versions = new(databroker.Versions) + + versions.ServerVersion, err = backend.getOrCreateServerVersion(ctx) + if err != nil { + return nil, nil, err + } + p := backend.client.Pipeline() lastVersionCmd := p.Get(ctx, lastVersionKey) resultsCmd := p.HVals(ctx, recordHashKey) _, err = p.Exec(ctx) - if err != nil { - return nil, 0, err - } - - latestRecordVersion, err = lastVersionCmd.Uint64() if errors.Is(err, redis.Nil) { - latestRecordVersion = 0 + // nil is returned when there are no records + return nil, versions, nil } else if err != nil { - return nil, 0, err + return nil, nil, fmt.Errorf("redis: error beginning GetAll pipeline: %w", err) } - results, err := resultsCmd.Result() + versions.LatestRecordVersion, err = lastVersionCmd.Uint64() + if errors.Is(err, redis.Nil) { + } else if err != nil { + return nil, nil, fmt.Errorf("redis: error retrieving GetAll latest record version: %w", err) + } + + var results []string + results, err = resultsCmd.Result() if err != nil { - return nil, 0, err + return nil, nil, fmt.Errorf("redis: error retrieving GetAll records: %w", err) } for _, result := range results { @@ -167,7 +179,7 @@ func (backend *Backend) GetAll(ctx context.Context) (records []*databroker.Recor } records = append(records, &record) } - return records, latestRecordVersion, nil + return records, versions, nil } // GetOptions gets the options for the given record type. @@ -190,22 +202,27 @@ func (backend *Backend) GetOptions(ctx context.Context, recordType string) (*dat } // Put puts a record into redis. -func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (err error) { +func (backend *Backend) Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) { ctx, span := trace.StartSpan(ctx, "databroker.redis.Put") defer span.End() defer func(start time.Time) { recordOperation(ctx, start, "put", err) }(time.Now()) + serverVersion, err = backend.getOrCreateServerVersion(ctx) + if err != nil { + return serverVersion, err + } + err = backend.put(ctx, record) if err != nil { - return err + return serverVersion, err } err = backend.enforceOptions(ctx, record.GetType()) if err != nil { - return err + return serverVersion, err } - return nil + return serverVersion, nil } // SetOptions sets the options for the given record type. @@ -233,9 +250,9 @@ func (backend *Backend) SetOptions(ctx context.Context, recordType string, optio return nil } -// Sync returns a record stream of any records changed after the specified version. -func (backend *Backend) Sync(ctx context.Context, version uint64) (storage.RecordStream, error) { - return newRecordStream(ctx, backend, version), nil +// Sync returns a record stream of any records changed after the specified recordVersion. +func (backend *Backend) Sync(ctx context.Context, serverVersion, recordVersion uint64) (storage.RecordStream, error) { + return newRecordStream(ctx, backend, serverVersion, recordVersion), nil } func (backend *Backend) put(ctx context.Context, record *databroker.Record) error { @@ -328,11 +345,11 @@ func (backend *Backend) enforceOptions(ctx context.Context, recordType string) e return nil } -// incrementVersion increments the last version key, runs the code in `query`, then attempts to commit the code in -// `commit`. If the last version changes in the interim, we will retry the transaction. +// incrementVersion increments the last recordVersion key, runs the code in `query`, then attempts to commit the code in +// `commit`. If the last recordVersion changes in the interim, we will retry the transaction. func (backend *Backend) incrementVersion(ctx context.Context, - query func(tx *redis.Tx, version uint64) error, - commit func(p redis.Pipeliner, version uint64) error, + query func(tx *redis.Tx, recordVersion uint64) error, + commit func(p redis.Pipeliner, recordVersion uint64) error, ) error { // code is modeled on https://pkg.go.dev/github.com/go-redis/redis/v8#example-Client.Watch txf := func(tx *redis.Tx) error { @@ -457,6 +474,20 @@ func (backend *Backend) removeChangesBefore(ctx context.Context, cutoff time.Tim } } +func (backend *Backend) getOrCreateServerVersion(ctx context.Context) (serverVersion uint64, err error) { + serverVersion, err = backend.client.Get(ctx, serverVersionKey).Uint64() + // if the server version hasn't been set yet, set it to a random value and immediately retrieve it + // this should properly handle a data race by only setting the key if it doesn't already exist + if errors.Is(err, redis.Nil) { + _, _ = backend.client.SetNX(ctx, serverVersionKey, cryptutil.NewRandomUInt64(), 0).Result() + serverVersion, err = backend.client.Get(ctx, serverVersionKey).Uint64() + } + if err != nil { + return 0, fmt.Errorf("redis: error retrieving server version: %w", err) + } + return serverVersion, err +} + func getRecordTypeChangesKey(recordType string) string { return fmt.Sprintf(recordTypeChangesKeyTpl, recordType) } diff --git a/pkg/storage/redis/redis_test.go b/pkg/storage/redis/redis_test.go index 41264adec..f06606507 100644 --- a/pkg/storage/redis/redis_test.go +++ b/pkg/storage/redis/redis_test.go @@ -32,6 +32,10 @@ func TestBackend(t *testing.T) { backend, err := New(rawURL, opts...) require.NoError(t, err) defer func() { _ = backend.Close() }() + + serverVersion, err := backend.getOrCreateServerVersion(ctx) + require.NoError(t, err) + t.Run("get missing record", func(t *testing.T) { record, err := backend.Get(ctx, "TYPE", "abcd") require.Error(t, err) @@ -39,11 +43,13 @@ func TestBackend(t *testing.T) { }) t.Run("get record", func(t *testing.T) { data := new(anypb.Any) - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: "abcd", Data: data, - })) + }) + assert.NoError(t, err) + assert.Equal(t, serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") require.NoError(t, err) if assert.NotNil(t, record) { @@ -56,26 +62,30 @@ func TestBackend(t *testing.T) { } }) t.Run("delete record", func(t *testing.T) { - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: "abcd", DeletedAt: timestamppb.Now(), - })) + }) + assert.NoError(t, err) + assert.Equal(t, serverVersion, sv) record, err := backend.Get(ctx, "TYPE", "abcd") assert.Error(t, err) assert.Nil(t, record) }) t.Run("get all records", func(t *testing.T) { for i := 0; i < 1000; i++ { - require.NoError(t, backend.Put(ctx, &databroker.Record{ + sv, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - })) + }) + assert.NoError(t, err) + assert.Equal(t, serverVersion, sv) } - records, version, err := backend.GetAll(ctx) + records, versions, err := backend.GetAll(ctx) assert.NoError(t, err) assert.Len(t, records, 1000) - assert.Equal(t, uint64(1002), version) + assert.Equal(t, uint64(1002), versions.LatestRecordVersion) }) return nil } @@ -134,7 +144,7 @@ func TestChangeSignal(t *testing.T) { ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() for { - _ = backend2.Put(ctx, &databroker.Record{ + _, _ = backend2.Put(ctx, &databroker.Record{ Type: "TYPE", Id: "ID", }) @@ -167,13 +177,17 @@ func TestExpiry(t *testing.T) { require.NoError(t, err) defer func() { _ = backend.Close() }() + serverVersion, err := backend.getOrCreateServerVersion(ctx) + require.NoError(t, err) + for i := 0; i < 1000; i++ { - assert.NoError(t, backend.Put(ctx, &databroker.Record{ + _, err := backend.Put(ctx, &databroker.Record{ Type: "TYPE", Id: fmt.Sprint(i), - })) + }) + assert.NoError(t, err) } - stream, err := backend.Sync(ctx, 0) + stream, err := backend.Sync(ctx, serverVersion, 0) require.NoError(t, err) var records []*databroker.Record for stream.Next(false) { @@ -184,7 +198,7 @@ func TestExpiry(t *testing.T) { backend.removeChangesBefore(ctx, time.Now().Add(time.Second)) - stream, err = backend.Sync(ctx, 0) + stream, err = backend.Sync(ctx, serverVersion, 0) require.NoError(t, err) records = nil for stream.Next(false) { @@ -214,7 +228,7 @@ func TestCapacity(t *testing.T) { require.NoError(t, err) for i := 0; i < 10; i++ { - err = backend.Put(ctx, &databroker.Record{ + _, err = backend.Put(ctx, &databroker.Record{ Type: "EXAMPLE", Id: fmt.Sprint(i), }) diff --git a/pkg/storage/redis/stream.go b/pkg/storage/redis/stream.go index 5cc303614..3f766af2a 100644 --- a/pkg/storage/redis/stream.go +++ b/pkg/storage/redis/stream.go @@ -11,28 +11,31 @@ import ( "github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/storage" ) type recordStream struct { ctx context.Context backend *Backend - changed chan context.Context - version uint64 - record *databroker.Record - err error + changed chan context.Context + serverVersion uint64 + recordVersion uint64 + record *databroker.Record + err error closeOnce sync.Once closed chan struct{} } -func newRecordStream(ctx context.Context, backend *Backend, version uint64) *recordStream { +func newRecordStream(ctx context.Context, backend *Backend, serverVersion, recordVersion uint64) *recordStream { stream := &recordStream{ ctx: ctx, backend: backend, - changed: backend.onChange.Bind(), - version: version, + changed: backend.onChange.Bind(), + serverVersion: serverVersion, + recordVersion: recordVersion, closed: make(chan struct{}), } @@ -65,8 +68,18 @@ func (stream *recordStream) Next(block bool) bool { changeCtx := context.Background() for { + serverVersion, err := stream.backend.getOrCreateServerVersion(stream.ctx) + if err != nil { + stream.err = err + return false + } + if stream.serverVersion != serverVersion { + stream.err = storage.ErrInvalidServerVersion + return false + } + cmd := stream.backend.client.ZRangeByScore(stream.ctx, changesSetKey, &redis.ZRangeBy{ - Min: fmt.Sprintf("(%d", stream.version), + Min: fmt.Sprintf("(%d", stream.recordVersion), Max: "+inf", Offset: 0, Count: 1, @@ -86,7 +99,7 @@ func (stream *recordStream) Next(block bool) bool { } else { stream.record = &record } - stream.version++ + stream.recordVersion++ return true } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index e9a361ad1..073da6649 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -6,6 +6,8 @@ import ( "errors" "strings" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/types/known/anypb" @@ -15,8 +17,9 @@ import ( // Errors var ( - ErrNotFound = errors.New("record not found") - ErrStreamClosed = errors.New("record stream closed") + ErrNotFound = errors.New("record not found") + ErrStreamClosed = errors.New("record stream closed") + ErrInvalidServerVersion = status.Error(codes.Aborted, "invalid server version") ) // A RecordStream is a stream of records. @@ -41,15 +44,15 @@ type Backend interface { // Get is used to retrieve a record. Get(ctx context.Context, recordType, id string) (*databroker.Record, error) // GetAll gets all the records. - GetAll(ctx context.Context) (records []*databroker.Record, version uint64, err error) + GetAll(ctx context.Context) (records []*databroker.Record, version *databroker.Versions, err error) // GetOptions gets the options for a type. GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) // Put is used to insert or update a record. - Put(ctx context.Context, record *databroker.Record) error + Put(ctx context.Context, record *databroker.Record) (serverVersion uint64, err error) // SetOptions sets the options for a type. SetOptions(ctx context.Context, recordType string, options *databroker.Options) error // Sync syncs record changes after the specified version. - Sync(ctx context.Context, version uint64) (RecordStream, error) + Sync(ctx context.Context, serverVersion, recordVersion uint64) (RecordStream, error) } // MatchAny searches any data with a query. diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 7315a9930..212988a24 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -13,16 +13,16 @@ import ( type mockBackend struct { Backend - put func(ctx context.Context, record *databroker.Record) error + put func(ctx context.Context, record *databroker.Record) (uint64, error) get func(ctx context.Context, recordType, id string) (*databroker.Record, error) - getAll func(ctx context.Context) ([]*databroker.Record, uint64, error) + getAll func(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) } func (m *mockBackend) Close() error { return nil } -func (m *mockBackend) Put(ctx context.Context, record *databroker.Record) error { +func (m *mockBackend) Put(ctx context.Context, record *databroker.Record) (uint64, error) { return m.put(ctx, record) } @@ -30,14 +30,10 @@ func (m *mockBackend) Get(ctx context.Context, recordType, id string) (*databrok return m.get(ctx, recordType, id) } -func (m *mockBackend) GetAll(ctx context.Context) ([]*databroker.Record, uint64, error) { +func (m *mockBackend) GetAll(ctx context.Context) ([]*databroker.Record, *databroker.Versions, error) { return m.getAll(ctx) } -func (m *mockBackend) Sync(ctx context.Context, version uint64) (RecordStream, error) { - panic("implement me") -} - func TestMatchAny(t *testing.T) { u := &user.User{Id: "id", Name: "name", Email: "email"} data, _ := anypb.New(u)