databroker: add support for syncing by type (#3412)

* databroker: add support for syncing by type

* add type url, fix query
This commit is contained in:
Caleb Doxsey 2022-06-13 09:52:13 -06:00 committed by GitHub
parent 2487e9af20
commit 45a29ea879
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 163 additions and 125 deletions

View file

@ -210,6 +210,7 @@ func (backend *Backend) SetOptions(
// Sync syncs the records.
func (backend *Backend) Sync(
ctx context.Context,
recordType string,
serverVersion, recordVersion uint64,
) (storage.RecordStream, error) {
// the original ctx will be used for the stream, this ctx used for pre-stream calls
@ -224,7 +225,7 @@ func (backend *Backend) Sync(
return nil, storage.ErrInvalidServerVersion
}
return newChangedRecordStream(ctx, backend, recordVersion), nil
return newChangedRecordStream(ctx, backend, recordType, recordVersion), nil
}
// SyncLatest syncs the latest version of each record.

View file

@ -111,11 +111,11 @@ func TestBackend(t *testing.T) {
})
t.Run("changed", func(t *testing.T) {
serverVersion, recordVersion, stream, err := backend.SyncLatest(ctx, "", nil)
serverVersion, recordVersion, stream, err := backend.SyncLatest(ctx, "sync-test", nil)
require.NoError(t, err)
assert.NoError(t, stream.Close())
stream, err = backend.Sync(ctx, serverVersion, recordVersion)
stream, err = backend.Sync(ctx, "", serverVersion, recordVersion)
require.NoError(t, err)
defer stream.Close()

View file

@ -4,6 +4,7 @@ package postgres
import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgconn"
@ -78,27 +79,37 @@ func getLatestRecordVersion(ctx context.Context, q querier) (recordVersion uint6
return recordVersion, err
}
func getNextChangedRecord(ctx context.Context, q querier, afterRecordVersion uint64) (*databroker.Record, error) {
var recordType, recordID string
func getNextChangedRecord(ctx context.Context, q querier, recordType string, afterRecordVersion uint64) (*databroker.Record, error) {
var recordID string
var version uint64
var data pgtype.JSONB
var modifiedAt pgtype.Timestamptz
var deletedAt pgtype.Timestamptz
err := q.QueryRow(ctx, `
query := `
SELECT type, id, version, data, modified_at, deleted_at
FROM `+schemaName+`.`+recordChangesTableName+`
WHERE version > $1
`, afterRecordVersion).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt)
FROM ` + schemaName + `.` + recordChangesTableName + `
WHERE version > $1
`
args := []any{afterRecordVersion}
if recordType != "" {
query += ` AND type = $2`
args = append(args, recordType)
}
query += `
ORDER BY version ASC
LIMIT 1
`
err := q.QueryRow(ctx, query, args...).Scan(&recordType, &recordID, &version, &data, &modifiedAt, &deletedAt)
if isNotFound(err) {
return nil, storage.ErrNotFound
} else if err != nil {
return nil, err
return nil, fmt.Errorf("error querying next changed record: %w", err)
}
var any anypb.Any
err = protojson.Unmarshal(data.Bytes, &any)
if err != nil {
return nil, err
return nil, fmt.Errorf("error unmarshaling changed record data: %w", err)
}
return &databroker.Record{

View file

@ -82,6 +82,7 @@ const watchPollInterval = 30 * time.Second
type changedRecordStream struct {
backend *Backend
recordType string
recordVersion uint64
ctx context.Context
@ -95,10 +96,12 @@ type changedRecordStream struct {
func newChangedRecordStream(
ctx context.Context,
backend *Backend,
recordType string,
recordVersion uint64,
) storage.RecordStream {
stream := &changedRecordStream{
backend: backend,
recordType: recordType,
recordVersion: recordVersion,
ticker: time.NewTicker(watchPollInterval),
changed: backend.onChange.Bind(),
@ -129,6 +132,7 @@ func (stream *changedRecordStream) Next(block bool) bool {
stream.record, stream.err = getNextChangedRecord(
stream.ctx,
pool,
stream.recordType,
stream.recordVersion,
)
if isNotFound(stream.err) {