databroker: add list types method (#3937)

* databroker: add list types method

* fix test

* Update pkg/storage/redis/redis.go

Co-authored-by: Denis Mishin <dmishin@pomerium.com>

---------

Co-authored-by: Denis Mishin <dmishin@pomerium.com>
This commit is contained in:
Caleb Doxsey 2023-02-03 13:16:28 -07:00 committed by GitHub
parent 424b743b11
commit 7895bf431f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 495 additions and 232 deletions

View file

@ -181,6 +181,19 @@ func (backend *Backend) Lease(
return leaseHolderID == leaseID, nil
}
// ListTypes lists the record types.
func (backend *Backend) ListTypes(ctx context.Context) ([]string, error) {
ctx, cancel := contextutil.Merge(ctx, backend.closeCtx)
defer cancel()
_, conn, err := backend.init(ctx)
if err != nil {
return nil, err
}
return listTypes(ctx, conn)
}
// Put puts a record into Postgres.
func (backend *Backend) Put(
ctx context.Context,

View file

@ -182,6 +182,12 @@ func TestBackend(t *testing.T) {
}
})
t.Run("list types", func(t *testing.T) {
types, err := backend.ListTypes(ctx)
assert.NoError(t, err)
assert.Equal(t, []string{"capacity-test", "latest-test", "sync-test", "test-1", "unknown"}, types)
})
return nil
}))
}

View file

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"
@ -284,6 +285,36 @@ func listServices(ctx context.Context, q querier) ([]*registry.Service, error) {
return services, nil
}
func listTypes(ctx context.Context, q querier) ([]string, error) {
query := `
SELECT DISTINCT type
FROM ` + schemaName + `.` + recordsTableName + `
`
rows, err := q.Query(ctx, query)
if err != nil {
return nil, fmt.Errorf("postgres: failed to execute query: %w", err)
}
defer rows.Close()
var types []string
for rows.Next() {
var recordType string
err = rows.Scan(&recordType)
if err != nil {
return nil, fmt.Errorf("postgres: failed to scan row: %w", err)
}
types = append(types, recordType)
}
err = rows.Err()
if err != nil {
return nil, fmt.Errorf("postgres: error iterating over rows: %w", err)
}
sort.Strings(types)
return types, nil
}
func maybeAcquireLease(ctx context.Context, q querier, leaseName, leaseID string, ttl time.Duration) (leaseHolderID string, err error) {
tbl := schemaName + "." + leasesTableName
expiresAt := timestamptzFromTimestamppb(timestamppb.New(time.Now().Add(ttl)))