pomerium/pkg/policy/criteria/mcp_tool_test.go
Denis Mishin 9363457849
mcp: add mcp method and tool logging to authorize (#5668)
## Summary

Adds support for extending authorization log with Model Context Protocol
details.

i.e. 
```json
{
  "level": "info",
  "server-name": "all",
  "service": "authorize",
  "mcp-method": "tools/call",
  "mcp-tool": "describe_table",
  "mcp-tool-parameters": { "table_name": "Categories" },
  "allow": true,
  "allow-why-true": ["email-ok", "mcp-tool-ok"],
  "deny": false,
  "deny-why-false": [],
  "time": "2025-06-24T17:40:41-04:00",
  "message": "authorize check"
}
```

## Related issues

Fixes
https://linear.app/pomerium/issue/ENG-2393/mcp-authorize-each-incoming-request-to-an-mcp-route

## User Explanation

<!-- How would you explain this change to the user? If this
change doesn't create any user-facing changes, you can leave
this blank. If filled out, add the `docs` label -->

## Checklist

- [x] reference any related issues
- [x] updated unit tests
- [x] add appropriate label (`enhancement`, `bug`, `breaking`,
`dependencies`, `ci`)
- [x] ready for review
2025-06-24 20:58:51 -04:00

71 lines
2.1 KiB
Go

package criteria
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
func TestMCPTool(t *testing.T) {
t.Run("ok", func(t *testing.T) {
res, err := evaluate(t, `
allow:
and:
- mcp_tool:
is: list_tables
`, []*databroker.Record{}, Input{MCP: InputMCP{Method: "tools/call", ToolCall: &InputMCPToolCall{Name: "list_tables"}}})
require.NoError(t, err)
require.Equal(t, A{true, A{ReasonMCPToolOK}, M{}}, res["allow"])
require.Equal(t, A{false, A{}}, res["deny"])
})
t.Run("unauthorized", func(t *testing.T) {
res, err := evaluate(t, `
allow:
and:
- mcp_tool:
is: list_tables
`, []*databroker.Record{}, Input{MCP: InputMCP{Method: "tools/call", ToolCall: &InputMCPToolCall{Name: "read_table"}}})
require.NoError(t, err)
require.Equal(t, A{false, A{ReasonMCPToolUnauthorized}, M{}}, res["allow"])
require.Equal(t, A{false, A{}}, res["deny"])
})
t.Run("in list", func(t *testing.T) {
res, err := evaluate(t, `
allow:
and:
- mcp_tool:
in: ["list_tables", "read_table"]
`, []*databroker.Record{}, Input{MCP: InputMCP{Method: "tools/call", ToolCall: &InputMCPToolCall{Name: "list_tables"}}})
require.NoError(t, err)
require.Equal(t, A{true, A{ReasonMCPToolOK}, M{}}, res["allow"])
require.Equal(t, A{false, A{}}, res["deny"])
})
t.Run("not in list", func(t *testing.T) {
res, err := evaluate(t, `
allow:
and:
- mcp_tool:
in: ["list_tables", "read_table"]
`, []*databroker.Record{}, Input{MCP: InputMCP{Method: "tools/call", ToolCall: &InputMCPToolCall{Name: "delete_table"}}})
require.NoError(t, err)
require.Equal(t, A{false, A{ReasonMCPToolUnauthorized}, M{}}, res["allow"])
require.Equal(t, A{false, A{}}, res["deny"])
})
t.Run("non-tools/call method should pass", func(t *testing.T) {
res, err := evaluate(t, `
allow:
and:
- mcp_tool:
is: list_tables
`, []*databroker.Record{}, Input{MCP: InputMCP{Method: "some/other_method"}})
require.NoError(t, err)
require.Equal(t, A{true, A{ReasonMCPNotAToolCall}, M{}}, res["allow"])
require.Equal(t, A{false, A{}}, res["deny"])
})
}