mirror of
https://github.com/pomerium/pomerium.git
synced 2025-04-29 18:36:30 +02:00
* upgrade to go v1.24 * add a macOS-specific //nolint comment too --------- Co-authored-by: Kenneth Jenkins <51246568+kenjenkins@users.noreply.github.com>
157 lines
4 KiB
Go
157 lines
4 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"net"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
|
oteltrace "go.opentelemetry.io/otel/trace"
|
|
googlegrpc "google.golang.org/grpc"
|
|
|
|
"github.com/pomerium/pomerium/config"
|
|
"github.com/pomerium/pomerium/internal/log"
|
|
"github.com/pomerium/pomerium/pkg/grpc"
|
|
pb "github.com/pomerium/pomerium/pkg/grpc/registry"
|
|
)
|
|
|
|
// Reporter periodically submits a list of services available on this instance to the service registry
|
|
type Reporter struct {
|
|
cancel func()
|
|
outboundGRPCConnection *grpc.CachedOutboundGRPClientConn
|
|
tracerProvider oteltrace.TracerProvider
|
|
}
|
|
|
|
// NewReporter creates a new Reporter.
|
|
func NewReporter(tracerProvider oteltrace.TracerProvider) *Reporter {
|
|
return &Reporter{
|
|
outboundGRPCConnection: new(grpc.CachedOutboundGRPClientConn),
|
|
tracerProvider: tracerProvider,
|
|
}
|
|
}
|
|
|
|
// OnConfigChange applies configuration changes to the reporter
|
|
func (r *Reporter) OnConfigChange(ctx context.Context, cfg *config.Config) {
|
|
if r.cancel != nil {
|
|
r.cancel()
|
|
}
|
|
|
|
services, err := getReportedServices(cfg)
|
|
if err != nil {
|
|
log.Ctx(ctx).Error().Err(err).Msg("metrics announce to service registry is disabled")
|
|
}
|
|
|
|
sharedKey, err := cfg.Options.GetSharedKey()
|
|
if err != nil {
|
|
log.Ctx(ctx).Error().Err(err).Msg("decoding shared key")
|
|
return
|
|
}
|
|
|
|
registryConn, err := r.outboundGRPCConnection.Get(ctx, &grpc.OutboundOptions{
|
|
OutboundPort: cfg.OutboundPort,
|
|
InstallationID: cfg.Options.InstallationID,
|
|
ServiceName: cfg.Options.Services,
|
|
SignedJWTKey: sharedKey,
|
|
}, googlegrpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(r.tracerProvider))))
|
|
if err != nil {
|
|
log.Ctx(ctx).Error().Err(err).Msg("connecting to registry")
|
|
return
|
|
}
|
|
|
|
if len(services) > 0 {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
go runReporter(ctx, pb.NewRegistryClient(registryConn), services)
|
|
r.cancel = cancel
|
|
}
|
|
}
|
|
|
|
func getReportedServices(cfg *config.Config) ([]*pb.Service, error) {
|
|
if cfg.Options.MetricsAddr == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
mu, err := metricsURL(*cfg.Options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return []*pb.Service{
|
|
{Kind: pb.ServiceKind_PROMETHEUS_METRICS, Endpoint: mu.String()},
|
|
}, nil
|
|
}
|
|
|
|
func metricsURL(o config.Options) (*url.URL, error) {
|
|
host, port, err := net.SplitHostPort(o.MetricsAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid metrics address %q: %w", o.MetricsAddr, err)
|
|
}
|
|
if port == "" {
|
|
return nil, fmt.Errorf("invalid metrics value %q: port is required", o.MetricsAddr)
|
|
}
|
|
if host == "" {
|
|
if host, err = getHostOrIP(); err != nil {
|
|
return nil, fmt.Errorf("could not guess hostname: %w", err)
|
|
}
|
|
}
|
|
|
|
u := url.URL{
|
|
Scheme: "http",
|
|
Host: net.JoinHostPort(host, port),
|
|
Path: defaultMetricsPath,
|
|
}
|
|
|
|
if o.MetricsBasicAuth != "" {
|
|
txt, err := base64.StdEncoding.DecodeString(o.MetricsBasicAuth)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("metrics basic auth: %w", err)
|
|
}
|
|
parts := strings.SplitN(string(txt), ":", 2)
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("expected username:password for basic auth")
|
|
}
|
|
u.User = url.UserPassword(parts[0], parts[1])
|
|
}
|
|
|
|
if o.MetricsCertificate != "" || o.MetricsCertificateFile != "" {
|
|
u.Scheme = "https"
|
|
}
|
|
|
|
if o.MetricsAddr == "" {
|
|
return nil, fmt.Errorf("no metrics address provided")
|
|
}
|
|
|
|
return &u, nil
|
|
}
|
|
|
|
func runReporter(
|
|
ctx context.Context,
|
|
client pb.RegistryClient,
|
|
services []*pb.Service,
|
|
) {
|
|
backoff := backoff.NewExponentialBackOff()
|
|
backoff.MaxElapsedTime = 0
|
|
|
|
req := &pb.RegisterRequest{Services: services}
|
|
after := minTTL
|
|
for {
|
|
select {
|
|
case <-time.After(after):
|
|
resp, err := client.Report(ctx, req)
|
|
if err != nil {
|
|
log.Ctx(ctx).Error().Err(err).Msg("grpc.service_registry.Report")
|
|
after = backoff.NextBackOff()
|
|
continue
|
|
}
|
|
after = resp.CallBackAfter.AsDuration()
|
|
backoff.Reset()
|
|
case <-ctx.Done():
|
|
log.Ctx(ctx).Info().Msg("service registry reporter stopping")
|
|
return
|
|
}
|
|
}
|
|
}
|