mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
authorize: add evaluator store (#1105)
* add evaluator store * handle arrays
This commit is contained in:
parent
d2656ecd67
commit
fff782e04c
6 changed files with 203 additions and 13 deletions
|
@ -50,6 +50,7 @@ func (a *atomicMarshalUnmarshaler) Store(encoder encoding.MarshalUnmarshaler) {
|
|||
// Authorize struct holds
|
||||
type Authorize struct {
|
||||
pe *evaluator.Evaluator
|
||||
store *evaluator.Store
|
||||
|
||||
currentOptions atomicOptions
|
||||
currentEncoder atomicMarshalUnmarshaler
|
||||
|
@ -84,6 +85,7 @@ func New(opts *config.Options) (*Authorize, error) {
|
|||
}
|
||||
|
||||
a := Authorize{
|
||||
store: evaluator.NewStore(),
|
||||
templates: template.Must(frontend.NewTemplates()),
|
||||
dataBrokerClient: databroker.NewDataBrokerServiceClient(dataBrokerConn),
|
||||
dataBrokerData: make(evaluator.DataBrokerData),
|
||||
|
@ -113,14 +115,14 @@ func validateOptions(o *config.Options) error {
|
|||
}
|
||||
|
||||
// newPolicyEvaluator returns an policy evaluator.
|
||||
func newPolicyEvaluator(opts *config.Options) (*evaluator.Evaluator, error) {
|
||||
func newPolicyEvaluator(opts *config.Options, store *evaluator.Store) (*evaluator.Evaluator, error) {
|
||||
metrics.AddPolicyCountCallback("pomerium-authorize", func() int64 {
|
||||
return int64(len(opts.Policies))
|
||||
})
|
||||
ctx := context.Background()
|
||||
_, span := trace.StartSpan(ctx, "authorize.newPolicyEvaluator")
|
||||
defer span.End()
|
||||
return evaluator.New(opts)
|
||||
return evaluator.New(opts, store)
|
||||
}
|
||||
|
||||
// OnConfigChange implements the OptionsUpdater interface and updates internal
|
||||
|
@ -134,7 +136,7 @@ func (a *Authorize) OnConfigChange(cfg *config.Config) {
|
|||
a.currentOptions.Store(cfg.Options)
|
||||
|
||||
var err error
|
||||
if a.pe, err = newPolicyEvaluator(cfg.Options); err != nil {
|
||||
if a.pe, err = newPolicyEvaluator(cfg.Options, a.store); err != nil {
|
||||
log.Error().Err(err).Msg("authorize: failed to update policy with options")
|
||||
return
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/open-policy-agent/opa/rego"
|
||||
"github.com/open-policy-agent/opa/storage/inmem"
|
||||
"google.golang.org/protobuf/reflect/protoregistry"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
"gopkg.in/square/go-jose.v2"
|
||||
|
@ -49,7 +48,7 @@ type Evaluator struct {
|
|||
}
|
||||
|
||||
// New creates a new Evaluator.
|
||||
func New(options *config.Options) (*Evaluator, error) {
|
||||
func New(options *config.Options, store *Store) (*Evaluator, error) {
|
||||
e := &Evaluator{
|
||||
authenticateHost: options.AuthenticateURL.Host,
|
||||
policies: options.Policies,
|
||||
|
@ -97,11 +96,11 @@ func New(options *config.Options) (*Evaluator, error) {
|
|||
return nil, fmt.Errorf("error loading rego policy: %w", err)
|
||||
}
|
||||
|
||||
store.UpdateAdmins(options.Administrators)
|
||||
store.UpdateRoutePolicies(options.Policies)
|
||||
|
||||
e.rego = rego.New(
|
||||
rego.Store(inmem.NewFromObject(map[string]interface{}{
|
||||
"admins": options.Administrators,
|
||||
"route_policies": options.Policies,
|
||||
})),
|
||||
rego.Store(store.opaStore),
|
||||
rego.Module("pomerium.authz", string(authzPolicy)),
|
||||
rego.Query("result = data.pomerium.authz"),
|
||||
)
|
||||
|
|
|
@ -72,7 +72,7 @@ func TestJSONMarshal(t *testing.T) {
|
|||
func TestEvaluator_SignedJWT(t *testing.T) {
|
||||
opt := config.NewDefaultOptions()
|
||||
opt.AuthenticateURL = mustParseURL("https://authenticate.example.com")
|
||||
e, err := New(opt)
|
||||
e, err := New(opt, NewStore())
|
||||
require.NoError(t, err)
|
||||
req := &Request{
|
||||
HTTP: RequestHTTP{
|
||||
|
@ -93,7 +93,7 @@ func TestEvaluator_JWTWithKID(t *testing.T) {
|
|||
opt := config.NewDefaultOptions()
|
||||
opt.AuthenticateURL = mustParseURL("https://authenticate.example.com")
|
||||
opt.SigningKey = "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUpCMFZkbko1VjEvbVlpYUlIWHhnd2Q0Yzd5YWRTeXMxb3Y0bzA1b0F3ekdvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUc1eENQMEpUVDFINklvbDhqS3VUSVBWTE0wNENnVzlQbEV5cE5SbVdsb29LRVhSOUhUMwpPYnp6aktZaWN6YjArMUt3VjJmTVRFMTh1dy82MXJVQ0JBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
|
||||
e, err := New(opt)
|
||||
e, err := New(opt, NewStore())
|
||||
require.NoError(t, err)
|
||||
req := &Request{
|
||||
HTTP: RequestHTTP{
|
||||
|
@ -122,7 +122,7 @@ func mustParseURL(str string) *url.URL {
|
|||
func BenchmarkEvaluator_Evaluate(b *testing.B) {
|
||||
e, err := New(&config.Options{
|
||||
AuthenticateURL: mustParseURL("https://authn.example.com"),
|
||||
})
|
||||
}, NewStore())
|
||||
if !assert.NoError(b, err) {
|
||||
return
|
||||
}
|
||||
|
|
118
authorize/evaluator/store.go
Normal file
118
authorize/evaluator/store.go
Normal file
|
@ -0,0 +1,118 @@
|
|||
package evaluator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/open-policy-agent/opa/storage"
|
||||
"github.com/open-policy-agent/opa/storage/inmem"
|
||||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||
)
|
||||
|
||||
// A Store stores data for the OPA rego policy evaluation.
|
||||
type Store struct {
|
||||
opaStore storage.Store
|
||||
}
|
||||
|
||||
// NewStore creates a new Store.
|
||||
func NewStore() *Store {
|
||||
return &Store{
|
||||
opaStore: inmem.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateAdmins updates the admins in the store.
|
||||
func (s *Store) UpdateAdmins(admins []string) {
|
||||
s.write("/admins", admins)
|
||||
}
|
||||
|
||||
// UpdateRoutePolicies updates the route policies in the store.
|
||||
func (s *Store) UpdateRoutePolicies(routePolicies []config.Policy) {
|
||||
s.write("/route_policies", routePolicies)
|
||||
}
|
||||
|
||||
// UpdateRecord updates a record in the store.
|
||||
func (s *Store) UpdateRecord(record *databroker.Record) {
|
||||
rawPath := fmt.Sprintf("/databroker_data/%s/%s", record.GetType(), record.GetId())
|
||||
|
||||
if record.GetDeletedAt() != nil {
|
||||
s.delete(rawPath)
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := record.GetData().UnmarshalNew()
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("path", rawPath).
|
||||
Msg("opa-store: error unmarshaling record data, ignoring")
|
||||
return
|
||||
}
|
||||
|
||||
s.write(rawPath, msg)
|
||||
}
|
||||
|
||||
func (s *Store) delete(rawPath string) {
|
||||
p, ok := storage.ParsePath(rawPath)
|
||||
if !ok {
|
||||
log.Error().
|
||||
Str("path", rawPath).
|
||||
Msg("opa-store: invalid path, ignoring data")
|
||||
return
|
||||
}
|
||||
|
||||
err := storage.Txn(context.Background(), s.opaStore, storage.WriteParams, func(txn storage.Transaction) error {
|
||||
_, err := s.opaStore.Read(context.Background(), txn, p)
|
||||
if storage.IsNotFound(err) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.opaStore.Write(context.Background(), txn, storage.RemoveOp, p, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("opa-store: error deleting data")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) write(rawPath string, value interface{}) {
|
||||
p, ok := storage.ParsePath(rawPath)
|
||||
if !ok {
|
||||
log.Error().
|
||||
Str("path", rawPath).
|
||||
Msg("opa-store: invalid path, ignoring data")
|
||||
return
|
||||
}
|
||||
|
||||
err := storage.Txn(context.Background(), s.opaStore, storage.WriteParams, func(txn storage.Transaction) error {
|
||||
if len(p) > 1 {
|
||||
err := storage.MakeDir(context.Background(), s.opaStore, txn, p[:len(p)-1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var op storage.PatchOp = storage.ReplaceOp
|
||||
_, err := s.opaStore.Read(context.Background(), txn, p)
|
||||
if storage.IsNotFound(err) {
|
||||
op = storage.AddOp
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.opaStore.Write(context.Background(), txn, op, p, value)
|
||||
})
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("opa-store: error writing data")
|
||||
return
|
||||
}
|
||||
}
|
69
authorize/evaluator/store_test.go
Normal file
69
authorize/evaluator/store_test.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package evaluator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/open-policy-agent/opa/storage"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/pomerium/pomerium/pkg/grpc/databroker"
|
||||
"github.com/pomerium/pomerium/pkg/grpc/user"
|
||||
)
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
ctx, clearTimeout := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer clearTimeout()
|
||||
|
||||
s := NewStore()
|
||||
|
||||
t.Run("admins", func(t *testing.T) {
|
||||
s.UpdateAdmins([]string{"admin1", "admin2"})
|
||||
v, err := storage.ReadOne(ctx, s.opaStore, storage.MustParsePath("/admins"))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []interface{}{"admin1", "admin2"}, v)
|
||||
|
||||
s.UpdateAdmins([]string{"admin3"})
|
||||
v, err = storage.ReadOne(ctx, s.opaStore, storage.MustParsePath("/admins"))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, []interface{}{"admin3"}, v)
|
||||
})
|
||||
t.Run("records", func(t *testing.T) {
|
||||
u := &user.User{
|
||||
Version: "v1",
|
||||
Id: "u1",
|
||||
Name: "name",
|
||||
Email: "name@example.com",
|
||||
}
|
||||
any, _ := ptypes.MarshalAny(u)
|
||||
s.UpdateRecord(&databroker.Record{
|
||||
Version: "v1",
|
||||
Type: any.GetTypeUrl(),
|
||||
Id: u.GetId(),
|
||||
Data: any,
|
||||
})
|
||||
|
||||
v, err := storage.ReadOne(ctx, s.opaStore, storage.MustParsePath("/databroker_data/type.googleapis.com/user.User/u1"))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, map[string]interface{}{
|
||||
"version": "v1",
|
||||
"id": "u1",
|
||||
"name": "name",
|
||||
"email": "name@example.com",
|
||||
}, v)
|
||||
|
||||
s.UpdateRecord(&databroker.Record{
|
||||
Version: "v2",
|
||||
Type: any.GetTypeUrl(),
|
||||
Id: u.GetId(),
|
||||
Data: any,
|
||||
DeletedAt: ptypes.TimestampNow(),
|
||||
})
|
||||
|
||||
v, err = storage.ReadOne(ctx, s.opaStore, storage.MustParsePath("/databroker_data/type.googleapis.com/user.User/u1"))
|
||||
assert.Error(t, err)
|
||||
assert.Nil(t, v)
|
||||
})
|
||||
}
|
|
@ -180,6 +180,8 @@ func (a *Authorize) runDataUpdater(ctx context.Context, updateRecord <-chan *dat
|
|||
case record = <-updateRecord:
|
||||
}
|
||||
|
||||
a.store.UpdateRecord(record)
|
||||
|
||||
a.dataBrokerDataLock.Lock()
|
||||
a.dataBrokerData.Update(record)
|
||||
a.dataBrokerDataLock.Unlock()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue