connect: add health check

This commit is contained in:
Denis Mishin 2024-04-25 20:55:46 -04:00
parent 498c3aa108
commit 1546241053
2 changed files with 11 additions and 6 deletions

View file

@ -3,15 +3,16 @@ package mux
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync/atomic" "sync/atomic"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/pomerium/pomerium/internal/zero/apierror" "github.com/pomerium/pomerium/internal/zero/apierror"
"github.com/pomerium/pomerium/pkg/fanout" "github.com/pomerium/pomerium/pkg/fanout"
"github.com/pomerium/pomerium/pkg/health"
"github.com/pomerium/pomerium/pkg/zero/connect" "github.com/pomerium/pomerium/pkg/zero/connect"
) )
@ -56,6 +57,7 @@ func (svc *Mux) run(ctx context.Context) error {
return backoff.Retry(func() error { return backoff.Retry(func() error {
err := svc.subscribeAndDispatch(ctx, b.Reset) err := svc.subscribeAndDispatch(ctx, b.Reset)
health.ReportError(health.ZeroConnect, err)
if apierror.IsTerminalError(err) { if apierror.IsTerminalError(err) {
return backoff.Permanent(err) return backoff.Permanent(err)
} }
@ -71,13 +73,14 @@ func (svc *Mux) subscribeAndDispatch(ctx context.Context, onConnected func()) (e
if err != nil { if err != nil {
return fmt.Errorf("subscribe: %w", err) return fmt.Errorf("subscribe: %w", err)
} }
health.ReportOK(health.ZeroConnect)
onConnected() onConnected()
if err = svc.onConnected(ctx); err != nil { if err = svc.onConnected(ctx); err != nil {
return err return fmt.Errorf("onConnected: %w", err)
} }
defer func() { defer func() {
err = multierror.Append(err, svc.onDisconnected(ctx)).ErrorOrNil() err = errors.Join(err, svc.onDisconnected(ctx))
}() }()
log.Ctx(ctx).Debug().Msg("subscribed to connect service") log.Ctx(ctx).Debug().Msg("subscribed to connect service")
@ -89,7 +92,7 @@ func (svc *Mux) subscribeAndDispatch(ctx context.Context, onConnected func()) (e
} }
err = svc.onMessage(ctx, msg) err = svc.onMessage(ctx, msg)
if err != nil { if err != nil {
return err return fmt.Errorf("onMessage: %w", err)
} }
} }
} }

View file

@ -5,6 +5,8 @@ import "fmt"
type Check string type Check string
const ( const (
// StorageBackend checks whether the storage backend is healthy
StorageBackend = Check("storage.backend")
// XDSCluster checks whether the XDS Cluster resources were applied // XDSCluster checks whether the XDS Cluster resources were applied
XDSCluster = Check("xds.cluster") XDSCluster = Check("xds.cluster")
// XDSListener checks whether the XDS Listener resources were applied // XDSListener checks whether the XDS Listener resources were applied
@ -15,8 +17,8 @@ const (
XDSOther = Check("xds.other") XDSOther = Check("xds.other")
// ZeroBootstrapConfigSave checks whether the Zero bootstrap config was saved // ZeroBootstrapConfigSave checks whether the Zero bootstrap config was saved
ZeroBootstrapConfigSave = Check("zero.bootstrap-config.save") ZeroBootstrapConfigSave = Check("zero.bootstrap-config.save")
// StorageBackend checks whether the storage backend is healthy // ZeroConnect checks whether the Zero Connect service is connected
StorageBackend = Check("storage.backend") ZeroConnect = Check("zero.connect")
) )
// ZeroResourceBundle checks whether the Zero resource bundle was applied // ZeroResourceBundle checks whether the Zero resource bundle was applied