mirror of
https://github.com/pomerium/pomerium.git
synced 2025-06-29 16:08:13 +02:00
wip
This commit is contained in:
parent
dad954ae16
commit
5b717c9aa5
15 changed files with 746 additions and 9 deletions
|
@ -50,6 +50,10 @@ type Config struct {
|
||||||
|
|
||||||
// MetricsScrapeEndpoints additional metrics endpoints to scrape and provide part of metrics
|
// MetricsScrapeEndpoints additional metrics endpoints to scrape and provide part of metrics
|
||||||
MetricsScrapeEndpoints []MetricsScrapeEndpoint
|
MetricsScrapeEndpoints []MetricsScrapeEndpoint
|
||||||
|
|
||||||
|
// Zero-specific configuration options.
|
||||||
|
ZeroClusterID string
|
||||||
|
ZeroOrganizationID string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clone creates a clone of the config.
|
// Clone creates a clone of the config.
|
||||||
|
|
44
internal/testutil/grpc.go
Normal file
44
internal/testutil/grpc.go
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
package testutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"google.golang.org/grpc/test/bufconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewGRPCServer starts a gRPC server and returns a client connection to it.
|
||||||
|
func NewGRPCServer(t testing.TB, register func(s *grpc.Server)) *grpc.ClientConn {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
li := bufconn.Listen(1024 * 1024)
|
||||||
|
s := grpc.NewServer()
|
||||||
|
register(s)
|
||||||
|
go func() {
|
||||||
|
err := s.Serve(li)
|
||||||
|
if errors.Is(err, grpc.ErrServerStopped) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
t.Cleanup(func() {
|
||||||
|
s.Stop()
|
||||||
|
})
|
||||||
|
|
||||||
|
cc, err := grpc.NewClient("passthrough://bufnet",
|
||||||
|
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||||
|
return li.Dial()
|
||||||
|
}),
|
||||||
|
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
cc.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
return cc
|
||||||
|
}
|
|
@ -119,3 +119,16 @@ func (api *API) GetClusterResourceBundles(ctx context.Context) (*cluster_api.Get
|
||||||
func (api *API) GetTelemetryConn() *grpc.ClientConn {
|
func (api *API) GetTelemetryConn() *grpc.ClientConn {
|
||||||
return api.telemetryConn
|
return api.telemetryConn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *API) ReportUsage(ctx context.Context, req cluster_api.ReportUsageRequest) error {
|
||||||
|
res, err := api.cluster.ReportUsageWithResponse(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.StatusCode()/100 != 2 {
|
||||||
|
return fmt.Errorf("unexpected response from ReportUsage: %d", res.StatusCode())
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -55,9 +55,9 @@ func (src *source) OnConfigChange(_ context.Context, l config.ChangeListener) {
|
||||||
func (src *source) UpdateBootstrap(ctx context.Context, cfg cluster_api.BootstrapConfig) bool {
|
func (src *source) UpdateBootstrap(ctx context.Context, cfg cluster_api.BootstrapConfig) bool {
|
||||||
current := src.cfg.Load()
|
current := src.cfg.Load()
|
||||||
incoming := current.Clone()
|
incoming := current.Clone()
|
||||||
applyBootstrapConfig(incoming.Options, &cfg)
|
applyBootstrapConfig(incoming, &cfg)
|
||||||
|
|
||||||
if cmp.Equal(incoming.Options, current.Options, cmpOpts...) {
|
if cmp.Equal(incoming, current, cmpOpts...) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,13 +81,15 @@ func (src *source) notifyListeners(ctx context.Context, cfg *config.Config) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func applyBootstrapConfig(dst *config.Options, src *cluster_api.BootstrapConfig) {
|
func applyBootstrapConfig(dst *config.Config, src *cluster_api.BootstrapConfig) {
|
||||||
dst.SharedKey = base64.StdEncoding.EncodeToString(src.SharedSecret)
|
dst.Options.SharedKey = base64.StdEncoding.EncodeToString(src.SharedSecret)
|
||||||
if src.DatabrokerStorageConnection != nil {
|
if src.DatabrokerStorageConnection != nil {
|
||||||
dst.DataBrokerStorageType = config.StoragePostgresName
|
dst.Options.DataBrokerStorageType = config.StoragePostgresName
|
||||||
dst.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection
|
dst.Options.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection
|
||||||
} else {
|
} else {
|
||||||
dst.DataBrokerStorageType = config.StorageInMemoryName
|
dst.Options.DataBrokerStorageType = config.StorageInMemoryName
|
||||||
dst.DataBrokerStorageConnectionString = ""
|
dst.Options.DataBrokerStorageConnectionString = ""
|
||||||
}
|
}
|
||||||
|
dst.ZeroClusterID = src.ClusterId
|
||||||
|
dst.ZeroOrganizationID = src.OrganizationId
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ func TestSecretWriter(t *testing.T) {
|
||||||
"namespace": "pomerium",
|
"namespace": "pomerium",
|
||||||
},
|
},
|
||||||
"data": map[string]any{
|
"data": map[string]any{
|
||||||
"bootstrap.dat": `{"databrokerStorageConnection":"test","sharedSecret":null}`,
|
"bootstrap.dat": `{"clusterId":"","databrokerStorageConnection":"test","organizationId":"","sharedSecret":null}`,
|
||||||
},
|
},
|
||||||
}, unstructured)
|
}, unstructured)
|
||||||
})
|
})
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/pomerium/pomerium/internal/zero/bootstrap"
|
"github.com/pomerium/pomerium/internal/zero/bootstrap"
|
||||||
"github.com/pomerium/pomerium/internal/zero/bootstrap/writers"
|
"github.com/pomerium/pomerium/internal/zero/bootstrap/writers"
|
||||||
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
|
connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
|
||||||
|
"github.com/pomerium/pomerium/internal/zero/controller/usagereporter"
|
||||||
"github.com/pomerium/pomerium/internal/zero/healthcheck"
|
"github.com/pomerium/pomerium/internal/zero/healthcheck"
|
||||||
"github.com/pomerium/pomerium/internal/zero/reconciler"
|
"github.com/pomerium/pomerium/internal/zero/reconciler"
|
||||||
"github.com/pomerium/pomerium/internal/zero/telemetry"
|
"github.com/pomerium/pomerium/internal/zero/telemetry"
|
||||||
|
@ -160,6 +161,7 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error {
|
||||||
c.runSessionAnalyticsLeased,
|
c.runSessionAnalyticsLeased,
|
||||||
c.runHealthChecksLeased,
|
c.runHealthChecksLeased,
|
||||||
leaseStatus.MonitorLease,
|
leaseStatus.MonitorLease,
|
||||||
|
c.runUsageReporter,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
@ -208,6 +210,20 @@ func (c *controller) runHealthChecksLeased(ctx context.Context, client databroke
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *controller) runUsageReporter(ctx context.Context, client databroker.DataBrokerServiceClient) error {
|
||||||
|
// the usage reporter depends on the zero organization id, so wait for bootstrap to complete
|
||||||
|
err := c.bootstrapConfig.WaitReady(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error waiting for bootstrap: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ur := usagereporter.New(c.api, c.bootstrapConfig.GetConfig().ZeroOrganizationID)
|
||||||
|
return retry.WithBackoff(ctx, "zero-usage-reporter", func(ctx context.Context) error {
|
||||||
|
// start the usage reporter
|
||||||
|
return ur.Run(ctx, client)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (c *controller) getEnvoyScrapeURL() string {
|
func (c *controller) getEnvoyScrapeURL() string {
|
||||||
return (&url.URL{
|
return (&url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
|
|
181
internal/zero/controller/usagereporter/usagereporter.go
Normal file
181
internal/zero/controller/usagereporter/usagereporter.go
Normal file
|
@ -0,0 +1,181 @@
|
||||||
|
// Package usagereporter reports usage for a cluster.
|
||||||
|
package usagereporter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
backoff "github.com/cenkalti/backoff/v4"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
|
sdk "github.com/pomerium/pomerium/internal/zero/api"
|
||||||
|
"github.com/pomerium/pomerium/pkg/cryptutil"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/session"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/user"
|
||||||
|
"github.com/pomerium/pomerium/pkg/zero/cluster"
|
||||||
|
)
|
||||||
|
|
||||||
|
type usageReporterRecord struct {
|
||||||
|
userID string
|
||||||
|
userEmail string
|
||||||
|
lastSignedInAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// A UsageReporter reports usage to the zero api.
|
||||||
|
type UsageReporter struct {
|
||||||
|
api *sdk.API
|
||||||
|
organizationID string
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
byUserID map[string]usageReporterRecord
|
||||||
|
updates map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new UsageReporter.
|
||||||
|
func New(api *sdk.API, organizationID string) *UsageReporter {
|
||||||
|
return &UsageReporter{
|
||||||
|
api: api,
|
||||||
|
organizationID: organizationID,
|
||||||
|
byUserID: make(map[string]usageReporterRecord),
|
||||||
|
updates: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run runs the usage reporter.
|
||||||
|
func (ur *UsageReporter) Run(ctx context.Context, client databroker.DataBrokerServiceClient) error {
|
||||||
|
// first initialize the user collection
|
||||||
|
serverVersion, latestRecordVersion, err := ur.runInit(ctx, client)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the continuous sync calls and periodically report usage
|
||||||
|
return ur.runSync(ctx, client, serverVersion, latestRecordVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ur *UsageReporter) report(ctx context.Context, records []usageReporterRecord) error {
|
||||||
|
req := cluster.ReportUsageRequest{}
|
||||||
|
for _, record := range records {
|
||||||
|
req.Users = append(req.Users, cluster.ReportUsageUser{
|
||||||
|
LastSignedInAt: record.lastSignedInAt,
|
||||||
|
PseudonymousEmail: cryptutil.Pseudonymize(ur.organizationID, record.userEmail),
|
||||||
|
PseudonymousId: cryptutil.Pseudonymize(ur.organizationID, record.userID),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return backoff.Retry(func() error {
|
||||||
|
log.Debug(ctx).Int("updated-users", len(req.Users)).Msg("reporting usage")
|
||||||
|
err := ur.api.ReportUsage(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(ctx).Err(err).Msg("error reporting usage")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ur *UsageReporter) runInit(ctx context.Context, client databroker.DataBrokerServiceClient) (serverVersion, latestRecordVersion uint64, err error) {
|
||||||
|
_, _, err = databroker.SyncLatestRecords(ctx, client, ur.onUpdateSession)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
serverVersion, latestRecordVersion, err = databroker.SyncLatestRecords(ctx, client, ur.onUpdateUser)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return serverVersion, latestRecordVersion, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ur *UsageReporter) runSync(ctx context.Context, client databroker.DataBrokerServiceClient, serverVersion, latestRecordVersion uint64) error {
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
eg.Go(func() error {
|
||||||
|
return databroker.SyncRecords(ctx, client, serverVersion, latestRecordVersion, ur.onUpdateSession)
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
return databroker.SyncRecords(ctx, client, serverVersion, latestRecordVersion, ur.onUpdateUser)
|
||||||
|
})
|
||||||
|
eg.Go(func() error {
|
||||||
|
return ur.runReporter(ctx)
|
||||||
|
})
|
||||||
|
return eg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ur *UsageReporter) runReporter(ctx context.Context) error {
|
||||||
|
// every minute collect any updates and submit them to the API
|
||||||
|
timer := time.NewTicker(time.Minute)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
// collect the updated records since last run
|
||||||
|
ur.mu.Lock()
|
||||||
|
records := make([]usageReporterRecord, 0, len(ur.updates))
|
||||||
|
for userID := range ur.updates {
|
||||||
|
records = append(records, ur.byUserID[userID])
|
||||||
|
}
|
||||||
|
clear(ur.updates)
|
||||||
|
ur.mu.Unlock()
|
||||||
|
|
||||||
|
if len(records) > 0 {
|
||||||
|
err := ur.report(ctx, records)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ur *UsageReporter) onUpdateSession(s *session.Session) {
|
||||||
|
userID := s.GetUserId()
|
||||||
|
if userID == "" {
|
||||||
|
// ignore sessions without a user id
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ur.mu.Lock()
|
||||||
|
defer ur.mu.Unlock()
|
||||||
|
|
||||||
|
r := ur.byUserID[userID]
|
||||||
|
nr := r
|
||||||
|
nr.lastSignedInAt = latest(nr.lastSignedInAt, s.GetIssuedAt().AsTime())
|
||||||
|
nr.userID = userID
|
||||||
|
if nr != r {
|
||||||
|
ur.byUserID[userID] = nr
|
||||||
|
ur.updates[userID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ur *UsageReporter) onUpdateUser(u *user.User) {
|
||||||
|
userID := u.GetId()
|
||||||
|
if userID == "" {
|
||||||
|
// ignore users without a user id
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ur.mu.Lock()
|
||||||
|
defer ur.mu.Unlock()
|
||||||
|
|
||||||
|
r := ur.byUserID[userID]
|
||||||
|
nr := r
|
||||||
|
nr.userID = userID
|
||||||
|
nr.userEmail = u.GetEmail()
|
||||||
|
if nr != r {
|
||||||
|
ur.byUserID[userID] = nr
|
||||||
|
ur.updates[userID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func latest(t1, t2 time.Time) time.Time {
|
||||||
|
if t2.After(t1) {
|
||||||
|
return t2
|
||||||
|
}
|
||||||
|
return t1
|
||||||
|
}
|
|
@ -0,0 +1,7 @@
|
||||||
|
package usagereporter
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func Test_report(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
}
|
16
pkg/cryptutil/pseudonymize.go
Normal file
16
pkg/cryptutil/pseudonymize.go
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
package cryptutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/hmac"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/base64"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Pseudonymize pseudonymizes data by computing the HMAC-SHA256 of the data.
|
||||||
|
func Pseudonymize(organizationID string, data string) string {
|
||||||
|
h := hmac.New(sha256.New, []byte(organizationID))
|
||||||
|
_, _ = io.WriteString(h, data)
|
||||||
|
bs := h.Sum(nil)
|
||||||
|
return base64.StdEncoding.EncodeToString(bs)
|
||||||
|
}
|
110
pkg/grpc/databroker/sync.go
Normal file
110
pkg/grpc/databroker/sync.go
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
package databroker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/log"
|
||||||
|
"github.com/pomerium/pomerium/pkg/protoutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SyncRecords calls fn for every record using Sync.
|
||||||
|
func SyncRecords[T any, TMessage interface {
|
||||||
|
*T
|
||||||
|
proto.Message
|
||||||
|
}](
|
||||||
|
ctx context.Context,
|
||||||
|
client DataBrokerServiceClient,
|
||||||
|
serverVersion, latestRecordVersion uint64,
|
||||||
|
fn func(TMessage),
|
||||||
|
) error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var msg TMessage = new(T)
|
||||||
|
stream, err := client.Sync(ctx, &SyncRequest{
|
||||||
|
Type: protoutil.GetTypeURL(msg),
|
||||||
|
ServerVersion: serverVersion,
|
||||||
|
RecordVersion: latestRecordVersion,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error syncing %T: %w", msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
res, err := stream.Recv()
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, io.EOF):
|
||||||
|
return nil
|
||||||
|
case err != nil:
|
||||||
|
return fmt.Errorf("error receiving record for %T: %w", msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = new(T)
|
||||||
|
err = res.GetRecord().GetData().UnmarshalTo(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Error().Err(err).
|
||||||
|
Str("record-type", res.Record.Type).
|
||||||
|
Str("record-id", res.Record.GetId()).
|
||||||
|
Msgf("unexpected data in %T stream", msg)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fn(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncRecords calls fn for every record using SyncLatest.
|
||||||
|
func SyncLatestRecords[T any, TMessage interface {
|
||||||
|
*T
|
||||||
|
proto.Message
|
||||||
|
}](
|
||||||
|
ctx context.Context,
|
||||||
|
client DataBrokerServiceClient,
|
||||||
|
fn func(TMessage),
|
||||||
|
) (serverVersion, latestRecordVersion uint64, err error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var msg TMessage = new(T)
|
||||||
|
stream, err := client.SyncLatest(ctx, &SyncLatestRequest{
|
||||||
|
Type: protoutil.GetTypeURL(msg),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("error syncing latest %T: %w", msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
res, err := stream.Recv()
|
||||||
|
switch {
|
||||||
|
case errors.Is(err, io.EOF):
|
||||||
|
return serverVersion, latestRecordVersion, nil
|
||||||
|
case err != nil:
|
||||||
|
return 0, 0, fmt.Errorf("error receiving record for latest %T: %w", msg, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch res := res.GetResponse().(type) {
|
||||||
|
case *SyncLatestResponse_Versions:
|
||||||
|
serverVersion = res.Versions.GetServerVersion()
|
||||||
|
latestRecordVersion = res.Versions.GetLatestRecordVersion()
|
||||||
|
case *SyncLatestResponse_Record:
|
||||||
|
msg = new(T)
|
||||||
|
err = res.Record.GetData().UnmarshalTo(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Ctx(ctx).Error().Err(err).
|
||||||
|
Str("record-type", res.Record.Type).
|
||||||
|
Str("record-id", res.Record.GetId()).
|
||||||
|
Msgf("unexpected data in latest %T stream", msg)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fn(msg)
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("unexpected response: %T", res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
67
pkg/grpc/databroker/sync_test.go
Normal file
67
pkg/grpc/databroker/sync_test.go
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
package databroker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
grpc "google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/pomerium/pomerium/internal/databroker"
|
||||||
|
"github.com/pomerium/pomerium/internal/testutil"
|
||||||
|
databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/session"
|
||||||
|
"github.com/pomerium/pomerium/pkg/grpc/user"
|
||||||
|
"github.com/pomerium/pomerium/pkg/protoutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_SyncLatestRecords(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Minute)
|
||||||
|
defer clearTimeout()
|
||||||
|
|
||||||
|
cc := testutil.NewGRPCServer(t, func(s *grpc.Server) {
|
||||||
|
databrokerpb.RegisterDataBrokerServiceServer(s, databroker.New())
|
||||||
|
})
|
||||||
|
|
||||||
|
c := databrokerpb.NewDataBrokerServiceClient(cc)
|
||||||
|
|
||||||
|
expected := []*user.User{
|
||||||
|
{Id: "u1"},
|
||||||
|
{Id: "u2"},
|
||||||
|
{Id: "u3"},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, u := range expected {
|
||||||
|
_, err := c.Put(ctx, &databrokerpb.PutRequest{
|
||||||
|
Records: []*databrokerpb.Record{
|
||||||
|
databrokerpb.NewRecord(u),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// add a non-user record to make sure it gets ignored
|
||||||
|
_, err := c.Put(ctx, &databrokerpb.PutRequest{
|
||||||
|
Records: []*databrokerpb.Record{
|
||||||
|
{
|
||||||
|
Id: "u4",
|
||||||
|
Type: protoutil.GetTypeURL(new(user.User)),
|
||||||
|
Data: protoutil.NewAny(&session.Session{Id: "s1"}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var actual []*user.User
|
||||||
|
serverVersion, latestRecordVersion, err := databrokerpb.SyncLatestRecords(context.Background(), c, func(u *user.User) {
|
||||||
|
actual = append(actual, u)
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotZero(t, serverVersion)
|
||||||
|
assert.Equal(t, uint64(4), latestRecordVersion)
|
||||||
|
testutil.AssertProtoEqual(t, expected, actual)
|
||||||
|
}
|
|
@ -107,6 +107,11 @@ type ClientInterface interface {
|
||||||
ExchangeClusterIdentityTokenWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error)
|
ExchangeClusterIdentityTokenWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error)
|
||||||
|
|
||||||
ExchangeClusterIdentityToken(ctx context.Context, body ExchangeClusterIdentityTokenJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error)
|
ExchangeClusterIdentityToken(ctx context.Context, body ExchangeClusterIdentityTokenJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error)
|
||||||
|
|
||||||
|
// ReportUsageWithBody request with any body
|
||||||
|
ReportUsageWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error)
|
||||||
|
|
||||||
|
ReportUsage(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) GetClusterBootstrapConfig(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) {
|
func (c *Client) GetClusterBootstrapConfig(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) {
|
||||||
|
@ -193,6 +198,30 @@ func (c *Client) ExchangeClusterIdentityToken(ctx context.Context, body Exchange
|
||||||
return c.Client.Do(req)
|
return c.Client.Do(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) ReportUsageWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) {
|
||||||
|
req, err := NewReportUsageRequestWithBody(c.Server, contentType, body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
if err := c.applyEditors(ctx, req, reqEditors); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return c.Client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ReportUsage(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) {
|
||||||
|
req, err := NewReportUsageRequest(c.Server, body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
if err := c.applyEditors(ctx, req, reqEditors); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return c.Client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
// NewGetClusterBootstrapConfigRequest generates requests for GetClusterBootstrapConfig
|
// NewGetClusterBootstrapConfigRequest generates requests for GetClusterBootstrapConfig
|
||||||
func NewGetClusterBootstrapConfigRequest(server string) (*http.Request, error) {
|
func NewGetClusterBootstrapConfigRequest(server string) (*http.Request, error) {
|
||||||
var err error
|
var err error
|
||||||
|
@ -368,6 +397,46 @@ func NewExchangeClusterIdentityTokenRequestWithBody(server string, contentType s
|
||||||
return req, nil
|
return req, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewReportUsageRequest calls the generic ReportUsage builder with application/json body
|
||||||
|
func NewReportUsageRequest(server string, body ReportUsageJSONRequestBody) (*http.Request, error) {
|
||||||
|
var bodyReader io.Reader
|
||||||
|
buf, err := json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
bodyReader = bytes.NewReader(buf)
|
||||||
|
return NewReportUsageRequestWithBody(server, "application/json", bodyReader)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReportUsageRequestWithBody generates requests for ReportUsage with any type of body
|
||||||
|
func NewReportUsageRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
serverURL, err := url.Parse(server)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
operationPath := fmt.Sprintf("/reportUsage")
|
||||||
|
if operationPath[0] == '/' {
|
||||||
|
operationPath = "." + operationPath
|
||||||
|
}
|
||||||
|
|
||||||
|
queryURL, err := serverURL.Parse(operationPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("POST", queryURL.String(), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Add("Content-Type", contentType)
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) applyEditors(ctx context.Context, req *http.Request, additionalEditors []RequestEditorFn) error {
|
func (c *Client) applyEditors(ctx context.Context, req *http.Request, additionalEditors []RequestEditorFn) error {
|
||||||
for _, r := range c.RequestEditors {
|
for _, r := range c.RequestEditors {
|
||||||
if err := r(ctx, req); err != nil {
|
if err := r(ctx, req); err != nil {
|
||||||
|
@ -429,6 +498,11 @@ type ClientWithResponsesInterface interface {
|
||||||
ExchangeClusterIdentityTokenWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ExchangeClusterIdentityTokenResp, error)
|
ExchangeClusterIdentityTokenWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ExchangeClusterIdentityTokenResp, error)
|
||||||
|
|
||||||
ExchangeClusterIdentityTokenWithResponse(ctx context.Context, body ExchangeClusterIdentityTokenJSONRequestBody, reqEditors ...RequestEditorFn) (*ExchangeClusterIdentityTokenResp, error)
|
ExchangeClusterIdentityTokenWithResponse(ctx context.Context, body ExchangeClusterIdentityTokenJSONRequestBody, reqEditors ...RequestEditorFn) (*ExchangeClusterIdentityTokenResp, error)
|
||||||
|
|
||||||
|
// ReportUsageWithBodyWithResponse request with any body
|
||||||
|
ReportUsageWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ReportUsageResp, error)
|
||||||
|
|
||||||
|
ReportUsageWithResponse(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*ReportUsageResp, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetClusterBootstrapConfigResp struct {
|
type GetClusterBootstrapConfigResp struct {
|
||||||
|
@ -551,6 +625,27 @@ func (r ExchangeClusterIdentityTokenResp) StatusCode() int {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ReportUsageResp struct {
|
||||||
|
Body []byte
|
||||||
|
HTTPResponse *http.Response
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status returns HTTPResponse.Status
|
||||||
|
func (r ReportUsageResp) Status() string {
|
||||||
|
if r.HTTPResponse != nil {
|
||||||
|
return r.HTTPResponse.Status
|
||||||
|
}
|
||||||
|
return http.StatusText(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatusCode returns HTTPResponse.StatusCode
|
||||||
|
func (r ReportUsageResp) StatusCode() int {
|
||||||
|
if r.HTTPResponse != nil {
|
||||||
|
return r.HTTPResponse.StatusCode
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
// GetClusterBootstrapConfigWithResponse request returning *GetClusterBootstrapConfigResp
|
// GetClusterBootstrapConfigWithResponse request returning *GetClusterBootstrapConfigResp
|
||||||
func (c *ClientWithResponses) GetClusterBootstrapConfigWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetClusterBootstrapConfigResp, error) {
|
func (c *ClientWithResponses) GetClusterBootstrapConfigWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetClusterBootstrapConfigResp, error) {
|
||||||
rsp, err := c.GetClusterBootstrapConfig(ctx, reqEditors...)
|
rsp, err := c.GetClusterBootstrapConfig(ctx, reqEditors...)
|
||||||
|
@ -612,6 +707,23 @@ func (c *ClientWithResponses) ExchangeClusterIdentityTokenWithResponse(ctx conte
|
||||||
return ParseExchangeClusterIdentityTokenResp(rsp)
|
return ParseExchangeClusterIdentityTokenResp(rsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReportUsageWithBodyWithResponse request with arbitrary body returning *ReportUsageResp
|
||||||
|
func (c *ClientWithResponses) ReportUsageWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ReportUsageResp, error) {
|
||||||
|
rsp, err := c.ReportUsageWithBody(ctx, contentType, body, reqEditors...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ParseReportUsageResp(rsp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClientWithResponses) ReportUsageWithResponse(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*ReportUsageResp, error) {
|
||||||
|
rsp, err := c.ReportUsage(ctx, body, reqEditors...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ParseReportUsageResp(rsp)
|
||||||
|
}
|
||||||
|
|
||||||
// ParseGetClusterBootstrapConfigResp parses an HTTP response from a GetClusterBootstrapConfigWithResponse call
|
// ParseGetClusterBootstrapConfigResp parses an HTTP response from a GetClusterBootstrapConfigWithResponse call
|
||||||
func ParseGetClusterBootstrapConfigResp(rsp *http.Response) (*GetClusterBootstrapConfigResp, error) {
|
func ParseGetClusterBootstrapConfigResp(rsp *http.Response) (*GetClusterBootstrapConfigResp, error) {
|
||||||
bodyBytes, err := io.ReadAll(rsp.Body)
|
bodyBytes, err := io.ReadAll(rsp.Body)
|
||||||
|
@ -812,6 +924,22 @@ func ParseExchangeClusterIdentityTokenResp(rsp *http.Response) (*ExchangeCluster
|
||||||
return response, nil
|
return response, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ParseReportUsageResp parses an HTTP response from a ReportUsageWithResponse call
|
||||||
|
func ParseReportUsageResp(rsp *http.Response) (*ReportUsageResp, error) {
|
||||||
|
bodyBytes, err := io.ReadAll(rsp.Body)
|
||||||
|
defer func() { _ = rsp.Body.Close() }()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &ReportUsageResp{
|
||||||
|
Body: bodyBytes,
|
||||||
|
HTTPResponse: rsp,
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetHTTPResponse implements apierror.APIResponse
|
// GetHTTPResponse implements apierror.APIResponse
|
||||||
func (r *GetClusterBootstrapConfigResp) GetHTTPResponse() *http.Response {
|
func (r *GetClusterBootstrapConfigResp) GetHTTPResponse() *http.Response {
|
||||||
return r.HTTPResponse
|
return r.HTTPResponse
|
||||||
|
@ -951,3 +1079,16 @@ func (r *ExchangeClusterIdentityTokenResp) GetInternalServerError() (string, boo
|
||||||
}
|
}
|
||||||
return r.JSON500.Error, true
|
return r.JSON500.Error, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetHTTPResponse implements apierror.APIResponse
|
||||||
|
func (r *ReportUsageResp) GetHTTPResponse() *http.Response {
|
||||||
|
return r.HTTPResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetValue implements apierror.APIResponse
|
||||||
|
func (r *ReportUsageResp) GetValue() *EmptyResponse {
|
||||||
|
if r.StatusCode()/100 != 2 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &EmptyResponse{}
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,10 @@
|
||||||
// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.3.0 DO NOT EDIT.
|
// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.3.0 DO NOT EDIT.
|
||||||
package cluster
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
BearerAuthScopes = "bearerAuth.Scopes"
|
BearerAuthScopes = "bearerAuth.Scopes"
|
||||||
)
|
)
|
||||||
|
@ -18,8 +22,11 @@ const (
|
||||||
|
|
||||||
// BootstrapConfig defines model for BootstrapConfig.
|
// BootstrapConfig defines model for BootstrapConfig.
|
||||||
type BootstrapConfig struct {
|
type BootstrapConfig struct {
|
||||||
|
ClusterId string `json:"clusterId"`
|
||||||
|
|
||||||
// DatabrokerStorageConnection databroker storage connection string
|
// DatabrokerStorageConnection databroker storage connection string
|
||||||
DatabrokerStorageConnection *string `json:"databrokerStorageConnection,omitempty"`
|
DatabrokerStorageConnection *string `json:"databrokerStorageConnection,omitempty"`
|
||||||
|
OrganizationId string `json:"organizationId"`
|
||||||
|
|
||||||
// SharedSecret shared secret
|
// SharedSecret shared secret
|
||||||
SharedSecret []byte `json:"sharedSecret"`
|
SharedSecret []byte `json:"sharedSecret"`
|
||||||
|
@ -92,6 +99,18 @@ type GetBundlesResponse struct {
|
||||||
Bundles []Bundle `json:"bundles"`
|
Bundles []Bundle `json:"bundles"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReportUsageRequest defines model for ReportUsageRequest.
|
||||||
|
type ReportUsageRequest struct {
|
||||||
|
Users []ReportUsageUser `json:"users"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReportUsageUser defines model for ReportUsageUser.
|
||||||
|
type ReportUsageUser struct {
|
||||||
|
LastSignedInAt time.Time `json:"lastSignedInAt"`
|
||||||
|
PseudonymousEmail string `json:"pseudonymousEmail"`
|
||||||
|
PseudonymousId string `json:"pseudonymousId"`
|
||||||
|
}
|
||||||
|
|
||||||
// BundleId defines model for bundleId.
|
// BundleId defines model for bundleId.
|
||||||
type BundleId = string
|
type BundleId = string
|
||||||
|
|
||||||
|
@ -100,3 +119,6 @@ type ReportClusterResourceBundleStatusJSONRequestBody = BundleStatus
|
||||||
|
|
||||||
// ExchangeClusterIdentityTokenJSONRequestBody defines body for ExchangeClusterIdentityToken for application/json ContentType.
|
// ExchangeClusterIdentityTokenJSONRequestBody defines body for ExchangeClusterIdentityToken for application/json ContentType.
|
||||||
type ExchangeClusterIdentityTokenJSONRequestBody = ExchangeTokenRequest
|
type ExchangeClusterIdentityTokenJSONRequestBody = ExchangeTokenRequest
|
||||||
|
|
||||||
|
// ReportUsageJSONRequestBody defines body for ReportUsage for application/json ContentType.
|
||||||
|
type ReportUsageJSONRequestBody = ReportUsageRequest
|
||||||
|
|
|
@ -149,6 +149,21 @@ paths:
|
||||||
schema:
|
schema:
|
||||||
$ref: "#/components/schemas/ErrorResponse"
|
$ref: "#/components/schemas/ErrorResponse"
|
||||||
|
|
||||||
|
/reportUsage:
|
||||||
|
post:
|
||||||
|
description: Report usage for the cluster
|
||||||
|
operationId: reportUsage
|
||||||
|
tags: [usage]
|
||||||
|
requestBody:
|
||||||
|
required: true
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/ReportUsageRequest"
|
||||||
|
responses:
|
||||||
|
"204":
|
||||||
|
description: OK
|
||||||
|
|
||||||
components:
|
components:
|
||||||
parameters:
|
parameters:
|
||||||
bundleId:
|
bundleId:
|
||||||
|
@ -163,14 +178,20 @@ components:
|
||||||
BootstrapConfig:
|
BootstrapConfig:
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
|
clusterId:
|
||||||
|
type: string
|
||||||
databrokerStorageConnection:
|
databrokerStorageConnection:
|
||||||
type: string
|
type: string
|
||||||
description: databroker storage connection string
|
description: databroker storage connection string
|
||||||
|
organizationId:
|
||||||
|
type: string
|
||||||
sharedSecret:
|
sharedSecret:
|
||||||
type: string
|
type: string
|
||||||
format: byte
|
format: byte
|
||||||
description: shared secret
|
description: shared secret
|
||||||
required:
|
required:
|
||||||
|
- clusterId
|
||||||
|
- organizationId
|
||||||
- sharedSecret
|
- sharedSecret
|
||||||
|
|
||||||
Bundle:
|
Bundle:
|
||||||
|
@ -273,3 +294,26 @@ components:
|
||||||
$ref: "#/components/schemas/Bundle"
|
$ref: "#/components/schemas/Bundle"
|
||||||
required:
|
required:
|
||||||
- bundles
|
- bundles
|
||||||
|
ReportUsageRequest:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
users:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: "#/components/schemas/ReportUsageUser"
|
||||||
|
required:
|
||||||
|
- users
|
||||||
|
ReportUsageUser:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
lastSignedInAt:
|
||||||
|
type: string
|
||||||
|
format: "date-time"
|
||||||
|
pseudonymousEmail:
|
||||||
|
type: string
|
||||||
|
pseudonymousId:
|
||||||
|
type: string
|
||||||
|
required:
|
||||||
|
- lastSignedInAt
|
||||||
|
- pseudonymousEmail
|
||||||
|
- pseudonymousId
|
||||||
|
|
|
@ -31,6 +31,9 @@ type ServerInterface interface {
|
||||||
|
|
||||||
// (POST /exchangeToken)
|
// (POST /exchangeToken)
|
||||||
ExchangeClusterIdentityToken(w http.ResponseWriter, r *http.Request)
|
ExchangeClusterIdentityToken(w http.ResponseWriter, r *http.Request)
|
||||||
|
|
||||||
|
// (POST /reportUsage)
|
||||||
|
ReportUsage(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unimplemented server implementation that returns http.StatusNotImplemented for each endpoint.
|
// Unimplemented server implementation that returns http.StatusNotImplemented for each endpoint.
|
||||||
|
@ -62,6 +65,11 @@ func (_ Unimplemented) ExchangeClusterIdentityToken(w http.ResponseWriter, r *ht
|
||||||
w.WriteHeader(http.StatusNotImplemented)
|
w.WriteHeader(http.StatusNotImplemented)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// (POST /reportUsage)
|
||||||
|
func (_ Unimplemented) ReportUsage(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusNotImplemented)
|
||||||
|
}
|
||||||
|
|
||||||
// ServerInterfaceWrapper converts contexts to parameters.
|
// ServerInterfaceWrapper converts contexts to parameters.
|
||||||
type ServerInterfaceWrapper struct {
|
type ServerInterfaceWrapper struct {
|
||||||
Handler ServerInterface
|
Handler ServerInterface
|
||||||
|
@ -176,6 +184,23 @@ func (siw *ServerInterfaceWrapper) ExchangeClusterIdentityToken(w http.ResponseW
|
||||||
handler.ServeHTTP(w, r.WithContext(ctx))
|
handler.ServeHTTP(w, r.WithContext(ctx))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReportUsage operation middleware
|
||||||
|
func (siw *ServerInterfaceWrapper) ReportUsage(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
ctx = context.WithValue(ctx, BearerAuthScopes, []string{})
|
||||||
|
|
||||||
|
handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
siw.Handler.ReportUsage(w, r)
|
||||||
|
}))
|
||||||
|
|
||||||
|
for i := len(siw.HandlerMiddlewares) - 1; i >= 0; i-- {
|
||||||
|
handler = siw.HandlerMiddlewares[i](handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
handler.ServeHTTP(w, r.WithContext(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
type UnescapedCookieParamError struct {
|
type UnescapedCookieParamError struct {
|
||||||
ParamName string
|
ParamName string
|
||||||
Err error
|
Err error
|
||||||
|
@ -304,6 +329,9 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl
|
||||||
r.Group(func(r chi.Router) {
|
r.Group(func(r chi.Router) {
|
||||||
r.Post(options.BaseURL+"/exchangeToken", wrapper.ExchangeClusterIdentityToken)
|
r.Post(options.BaseURL+"/exchangeToken", wrapper.ExchangeClusterIdentityToken)
|
||||||
})
|
})
|
||||||
|
r.Group(func(r chi.Router) {
|
||||||
|
r.Post(options.BaseURL+"/reportUsage", wrapper.ReportUsage)
|
||||||
|
})
|
||||||
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
@ -490,6 +518,14 @@ func (response ExchangeClusterIdentityToken500JSONResponse) VisitExchangeCluster
|
||||||
return json.NewEncoder(w).Encode(response)
|
return json.NewEncoder(w).Encode(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ReportUsageRequestObject struct {
|
||||||
|
Body *ReportUsageJSONRequestBody
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReportUsageResponseObject interface {
|
||||||
|
VisitReportUsageResponse(w http.ResponseWriter) error
|
||||||
|
}
|
||||||
|
|
||||||
// StrictServerInterface represents all server handlers.
|
// StrictServerInterface represents all server handlers.
|
||||||
type StrictServerInterface interface {
|
type StrictServerInterface interface {
|
||||||
|
|
||||||
|
@ -507,6 +543,9 @@ type StrictServerInterface interface {
|
||||||
|
|
||||||
// (POST /exchangeToken)
|
// (POST /exchangeToken)
|
||||||
ExchangeClusterIdentityToken(ctx context.Context, request ExchangeClusterIdentityTokenRequestObject) (ExchangeClusterIdentityTokenResponseObject, error)
|
ExchangeClusterIdentityToken(ctx context.Context, request ExchangeClusterIdentityTokenRequestObject) (ExchangeClusterIdentityTokenResponseObject, error)
|
||||||
|
|
||||||
|
// (POST /reportUsage)
|
||||||
|
ReportUsage(ctx context.Context, request ReportUsageRequestObject) (ReportUsageResponseObject, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StrictHandlerFunc = strictnethttp.StrictHTTPHandlerFunc
|
type StrictHandlerFunc = strictnethttp.StrictHTTPHandlerFunc
|
||||||
|
@ -675,3 +714,34 @@ func (sh *strictHandler) ExchangeClusterIdentityToken(w http.ResponseWriter, r *
|
||||||
sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response))
|
sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReportUsage operation middleware
|
||||||
|
func (sh *strictHandler) ReportUsage(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var request ReportUsageRequestObject
|
||||||
|
|
||||||
|
var body ReportUsageJSONRequestBody
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||||
|
sh.options.RequestErrorHandlerFunc(w, r, fmt.Errorf("can't decode JSON body: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
request.Body = &body
|
||||||
|
|
||||||
|
handler := func(ctx context.Context, w http.ResponseWriter, r *http.Request, request interface{}) (interface{}, error) {
|
||||||
|
return sh.ssi.ReportUsage(ctx, request.(ReportUsageRequestObject))
|
||||||
|
}
|
||||||
|
for _, middleware := range sh.middlewares {
|
||||||
|
handler = middleware(handler, "ReportUsage")
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := handler(r.Context(), w, r, request)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
sh.options.ResponseErrorHandlerFunc(w, r, err)
|
||||||
|
} else if validResponse, ok := response.(ReportUsageResponseObject); ok {
|
||||||
|
if err := validResponse.VisitReportUsageResponse(w); err != nil {
|
||||||
|
sh.options.ResponseErrorHandlerFunc(w, r, err)
|
||||||
|
}
|
||||||
|
} else if response != nil {
|
||||||
|
sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue