pomerium/internal/testenv/scenarios/trace_receiver.go
Joe Kralicky 5e94b2f8f1
Refactor trace config to match supported otel options (#5447)
* Refactor trace config to match supported otel options

* use duration instead of int64 for otel timeouts

* change 'trace client updated' log level to debug
2025-01-30 11:59:19 -05:00

231 lines
7.3 KiB
Go

package scenarios
import (
"compress/gzip"
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/testenv"
"github.com/pomerium/pomerium/internal/testenv/upstreams"
"github.com/pomerium/pomerium/internal/testenv/values"
"github.com/pomerium/pomerium/internal/testutil/tracetest"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
type RecordedExportRequest struct {
Request *coltracepb.ExportTraceServiceRequest
Metadata map[string][]string
}
type RecordedExportRequests []RecordedExportRequest
func (s RecordedExportRequests) AsExportTraceServiceRequests() []*coltracepb.ExportTraceServiceRequest {
out := make([]*coltracepb.ExportTraceServiceRequest, len(s))
for i, v := range s {
out[i] = v.Request
}
return out
}
type OTLPTraceReceiver struct {
coltracepb.UnimplementedTraceServiceServer
mu sync.Mutex
receivedRequests RecordedExportRequests
grpcUpstream values.MutableValue[upstreams.GRPCUpstream]
httpUpstream values.MutableValue[upstreams.HTTPUpstream]
}
func NewOTLPTraceReceiver() *OTLPTraceReceiver {
return &OTLPTraceReceiver{
grpcUpstream: values.Deferred[upstreams.GRPCUpstream](),
httpUpstream: values.Deferred[upstreams.HTTPUpstream](),
}
}
// Export implements v1.TraceServiceServer.
func (rec *OTLPTraceReceiver) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) {
rec.mu.Lock()
defer rec.mu.Unlock()
md, _ := metadata.FromIncomingContext(ctx)
rec.receivedRequests = append(rec.receivedRequests, RecordedExportRequest{
Request: req,
Metadata: md,
})
return &coltracepb.ExportTraceServiceResponse{}, nil
}
// Attach implements testenv.Modifier.
func (rec *OTLPTraceReceiver) Attach(ctx context.Context) {
env := testenv.EnvFromContext(ctx)
// NB: we cannot install tracing middleware into the receiver server, since
// it will cause a feedback loop of spans created when exporting other spans
grpcUpstream := upstreams.GRPC(nil,
upstreams.WithDisplayName("OTLP GRPC Receiver"),
upstreams.WithDelayedShutdown(),
upstreams.WithNoClientTracing(),
upstreams.WithNoServerTracing(),
)
httpUpstream := upstreams.HTTP(nil,
upstreams.WithDisplayName("OTLP HTTP Receiver"),
upstreams.WithDelayedShutdown(),
upstreams.WithNoClientTracing(),
upstreams.WithNoServerTracing(),
)
coltracepb.RegisterTraceServiceServer(grpcUpstream, rec)
httpUpstream.Handle("/v1/traces", rec.handleV1Traces)
env.AddUpstream(grpcUpstream)
env.AddUpstream(httpUpstream)
rec.grpcUpstream.Resolve(grpcUpstream)
rec.httpUpstream.Resolve(httpUpstream)
}
// Modify implements testenv.Modifier.
func (rec *OTLPTraceReceiver) Modify(cfg *config.Config) {
cfg.Options.Tracing.OtelTracesExporter = proto.String("otlp")
cfg.Options.Tracing.OtelExporterOtlpTracesEndpoint = proto.String(rec.GRPCEndpointURL().Value())
cfg.Options.Tracing.OtelExporterOtlpTracesProtocol = proto.String("grpc")
}
func (rec *OTLPTraceReceiver) handleV1Traces(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != "application/x-protobuf" {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("invalid content type"))
return
}
reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
var err error
reader, err = gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
}
data, err := io.ReadAll(reader)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
var req coltracepb.ExportTraceServiceRequest
if err := proto.Unmarshal(data, &req); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte(err.Error()))
return
}
resp, err := rec.Export(metadata.NewIncomingContext(r.Context(), metadata.MD(r.Header)), &req)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
respData, err := proto.Marshal(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(respData)
}
func (rec *OTLPTraceReceiver) ReceivedRequests() []RecordedExportRequest {
rec.mu.Lock()
defer rec.mu.Unlock()
return rec.receivedRequests
}
func (rec *OTLPTraceReceiver) PeekResourceSpans() []*tracev1.ResourceSpans {
rec.mu.Lock()
defer rec.mu.Unlock()
return rec.peekResourceSpansLocked()
}
func (rec *OTLPTraceReceiver) peekResourceSpansLocked() []*tracev1.ResourceSpans {
return tracetest.FlattenExportRequests(rec.receivedRequests.AsExportTraceServiceRequests())
}
func (rec *OTLPTraceReceiver) FlushResourceSpans() []*tracev1.ResourceSpans {
rec.mu.Lock()
defer rec.mu.Unlock()
spans := rec.peekResourceSpansLocked()
rec.receivedRequests = nil
return spans
}
// GRPCEndpointURL returns a url suitable for use with the environment variable
// $OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or with [otlptracegrpc.WithEndpointURL].
func (rec *OTLPTraceReceiver) GRPCEndpointURL() values.Value[string] {
return values.Chain(rec.grpcUpstream, upstreams.GRPCUpstream.Port, func(port int) string {
return fmt.Sprintf("http://127.0.0.1:%d", port)
})
}
// GRPCEndpointURL returns a url suitable for use with the environment variable
// $OTEL_EXPORTER_OTLP_TRACES_ENDPOINT or with [otlptracehttp.WithEndpointURL].
func (rec *OTLPTraceReceiver) HTTPEndpointURL() values.Value[string] {
return values.Chain(rec.httpUpstream, upstreams.HTTPUpstream.Port, func(port int) string {
return fmt.Sprintf("http://127.0.0.1:%d/v1/traces", port)
})
}
func (rec *OTLPTraceReceiver) NewGRPCClient(opts ...otlptracegrpc.Option) otlptrace.Client {
return &deferredClient{
client: values.Bind(rec.grpcUpstream, func(up upstreams.GRPCUpstream) otlptrace.Client {
return otlptracegrpc.NewClient(append(opts,
otlptracegrpc.WithGRPCConn(up.DirectConnect()),
otlptracegrpc.WithTimeout(1*time.Minute),
)...)
}),
}
}
func (rec *OTLPTraceReceiver) NewHTTPClient(opts ...otlptracehttp.Option) otlptrace.Client {
return &deferredClient{
client: values.Chain(rec.httpUpstream, upstreams.HTTPUpstream.Port, func(port int) otlptrace.Client {
return otlptracehttp.NewClient(append(opts,
otlptracehttp.WithEndpointURL(fmt.Sprintf("http://127.0.0.1:%d/v1/traces", port)),
otlptracehttp.WithTimeout(1*time.Minute),
)...)
}),
}
}
type deferredClient struct {
client values.Value[otlptrace.Client]
}
// Start implements otlptrace.Client.
func (o *deferredClient) Start(ctx context.Context) error {
return o.client.Value().Start(ctx)
}
// Stop implements otlptrace.Client.
func (o *deferredClient) Stop(ctx context.Context) error {
return o.client.Value().Stop(ctx)
}
// UploadTraces implements otlptrace.Client.
func (o *deferredClient) UploadTraces(ctx context.Context, protoSpans []*tracev1.ResourceSpans) error {
return o.client.Value().UploadTraces(ctx, protoSpans)
}
var _ otlptrace.Client = (*deferredClient)(nil)