pomerium/authorize/authorize.go
Kenneth Jenkins 1da95d334c refactor to share more authorize check logic
should restore authorize log entries for ssh auth
2025-02-27 13:04:44 -08:00

252 lines
8.1 KiB
Go

// Package authorize is a pomerium service that is responsible for determining
// if a given request should be authorized (AuthZ).
package authorize
import (
"context"
"fmt"
"slices"
"sync"
"time"
"github.com/rs/zerolog"
oteltrace "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/pomerium/datasource/pkg/directory"
"github.com/pomerium/pomerium/authorize/evaluator"
"github.com/pomerium/pomerium/authorize/internal/store"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/atomicutil"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/sessions"
"github.com/pomerium/pomerium/internal/telemetry/metrics"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/contextutil"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/policy/criteria"
"github.com/pomerium/pomerium/pkg/storage"
"github.com/pomerium/pomerium/pkg/telemetry/requestid"
)
// Authorize struct holds
type Authorize struct {
state *atomicutil.Value[*authorizeState]
store *store.Store
currentOptions *atomicutil.Value[*config.Options]
accessTracker *AccessTracker
globalCache storage.Cache
groupsCacheWarmer *cacheWarmer
tracerProvider oteltrace.TracerProvider
tracer oteltrace.Tracer
activeStreamsMu sync.Mutex
activeStreams []chan error
}
// New validates and creates a new Authorize service from a set of config options.
func New(ctx context.Context, cfg *config.Config) (*Authorize, error) {
tracerProvider := trace.NewTracerProvider(ctx, "Authorize")
tracer := tracerProvider.Tracer(trace.PomeriumCoreTracer)
a := &Authorize{
currentOptions: config.NewAtomicOptions(),
store: store.New(),
globalCache: storage.NewGlobalCache(time.Minute),
tracerProvider: tracerProvider,
tracer: tracer,
activeStreams: []chan error{},
}
a.accessTracker = NewAccessTracker(a, accessTrackerMaxSize, accessTrackerDebouncePeriod)
state, err := newAuthorizeStateFromConfig(ctx, tracerProvider, cfg, a.store, nil)
if err != nil {
return nil, err
}
a.state = atomicutil.NewValue(state)
a.groupsCacheWarmer = newCacheWarmer(state.dataBrokerClientConnection, a.globalCache, directory.GroupRecordType)
return a, nil
}
// GetDataBrokerServiceClient returns the current DataBrokerServiceClient.
func (a *Authorize) GetDataBrokerServiceClient() databroker.DataBrokerServiceClient {
return a.state.Load().dataBrokerClient
}
// Run runs the authorize service.
func (a *Authorize) Run(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
a.accessTracker.Run(ctx)
return nil
})
eg.Go(func() error {
a.groupsCacheWarmer.Run(ctx)
return nil
})
return eg.Wait()
}
func validateOptions(o *config.Options) error {
sharedKey, err := o.GetSharedKey()
if err != nil {
return fmt.Errorf("authorize: bad 'SHARED_SECRET': %w", err)
}
if _, err := cryptutil.NewAEADCipher(sharedKey); err != nil {
return fmt.Errorf("authorize: bad 'SHARED_SECRET': %w", err)
}
return nil
}
// newPolicyEvaluator returns an policy evaluator.
func newPolicyEvaluator(
ctx context.Context,
opts *config.Options, store *store.Store, previous *evaluator.Evaluator,
) (*evaluator.Evaluator, error) {
metrics.AddPolicyCountCallback("pomerium-authorize", func() int64 {
return int64(opts.NumPolicies())
})
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "authorize")
})
ctx, span := trace.Continue(ctx, "authorize.newPolicyEvaluator")
defer span.End()
clientCA, err := opts.DownstreamMTLS.GetCA()
if err != nil {
return nil, fmt.Errorf("authorize: invalid client CA: %w", err)
}
clientCRL, err := opts.DownstreamMTLS.GetCRL()
if err != nil {
return nil, fmt.Errorf("authorize: invalid client CRL: %w", err)
}
authenticateURL, err := opts.GetInternalAuthenticateURL()
if err != nil {
return nil, fmt.Errorf("authorize: invalid authenticate url: %w", err)
}
signingKey, err := opts.GetSigningKey()
if err != nil {
return nil, fmt.Errorf("authorize: invalid signing key: %w", err)
}
// It is important to add an invalid_client_certificate rule even when the
// mTLS enforcement behavior is set to reject connections at the listener
// level, because of the per-route TLSDownstreamClientCA setting.
addDefaultClientCertificateRule := opts.HasAnyDownstreamMTLSClientCA() &&
opts.DownstreamMTLS.GetEnforcement() != config.MTLSEnforcementPolicy
clientCertConstraints, err := evaluator.ClientCertConstraintsFromConfig(&opts.DownstreamMTLS)
if err != nil {
return nil, fmt.Errorf(
"authorize: internal error: couldn't build client cert constraints: %w", err)
}
allPolicies := slices.Collect(opts.GetAllPolicies())
return evaluator.New(ctx, store, previous,
evaluator.WithPolicies(allPolicies),
evaluator.WithClientCA(clientCA),
evaluator.WithAddDefaultClientCertificateRule(addDefaultClientCertificateRule),
evaluator.WithClientCRL(clientCRL),
evaluator.WithClientCertConstraints(clientCertConstraints),
evaluator.WithSigningKey(signingKey),
evaluator.WithAuthenticateURL(authenticateURL.String()),
evaluator.WithGoogleCloudServerlessAuthenticationServiceAccount(opts.GetGoogleCloudServerlessAuthenticationServiceAccount()),
evaluator.WithJWTClaimsHeaders(opts.JWTClaimsHeaders),
evaluator.WithJWTGroupsFilter(opts.JWTGroupsFilter),
)
}
// OnConfigChange updates internal structures based on config.Options
func (a *Authorize) OnConfigChange(ctx context.Context, cfg *config.Config) {
a.activeStreamsMu.Lock()
// demo code
if cfg.Options.Routes[0].AllowAnyAuthenticatedUser == false {
for _, s := range a.activeStreams {
s <- fmt.Errorf("no longer authorized")
}
clear(a.activeStreams)
}
a.activeStreamsMu.Unlock()
currentState := a.state.Load()
a.currentOptions.Store(cfg.Options)
if newState, err := newAuthorizeStateFromConfig(ctx, a.tracerProvider, cfg, a.store, currentState.evaluator); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("authorize: error updating state")
} else {
a.state.Store(newState)
if currentState.dataBrokerClientConnection != newState.dataBrokerClientConnection {
a.groupsCacheWarmer.UpdateConn(newState.dataBrokerClientConnection)
}
}
}
type evaluateResult struct {
// Overall allow/deny result.
Allowed bool
// Reasons for the overall result.
Reasons criteria.Reasons
// Reason detail traces. (Populated only if enabled by the policy.)
Traces []contextutil.PolicyEvaluationTrace
}
func (a *Authorize) evaluate(
ctx context.Context,
req *evaluator.Request,
sessionState *sessions.State,
) (*evaluator.Result, error) {
querier := storage.NewCachingQuerier(
storage.NewQuerier(a.state.Load().dataBrokerClient),
a.globalCache,
)
ctx = storage.WithQuerier(ctx, querier)
requestID := requestid.FromContext(ctx)
state := a.state.Load()
var s sessionOrServiceAccount
var u *user.User
var err error
if sessionState != nil {
s, err = a.getDataBrokerSessionOrServiceAccount(ctx, sessionState.ID, sessionState.DatabrokerRecordVersion)
if status.Code(err) == codes.Unavailable {
log.Ctx(ctx).Debug().Str("request-id", requestID).Err(err).Msg("temporary error checking authorization: data broker unavailable")
return nil, err
} else if err != nil {
log.Ctx(ctx).Info().Err(err).Str("request-id", requestID).Msg("missing or invalid session or service account")
sessionState = nil
}
}
if sessionState != nil && s != nil {
u, _ = a.getDataBrokerUser(ctx, s.GetUserId()) // ignore any missing user error
}
res, err := state.evaluator.Evaluate(ctx, req)
if err != nil {
log.Ctx(ctx).Error().Err(err).Str("request-id", requestID).Msg("error during OPA evaluation")
return nil, err
}
a.logAuthorizeCheck(ctx, req, res, s, u)
/*result := &evaluateResult{
Allowed: res.Allow.Value && !res.Deny.Value,
}
// if show error details is enabled, attach the policy evaluation traces
if req.Policy != nil && req.Policy.ShowErrorDetails {
result.Traces = res.Traces
}*/
return res, nil
}