pomerium/pkg/policy/criteria/mcp_tool.go
Denis Mishin f9e7308f12
mcp: add mcp_tool to ppl (#5662)
## Summary

Adds `mcp_tool` PPL criterion, that matches MCP tool names like 

```yaml
  - from: https://db.localhost.pomerium.io
    to: http://localhost:3000/mcp
    policy:
      allow:
        and:
          - email: 
              in: ["user@pomerium.com"]
          - mcp_tool:
              in: ["list_tables", "read_table", "search_records"]
    mcp: {}
```

## Related issues

Fix
https://linear.app/pomerium/issue/ENG-2393/mcp-authorize-each-incoming-request-to-an-mcp-route

## User Explanation

<!-- How would you explain this change to the user? If this
change doesn't create any user-facing changes, you can leave
this blank. If filled out, add the `docs` label -->

## Checklist

- [x] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review
2025-06-23 12:43:43 -04:00

60 lines
1.4 KiB
Go

package criteria
import (
"github.com/open-policy-agent/opa/ast"
"github.com/pomerium/pomerium/pkg/policy/generator"
"github.com/pomerium/pomerium/pkg/policy/parser"
)
type mcpToolCriterion struct {
g *Generator
}
func (mcpToolCriterion) DataType() CriterionDataType {
return CriterionDataTypeStringMatcher
}
func (mcpToolCriterion) Name() string {
return "mcp_tool"
}
func (c mcpToolCriterion) GenerateRule(_ string, data parser.Value) (*ast.Rule, []*ast.Rule, error) {
r1 := c.g.NewRule(c.Name())
r1.Head.Value = NewCriterionTerm(true, ReasonMCPNotAToolCall)
r1.Body = ast.Body{
ast.MustParseExpr(`input.mcp.method != "tools/call"`),
}
r2 := &ast.Rule{
Head: generator.NewHead("", NewCriterionTerm(true, ReasonMCPToolOK)),
Body: ast.Body{
ast.MustParseExpr(`input.mcp.method == "tools/call"`),
},
}
toolRef := ast.RefTerm(ast.VarTerm("input"), ast.VarTerm("mcp"), ast.VarTerm("tool"))
err := matchString(&r2.Body, toolRef, data)
if err != nil {
return nil, nil, err
}
r1.Else = r2
r3 := &ast.Rule{
Head: generator.NewHead("", NewCriterionTerm(false, ReasonMCPToolUnauthorized)),
Body: ast.Body{
ast.NewExpr(ast.BooleanTerm(true)),
},
}
r2.Else = r3
return r1, nil, nil
}
// MCPTool returns a Criterion which matches an MCP tool name.
func MCPTool(generator *Generator) Criterion {
return mcpToolCriterion{g: generator}
}
func init() {
Register(MCPTool)
}