mcp: storage scaffolding (#5581)

This commit is contained in:
Denis Mishin 2025-04-23 13:39:27 -04:00 committed by GitHub
parent f1a9401ddc
commit db221cb826
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 68 additions and 8 deletions

View file

@ -2,14 +2,19 @@ package mcp
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"path" "path"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/rs/cors" "github.com/rs/cors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
oteltrace "go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace"
googlegrpc "google.golang.org/grpc"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/pkg/grpc"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/telemetry/trace" "github.com/pomerium/pomerium/pkg/telemetry/trace"
) )
@ -24,19 +29,27 @@ const (
) )
type Handler struct { type Handler struct {
prefix string prefix string
trace oteltrace.TracerProvider trace oteltrace.TracerProvider
storage *Storage
} }
func New( func New(
ctx context.Context, ctx context.Context,
prefix string, prefix string,
_ *config.Config, cfg *config.Config,
) (*Handler, error) { ) (*Handler, error) {
tracerProvider := trace.NewTracerProvider(ctx, "MCP") tracerProvider := trace.NewTracerProvider(ctx, "MCP")
client, err := getDatabrokerServiceClient(ctx, cfg, tracerProvider)
if err != nil {
return nil, fmt.Errorf("databroker client: %w", err)
}
return &Handler{ return &Handler{
prefix: prefix, prefix: prefix,
trace: tracerProvider, trace: tracerProvider,
storage: NewStorage(client),
}, nil }, nil
} }
@ -58,3 +71,27 @@ func (srv *Handler) HandlerFunc() http.HandlerFunc {
return r.ServeHTTP return r.ServeHTTP
} }
var outboundGRPCConnection = new(grpc.CachedOutboundGRPClientConn)
func getDatabrokerServiceClient(
ctx context.Context,
cfg *config.Config,
tracerProvider oteltrace.TracerProvider,
) (databroker.DataBrokerServiceClient, error) {
sharedKey, err := cfg.Options.GetSharedKey()
if err != nil {
return nil, fmt.Errorf("shared key: %w", err)
}
dataBrokerConn, err := outboundGRPCConnection.Get(ctx, &grpc.OutboundOptions{
OutboundPort: cfg.OutboundPort,
InstallationID: cfg.Options.InstallationID,
ServiceName: cfg.Options.Services,
SignedJWTKey: sharedKey,
}, googlegrpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))))
if err != nil {
return nil, fmt.Errorf("databroker connection: %w", err)
}
return databroker.NewDataBrokerServiceClient(dataBrokerConn), nil
}

16
internal/mcp/storage.go Normal file
View file

@ -0,0 +1,16 @@
package mcp
import "github.com/pomerium/pomerium/pkg/grpc/databroker"
type Storage struct {
client databroker.DataBrokerServiceClient
}
// NewStorage creates a new Storage instance.
func NewStorage(
client databroker.DataBrokerServiceClient,
) *Storage {
return &Storage{
client: client,
}
}

View file

@ -25,7 +25,7 @@ func (p *Proxy) registerDashboardHandlers(r *mux.Router, opts *config.Options) *
h.Use(middleware.SetHeaders(httputil.HeadersContentSecurityPolicy)) h.Use(middleware.SetHeaders(httputil.HeadersContentSecurityPolicy))
// model context protocol // model context protocol
h.PathPrefix("/mcp").Handler(p.mcp.HandlerFunc()) h.PathPrefix("/mcp").Handler(p.mcp.Load().HandlerFunc())
// special pomerium endpoints for users to view their session // special pomerium endpoints for users to view their session
h.Path("/").Handler(httputil.HandlerFunc(p.userInfo)).Methods(http.MethodGet) h.Path("/").Handler(httputil.HandlerFunc(p.userInfo)).Methods(http.MethodGet)

View file

@ -64,7 +64,7 @@ type Proxy struct {
webauthn *webauthn.Handler webauthn *webauthn.Handler
tracerProvider oteltrace.TracerProvider tracerProvider oteltrace.TracerProvider
logoProvider portal.LogoProvider logoProvider portal.LogoProvider
mcp *mcp.Handler mcp *atomicutil.Value[*mcp.Handler]
} }
// New takes a Proxy service from options and a validation function. // New takes a Proxy service from options and a validation function.
@ -87,7 +87,7 @@ func New(ctx context.Context, cfg *config.Config) (*Proxy, error) {
currentConfig: atomicutil.NewValue(&config.Config{Options: config.NewDefaultOptions()}), currentConfig: atomicutil.NewValue(&config.Config{Options: config.NewDefaultOptions()}),
currentRouter: atomicutil.NewValue(httputil.NewRouter()), currentRouter: atomicutil.NewValue(httputil.NewRouter()),
logoProvider: portal.NewLogoProvider(), logoProvider: portal.NewLogoProvider(),
mcp: mcp, mcp: atomicutil.NewValue(mcp),
} }
p.OnConfigChange(ctx, cfg) p.OnConfigChange(ctx, cfg)
p.webauthn = webauthn.New(p.getWebauthnState) p.webauthn = webauthn.New(p.getWebauthnState)
@ -110,6 +110,13 @@ func (p *Proxy) OnConfigChange(ctx context.Context, cfg *config.Config) {
return return
} }
mcp, err := mcp.New(ctx, mcp.DefaultPrefix, cfg)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("proxy: failed to update proxy state from configuration settings")
} else {
p.mcp.Store(mcp)
}
p.currentConfig.Store(cfg) p.currentConfig.Store(cfg)
if err := p.setHandlers(ctx, cfg.Options); err != nil { if err := p.setHandlers(ctx, cfg.Options); err != nil {
log.Ctx(ctx).Error().Err(err).Msg("proxy: failed to update proxy handlers from configuration settings") log.Ctx(ctx).Error().Err(err).Msg("proxy: failed to update proxy handlers from configuration settings")