Merge branch 'main' into kenjenkins/log-authorize-check-refactor

This commit is contained in:
Kenneth Jenkins 2025-04-22 13:45:20 -07:00 committed by GitHub
commit 0a904e25bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 988 additions and 874 deletions

View file

@ -138,7 +138,7 @@ func (a *Authenticate) VerifySession(next http.Handler) http.Handler {
if err != nil {
log.FromRequest(r).Info().
Err(err).
Str("idp_id", idpID).
Str("idp-id", idpID).
Msg("authenticate: session load error")
span.AddEvent("session load error",
oteltrace.WithAttributes(attribute.String("error", err.Error())))
@ -147,8 +147,8 @@ func (a *Authenticate) VerifySession(next http.Handler) http.Handler {
if sessionState.IdentityProviderID != idpID {
log.FromRequest(r).Info().
Str("idp_id", idpID).
Str("session_idp_id", sessionState.IdentityProviderID).
Str("idp-id", idpID).
Str("session-idp-id", sessionState.IdentityProviderID).
Str("id", sessionState.ID).
Msg("authenticate: session not associated with identity provider")
span.AddEvent("session not associated with identity provider")
@ -158,7 +158,7 @@ func (a *Authenticate) VerifySession(next http.Handler) http.Handler {
if err := state.flow.VerifySession(ctx, r, sessionState); err != nil {
log.FromRequest(r).Info().
Err(err).
Str("idp_id", idpID).
Str("idp-id", idpID).
Msg("authenticate: couldn't verify session")
span.AddEvent("couldn't verify session",
oteltrace.WithAttributes(attribute.String("error", err.Error())))

View file

@ -99,6 +99,9 @@ func (a *Authorize) handleResultDenied(
case invalidClientCertReason(reasons):
denyStatusCode = httputil.StatusInvalidClientCertificate
denyStatusText = httputil.DetailsText(httputil.StatusInvalidClientCertificate)
case request.Policy.IsMCP():
denyStatusCode = http.StatusUnauthorized
denyStatusText = httputil.DetailsText(http.StatusUnauthorized)
}
return a.deniedResponse(ctx, in, denyStatusCode, denyStatusText, nil)
@ -217,7 +220,7 @@ func (a *Authorize) requireLoginResponse(
options := a.currentConfig.Load().Options
state := a.state.Load()
if !a.shouldRedirect(in) {
if !a.shouldRedirect(in, request) {
return a.deniedResponse(ctx, in, http.StatusUnauthorized, "Unauthenticated", nil)
}
@ -269,7 +272,7 @@ func (a *Authorize) requireWebAuthnResponse(
return a.okResponse(result.Headers), nil
}
if !a.shouldRedirect(in) {
if !a.shouldRedirect(in, request) {
return a.deniedResponse(ctx, in, http.StatusUnauthorized, "Unauthenticated", nil)
}
@ -354,7 +357,11 @@ func (a *Authorize) userInfoEndpointURL(in *envoy_service_auth_v3.CheckRequest)
return urlutil.NewSignedURL(a.state.Load().sharedKey, debugEndpoint).Sign(), nil
}
func (a *Authorize) shouldRedirect(in *envoy_service_auth_v3.CheckRequest) bool {
func (a *Authorize) shouldRedirect(in *envoy_service_auth_v3.CheckRequest, request *evaluator.Request) bool {
if request.Policy.IsMCP() {
return false
}
requestHeaders := in.GetAttributes().GetRequest().GetHttp().GetHeaders()
if requestHeaders == nil {
return true

View file

@ -113,6 +113,18 @@ func TestAuthorize_handleResult(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 495, int(res.GetDeniedResponse().GetStatus().GetCode()))
})
t.Run("mcp-route-unauthenticated", func(t *testing.T) {
res, err := a.handleResult(context.Background(),
&envoy_service_auth_v3.CheckRequest{},
&evaluator.Request{
Policy: &config.Policy{MCP: &config.MCP{}},
},
&evaluator.Result{
Allow: evaluator.NewRuleResult(false, criteria.ReasonUserUnauthenticated),
})
assert.NoError(t, err)
assert.Equal(t, 401, int(res.GetDeniedResponse().GetStatus().GetCode()))
})
}
func TestAuthorize_okResponse(t *testing.T) {

View file

@ -163,7 +163,7 @@ func NewFileOrEnvironmentSource(
func (src *FileOrEnvironmentSource) check(ctx context.Context) {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("config_change_id", uuid.New().String())
return c.Str("config-change-id", uuid.New().String())
})
src.mu.Lock()
cfg := src.config

View file

@ -665,3 +665,17 @@ func (f JWTIssuerFormat) Valid() bool {
_, ok := knownJWTIssuerFormats[f]
return ok
}
func MCPFromPB(src *configpb.MCP) *MCP {
if src == nil {
return nil
}
return &MCP{}
}
func MCPToPB(src *MCP) *configpb.MCP {
if src == nil {
return nil
}
return &configpb.MCP{}
}

View file

@ -411,7 +411,7 @@ func checkConfigKeysErrors(configFile string, o *Options, unused []string) error
default:
evt = log.Ctx(ctx).Error()
}
evt.Str("config_file", configFile).Str("key", check.Key)
evt.Str("config-file", configFile).Str("key", check.Key)
if check.DocsURL != "" {
evt = evt.Str("help", check.DocsURL)
}
@ -421,7 +421,7 @@ func checkConfigKeysErrors(configFile string, o *Options, unused []string) error
// check for unknown runtime flags
for flag := range o.RuntimeFlags {
if _, ok := defaultRuntimeFlags[flag]; !ok {
log.Ctx(ctx).Error().Str("config_file", configFile).Str("flag", string(flag)).Msg("unknown runtime flag")
log.Ctx(ctx).Error().Str("config-file", configFile).Str("flag", string(flag)).Msg("unknown runtime flag")
}
}

View file

@ -202,8 +202,14 @@ type Policy struct {
Policy *PPLPolicy `mapstructure:"policy" yaml:"policy,omitempty" json:"policy,omitempty"`
DependsOn []string `mapstructure:"depends_on" yaml:"depends_on,omitempty" json:"depends_on,omitempty"`
// MCP is an experimental support for Model Context Protocol upstreams
MCP *MCP `mapstructure:"mcp" yaml:"mcp,omitempty" json:"mcp,omitempty"`
}
// MCP is an experimental support for Model Context Protocol upstreams configuration
type MCP struct{}
// RewriteHeader is a policy configuration option to rewrite an HTTP header.
type RewriteHeader struct {
Header string `mapstructure:"header" yaml:"header" json:"header"`
@ -316,6 +322,7 @@ func NewPolicyFromProto(pb *configpb.Route) (*Policy, error) {
KubernetesServiceAccountToken: pb.GetKubernetesServiceAccountToken(),
KubernetesServiceAccountTokenFile: pb.GetKubernetesServiceAccountTokenFile(),
LogoURL: pb.GetLogoUrl(),
MCP: MCPFromPB(pb.GetMcp()),
Name: pb.GetName(),
PassIdentityHeaders: pb.PassIdentityHeaders,
Path: pb.GetPath(),
@ -470,6 +477,7 @@ func (p *Policy) ToProto() (*configpb.Route, error) {
KubernetesServiceAccountToken: p.KubernetesServiceAccountToken,
KubernetesServiceAccountTokenFile: p.KubernetesServiceAccountTokenFile,
LogoUrl: p.LogoURL,
Mcp: MCPToPB(p.MCP),
Name: p.Name,
PassIdentityHeaders: p.PassIdentityHeaders,
Path: p.Path,
@ -824,6 +832,11 @@ func (p *Policy) IsForKubernetes() bool {
return p.KubernetesServiceAccountTokenFile != "" || p.KubernetesServiceAccountToken != ""
}
// IsMCP returns true if the route is for the Model Context Protocol upstream server.
func (p *Policy) IsMCP() bool {
return p != nil && p.MCP != nil
}
// IsTCP returns true if the route is for TCP.
func (p *Policy) IsTCP() bool {
return strings.HasPrefix(p.From, "tcp")

View file

@ -106,7 +106,7 @@ func New(ctx context.Context, cfg *config.Config, eventsMgr *events.Manager, opt
}
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("service", "databroker").Str("config_source", "bootstrap")
return c.Str("service", "databroker").Str("config-source", "bootstrap")
})
localGRPCConnection, err := grpc.DialContext(
ctx,

View file

@ -219,10 +219,10 @@ func (mgr *Manager) renewConfigCerts(ctx context.Context) error {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
if len(renew) > 0 {
c = c.Strs("renew_domains", renew)
c = c.Strs("renew-domains", renew)
}
if len(ocsp) > 0 {
c = c.Strs("ocsp_refresh", ocsp)
c = c.Strs("ocsp-refresh", ocsp)
}
return c
})

View file

@ -98,7 +98,7 @@ func NewServer(
}
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("server_name", cfg.Options.Services)
return c.Str("server-name", cfg.Options.Services)
})
var err error

View file

@ -280,7 +280,7 @@ func (src *ConfigSource) runUpdater(ctx context.Context, cfg *config.Config) {
databroker.WithFastForward())
go func() {
log.Ctx(ctx).Debug().
Str("outbound_port", cfg.OutboundPort).
Str("outbound-port", cfg.OutboundPort).
Msg("config: starting databroker config source syncer")
_ = syncer.Run(ctx)
}()

View file

@ -150,26 +150,26 @@ func ExampleContext() {
captureOutput(func() {
bg := context.Background()
ctx1 := log.WithContext(bg, func(c zerolog.Context) zerolog.Context {
return c.Str("param_one", "one")
return c.Str("param-one", "one")
})
ctx2 := log.WithContext(ctx1, func(c zerolog.Context) zerolog.Context {
return c.Str("param_two", "two")
return c.Str("param-two", "two")
})
log.Ctx(bg).Error().Str("non_context_param", "value").Msg("background")
log.Ctx(ctx1).Error().Str("non_context_param", "value").Msg("first")
log.Ctx(ctx2).Error().Str("non_context_param", "value").Msg("second")
log.Ctx(bg).Error().Str("non-context-param", "value").Msg("background")
log.Ctx(ctx1).Error().Str("non-context-param", "value").Msg("first")
log.Ctx(ctx2).Error().Str("non-context-param", "value").Msg("second")
for i := 0; i < 10; i++ {
ctx1 = log.WithContext(ctx1, func(c zerolog.Context) zerolog.Context {
return c.Int("counter", i)
})
}
log.Ctx(ctx1).Info().Str("non_ctx_param", "value").Msg("after counter")
log.Ctx(ctx1).Info().Str("non-ctx-param", "value").Msg("after counter")
})
// Output:
// {"level":"error","non_context_param":"value","time":"2008-01-08T17:05:05Z","message":"background"}
// {"level":"error","param_one":"one","non_context_param":"value","time":"2008-01-08T17:05:05Z","message":"first"}
// {"level":"error","param_one":"one","param_two":"two","non_context_param":"value","time":"2008-01-08T17:05:05Z","message":"second"}
// {"level":"info","param_one":"one","counter":0,"counter":1,"counter":2,"counter":3,"counter":4,"counter":5,"counter":6,"counter":7,"counter":8,"counter":9,"non_ctx_param":"value","time":"2008-01-08T17:05:05Z","message":"after counter"}
// {"level":"error","non-context-param":"value","time":"2008-01-08T17:05:05Z","message":"background"}
// {"level":"error","param-one":"one","non-context-param":"value","time":"2008-01-08T17:05:05Z","message":"first"}
// {"level":"error","param-one":"one","param-two":"two","non-context-param":"value","time":"2008-01-08T17:05:05Z","message":"second"}
// {"level":"info","param-one":"one","counter":0,"counter":1,"counter":2,"counter":3,"counter":4,"counter":5,"counter":6,"counter":7,"counter":8,"counter":9,"non-ctx-param":"value","time":"2008-01-08T17:05:05Z","message":"after counter"}
}

View file

@ -315,9 +315,9 @@ func RecordIdentityManagerSessionRefresh(ctx context.Context, err error) {
func SetDBConfigInfo(ctx context.Context, service, configID string, version uint64, errCount int64) {
log.Ctx(ctx).Info().
Str("service", service).
Str("config_id", configID).
Str("config-id", configID).
Uint64("version", version).
Int64("err_count", errCount).
Int64("err-count", errCount).
Msg("set db config info")
if err := stats.RecordWithTags(

View file

@ -27,8 +27,8 @@ func WaitStartupComplete(env testenv.Environment, timeout ...time.Duration) time
ctx, ca := context.WithTimeout(env.Context(), timeout[0])
defer ca()
recorder.WaitForMatch(map[string]any{
"syncer_id": "databroker",
"syncer_type": "type.googleapis.com/pomerium.config.Config",
"syncer-id": "databroker",
"syncer-type": "type.googleapis.com/pomerium.config.Config",
"message": "listening for updates",
}, timeout...)
cc, err := grpc.Dial(env.DatabrokerURL().Value(),

View file

@ -46,9 +46,9 @@ func (api *API) DownloadClusterResourceBundle(
defer resp.Body.Close()
log.Ctx(ctx).Trace().
Str("url_path", req.Request.URL.Path).
Interface("request_headers", req.Header).
Interface("response_headers", resp.Header).
Str("url-path", req.Request.URL.Path).
Interface("request-headers", req.Header).
Interface("response-headers", resp.Header).
Str("status", resp.Status).
Msg("bundle download request")
@ -105,9 +105,9 @@ func (api *API) HeadClusterResourceBundle(
defer resp.Body.Close()
log.Ctx(ctx).Trace().
Str("url_path", req.Request.URL.Path).
Interface("request_headers", req.Header).
Interface("response_headers", resp.Header).
Str("url-path", req.Request.URL.Path).
Interface("request-headers", req.Header).
Interface("response-headers", resp.Header).
Str("status", resp.Status).
Msg("bundle metadata request")

View file

@ -178,9 +178,9 @@ func (c *service) syncUpdatedBundle(ctx context.Context, key string, cached *Bun
log.Ctx(ctx).Debug().
Str("bundle", key).
Strs("record_types", bundleRecordTypes).
Strs("record-types", bundleRecordTypes).
Str("etag", current.ETag).
Str("last_modified", current.LastModified).
Str("last-modified", current.LastModified).
Interface("metadata", result.Metadata).
Msg("bundle synced")
@ -213,9 +213,9 @@ func (c *service) getUpdatedMetadata(ctx context.Context, key string, cached Bun
log.Ctx(ctx).Debug().
Str("bundle", key).
Strs("record_types", current.RecordTypes).
Strs("record-types", current.RecordTypes).
Str("etag", current.ETag).
Str("last_modified", current.LastModified).
Str("last-modified", current.LastModified).
Interface("metadata", result.Metadata).
Msg("metadata updated")

View file

@ -104,7 +104,7 @@ func (p *Pomerium) Start(ctx context.Context, tracerProvider oteltrace.TracerPro
_, _ = maxprocs.Set(maxprocs.Logger(func(s string, i ...any) { log.Ctx(ctx).Debug().Msgf(s, i...) }))
evt := log.Ctx(ctx).Info().
Str("envoy_version", files.FullVersion()).
Str("envoy-version", files.FullVersion()).
Str("version", version.FullVersion())
if buildTime := version.BuildTime(); buildTime != "" {
evt = evt.Str("built", buildTime)

File diff suppressed because it is too large Load diff

View file

@ -52,7 +52,7 @@ enum BearerTokenFormat {
BEARER_TOKEN_FORMAT_IDP_IDENTITY_TOKEN = 3;
}
// Next ID: 72.
// Next ID: 73.
message Route {
message StringList { repeated string values = 1; }
@ -143,8 +143,12 @@ message Route {
optional string idp_client_secret = 56;
optional StringList idp_access_token_allowed_audiences = 69;
bool show_error_details = 59;
optional MCP mcp = 72;
}
message MCP {}
message PPLPolicy { bytes raw = 1; }
message Policy {

View file

@ -95,15 +95,15 @@ func (locker *Leaser) runOnce(ctx context.Context, resetBackoff func()) error {
if status.Code(err) == codes.AlreadyExists {
return nil
} else if err != nil {
log.Ctx(ctx).Error().Err(err).Str("lease_name", locker.leaseName).Msg("leaser: error acquiring lease")
log.Ctx(ctx).Error().Err(err).Str("lease-name", locker.leaseName).Msg("leaser: error acquiring lease")
return retryableError{err}
}
resetBackoff()
leaseID := res.Id
log.Ctx(ctx).Debug().
Str("lease_name", locker.leaseName).
Str("lease_id", leaseID).
Str("lease-name", locker.leaseName).
Str("lease-id", leaseID).
Msg("leaser: lease acquired")
return locker.withLease(ctx, leaseID)
@ -143,13 +143,13 @@ func (locker *Leaser) withLease(ctx context.Context, leaseID string) error {
})
if status.Code(err) == codes.AlreadyExists {
log.Ctx(ctx).Debug().
Str("lease_name", locker.leaseName).
Str("lease_id", leaseID).
Str("lease-name", locker.leaseName).
Str("lease-id", leaseID).
Msg("leaser: lease lost")
// failed to renew lease
return nil
} else if err != nil {
log.Ctx(ctx).Error().Err(err).Str("lease_name", locker.leaseName).Msg("leaser: error renewing lease")
log.Ctx(ctx).Error().Err(err).Str("lease-name", locker.leaseName).Msg("leaser: error renewing lease")
return retryableError{err}
}
}

View file

@ -130,8 +130,8 @@ func (syncer *Syncer) Run(ctx context.Context) error {
if err != nil {
log.Ctx(ctx).Error().
Str("syncer_id", syncer.id).
Str("syncer_type", syncer.cfg.typeURL).
Str("syncer-id", syncer.id).
Str("syncer-type", syncer.cfg.typeURL).
Err(err).
Msg("sync")
select {
@ -145,8 +145,8 @@ func (syncer *Syncer) Run(ctx context.Context) error {
func (syncer *Syncer) init(ctx context.Context) error {
log.Ctx(ctx).Debug().
Str("syncer_id", syncer.id).
Str("syncer_type", syncer.cfg.typeURL).
Str("syncer-id", syncer.id).
Str("syncer-type", syncer.cfg.typeURL).
Msg("initial sync")
records, recordVersion, serverVersion, err := InitialSync(ctx, syncer.handler.GetDataBrokerServiceClient(), &SyncLatestRequest{
Type: syncer.cfg.typeURL,
@ -180,16 +180,16 @@ func (syncer *Syncer) sync(ctx context.Context) error {
}
log.Ctx(ctx).Debug().
Str("syncer_id", syncer.id).
Str("syncer_type", syncer.cfg.typeURL).
Str("syncer-id", syncer.id).
Str("syncer-type", syncer.cfg.typeURL).
Msg("listening for updates")
for {
res, err := stream.Recv()
if status.Code(err) == codes.Aborted {
log.Ctx(ctx).Error().Err(err).
Str("syncer_id", syncer.id).
Str("syncer_type", syncer.cfg.typeURL).
Str("syncer-id", syncer.id).
Str("syncer-type", syncer.cfg.typeURL).
Msg("aborted sync due to mismatched server version")
// server version changed, so re-init
syncer.serverVersion = 0
@ -203,8 +203,8 @@ func (syncer *Syncer) sync(ctx context.Context) error {
rec := res.GetRecord()
log.Ctx(logCtxRec(ctx, rec)).Debug().
Str("syncer_id", syncer.id).
Str("syncer_type", syncer.cfg.typeURL).
Str("syncer-id", syncer.id).
Str("syncer-type", syncer.cfg.typeURL).
Msg("syncer got record")
syncer.recordVersion = res.GetRecord().GetVersion()
@ -219,8 +219,8 @@ func (syncer *Syncer) sync(ctx context.Context) error {
// logCtxRecRec adds log params to context related to particular record
func logCtxRec(ctx context.Context, rec *Record) context.Context {
return log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("record_type", rec.GetType()).
Str("record_id", rec.GetId()).
Uint64("record_version", rec.GetVersion())
return c.Str("record-type", rec.GetType()).
Str("record-id", rec.GetId()).
Uint64("record-version", rec.GetVersion())
})
}

View file

@ -118,7 +118,7 @@ func (mgr *Manager) onDeleteAllUsers(ctx context.Context) {
}
func (mgr *Manager) onDeleteSession(ctx context.Context, sessionID string) {
log.Ctx(ctx).Debug().Str("session_id", sessionID).Msg("session deleted")
log.Ctx(ctx).Debug().Str("session-id", sessionID).Msg("session deleted")
mgr.mu.Lock()
mgr.dataStore.deleteSession(sessionID)
@ -130,7 +130,7 @@ func (mgr *Manager) onDeleteSession(ctx context.Context, sessionID string) {
}
func (mgr *Manager) onDeleteUser(ctx context.Context, userID string) {
log.Ctx(ctx).Debug().Str("user_id", userID).Msg("user deleted")
log.Ctx(ctx).Debug().Str("user-id", userID).Msg("user deleted")
mgr.mu.Lock()
mgr.dataStore.deleteUser(userID)
@ -142,7 +142,7 @@ func (mgr *Manager) onDeleteUser(ctx context.Context, userID string) {
}
func (mgr *Manager) onUpdateSession(ctx context.Context, s *session.Session) {
log.Ctx(ctx).Debug().Str("session_id", s.GetId()).Msg("session updated")
log.Ctx(ctx).Debug().Str("session-id", s.GetId()).Msg("session updated")
mgr.mu.Lock()
mgr.dataStore.putSession(s)
@ -163,7 +163,7 @@ func (mgr *Manager) onUpdateSession(ctx context.Context, s *session.Session) {
}
func (mgr *Manager) onUpdateUser(ctx context.Context, u *user.User) {
log.Ctx(ctx).Debug().Str("user_id", u.GetId()).Msg("user updated")
log.Ctx(ctx).Debug().Str("user-id", u.GetId()).Msg("user updated")
mgr.mu.Lock()
mgr.dataStore.putUser(u)
@ -182,7 +182,7 @@ func (mgr *Manager) onUpdateUser(ctx context.Context, u *user.User) {
func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
log.Ctx(ctx).Debug().
Str("session_id", sessionID).
Str("session-id", sessionID).
Msg("refreshing session")
mgr.mu.Lock()
@ -191,8 +191,8 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
if s == nil {
log.Ctx(ctx).Info().
Str("user_id", u.GetId()).
Str("session_id", sessionID).
Str("user-id", u.GetId()).
Str("session-id", sessionID).
Msg("no session found for refresh")
return
}
@ -200,8 +200,8 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
authenticator := mgr.cfg.Load().authenticator
if authenticator == nil {
log.Ctx(ctx).Info().
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("no authenticator defined, deleting session")
mgr.deleteSession(ctx, sessionID)
return
@ -210,8 +210,8 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
expiry := s.GetExpiresAt().AsTime()
if !expiry.After(mgr.cfg.Load().now()) {
log.Ctx(ctx).Info().
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("deleting expired session")
mgr.deleteSession(ctx, sessionID)
return
@ -224,8 +224,8 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
if s.GetOauthToken() == nil {
log.Ctx(ctx).Info().
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("no session oauth2 token found for refresh")
return
}
@ -235,14 +235,14 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
mgr.recordLastError(metrics_ids.IdentityManagerLastSessionRefreshError, err)
if isTemporaryError(err) {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to refresh oauth2 token")
return
} else if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to refresh oauth2 token, deleting session")
mgr.deleteSession(ctx, sessionID)
return
@ -254,14 +254,14 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err)
if isTemporaryError(err) {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to update user info")
return
} else if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to update user info, deleting session")
mgr.deleteSession(ctx, sessionID)
return
@ -274,7 +274,7 @@ func (mgr *Manager) refreshSession(ctx context.Context, sessionID string) {
}
func (mgr *Manager) updateUserInfo(ctx context.Context, userID string) {
log.Ctx(ctx).Info().Str("user_id", userID).Msg("updating user info")
log.Ctx(ctx).Info().Str("user-id", userID).Msg("updating user info")
authenticator := mgr.cfg.Load().authenticator
if authenticator == nil {
@ -287,7 +287,7 @@ func (mgr *Manager) updateUserInfo(ctx context.Context, userID string) {
if u == nil {
log.Ctx(ctx).Error().
Str("user_id", userID).
Str("user-id", userID).
Msg("no user found for update")
return
}
@ -295,8 +295,8 @@ func (mgr *Manager) updateUserInfo(ctx context.Context, userID string) {
for _, s := range ss {
if s.GetOauthToken() == nil {
log.Ctx(ctx).Error().
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("no session oauth2 token found for updating user info")
continue
}
@ -306,14 +306,14 @@ func (mgr *Manager) updateUserInfo(ctx context.Context, userID string) {
mgr.recordLastError(metrics_ids.IdentityManagerLastUserRefreshError, err)
if isTemporaryError(err) {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to update user info")
continue
} else if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to update user info, deleting session")
mgr.deleteSession(ctx, s.GetId())
continue
@ -326,7 +326,7 @@ func (mgr *Manager) updateUserInfo(ctx context.Context, userID string) {
// deleteSession deletes a session from the databroke, the local data store, and the schedulers
func (mgr *Manager) deleteSession(ctx context.Context, sessionID string) {
log.Ctx(ctx).Debug().
Str("session_id", sessionID).
Str("session-id", sessionID).
Msg("deleting session")
mgr.mu.Lock()
@ -345,7 +345,7 @@ func (mgr *Manager) deleteSession(ctx context.Context, sessionID string) {
return
} else if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("session_id", sessionID).
Str("session-id", sessionID).
Msg("failed to delete session")
return
}
@ -358,7 +358,7 @@ func (mgr *Manager) deleteSession(ctx context.Context, sessionID string) {
})
if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("session_id", sessionID).
Str("session-id", sessionID).
Msg("failed to delete session")
return
}
@ -366,15 +366,15 @@ func (mgr *Manager) deleteSession(ctx context.Context, sessionID string) {
func (mgr *Manager) updateSession(ctx context.Context, s *session.Session) {
log.Ctx(ctx).Debug().
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("updating session")
fm, err := fieldmaskpb.New(s, "oauth_token", "id_token", "claims")
if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to create fieldmask for session")
return
}
@ -382,8 +382,8 @@ func (mgr *Manager) updateSession(ctx context.Context, s *session.Session) {
_, err = session.Patch(ctx, mgr.cfg.Load().dataBrokerClient, s, fm)
if err != nil {
log.Ctx(ctx).Error().Err(err).
Str("user_id", s.GetUserId()).
Str("session_id", s.GetId()).
Str("user-id", s.GetUserId()).
Str("session-id", s.GetId()).
Msg("failed to patch updated session record")
return
}
@ -401,13 +401,13 @@ func (mgr *Manager) updateSession(ctx context.Context, s *session.Session) {
// scheduler here we can avoid refreshing user info more often than necessary.)
func (mgr *Manager) updateUser(ctx context.Context, u *user.User) {
log.Ctx(ctx).Debug().
Str("user_id", u.GetId()).
Str("user-id", u.GetId()).
Msg("updating user")
_, err := databroker.Put(ctx, mgr.cfg.Load().dataBrokerClient, u)
if err != nil {
log.Ctx(ctx).Error().
Str("user_id", u.GetId()).
Str("user-id", u.GetId()).
Err(err).
Msg("failed to store updated user record")
return

View file

@ -222,9 +222,9 @@ func (backend *Backend) Put(ctx context.Context, records []*databroker.Record) (
}
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("db_op", "put").
Str("db_id", record.Id).
Str("db_type", record.Type)
return c.Str("db-op", "put").
Str("db-id", record.Id).
Str("db-type", record.Type)
})
backend.update(record)