diff --git a/config/config.go b/config/config.go index 30a878693..53bd8a699 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,10 @@ type Config struct { // MetricsScrapeEndpoints additional metrics endpoints to scrape and provide part of metrics MetricsScrapeEndpoints []MetricsScrapeEndpoint + + // Zero-specific configuration options. + ZeroClusterID string + ZeroOrganizationID string } // Clone creates a clone of the config. diff --git a/internal/testutil/grpc.go b/internal/testutil/grpc.go new file mode 100644 index 000000000..3b34f4575 --- /dev/null +++ b/internal/testutil/grpc.go @@ -0,0 +1,44 @@ +package testutil + +import ( + "context" + "errors" + "net" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +// NewGRPCServer starts a gRPC server and returns a client connection to it. +func NewGRPCServer(t testing.TB, register func(s *grpc.Server)) *grpc.ClientConn { + t.Helper() + + li := bufconn.Listen(1024 * 1024) + s := grpc.NewServer() + register(s) + go func() { + err := s.Serve(li) + if errors.Is(err, grpc.ErrServerStopped) { + err = nil + } + require.NoError(t, err) + }() + t.Cleanup(func() { + s.Stop() + }) + + cc, err := grpc.NewClient("passthrough://bufnet", + grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) { + return li.Dial() + }), + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + t.Cleanup(func() { + cc.Close() + }) + + return cc +} diff --git a/internal/zero/api/api.go b/internal/zero/api/api.go index b83d9b1a7..45e85c57b 100644 --- a/internal/zero/api/api.go +++ b/internal/zero/api/api.go @@ -119,3 +119,16 @@ func (api *API) GetClusterResourceBundles(ctx context.Context) (*cluster_api.Get func (api *API) GetTelemetryConn() *grpc.ClientConn { return api.telemetryConn } + +func (api *API) ReportUsage(ctx context.Context, req cluster_api.ReportUsageRequest) error { + res, err := api.cluster.ReportUsageWithResponse(ctx, req) + if err != nil { + return err + } + + if res.StatusCode()/100 != 2 { + return fmt.Errorf("unexpected response from ReportUsage: %d", res.StatusCode()) + } + + return nil +} diff --git a/internal/zero/bootstrap/source.go b/internal/zero/bootstrap/source.go index 2186f622c..ad1bfe657 100644 --- a/internal/zero/bootstrap/source.go +++ b/internal/zero/bootstrap/source.go @@ -55,9 +55,9 @@ func (src *source) OnConfigChange(_ context.Context, l config.ChangeListener) { func (src *source) UpdateBootstrap(ctx context.Context, cfg cluster_api.BootstrapConfig) bool { current := src.cfg.Load() incoming := current.Clone() - applyBootstrapConfig(incoming.Options, &cfg) + applyBootstrapConfig(incoming, &cfg) - if cmp.Equal(incoming.Options, current.Options, cmpOpts...) { + if cmp.Equal(incoming, current, cmpOpts...) { return false } @@ -81,13 +81,15 @@ func (src *source) notifyListeners(ctx context.Context, cfg *config.Config) { } } -func applyBootstrapConfig(dst *config.Options, src *cluster_api.BootstrapConfig) { - dst.SharedKey = base64.StdEncoding.EncodeToString(src.SharedSecret) +func applyBootstrapConfig(dst *config.Config, src *cluster_api.BootstrapConfig) { + dst.Options.SharedKey = base64.StdEncoding.EncodeToString(src.SharedSecret) if src.DatabrokerStorageConnection != nil { - dst.DataBrokerStorageType = config.StoragePostgresName - dst.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection + dst.Options.DataBrokerStorageType = config.StoragePostgresName + dst.Options.DataBrokerStorageConnectionString = *src.DatabrokerStorageConnection } else { - dst.DataBrokerStorageType = config.StorageInMemoryName - dst.DataBrokerStorageConnectionString = "" + dst.Options.DataBrokerStorageType = config.StorageInMemoryName + dst.Options.DataBrokerStorageConnectionString = "" } + dst.ZeroClusterID = src.ClusterId + dst.ZeroOrganizationID = src.OrganizationId } diff --git a/internal/zero/bootstrap/writers/k8s/secret_test.go b/internal/zero/bootstrap/writers/k8s/secret_test.go index 25c66e522..9b82875ae 100644 --- a/internal/zero/bootstrap/writers/k8s/secret_test.go +++ b/internal/zero/bootstrap/writers/k8s/secret_test.go @@ -95,7 +95,7 @@ func TestSecretWriter(t *testing.T) { "namespace": "pomerium", }, "data": map[string]any{ - "bootstrap.dat": `{"databrokerStorageConnection":"test","sharedSecret":null}`, + "bootstrap.dat": `{"clusterId":"","databrokerStorageConnection":"test","organizationId":"","sharedSecret":null}`, }, }, unstructured) }) diff --git a/internal/zero/controller/controller.go b/internal/zero/controller/controller.go index f53590611..2c2364409 100644 --- a/internal/zero/controller/controller.go +++ b/internal/zero/controller/controller.go @@ -17,6 +17,7 @@ import ( "github.com/pomerium/pomerium/internal/zero/bootstrap" "github.com/pomerium/pomerium/internal/zero/bootstrap/writers" connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux" + "github.com/pomerium/pomerium/internal/zero/controller/usagereporter" "github.com/pomerium/pomerium/internal/zero/healthcheck" "github.com/pomerium/pomerium/internal/zero/reconciler" "github.com/pomerium/pomerium/internal/zero/telemetry" @@ -160,6 +161,7 @@ func (c *controller) runZeroControlLoop(ctx context.Context) error { c.runSessionAnalyticsLeased, c.runHealthChecksLeased, leaseStatus.MonitorLease, + c.runUsageReporter, ), ) }) @@ -208,6 +210,20 @@ func (c *controller) runHealthChecksLeased(ctx context.Context, client databroke }) } +func (c *controller) runUsageReporter(ctx context.Context, client databroker.DataBrokerServiceClient) error { + // the usage reporter depends on the zero organization id, so wait for bootstrap to complete + err := c.bootstrapConfig.WaitReady(ctx) + if err != nil { + return fmt.Errorf("error waiting for bootstrap: %w", err) + } + + ur := usagereporter.New(c.api, c.bootstrapConfig.GetConfig().ZeroOrganizationID) + return retry.WithBackoff(ctx, "zero-usage-reporter", func(ctx context.Context) error { + // start the usage reporter + return ur.Run(ctx, client) + }) +} + func (c *controller) getEnvoyScrapeURL() string { return (&url.URL{ Scheme: "http", diff --git a/internal/zero/controller/usagereporter/usagereporter.go b/internal/zero/controller/usagereporter/usagereporter.go new file mode 100644 index 000000000..33ca20962 --- /dev/null +++ b/internal/zero/controller/usagereporter/usagereporter.go @@ -0,0 +1,181 @@ +// Package usagereporter reports usage for a cluster. +package usagereporter + +import ( + "context" + "sync" + "time" + + backoff "github.com/cenkalti/backoff/v4" + "golang.org/x/sync/errgroup" + + "github.com/pomerium/pomerium/internal/log" + sdk "github.com/pomerium/pomerium/internal/zero/api" + "github.com/pomerium/pomerium/pkg/cryptutil" + "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpc/session" + "github.com/pomerium/pomerium/pkg/grpc/user" + "github.com/pomerium/pomerium/pkg/zero/cluster" +) + +type usageReporterRecord struct { + userID string + userEmail string + lastSignedInAt time.Time +} + +// A UsageReporter reports usage to the zero api. +type UsageReporter struct { + api *sdk.API + organizationID string + + mu sync.Mutex + byUserID map[string]usageReporterRecord + updates map[string]struct{} +} + +// New creates a new UsageReporter. +func New(api *sdk.API, organizationID string) *UsageReporter { + return &UsageReporter{ + api: api, + organizationID: organizationID, + byUserID: make(map[string]usageReporterRecord), + updates: make(map[string]struct{}), + } +} + +// Run runs the usage reporter. +func (ur *UsageReporter) Run(ctx context.Context, client databroker.DataBrokerServiceClient) error { + // first initialize the user collection + serverVersion, latestRecordVersion, err := ur.runInit(ctx, client) + if err != nil { + return err + } + + // run the continuous sync calls and periodically report usage + return ur.runSync(ctx, client, serverVersion, latestRecordVersion) +} + +func (ur *UsageReporter) report(ctx context.Context, records []usageReporterRecord) error { + req := cluster.ReportUsageRequest{} + for _, record := range records { + req.Users = append(req.Users, cluster.ReportUsageUser{ + LastSignedInAt: record.lastSignedInAt, + PseudonymousEmail: cryptutil.Pseudonymize(ur.organizationID, record.userEmail), + PseudonymousId: cryptutil.Pseudonymize(ur.organizationID, record.userID), + }) + } + return backoff.Retry(func() error { + log.Debug(ctx).Int("updated-users", len(req.Users)).Msg("reporting usage") + err := ur.api.ReportUsage(ctx, req) + if err != nil { + log.Warn(ctx).Err(err).Msg("error reporting usage") + } + return err + }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) +} + +func (ur *UsageReporter) runInit(ctx context.Context, client databroker.DataBrokerServiceClient) (serverVersion, latestRecordVersion uint64, err error) { + _, _, err = databroker.SyncLatestRecords(ctx, client, ur.onUpdateSession) + if err != nil { + return 0, 0, err + } + + serverVersion, latestRecordVersion, err = databroker.SyncLatestRecords(ctx, client, ur.onUpdateUser) + if err != nil { + return 0, 0, err + } + + return serverVersion, latestRecordVersion, nil +} + +func (ur *UsageReporter) runSync(ctx context.Context, client databroker.DataBrokerServiceClient, serverVersion, latestRecordVersion uint64) error { + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return databroker.SyncRecords(ctx, client, serverVersion, latestRecordVersion, ur.onUpdateSession) + }) + eg.Go(func() error { + return databroker.SyncRecords(ctx, client, serverVersion, latestRecordVersion, ur.onUpdateUser) + }) + eg.Go(func() error { + return ur.runReporter(ctx) + }) + return eg.Wait() +} + +func (ur *UsageReporter) runReporter(ctx context.Context) error { + // every minute collect any updates and submit them to the API + timer := time.NewTicker(time.Minute) + defer timer.Stop() + + for { + // collect the updated records since last run + ur.mu.Lock() + records := make([]usageReporterRecord, 0, len(ur.updates)) + for userID := range ur.updates { + records = append(records, ur.byUserID[userID]) + } + clear(ur.updates) + ur.mu.Unlock() + + if len(records) > 0 { + err := ur.report(ctx, records) + if err != nil { + return err + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + } + } +} + +func (ur *UsageReporter) onUpdateSession(s *session.Session) { + userID := s.GetUserId() + if userID == "" { + // ignore sessions without a user id + return + } + + ur.mu.Lock() + defer ur.mu.Unlock() + + r := ur.byUserID[userID] + nr := r + nr.lastSignedInAt = latest(nr.lastSignedInAt, s.GetIssuedAt().AsTime()) + nr.userID = userID + if nr != r { + ur.byUserID[userID] = nr + ur.updates[userID] = struct{}{} + } +} + +func (ur *UsageReporter) onUpdateUser(u *user.User) { + userID := u.GetId() + if userID == "" { + // ignore users without a user id + return + } + + ur.mu.Lock() + defer ur.mu.Unlock() + + r := ur.byUserID[userID] + nr := r + nr.userID = userID + nr.userEmail = u.GetEmail() + if nr != r { + ur.byUserID[userID] = nr + ur.updates[userID] = struct{}{} + } +} + +func latest(t1, t2 time.Time) time.Time { + if t2.After(t1) { + return t2 + } + return t1 +} diff --git a/internal/zero/controller/usagereporter/usagereporter_test.go b/internal/zero/controller/usagereporter/usagereporter_test.go new file mode 100644 index 000000000..7d8a81116 --- /dev/null +++ b/internal/zero/controller/usagereporter/usagereporter_test.go @@ -0,0 +1,7 @@ +package usagereporter + +import "testing" + +func Test_report(t *testing.T) { + t.Parallel() +} diff --git a/pkg/cryptutil/pseudonymize.go b/pkg/cryptutil/pseudonymize.go new file mode 100644 index 000000000..c791a5696 --- /dev/null +++ b/pkg/cryptutil/pseudonymize.go @@ -0,0 +1,16 @@ +package cryptutil + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/base64" + "io" +) + +// Pseudonymize pseudonymizes data by computing the HMAC-SHA256 of the data. +func Pseudonymize(organizationID string, data string) string { + h := hmac.New(sha256.New, []byte(organizationID)) + _, _ = io.WriteString(h, data) + bs := h.Sum(nil) + return base64.StdEncoding.EncodeToString(bs) +} diff --git a/pkg/grpc/databroker/sync.go b/pkg/grpc/databroker/sync.go new file mode 100644 index 000000000..e894e81e1 --- /dev/null +++ b/pkg/grpc/databroker/sync.go @@ -0,0 +1,110 @@ +package databroker + +import ( + "context" + "errors" + "fmt" + "io" + + "google.golang.org/protobuf/proto" + + "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/pkg/protoutil" +) + +// SyncRecords calls fn for every record using Sync. +func SyncRecords[T any, TMessage interface { + *T + proto.Message +}]( + ctx context.Context, + client DataBrokerServiceClient, + serverVersion, latestRecordVersion uint64, + fn func(TMessage), +) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var msg TMessage = new(T) + stream, err := client.Sync(ctx, &SyncRequest{ + Type: protoutil.GetTypeURL(msg), + ServerVersion: serverVersion, + RecordVersion: latestRecordVersion, + }) + if err != nil { + return fmt.Errorf("error syncing %T: %w", msg, err) + } + + for { + res, err := stream.Recv() + switch { + case errors.Is(err, io.EOF): + return nil + case err != nil: + return fmt.Errorf("error receiving record for %T: %w", msg, err) + } + + msg = new(T) + err = res.GetRecord().GetData().UnmarshalTo(msg) + if err != nil { + log.Ctx(ctx).Error().Err(err). + Str("record-type", res.Record.Type). + Str("record-id", res.Record.GetId()). + Msgf("unexpected data in %T stream", msg) + continue + } + + fn(msg) + } +} + +// SyncRecords calls fn for every record using SyncLatest. +func SyncLatestRecords[T any, TMessage interface { + *T + proto.Message +}]( + ctx context.Context, + client DataBrokerServiceClient, + fn func(TMessage), +) (serverVersion, latestRecordVersion uint64, err error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var msg TMessage = new(T) + stream, err := client.SyncLatest(ctx, &SyncLatestRequest{ + Type: protoutil.GetTypeURL(msg), + }) + if err != nil { + return 0, 0, fmt.Errorf("error syncing latest %T: %w", msg, err) + } + + for { + res, err := stream.Recv() + switch { + case errors.Is(err, io.EOF): + return serverVersion, latestRecordVersion, nil + case err != nil: + return 0, 0, fmt.Errorf("error receiving record for latest %T: %w", msg, err) + } + + switch res := res.GetResponse().(type) { + case *SyncLatestResponse_Versions: + serverVersion = res.Versions.GetServerVersion() + latestRecordVersion = res.Versions.GetLatestRecordVersion() + case *SyncLatestResponse_Record: + msg = new(T) + err = res.Record.GetData().UnmarshalTo(msg) + if err != nil { + log.Ctx(ctx).Error().Err(err). + Str("record-type", res.Record.Type). + Str("record-id", res.Record.GetId()). + Msgf("unexpected data in latest %T stream", msg) + continue + } + + fn(msg) + default: + panic(fmt.Sprintf("unexpected response: %T", res)) + } + } +} diff --git a/pkg/grpc/databroker/sync_test.go b/pkg/grpc/databroker/sync_test.go new file mode 100644 index 000000000..85a454876 --- /dev/null +++ b/pkg/grpc/databroker/sync_test.go @@ -0,0 +1,67 @@ +package databroker_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + grpc "google.golang.org/grpc" + + "github.com/pomerium/pomerium/internal/databroker" + "github.com/pomerium/pomerium/internal/testutil" + databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker" + "github.com/pomerium/pomerium/pkg/grpc/session" + "github.com/pomerium/pomerium/pkg/grpc/user" + "github.com/pomerium/pomerium/pkg/protoutil" +) + +func Test_SyncLatestRecords(t *testing.T) { + t.Parallel() + + ctx, clearTimeout := context.WithTimeout(context.Background(), time.Minute) + defer clearTimeout() + + cc := testutil.NewGRPCServer(t, func(s *grpc.Server) { + databrokerpb.RegisterDataBrokerServiceServer(s, databroker.New()) + }) + + c := databrokerpb.NewDataBrokerServiceClient(cc) + + expected := []*user.User{ + {Id: "u1"}, + {Id: "u2"}, + {Id: "u3"}, + } + + for _, u := range expected { + _, err := c.Put(ctx, &databrokerpb.PutRequest{ + Records: []*databrokerpb.Record{ + databrokerpb.NewRecord(u), + }, + }) + require.NoError(t, err) + } + + // add a non-user record to make sure it gets ignored + _, err := c.Put(ctx, &databrokerpb.PutRequest{ + Records: []*databrokerpb.Record{ + { + Id: "u4", + Type: protoutil.GetTypeURL(new(user.User)), + Data: protoutil.NewAny(&session.Session{Id: "s1"}), + }, + }, + }) + require.NoError(t, err) + + var actual []*user.User + serverVersion, latestRecordVersion, err := databrokerpb.SyncLatestRecords(context.Background(), c, func(u *user.User) { + actual = append(actual, u) + }) + assert.NoError(t, err) + assert.NotZero(t, serverVersion) + assert.Equal(t, uint64(4), latestRecordVersion) + testutil.AssertProtoEqual(t, expected, actual) +} diff --git a/pkg/zero/cluster/client.gen.go b/pkg/zero/cluster/client.gen.go index 21e183063..4ec9ec035 100644 --- a/pkg/zero/cluster/client.gen.go +++ b/pkg/zero/cluster/client.gen.go @@ -107,6 +107,11 @@ type ClientInterface interface { ExchangeClusterIdentityTokenWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) ExchangeClusterIdentityToken(ctx context.Context, body ExchangeClusterIdentityTokenJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + + // ReportUsageWithBody request with any body + ReportUsageWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + ReportUsage(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) } func (c *Client) GetClusterBootstrapConfig(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) { @@ -193,6 +198,30 @@ func (c *Client) ExchangeClusterIdentityToken(ctx context.Context, body Exchange return c.Client.Do(req) } +func (c *Client) ReportUsageWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewReportUsageRequestWithBody(c.Server, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) ReportUsage(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewReportUsageRequest(c.Server, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + // NewGetClusterBootstrapConfigRequest generates requests for GetClusterBootstrapConfig func NewGetClusterBootstrapConfigRequest(server string) (*http.Request, error) { var err error @@ -368,6 +397,46 @@ func NewExchangeClusterIdentityTokenRequestWithBody(server string, contentType s return req, nil } +// NewReportUsageRequest calls the generic ReportUsage builder with application/json body +func NewReportUsageRequest(server string, body ReportUsageJSONRequestBody) (*http.Request, error) { + var bodyReader io.Reader + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + bodyReader = bytes.NewReader(buf) + return NewReportUsageRequestWithBody(server, "application/json", bodyReader) +} + +// NewReportUsageRequestWithBody generates requests for ReportUsage with any type of body +func NewReportUsageRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/reportUsage") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + func (c *Client) applyEditors(ctx context.Context, req *http.Request, additionalEditors []RequestEditorFn) error { for _, r := range c.RequestEditors { if err := r(ctx, req); err != nil { @@ -429,6 +498,11 @@ type ClientWithResponsesInterface interface { ExchangeClusterIdentityTokenWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ExchangeClusterIdentityTokenResp, error) ExchangeClusterIdentityTokenWithResponse(ctx context.Context, body ExchangeClusterIdentityTokenJSONRequestBody, reqEditors ...RequestEditorFn) (*ExchangeClusterIdentityTokenResp, error) + + // ReportUsageWithBodyWithResponse request with any body + ReportUsageWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ReportUsageResp, error) + + ReportUsageWithResponse(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*ReportUsageResp, error) } type GetClusterBootstrapConfigResp struct { @@ -551,6 +625,27 @@ func (r ExchangeClusterIdentityTokenResp) StatusCode() int { return 0 } +type ReportUsageResp struct { + Body []byte + HTTPResponse *http.Response +} + +// Status returns HTTPResponse.Status +func (r ReportUsageResp) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r ReportUsageResp) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + // GetClusterBootstrapConfigWithResponse request returning *GetClusterBootstrapConfigResp func (c *ClientWithResponses) GetClusterBootstrapConfigWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*GetClusterBootstrapConfigResp, error) { rsp, err := c.GetClusterBootstrapConfig(ctx, reqEditors...) @@ -612,6 +707,23 @@ func (c *ClientWithResponses) ExchangeClusterIdentityTokenWithResponse(ctx conte return ParseExchangeClusterIdentityTokenResp(rsp) } +// ReportUsageWithBodyWithResponse request with arbitrary body returning *ReportUsageResp +func (c *ClientWithResponses) ReportUsageWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*ReportUsageResp, error) { + rsp, err := c.ReportUsageWithBody(ctx, contentType, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseReportUsageResp(rsp) +} + +func (c *ClientWithResponses) ReportUsageWithResponse(ctx context.Context, body ReportUsageJSONRequestBody, reqEditors ...RequestEditorFn) (*ReportUsageResp, error) { + rsp, err := c.ReportUsage(ctx, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseReportUsageResp(rsp) +} + // ParseGetClusterBootstrapConfigResp parses an HTTP response from a GetClusterBootstrapConfigWithResponse call func ParseGetClusterBootstrapConfigResp(rsp *http.Response) (*GetClusterBootstrapConfigResp, error) { bodyBytes, err := io.ReadAll(rsp.Body) @@ -812,6 +924,22 @@ func ParseExchangeClusterIdentityTokenResp(rsp *http.Response) (*ExchangeCluster return response, nil } +// ParseReportUsageResp parses an HTTP response from a ReportUsageWithResponse call +func ParseReportUsageResp(rsp *http.Response) (*ReportUsageResp, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &ReportUsageResp{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + return response, nil +} + // GetHTTPResponse implements apierror.APIResponse func (r *GetClusterBootstrapConfigResp) GetHTTPResponse() *http.Response { return r.HTTPResponse @@ -951,3 +1079,16 @@ func (r *ExchangeClusterIdentityTokenResp) GetInternalServerError() (string, boo } return r.JSON500.Error, true } + +// GetHTTPResponse implements apierror.APIResponse +func (r *ReportUsageResp) GetHTTPResponse() *http.Response { + return r.HTTPResponse +} + +// GetValue implements apierror.APIResponse +func (r *ReportUsageResp) GetValue() *EmptyResponse { + if r.StatusCode()/100 != 2 { + return nil + } + return &EmptyResponse{} +} diff --git a/pkg/zero/cluster/models.gen.go b/pkg/zero/cluster/models.gen.go index b3716332c..2aec18fa3 100644 --- a/pkg/zero/cluster/models.gen.go +++ b/pkg/zero/cluster/models.gen.go @@ -3,6 +3,10 @@ // Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.3.0 DO NOT EDIT. package cluster +import ( + "time" +) + const ( BearerAuthScopes = "bearerAuth.Scopes" ) @@ -18,8 +22,11 @@ const ( // BootstrapConfig defines model for BootstrapConfig. type BootstrapConfig struct { + ClusterId string `json:"clusterId"` + // DatabrokerStorageConnection databroker storage connection string DatabrokerStorageConnection *string `json:"databrokerStorageConnection,omitempty"` + OrganizationId string `json:"organizationId"` // SharedSecret shared secret SharedSecret []byte `json:"sharedSecret"` @@ -92,6 +99,18 @@ type GetBundlesResponse struct { Bundles []Bundle `json:"bundles"` } +// ReportUsageRequest defines model for ReportUsageRequest. +type ReportUsageRequest struct { + Users []ReportUsageUser `json:"users"` +} + +// ReportUsageUser defines model for ReportUsageUser. +type ReportUsageUser struct { + LastSignedInAt time.Time `json:"lastSignedInAt"` + PseudonymousEmail string `json:"pseudonymousEmail"` + PseudonymousId string `json:"pseudonymousId"` +} + // BundleId defines model for bundleId. type BundleId = string @@ -100,3 +119,6 @@ type ReportClusterResourceBundleStatusJSONRequestBody = BundleStatus // ExchangeClusterIdentityTokenJSONRequestBody defines body for ExchangeClusterIdentityToken for application/json ContentType. type ExchangeClusterIdentityTokenJSONRequestBody = ExchangeTokenRequest + +// ReportUsageJSONRequestBody defines body for ReportUsage for application/json ContentType. +type ReportUsageJSONRequestBody = ReportUsageRequest diff --git a/pkg/zero/cluster/openapi.yaml b/pkg/zero/cluster/openapi.yaml index e9c43bf71..73a329b0b 100644 --- a/pkg/zero/cluster/openapi.yaml +++ b/pkg/zero/cluster/openapi.yaml @@ -149,6 +149,21 @@ paths: schema: $ref: "#/components/schemas/ErrorResponse" + /reportUsage: + post: + description: Report usage for the cluster + operationId: reportUsage + tags: [usage] + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/ReportUsageRequest" + responses: + "204": + description: OK + components: parameters: bundleId: @@ -163,14 +178,20 @@ components: BootstrapConfig: type: object properties: + clusterId: + type: string databrokerStorageConnection: type: string description: databroker storage connection string + organizationId: + type: string sharedSecret: type: string format: byte description: shared secret required: + - clusterId + - organizationId - sharedSecret Bundle: @@ -273,3 +294,26 @@ components: $ref: "#/components/schemas/Bundle" required: - bundles + ReportUsageRequest: + type: object + properties: + users: + type: array + items: + $ref: "#/components/schemas/ReportUsageUser" + required: + - users + ReportUsageUser: + type: object + properties: + lastSignedInAt: + type: string + format: "date-time" + pseudonymousEmail: + type: string + pseudonymousId: + type: string + required: + - lastSignedInAt + - pseudonymousEmail + - pseudonymousId diff --git a/pkg/zero/cluster/server.gen.go b/pkg/zero/cluster/server.gen.go index d12303d69..4dad40256 100644 --- a/pkg/zero/cluster/server.gen.go +++ b/pkg/zero/cluster/server.gen.go @@ -31,6 +31,9 @@ type ServerInterface interface { // (POST /exchangeToken) ExchangeClusterIdentityToken(w http.ResponseWriter, r *http.Request) + + // (POST /reportUsage) + ReportUsage(w http.ResponseWriter, r *http.Request) } // Unimplemented server implementation that returns http.StatusNotImplemented for each endpoint. @@ -62,6 +65,11 @@ func (_ Unimplemented) ExchangeClusterIdentityToken(w http.ResponseWriter, r *ht w.WriteHeader(http.StatusNotImplemented) } +// (POST /reportUsage) +func (_ Unimplemented) ReportUsage(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) +} + // ServerInterfaceWrapper converts contexts to parameters. type ServerInterfaceWrapper struct { Handler ServerInterface @@ -176,6 +184,23 @@ func (siw *ServerInterfaceWrapper) ExchangeClusterIdentityToken(w http.ResponseW handler.ServeHTTP(w, r.WithContext(ctx)) } +// ReportUsage operation middleware +func (siw *ServerInterfaceWrapper) ReportUsage(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + ctx = context.WithValue(ctx, BearerAuthScopes, []string{}) + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.ReportUsage(w, r) + })) + + for i := len(siw.HandlerMiddlewares) - 1; i >= 0; i-- { + handler = siw.HandlerMiddlewares[i](handler) + } + + handler.ServeHTTP(w, r.WithContext(ctx)) +} + type UnescapedCookieParamError struct { ParamName string Err error @@ -304,6 +329,9 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl r.Group(func(r chi.Router) { r.Post(options.BaseURL+"/exchangeToken", wrapper.ExchangeClusterIdentityToken) }) + r.Group(func(r chi.Router) { + r.Post(options.BaseURL+"/reportUsage", wrapper.ReportUsage) + }) return r } @@ -490,6 +518,14 @@ func (response ExchangeClusterIdentityToken500JSONResponse) VisitExchangeCluster return json.NewEncoder(w).Encode(response) } +type ReportUsageRequestObject struct { + Body *ReportUsageJSONRequestBody +} + +type ReportUsageResponseObject interface { + VisitReportUsageResponse(w http.ResponseWriter) error +} + // StrictServerInterface represents all server handlers. type StrictServerInterface interface { @@ -507,6 +543,9 @@ type StrictServerInterface interface { // (POST /exchangeToken) ExchangeClusterIdentityToken(ctx context.Context, request ExchangeClusterIdentityTokenRequestObject) (ExchangeClusterIdentityTokenResponseObject, error) + + // (POST /reportUsage) + ReportUsage(ctx context.Context, request ReportUsageRequestObject) (ReportUsageResponseObject, error) } type StrictHandlerFunc = strictnethttp.StrictHTTPHandlerFunc @@ -675,3 +714,34 @@ func (sh *strictHandler) ExchangeClusterIdentityToken(w http.ResponseWriter, r * sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response)) } } + +// ReportUsage operation middleware +func (sh *strictHandler) ReportUsage(w http.ResponseWriter, r *http.Request) { + var request ReportUsageRequestObject + + var body ReportUsageJSONRequestBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + sh.options.RequestErrorHandlerFunc(w, r, fmt.Errorf("can't decode JSON body: %w", err)) + return + } + request.Body = &body + + handler := func(ctx context.Context, w http.ResponseWriter, r *http.Request, request interface{}) (interface{}, error) { + return sh.ssi.ReportUsage(ctx, request.(ReportUsageRequestObject)) + } + for _, middleware := range sh.middlewares { + handler = middleware(handler, "ReportUsage") + } + + response, err := handler(r.Context(), w, r, request) + + if err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } else if validResponse, ok := response.(ReportUsageResponseObject); ok { + if err := validResponse.VisitReportUsageResponse(w); err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } + } else if response != nil { + sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response)) + } +}