mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
linter pass
This commit is contained in:
parent
d8abcb979d
commit
e26fb04e93
13 changed files with 104 additions and 102 deletions
|
@ -254,18 +254,18 @@ func (a *Authorize) requireLoginResponse(
|
||||||
// always assume https scheme
|
// always assume https scheme
|
||||||
checkRequestURL := getCheckRequestURL(in)
|
checkRequestURL := getCheckRequestURL(in)
|
||||||
checkRequestURL.Scheme = "https"
|
checkRequestURL.Scheme = "https"
|
||||||
var signInUrlQuery url.Values
|
var signInURLQuery url.Values
|
||||||
|
|
||||||
headers := map[string]string{}
|
headers := map[string]string{}
|
||||||
if id := in.GetAttributes().GetRequest().GetHttp().GetHeaders()["traceparent"]; id != "" {
|
if id := in.GetAttributes().GetRequest().GetHttp().GetHeaders()["traceparent"]; id != "" {
|
||||||
headers["X-Pomerium-Traceparent"] = id
|
headers["X-Pomerium-Traceparent"] = id
|
||||||
headers["X-Pomerium-Tracestate"] = "pomerium.traceparent=" + id // TODO: this might not be necessary anymore
|
headers["X-Pomerium-Tracestate"] = "pomerium.traceparent=" + id // TODO: this might not be necessary anymore
|
||||||
signInUrlQuery = url.Values{}
|
signInURLQuery = url.Values{}
|
||||||
signInUrlQuery.Add("pomerium_traceparent", id)
|
signInURLQuery.Add("pomerium_traceparent", id)
|
||||||
signInUrlQuery.Add("pomerium_tracestate", "pomerium.traceparent="+id)
|
signInURLQuery.Add("pomerium_tracestate", "pomerium.traceparent="+id)
|
||||||
}
|
}
|
||||||
redirectTo, err := state.authenticateFlow.AuthenticateSignInURL(
|
redirectTo, err := state.authenticateFlow.AuthenticateSignInURL(
|
||||||
ctx, signInUrlQuery, &checkRequestURL, idp.GetId())
|
ctx, signInURLQuery, &checkRequestURL, idp.GetId())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,16 +77,16 @@ func guessProtocol() string {
|
||||||
type noopClient struct{}
|
type noopClient struct{}
|
||||||
|
|
||||||
// Start implements otlptrace.Client.
|
// Start implements otlptrace.Client.
|
||||||
func (n noopClient) Start(ctx context.Context) error {
|
func (n noopClient) Start(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop implements otlptrace.Client.
|
// Stop implements otlptrace.Client.
|
||||||
func (n noopClient) Stop(ctx context.Context) error {
|
func (n noopClient) Stop(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UploadTraces implements otlptrace.Client.
|
// UploadTraces implements otlptrace.Client.
|
||||||
func (n noopClient) UploadTraces(ctx context.Context, protoSpans []*v1.ResourceSpans) error {
|
func (n noopClient) UploadTraces(context.Context, []*v1.ResourceSpans) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,22 +70,22 @@ func (df DebugFlags) Check(flags DebugFlags) bool {
|
||||||
type stackTraceProcessor struct{}
|
type stackTraceProcessor struct{}
|
||||||
|
|
||||||
// ForceFlush implements trace.SpanProcessor.
|
// ForceFlush implements trace.SpanProcessor.
|
||||||
func (s *stackTraceProcessor) ForceFlush(ctx context.Context) error {
|
func (s *stackTraceProcessor) ForceFlush(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEnd implements trace.SpanProcessor.
|
// OnEnd implements trace.SpanProcessor.
|
||||||
func (*stackTraceProcessor) OnEnd(s sdktrace.ReadOnlySpan) {
|
func (*stackTraceProcessor) OnEnd(sdktrace.ReadOnlySpan) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStart implements trace.SpanProcessor.
|
// OnStart implements trace.SpanProcessor.
|
||||||
func (*stackTraceProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {
|
func (*stackTraceProcessor) OnStart(_ context.Context, s sdktrace.ReadWriteSpan) {
|
||||||
_, file, line, _ := runtime.Caller(2)
|
_, file, line, _ := runtime.Caller(2)
|
||||||
s.SetAttributes(attribute.String("caller", fmt.Sprintf("%s:%d", file, line)))
|
s.SetAttributes(attribute.String("caller", fmt.Sprintf("%s:%d", file, line)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown implements trace.SpanProcessor.
|
// Shutdown implements trace.SpanProcessor.
|
||||||
func (s *stackTraceProcessor) Shutdown(ctx context.Context) error {
|
func (s *stackTraceProcessor) Shutdown(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,18 +3,32 @@ package trace
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/propagation"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.opentelemetry.io/otel/trace/embedded"
|
"go.opentelemetry.io/otel/trace/embedded"
|
||||||
)
|
)
|
||||||
|
|
||||||
const PomeriumCoreTracer = "pomerium.io/core"
|
const PomeriumCoreTracer = "pomerium.io/core"
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
|
||||||
|
}
|
||||||
|
|
||||||
|
func Continue(ctx context.Context, name string, o ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||||
|
return trace.SpanFromContext(ctx).TracerProvider().Tracer(PomeriumCoreTracer).Start(ctx, name, o...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UseGlobalPanicTracer() {
|
||||||
|
otel.SetTracerProvider(panicTracerProvider{})
|
||||||
|
}
|
||||||
|
|
||||||
type panicTracerProvider struct {
|
type panicTracerProvider struct {
|
||||||
embedded.TracerProvider
|
embedded.TracerProvider
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tracer implements trace.TracerProvider.
|
// Tracer implements trace.TracerProvider.
|
||||||
func (w panicTracerProvider) Tracer(name string, options ...trace.TracerOption) trace.Tracer {
|
func (w panicTracerProvider) Tracer(string, ...trace.TracerOption) trace.Tracer {
|
||||||
return panicTracer{}
|
return panicTracer{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,10 +37,6 @@ type panicTracer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start implements trace.Tracer.
|
// Start implements trace.Tracer.
|
||||||
func (p panicTracer) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
func (p panicTracer) Start(context.Context, string, ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||||
panic("global tracer used")
|
panic("global tracer used")
|
||||||
}
|
}
|
||||||
|
|
||||||
func Continue(ctx context.Context, name string, o ...trace.SpanStartOption) (context.Context, trace.Span) {
|
|
||||||
return trace.SpanFromContext(ctx).TracerProvider().Tracer(PomeriumCoreTracer).Start(ctx, name, o...)
|
|
||||||
}
|
|
||||||
|
|
|
@ -25,8 +25,8 @@ import (
|
||||||
|
|
||||||
type SpanExportQueue struct {
|
type SpanExportQueue struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
pendingResourcesByTraceId map[unique.Handle[oteltrace.TraceID]]*PendingResources
|
pendingResourcesByTraceID map[unique.Handle[oteltrace.TraceID]]*PendingResources
|
||||||
knownTraceIdMappings map[unique.Handle[oteltrace.TraceID]]unique.Handle[oteltrace.TraceID]
|
knownTraceIDMappings map[unique.Handle[oteltrace.TraceID]]unique.Handle[oteltrace.TraceID]
|
||||||
uploadC chan []*tracev1.ResourceSpans
|
uploadC chan []*tracev1.ResourceSpans
|
||||||
closing bool
|
closing bool
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
|
@ -45,8 +45,8 @@ func NewSpanExportQueue(ctx context.Context, client otlptrace.Client) *SpanExpor
|
||||||
observer = noopSpanObserver{}
|
observer = noopSpanObserver{}
|
||||||
}
|
}
|
||||||
q := &SpanExportQueue{
|
q := &SpanExportQueue{
|
||||||
pendingResourcesByTraceId: make(map[unique.Handle[oteltrace.TraceID]]*PendingResources),
|
pendingResourcesByTraceID: make(map[unique.Handle[oteltrace.TraceID]]*PendingResources),
|
||||||
knownTraceIdMappings: make(map[unique.Handle[oteltrace.TraceID]]unique.Handle[oteltrace.TraceID]),
|
knownTraceIDMappings: make(map[unique.Handle[oteltrace.TraceID]]unique.Handle[oteltrace.TraceID]),
|
||||||
uploadC: make(chan []*tracev1.ResourceSpans, 8),
|
uploadC: make(chan []*tracev1.ResourceSpans, 8),
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
debugFlags: debug,
|
debugFlags: debug,
|
||||||
|
@ -73,30 +73,30 @@ func (q *SpanExportQueue) insertPendingSpanLocked(
|
||||||
span *tracev1.Span,
|
span *tracev1.Span,
|
||||||
) {
|
) {
|
||||||
var pendingTraceResources *PendingResources
|
var pendingTraceResources *PendingResources
|
||||||
if ptr, ok := q.pendingResourcesByTraceId[traceID]; ok {
|
if ptr, ok := q.pendingResourcesByTraceID[traceID]; ok {
|
||||||
pendingTraceResources = ptr
|
pendingTraceResources = ptr
|
||||||
} else {
|
} else {
|
||||||
pendingTraceResources = NewPendingResources()
|
pendingTraceResources = NewPendingResources()
|
||||||
q.pendingResourcesByTraceId[traceID] = pendingTraceResources
|
q.pendingResourcesByTraceID[traceID] = pendingTraceResources
|
||||||
}
|
}
|
||||||
pendingTraceResources.Insert(resource, scope, scopeSchema, span)
|
pendingTraceResources.Insert(resource, scope, scopeSchema, span)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *SpanExportQueue) resolveTraceIdMappingLocked(original, mapping unique.Handle[oteltrace.TraceID]) [][]*tracev1.ResourceSpans {
|
func (q *SpanExportQueue) resolveTraceIDMappingLocked(original, mapping unique.Handle[oteltrace.TraceID]) [][]*tracev1.ResourceSpans {
|
||||||
q.knownTraceIdMappings[original] = mapping
|
q.knownTraceIDMappings[original] = mapping
|
||||||
|
|
||||||
toUpload := [][]*tracev1.ResourceSpans{}
|
toUpload := [][]*tracev1.ResourceSpans{}
|
||||||
if originalPending, ok := q.pendingResourcesByTraceId[original]; ok {
|
if originalPending, ok := q.pendingResourcesByTraceID[original]; ok {
|
||||||
resourceSpans := originalPending.AsResourceSpans(mapping)
|
resourceSpans := originalPending.AsResourceSpans(mapping)
|
||||||
delete(q.pendingResourcesByTraceId, original)
|
delete(q.pendingResourcesByTraceID, original)
|
||||||
toUpload = append(toUpload, resourceSpans)
|
toUpload = append(toUpload, resourceSpans)
|
||||||
}
|
}
|
||||||
|
|
||||||
if original != mapping {
|
if original != mapping {
|
||||||
q.knownTraceIdMappings[mapping] = mapping
|
q.knownTraceIDMappings[mapping] = mapping
|
||||||
if targetPending, ok := q.pendingResourcesByTraceId[mapping]; ok {
|
if targetPending, ok := q.pendingResourcesByTraceID[mapping]; ok {
|
||||||
resourceSpans := targetPending.AsResourceSpans(mapping)
|
resourceSpans := targetPending.AsResourceSpans(mapping)
|
||||||
delete(q.pendingResourcesByTraceId, mapping)
|
delete(q.pendingResourcesByTraceID, mapping)
|
||||||
toUpload = append(toUpload, resourceSpans)
|
toUpload = append(toUpload, resourceSpans)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -117,27 +117,27 @@ func (q *SpanExportQueue) Enqueue(ctx context.Context, req *coltracepb.ExportTra
|
||||||
for _, scope := range resource.ScopeSpans {
|
for _, scope := range resource.ScopeSpans {
|
||||||
for _, span := range scope.Spans {
|
for _, span := range scope.Spans {
|
||||||
formatSpanName(span)
|
formatSpanName(span)
|
||||||
spanId, ok := toSpanID(span.SpanId)
|
spanID, ok := toSpanID(span.SpanId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if q.debugFlags.Check(TrackAllSpans) {
|
if q.debugFlags.Check(TrackAllSpans) {
|
||||||
q.debugAllObservedSpans[spanId] = span
|
q.debugAllObservedSpans[spanID] = span
|
||||||
}
|
}
|
||||||
parentSpanId, ok := toSpanID(span.ParentSpanId)
|
parentSpanID, ok := toSpanID(span.ParentSpanId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if parentSpanId.IsValid() { // if parent is not a root span
|
if parentSpanID.IsValid() { // if parent is not a root span
|
||||||
q.observer.ObserveReference(parentSpanId)
|
q.observer.ObserveReference(parentSpanID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
traceId, ok := toTraceID(span.TraceId)
|
traceID, ok := toTraceID(span.TraceId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := q.knownTraceIdMappings[traceId]; !ok {
|
if _, ok := q.knownTraceIDMappings[traceID]; !ok {
|
||||||
// observed a new root span with an unknown trace id
|
// observed a new root span with an unknown trace id
|
||||||
var pomeriumTraceparent string
|
var pomeriumTraceparent string
|
||||||
for _, attr := range span.Attributes {
|
for _, attr := range span.Attributes {
|
||||||
|
@ -150,7 +150,7 @@ func (q *SpanExportQueue) Enqueue(ctx context.Context, req *coltracepb.ExportTra
|
||||||
|
|
||||||
if pomeriumTraceparent == "" {
|
if pomeriumTraceparent == "" {
|
||||||
// no replacement id, map the trace to itself and release pending spans
|
// no replacement id, map the trace to itself and release pending spans
|
||||||
mappedTraceID = traceId
|
mappedTraceID = traceID
|
||||||
} else {
|
} else {
|
||||||
// this root span has an alternate traceparent. permanently rewrite
|
// this root span has an alternate traceparent. permanently rewrite
|
||||||
// all spans of the old trace id to use the new trace id
|
// all spans of the old trace id to use the new trace id
|
||||||
|
@ -162,7 +162,7 @@ func (q *SpanExportQueue) Enqueue(ctx context.Context, req *coltracepb.ExportTra
|
||||||
mappedTraceID = unique.Make(tp.TraceID())
|
mappedTraceID = unique.Make(tp.TraceID())
|
||||||
}
|
}
|
||||||
|
|
||||||
toUpload = append(toUpload, q.resolveTraceIdMappingLocked(traceId, mappedTraceID)...)
|
toUpload = append(toUpload, q.resolveTraceIDMappingLocked(traceID, mappedTraceID)...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ func (q *SpanExportQueue) Enqueue(ctx context.Context, req *coltracepb.ExportTra
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
q.observer.Observe(spanID)
|
q.observer.Observe(spanID)
|
||||||
if mapping, ok := q.knownTraceIdMappings[traceID]; ok {
|
if mapping, ok := q.knownTraceIDMappings[traceID]; ok {
|
||||||
id := mapping.Value()
|
id := mapping.Value()
|
||||||
copy(span.TraceId, id[:])
|
copy(span.TraceId, id[:])
|
||||||
knownSpans = append(knownSpans, span)
|
knownSpans = append(knownSpans, span)
|
||||||
|
@ -248,15 +248,15 @@ func (q *SpanExportQueue) Close(ctx context.Context) error {
|
||||||
q.mu.Lock()
|
q.mu.Lock()
|
||||||
defer q.mu.Unlock()
|
defer q.mu.Unlock()
|
||||||
if q.debugFlags.Check(TrackSpanReferences) {
|
if q.debugFlags.Check(TrackSpanReferences) {
|
||||||
var unknownParentIds []string
|
var unknownParentIDs []string
|
||||||
for id, known := range q.observer.(*spanObserver).referencedIDs {
|
for id, known := range q.observer.(*spanObserver).referencedIDs {
|
||||||
if !known {
|
if !known {
|
||||||
unknownParentIds = append(unknownParentIds, id.String())
|
unknownParentIDs = append(unknownParentIDs, id.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(unknownParentIds) > 0 {
|
if len(unknownParentIDs) > 0 {
|
||||||
msg := startMsg("WARNING: parent spans referenced but never seen:\n")
|
msg := startMsg("WARNING: parent spans referenced but never seen:\n")
|
||||||
for _, str := range unknownParentIds {
|
for _, str := range unknownParentIDs {
|
||||||
msg.WriteString(str)
|
msg.WriteString(str)
|
||||||
msg.WriteString("\n")
|
msg.WriteString("\n")
|
||||||
}
|
}
|
||||||
|
@ -264,11 +264,11 @@ func (q *SpanExportQueue) Close(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
didWarn := false
|
didWarn := false
|
||||||
incomplete := len(q.pendingResourcesByTraceId) > 0
|
incomplete := len(q.pendingResourcesByTraceID) > 0
|
||||||
if incomplete && q.debugFlags.Check(WarnOnIncompleteTraces) {
|
if incomplete && q.debugFlags.Check(WarnOnIncompleteTraces) {
|
||||||
didWarn = true
|
didWarn = true
|
||||||
msg := startMsg("WARNING: exporter shut down with incomplete traces\n")
|
msg := startMsg("WARNING: exporter shut down with incomplete traces\n")
|
||||||
for k, v := range q.pendingResourcesByTraceId {
|
for k, v := range q.pendingResourcesByTraceID {
|
||||||
fmt.Fprintf(msg, "- Trace: %s\n", k.Value())
|
fmt.Fprintf(msg, "- Trace: %s\n", k.Value())
|
||||||
for _, pendingScope := range v.scopesByResourceID {
|
for _, pendingScope := range v.scopesByResourceID {
|
||||||
msg.WriteString(" - Resource:\n")
|
msg.WriteString(" - Resource:\n")
|
||||||
|
@ -291,21 +291,21 @@ func (q *SpanExportQueue) Close(ctx context.Context) error {
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
traceId, ok := toTraceID(span.TraceId)
|
traceID, ok := toTraceID(span.TraceId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parentSpanId, ok := toSpanID(span.ParentSpanId)
|
parentSpanID, ok := toSpanID(span.ParentSpanId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, seenParent := q.debugAllObservedSpans[parentSpanId]
|
_, seenParent := q.debugAllObservedSpans[parentSpanID]
|
||||||
var missing string
|
var missing string
|
||||||
if !seenParent {
|
if !seenParent {
|
||||||
missing = " [missing]"
|
missing = " [missing]"
|
||||||
}
|
}
|
||||||
fmt.Fprintf(msg, " - %-*s (trace: %s | span: %s | parent:%s %s)\n", longestName,
|
fmt.Fprintf(msg, " - %-*s (trace: %s | span: %s | parent:%s %s)\n", longestName,
|
||||||
"'"+span.Name+"'", traceId.Value(), spanID, missing, parentSpanId)
|
"'"+span.Name+"'", traceID.Value(), spanID, missing, parentSpanID)
|
||||||
for _, attr := range span.Attributes {
|
for _, attr := range span.Attributes {
|
||||||
if attr.Key == "caller" {
|
if attr.Key == "caller" {
|
||||||
fmt.Fprintf(msg, " => caller: '%s'\n", attr.Value.GetStringValue())
|
fmt.Fprintf(msg, " => caller: '%s'\n", attr.Value.GetStringValue())
|
||||||
|
@ -321,7 +321,7 @@ func (q *SpanExportQueue) Close(ctx context.Context) error {
|
||||||
|
|
||||||
if q.debugFlags.Check(LogTraceIDMappings) || (didWarn && q.debugFlags.Check(LogTraceIDMappingsOnWarn)) {
|
if q.debugFlags.Check(LogTraceIDMappings) || (didWarn && q.debugFlags.Check(LogTraceIDMappingsOnWarn)) {
|
||||||
msg := startMsg("Known trace ids:\n")
|
msg := startMsg("Known trace ids:\n")
|
||||||
for k, v := range q.knownTraceIdMappings {
|
for k, v := range q.knownTraceIDMappings {
|
||||||
if k != v {
|
if k != v {
|
||||||
fmt.Fprintf(msg, "%s => %s\n", k.Value(), v.Value())
|
fmt.Fprintf(msg, "%s => %s\n", k.Value(), v.Value())
|
||||||
} else {
|
} else {
|
||||||
|
@ -341,16 +341,16 @@ func (q *SpanExportQueue) Close(ctx context.Context) error {
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
traceId, ok := toTraceID(span.TraceId)
|
traceID, ok := toTraceID(span.TraceId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parentSpanId, ok := toSpanID(span.ParentSpanId)
|
parentSpanID, ok := toSpanID(span.ParentSpanId)
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Fprintf(msg, "%-*s (trace: %s | span: %s | parent: %s)", longestName,
|
fmt.Fprintf(msg, "%-*s (trace: %s | span: %s | parent: %s)", longestName,
|
||||||
"'"+span.Name+"'", traceId.Value(), spanID, parentSpanId)
|
"'"+span.Name+"'", traceID.Value(), spanID, parentSpanID)
|
||||||
var foundCaller bool
|
var foundCaller bool
|
||||||
for _, attr := range span.Attributes {
|
for _, attr := range span.Attributes {
|
||||||
if attr.Key == "caller" {
|
if attr.Key == "caller" {
|
||||||
|
@ -389,7 +389,7 @@ type spanInfo struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForceFlush implements trace.SpanProcessor.
|
// ForceFlush implements trace.SpanProcessor.
|
||||||
func (t *spanTracker) ForceFlush(ctx context.Context) error {
|
func (t *spanTracker) ForceFlush(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -400,7 +400,7 @@ func (t *spanTracker) OnEnd(s sdktrace.ReadOnlySpan) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStart implements trace.SpanProcessor.
|
// OnStart implements trace.SpanProcessor.
|
||||||
func (t *spanTracker) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {
|
func (t *spanTracker) OnStart(_ context.Context, s sdktrace.ReadWriteSpan) {
|
||||||
id := s.SpanContext().SpanID()
|
id := s.SpanContext().SpanID()
|
||||||
t.inflightSpans.Store(id, struct{}{})
|
t.inflightSpans.Store(id, struct{}{})
|
||||||
t.observer.Observe(id)
|
t.observer.Observe(id)
|
||||||
|
@ -414,7 +414,7 @@ func (t *spanTracker) OnStart(parent context.Context, s sdktrace.ReadWriteSpan)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown implements trace.SpanProcessor.
|
// Shutdown implements trace.SpanProcessor.
|
||||||
func (t *spanTracker) Shutdown(ctx context.Context) error {
|
func (t *spanTracker) Shutdown(_ context.Context) error {
|
||||||
if t.debugFlags == 0 {
|
if t.debugFlags == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -423,7 +423,7 @@ func (t *spanTracker) Shutdown(ctx context.Context) error {
|
||||||
if t.debugFlags.Check(WarnOnIncompleteSpans) {
|
if t.debugFlags.Check(WarnOnIncompleteSpans) {
|
||||||
if t.debugFlags.Check(TrackAllSpans) {
|
if t.debugFlags.Check(TrackAllSpans) {
|
||||||
incompleteSpans := []*spanInfo{}
|
incompleteSpans := []*spanInfo{}
|
||||||
t.inflightSpans.Range(func(key, value any) bool {
|
t.inflightSpans.Range(func(key, _ any) bool {
|
||||||
if info, ok := t.allSpans.Load(key); ok {
|
if info, ok := t.allSpans.Load(key); ok {
|
||||||
incompleteSpans = append(incompleteSpans, info.(*spanInfo))
|
incompleteSpans = append(incompleteSpans, info.(*spanInfo))
|
||||||
}
|
}
|
||||||
|
@ -444,7 +444,7 @@ func (t *spanTracker) Shutdown(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
incompleteSpans := []string{}
|
incompleteSpans := []string{}
|
||||||
t.inflightSpans.Range(func(key, value any) bool {
|
t.inflightSpans.Range(func(key, _ any) bool {
|
||||||
incompleteSpans = append(incompleteSpans, key.(string))
|
incompleteSpans = append(incompleteSpans, key.(string))
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -462,7 +462,7 @@ func (t *spanTracker) Shutdown(ctx context.Context) error {
|
||||||
|
|
||||||
if t.debugFlags.Check(LogAllSpans) || (t.debugFlags.Check(LogAllSpansOnWarn) && didWarn) {
|
if t.debugFlags.Check(LogAllSpans) || (t.debugFlags.Check(LogAllSpansOnWarn) && didWarn) {
|
||||||
allSpans := []*spanInfo{}
|
allSpans := []*spanInfo{}
|
||||||
t.allSpans.Range(func(key, value any) bool {
|
t.allSpans.Range(func(_, value any) bool {
|
||||||
allSpans = append(allSpans, value.(*spanInfo))
|
allSpans = append(allSpans, value.(*spanInfo))
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -504,22 +504,22 @@ type PendingScopes struct {
|
||||||
spansByScope map[string]*PendingSpans
|
spansByScope map[string]*PendingSpans
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptr *PendingScopes) Insert(scope *commonv1.InstrumentationScope, scopeSchema string, span *tracev1.Span) {
|
func (ps *PendingScopes) Insert(scope *commonv1.InstrumentationScope, scopeSchema string, span *tracev1.Span) {
|
||||||
var spans *PendingSpans
|
var spans *PendingSpans
|
||||||
if sp, ok := ptr.spansByScope[scope.GetName()]; ok {
|
if sp, ok := ps.spansByScope[scope.GetName()]; ok {
|
||||||
spans = sp
|
spans = sp
|
||||||
} else {
|
} else {
|
||||||
spans = NewPendingSpans(scope, scopeSchema)
|
spans = NewPendingSpans(scope, scopeSchema)
|
||||||
ptr.spansByScope[scope.GetName()] = spans
|
ps.spansByScope[scope.GetName()] = spans
|
||||||
}
|
}
|
||||||
spans.Insert(span)
|
spans.Insert(span)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptr *PendingScopes) AsScopeSpansList(rewriteTraceId unique.Handle[oteltrace.TraceID]) []*tracev1.ScopeSpans {
|
func (ps *PendingScopes) AsScopeSpansList(rewriteTraceID unique.Handle[oteltrace.TraceID]) []*tracev1.ScopeSpans {
|
||||||
out := make([]*tracev1.ScopeSpans, 0, len(ptr.spansByScope))
|
out := make([]*tracev1.ScopeSpans, 0, len(ps.spansByScope))
|
||||||
for _, spans := range ptr.spansByScope {
|
for _, spans := range ps.spansByScope {
|
||||||
for _, span := range spans.spans {
|
for _, span := range spans.spans {
|
||||||
id := rewriteTraceId.Value()
|
id := rewriteTraceID.Value()
|
||||||
copy(span.TraceId, id[:])
|
copy(span.TraceId, id[:])
|
||||||
}
|
}
|
||||||
scopeSpans := &tracev1.ScopeSpans{
|
scopeSpans := &tracev1.ScopeSpans{
|
||||||
|
@ -543,24 +543,24 @@ type PendingResources struct {
|
||||||
scopesByResourceID map[string]*PendingScopes
|
scopesByResourceID map[string]*PendingScopes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptr *PendingResources) Insert(resource *ResourceInfo, scope *commonv1.InstrumentationScope, scopeSchema string, span *tracev1.Span) {
|
func (pr *PendingResources) Insert(resource *ResourceInfo, scope *commonv1.InstrumentationScope, scopeSchema string, span *tracev1.Span) {
|
||||||
resourceEq := resource.ID()
|
resourceEq := resource.ID()
|
||||||
var scopes *PendingScopes
|
var scopes *PendingScopes
|
||||||
if sc, ok := ptr.scopesByResourceID[resourceEq]; ok {
|
if sc, ok := pr.scopesByResourceID[resourceEq]; ok {
|
||||||
scopes = sc
|
scopes = sc
|
||||||
} else {
|
} else {
|
||||||
scopes = NewPendingScopes(resource)
|
scopes = NewPendingScopes(resource)
|
||||||
ptr.scopesByResourceID[resourceEq] = scopes
|
pr.scopesByResourceID[resourceEq] = scopes
|
||||||
}
|
}
|
||||||
scopes.Insert(scope, scopeSchema, span)
|
scopes.Insert(scope, scopeSchema, span)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptr *PendingResources) AsResourceSpans(rewriteTraceId unique.Handle[oteltrace.TraceID]) []*tracev1.ResourceSpans {
|
func (pr *PendingResources) AsResourceSpans(rewriteTraceID unique.Handle[oteltrace.TraceID]) []*tracev1.ResourceSpans {
|
||||||
out := make([]*tracev1.ResourceSpans, 0, len(ptr.scopesByResourceID))
|
out := make([]*tracev1.ResourceSpans, 0, len(pr.scopesByResourceID))
|
||||||
for _, scopes := range ptr.scopesByResourceID {
|
for _, scopes := range pr.scopesByResourceID {
|
||||||
resourceSpans := &tracev1.ResourceSpans{
|
resourceSpans := &tracev1.ResourceSpans{
|
||||||
Resource: scopes.resource.Resource,
|
Resource: scopes.resource.Resource,
|
||||||
ScopeSpans: scopes.AsScopeSpansList(rewriteTraceId),
|
ScopeSpans: scopes.AsScopeSpansList(rewriteTraceID),
|
||||||
SchemaUrl: scopes.resource.Schema,
|
SchemaUrl: scopes.resource.Schema,
|
||||||
}
|
}
|
||||||
out = append(out, resourceSpans)
|
out = append(out, resourceSpans)
|
||||||
|
|
|
@ -18,7 +18,9 @@ import (
|
||||||
|
|
||||||
// Export implements ptraceotlp.GRPCServer.
|
// Export implements ptraceotlp.GRPCServer.
|
||||||
func (srv *ExporterServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
|
func (srv *ExporterServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
|
||||||
srv.spanExportQueue.Enqueue(ctx, req)
|
if err := srv.spanExportQueue.Enqueue(ctx, req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return &coltracepb.ExportTraceServiceResponse{}, nil
|
return &coltracepb.ExportTraceServiceResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +47,7 @@ func NewServer(ctx context.Context, remoteClient otlptrace.Client) *ExporterServ
|
||||||
|
|
||||||
func (srv *ExporterServer) Start(ctx context.Context) {
|
func (srv *ExporterServer) Start(ctx context.Context) {
|
||||||
lis := bufconn.Listen(4096)
|
lis := bufconn.Listen(4096)
|
||||||
go srv.server.Serve(lis)
|
go func() { _ = srv.server.Serve(lis) }()
|
||||||
cc, err := grpc.NewClient("passthrough://ignore",
|
cc, err := grpc.NewClient("passthrough://ignore",
|
||||||
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
|
||||||
return lis.Dial()
|
return lis.Dial()
|
||||||
|
|
|
@ -8,10 +8,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||||
"go.opentelemetry.io/otel/propagation"
|
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||||
|
@ -39,14 +37,6 @@ func systemContextFromContext(ctx context.Context) *systemContext {
|
||||||
return sys
|
return sys
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func UseGlobalPanicTracer() {
|
|
||||||
otel.SetTracerProvider(panicTracerProvider{})
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ trace.Tracer = panicTracer{}
|
var _ trace.Tracer = panicTracer{}
|
||||||
|
|
||||||
type tracerProviderManager struct {
|
type tracerProviderManager struct {
|
||||||
|
|
|
@ -14,11 +14,11 @@ func ParseTraceparent(traceparent string) (trace.SpanContext, error) {
|
||||||
if len(parts) != 4 {
|
if len(parts) != 4 {
|
||||||
return trace.SpanContext{}, errors.New("malformed traceparent")
|
return trace.SpanContext{}, errors.New("malformed traceparent")
|
||||||
}
|
}
|
||||||
traceId, err := trace.TraceIDFromHex(parts[1])
|
traceID, err := trace.TraceIDFromHex(parts[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return trace.SpanContext{}, err
|
return trace.SpanContext{}, err
|
||||||
}
|
}
|
||||||
spanId, err := trace.SpanIDFromHex(parts[2])
|
spanID, err := trace.SpanIDFromHex(parts[2])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return trace.SpanContext{}, err
|
return trace.SpanContext{}, err
|
||||||
}
|
}
|
||||||
|
@ -26,12 +26,12 @@ func ParseTraceparent(traceparent string) (trace.SpanContext, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return trace.SpanContext{}, err
|
return trace.SpanContext{}, err
|
||||||
}
|
}
|
||||||
if len(traceId) != 16 || len(spanId) != 8 {
|
if len(traceID) != 16 || len(spanID) != 8 {
|
||||||
return trace.SpanContext{}, errors.New("malformed traceparent")
|
return trace.SpanContext{}, errors.New("malformed traceparent")
|
||||||
}
|
}
|
||||||
return trace.NewSpanContext(trace.SpanContextConfig{
|
return trace.NewSpanContext(trace.SpanContextConfig{
|
||||||
TraceID: traceId,
|
TraceID: traceID,
|
||||||
SpanID: spanId,
|
SpanID: spanID,
|
||||||
TraceFlags: trace.TraceFlags(traceFlags),
|
TraceFlags: trace.TraceFlags(traceFlags),
|
||||||
}), nil
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -642,7 +642,7 @@ func (e *environment) Start() {
|
||||||
e.debugf("pomerium server shut down without error")
|
e.debugf("pomerium server shut down without error")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
pom.Start(ctx, e.tracerProvider, e.src)
|
require.NoError(e.t, pom.Start(ctx, e.tracerProvider, e.src))
|
||||||
return pom.Wait()
|
return pom.Wait()
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
|
@ -48,7 +48,7 @@ func TestOTLPTracing(t *testing.T) {
|
||||||
env := testenv.New(t, testenv.AddTraceDebugFlags(testenv.StandardTraceDebugFlags))
|
env := testenv.New(t, testenv.AddTraceDebugFlags(testenv.StandardTraceDebugFlags))
|
||||||
defer env.Stop()
|
defer env.Stop()
|
||||||
up := upstreams.HTTP(nil, upstreams.WithDisplayName("Upstream"))
|
up := upstreams.HTTP(nil, upstreams.WithDisplayName("Upstream"))
|
||||||
up.Handle("/foo", func(w http.ResponseWriter, req *http.Request) {
|
up.Handle("/foo", func(w http.ResponseWriter, _ *http.Request) {
|
||||||
w.Write([]byte("OK"))
|
w.Write([]byte("OK"))
|
||||||
})
|
})
|
||||||
env.Add(scenarios.NewIDP([]*scenarios.User{
|
env.Add(scenarios.NewIDP([]*scenarios.User{
|
||||||
|
|
|
@ -300,7 +300,7 @@ func (h *httpUpstream) Do(method string, r testenv.Route, opts ...RequestOption)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
otelhttp.WithTracerProvider(h.clientTracerProvider.Value()),
|
otelhttp.WithTracerProvider(h.clientTracerProvider.Value()),
|
||||||
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
|
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
|
||||||
return fmt.Sprintf("Client: %s %s", r.Method, r.URL.Path)
|
return fmt.Sprintf("Client: %s %s", r.Method, r.URL.Path)
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
|
@ -355,9 +355,9 @@ func (h *httpUpstream) Do(method string, r testenv.Route, opts ...RequestOption)
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.authenticateAs != "" {
|
if options.authenticateAs != "" {
|
||||||
resp, err = authenticateFlow(ctx, client, req, options.authenticateAs)
|
resp, err = authenticateFlow(ctx, client, req, options.authenticateAs) //nolint:bodyclose
|
||||||
} else {
|
} else {
|
||||||
resp, err = client.Do(req)
|
resp, err = client.Do(req) //nolint:bodyclose
|
||||||
}
|
}
|
||||||
// retry on connection refused
|
// retry on connection refused
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -371,8 +371,8 @@ func (h *httpUpstream) Do(method string, r testenv.Route, opts ...RequestOption)
|
||||||
}
|
}
|
||||||
if resp.StatusCode/100 == 5 {
|
if resp.StatusCode/100 == 5 {
|
||||||
resendCount++
|
resendCount++
|
||||||
io.ReadAll(resp.Body)
|
_, _ = io.ReadAll(resp.Body)
|
||||||
resp.Body.Close()
|
_ = resp.Body.Close()
|
||||||
span.SetAttributes(semconv.HTTPRequestResendCount(resendCount))
|
span.SetAttributes(semconv.HTTPRequestResendCount(resendCount))
|
||||||
span.AddEvent("Retrying on 5xx error", oteltrace.WithAttributes(
|
span.AddEvent("Retrying on 5xx error", oteltrace.WithAttributes(
|
||||||
attribute.String("status", resp.Status),
|
attribute.String("status", resp.Status),
|
||||||
|
|
|
@ -61,8 +61,8 @@ type ServerOptions struct {
|
||||||
exitGracePeriod time.Duration
|
exitGracePeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opts *ServerOptions) ExitGracePeriod() time.Duration {
|
func (o *ServerOptions) ExitGracePeriod() time.Duration {
|
||||||
return opts.exitGracePeriod
|
return o.exitGracePeriod
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerOption func(*ServerOptions)
|
type ServerOption func(*ServerOptions)
|
||||||
|
|
|
@ -74,7 +74,7 @@ func NewAuthenticator(ctx context.Context, tracerProvider oteltrace.TracerProvid
|
||||||
ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{
|
ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{
|
||||||
Transport: otelhttp.NewTransport(nil,
|
Transport: otelhttp.NewTransport(nil,
|
||||||
otelhttp.WithTracerProvider(tracerProvider),
|
otelhttp.WithTracerProvider(tracerProvider),
|
||||||
otelhttp.WithSpanNameFormatter(func(operation string, r *http.Request) string {
|
otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string {
|
||||||
return fmt.Sprintf("OAuth2 Client: %s %s", r.Method, r.URL.Path)
|
return fmt.Sprintf("OAuth2 Client: %s %s", r.Method, r.URL.Path)
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue