diff --git a/authorize/grpc.go b/authorize/grpc.go index a15378b45..348c8a6d7 100644 --- a/authorize/grpc.go +++ b/authorize/grpc.go @@ -86,6 +86,8 @@ func (a *Authorize) Check(ctx context.Context, in *envoy_service_auth_v2.CheckRe } func (a *Authorize) forceSync(ctx context.Context, sessionID string) { + ctx, span := trace.StartSpan(ctx, "authorize.forceSync") + defer span.End() s := a.forceSyncSession(ctx, sessionID) if s == nil { return @@ -94,6 +96,9 @@ func (a *Authorize) forceSync(ctx context.Context, sessionID string) { } func (a *Authorize) forceSyncSession(ctx context.Context, sessionID string) *session.Session { + ctx, span := trace.StartSpan(ctx, "authorize.forceSyncSession") + defer span.End() + a.dataBrokerDataLock.RLock() s, ok := a.dataBrokerData.Get(sessionTypeURL, sessionID).(*session.Session) a.dataBrokerDataLock.RUnlock() @@ -121,6 +126,9 @@ func (a *Authorize) forceSyncSession(ctx context.Context, sessionID string) *ses } func (a *Authorize) forceSyncUser(ctx context.Context, userID string) *user.User { + ctx, span := trace.StartSpan(ctx, "authorize.forceSyncUser") + defer span.End() + a.dataBrokerDataLock.RLock() s, ok := a.dataBrokerData.Get(userTypeURL, userID).(*user.User) a.dataBrokerDataLock.RUnlock() diff --git a/authorize/run.go b/authorize/run.go index 993b23d03..8985dc189 100644 --- a/authorize/run.go +++ b/authorize/run.go @@ -5,6 +5,8 @@ import ( "io" "time" + "github.com/pomerium/pomerium/internal/telemetry/trace" + backoff "github.com/cenkalti/backoff/v4" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" @@ -37,6 +39,8 @@ func (a *Authorize) Run(ctx context.Context) error { func (a *Authorize) runTypesSyncer(ctx context.Context, updateTypes chan<- []string) error { log.Info().Msg("starting type sync") return tryForever(ctx, func(backoff interface{ Reset() }) error { + ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.Sync") + defer span.End() stream, err := a.dataBrokerClient.SyncTypes(ctx, new(emptypb.Empty)) if err != nil { return err @@ -89,6 +93,7 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string, updat var serverVersion, recordVersion string log.Info().Str("type_url", typeURL).Msg("starting data initial load") + ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.GetAll") backoff := backoff.NewExponentialBackOff() for { res, err := a.dataBrokerClient.GetAll(ctx, &databroker.GetAllRequest{ @@ -122,9 +127,12 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string, updat break } + span.End() log.Info().Str("type_url", typeURL).Msg("starting data syncer") return tryForever(ctx, func(backoff interface{ Reset() }) error { + ctx, span := trace.StartSpan(ctx, "authorize.dataBrokerClient.Sync") + defer span.End() stream, err := a.dataBrokerClient.Sync(ctx, &databroker.SyncRequest{ ServerVersion: serverVersion, RecordVersion: recordVersion, diff --git a/cache/session.go b/cache/session.go index dacc0b381..877e979a2 100644 --- a/cache/session.go +++ b/cache/session.go @@ -10,6 +10,7 @@ import ( "github.com/pomerium/pomerium/internal/grpc/databroker" "github.com/pomerium/pomerium/internal/grpc/session" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/trace" ) // SessionServer implements the session service interface for adding and syncing sessions. @@ -28,6 +29,8 @@ func NewSessionServer(grpcServer *grpc.Server, dataBrokerClient databroker.DataB // Delete deletes a session from the session server. func (srv *SessionServer) Delete(ctx context.Context, req *session.DeleteRequest) (*emptypb.Empty, error) { + ctx, span := trace.StartSpan(ctx, "session.grpc.Delete") + defer span.End() log.Info(). Str("service", "session"). Str("session_id", req.GetId()). @@ -46,6 +49,8 @@ func (srv *SessionServer) Delete(ctx context.Context, req *session.DeleteRequest // Add adds a session to the session server. func (srv *SessionServer) Add(ctx context.Context, req *session.AddRequest) (*session.AddResponse, error) { + ctx, span := trace.StartSpan(ctx, "session.grpc.Add") + defer span.End() log.Info(). Str("service", "session"). Str("session_id", req.GetSession().GetId()). diff --git a/cache/user.go b/cache/user.go index 6ba10130d..2f4c1620b 100644 --- a/cache/user.go +++ b/cache/user.go @@ -10,6 +10,7 @@ import ( "github.com/pomerium/pomerium/internal/grpc/databroker" "github.com/pomerium/pomerium/internal/grpc/user" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/trace" ) // UserServer implements the user service interface for syncing users. @@ -28,6 +29,8 @@ func NewUserServer(grpcServer *grpc.Server, dataBrokerClient databroker.DataBrok // Add adds a user to the user server. func (srv *UserServer) Add(ctx context.Context, req *user.AddRequest) (*emptypb.Empty, error) { + ctx, span := trace.StartSpan(ctx, "user.grpc.Add") + defer span.End() log.Info(). Str("service", "user"). Str("user_id", req.GetUser().GetId()). diff --git a/internal/databroker/memory/server.go b/internal/databroker/memory/server.go index ef6856e35..026f4fe25 100644 --- a/internal/databroker/memory/server.go +++ b/internal/databroker/memory/server.go @@ -17,6 +17,7 @@ import ( "github.com/pomerium/pomerium/internal/grpc/databroker" "github.com/pomerium/pomerium/internal/log" + "github.com/pomerium/pomerium/internal/telemetry/trace" ) // Server implements the databroker service using an in memory database. @@ -63,6 +64,8 @@ func New(options ...ServerOption) *Server { // Delete deletes a record from the in-memory list. func (srv *Server) Delete(ctx context.Context, req *databroker.DeleteRequest) (*empty.Empty, error) { + _, span := trace.StartSpan(ctx, "databroker.grpc.Delete") + defer span.End() srv.log.Info(). Str("type", req.GetType()). Str("id", req.GetId()). @@ -77,6 +80,8 @@ func (srv *Server) Delete(ctx context.Context, req *databroker.DeleteRequest) (* // Get gets a record from the in-memory list. func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databroker.GetResponse, error) { + _, span := trace.StartSpan(ctx, "databroker.grpc.Get") + defer span.End() srv.log.Info(). Str("type", req.GetType()). Str("id", req.GetId()). @@ -91,6 +96,8 @@ func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databr // GetAll gets all the records from the in-memory list. func (srv *Server) GetAll(ctx context.Context, req *databroker.GetAllRequest) (*databroker.GetAllResponse, error) { + _, span := trace.StartSpan(ctx, "databroker.grpc.GetAll") + defer span.End() srv.log.Info(). Str("type", req.GetType()). Msg("get all") @@ -111,6 +118,8 @@ func (srv *Server) GetAll(ctx context.Context, req *databroker.GetAllRequest) (* // Set updates a record in the in-memory list, or adds a new one. func (srv *Server) Set(ctx context.Context, req *databroker.SetRequest) (*databroker.SetResponse, error) { + _, span := trace.StartSpan(ctx, "databroker.grpc.Set") + defer span.End() srv.log.Info(). Str("type", req.GetType()). Str("id", req.GetId()). @@ -130,6 +139,8 @@ func (srv *Server) Set(ctx context.Context, req *databroker.SetRequest) (*databr // Sync streams updates for the given record type. func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBrokerService_SyncServer) error { + _, span := trace.StartSpan(stream.Context(), "databroker.grpc.Sync") + defer span.End() srv.log.Info(). Str("type", req.GetType()). Str("server_version", req.GetServerVersion()). @@ -172,7 +183,9 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke } // GetTypes returns all the known record types. -func (srv *Server) GetTypes(_ context.Context, _ *emptypb.Empty) (*databroker.GetTypesResponse, error) { +func (srv *Server) GetTypes(ctx context.Context, _ *emptypb.Empty) (*databroker.GetTypesResponse, error) { + _, span := trace.StartSpan(ctx, "databroker.grpc.GetTypes") + defer span.End() var recordTypes []string srv.mu.RLock() for recordType := range srv.byType { @@ -188,6 +201,8 @@ func (srv *Server) GetTypes(_ context.Context, _ *emptypb.Empty) (*databroker.Ge // SyncTypes synchronizes all the known record types. func (srv *Server) SyncTypes(req *emptypb.Empty, stream databroker.DataBrokerService_SyncTypesServer) error { + _, span := trace.StartSpan(stream.Context(), "databroker.grpc.SyncTypes") + defer span.End() srv.log.Info(). Msg("sync types")