authorize: remove DataBrokerData (#1846)

* authorize: remove DataBrokerData

* fix method name
This commit is contained in:
Caleb Doxsey 2021-02-02 11:40:21 -07:00 committed by GitHub
parent 2f3c73baf3
commit eed873b263
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 263 additions and 322 deletions

View file

@ -6,7 +6,6 @@ import (
"context"
"fmt"
"html/template"
"sync"
"github.com/pomerium/pomerium/authorize/evaluator"
"github.com/pomerium/pomerium/config"
@ -25,9 +24,6 @@ type Authorize struct {
currentOptions *config.AtomicOptions
templates *template.Template
dataBrokerDataLock sync.RWMutex
dataBrokerData evaluator.DataBrokerData
dataBrokerInitialSync map[string]chan struct{}
}
@ -37,7 +33,6 @@ func New(cfg *config.Config) (*Authorize, error) {
currentOptions: config.NewAtomicOptions(),
store: evaluator.NewStore(),
templates: template.Must(frontend.NewTemplates()),
dataBrokerData: make(evaluator.DataBrokerData),
dataBrokerInitialSync: map[string]chan struct{}{
"type.googleapis.com/directory.Group": make(chan struct{}, 1),
"type.googleapis.com/directory.User": make(chan struct{}, 1),

View file

@ -15,11 +15,13 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/encoding/protojson"
"github.com/pomerium/pomerium/authorize/evaluator"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/encoding/jws"
"github.com/pomerium/pomerium/internal/frontend"
"github.com/pomerium/pomerium/internal/testutil"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user"
)
@ -39,25 +41,21 @@ func TestAuthorize_okResponse(t *testing.T) {
encoder, _ := jws.NewHS256Signer([]byte{0, 0, 0, 0})
a.state.Load().encoder = encoder
a.currentOptions.Store(opt)
a.store = evaluator.NewStore()
pe, err := newPolicyEvaluator(opt, a.store)
require.NoError(t, err)
a.state.Load().evaluator = pe
validJWT, _ := pe.SignedJWT(pe.JWTPayload(&evaluator.Request{
DataBrokerData: evaluator.DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
a.store = evaluator.NewStoreFromProtos(
&session.Session{
Id: "SESSION_ID",
UserId: "USER_ID",
},
},
"type.googleapis.com/user.User": map[string]interface{}{
"USER_ID": &user.User{
&user.User{
Id: "USER_ID",
Name: "foo",
Email: "foo@example.com",
},
},
},
)
pe, err := newPolicyEvaluator(opt, a.store)
require.NoError(t, err)
a.state.Load().evaluator = pe
validJWT, _ := pe.SignedJWT(pe.JWTPayload(&evaluator.Request{
HTTP: evaluator.RequestHTTP{URL: "https://example.com"},
Session: evaluator.RequestSession{
ID: "SESSION_ID",
@ -198,7 +196,8 @@ func TestAuthorize_okResponse(t *testing.T) {
got := a.okResponse(tc.reply)
assert.Equal(t, tc.want.Status.Code, got.Status.Code)
assert.Equal(t, tc.want.Status.Message, got.Status.Message)
assert.Equal(t, tc.want.GetOkResponse().GetHeaders(), got.GetOkResponse().GetHeaders())
want, _ := protojson.Marshal(tc.want.GetOkResponse())
testutil.AssertProtoJSONEqual(t, string(want), got.GetOkResponse())
})
}
}

View file

@ -11,18 +11,13 @@ import (
"net/http"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/open-policy-agent/opa/rego"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/anypb"
"gopkg.in/square/go-jose.v2"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/directory"
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
)
const (
@ -39,6 +34,7 @@ type Evaluator struct {
rego *rego.Rego
query rego.PreparedEvalQuery
policies []config.Policy
store *Store
authenticateHost string
jwk *jose.JSONWebKey
@ -51,6 +47,7 @@ func New(options *config.Options, store *Store) (*Evaluator, error) {
custom: NewCustomEvaluator(store.opaStore),
authenticateHost: options.AuthenticateURL.Host,
policies: options.GetAllPolicies(),
store: store,
}
var err error
e.signer, e.jwk, err = newSigner(options)
@ -165,7 +162,7 @@ func (e *Evaluator) JWTPayload(req *Request) map[string]interface{} {
payload := map[string]interface{}{
"iss": e.authenticateHost,
}
req.fillJWTPayload(payload)
req.fillJWTPayload(e.store, payload)
return payload
}
@ -247,9 +244,9 @@ type dataBrokerDataInput struct {
func (e *Evaluator) newInput(req *Request, isValidClientCertificate bool) *input {
i := new(input)
i.DataBrokerData.Session = req.DataBrokerData.Get(sessionTypeURL, req.Session.ID)
i.DataBrokerData.Session = e.store.GetRecordData(sessionTypeURL, req.Session.ID)
if i.DataBrokerData.Session == nil {
i.DataBrokerData.Session = req.DataBrokerData.Get(serviceAccountTypeURL, req.Session.ID)
i.DataBrokerData.Session = e.store.GetRecordData(serviceAccountTypeURL, req.Session.ID)
}
var userIDs []string
if obj, ok := i.DataBrokerData.Session.(interface{ GetUserId() string }); ok && obj.GetUserId() != "" {
@ -260,13 +257,13 @@ func (e *Evaluator) newInput(req *Request, isValidClientCertificate bool) *input
}
for _, userID := range userIDs {
i.DataBrokerData.User = req.DataBrokerData.Get(userTypeURL, userID)
i.DataBrokerData.User = e.store.GetRecordData(userTypeURL, userID)
user, ok := req.DataBrokerData.Get(directoryUserTypeURL, userID).(*directory.User)
user, ok := e.store.GetRecordData(directoryUserTypeURL, userID).(*directory.User)
if ok {
var groups []string
for _, groupID := range user.GetGroupIds() {
if dg, ok := req.DataBrokerData.Get(directoryGroupTypeURL, groupID).(*directory.Group); ok {
if dg, ok := e.store.GetRecordData(directoryGroupTypeURL, groupID).(*directory.Group); ok {
if dg.Name != "" {
groups = append(groups, dg.Name)
}
@ -359,54 +356,3 @@ func getDenyVar(vars rego.Vars) []Result {
}
return results
}
// DataBrokerData stores the data broker data by type => id => record
type DataBrokerData map[string]map[string]interface{}
// Clear removes all the data for the given type URL from the databroekr data.
func (dbd DataBrokerData) Clear(typeURL string) {
delete(dbd, typeURL)
}
// Count returns the number of entries for the given type URL.
func (dbd DataBrokerData) Count(typeURL string) int {
return len(dbd[typeURL])
}
// Get gets a record from the DataBrokerData.
func (dbd DataBrokerData) Get(typeURL, id string) interface{} {
m, ok := dbd[typeURL]
if !ok {
return nil
}
return m[id]
}
// Update updates a record in the DataBrokerData.
func (dbd DataBrokerData) Update(record *databroker.Record) {
db, ok := dbd[record.GetType()]
if !ok {
db = make(map[string]interface{})
dbd[record.GetType()] = db
}
if record.GetDeletedAt() != nil {
delete(db, record.GetId())
} else {
if obj, err := unmarshalAny(record.GetData()); err == nil {
db[record.GetId()] = obj
} else {
log.Warn().Err(err).Msg("failed to unmarshal unknown any type")
delete(db, record.GetId())
}
}
}
func unmarshalAny(any *anypb.Any) (proto.Message, error) {
messageType, err := protoregistry.GlobalTypes.FindMessageByURL(any.GetTypeUrl())
if err != nil {
return nil, err
}
msg := proto.MessageV1(messageType.New())
return msg, ptypes.UnmarshalAny(any, msg)
}

View file

@ -23,33 +23,28 @@ import (
)
func TestJSONMarshal(t *testing.T) {
dbd := DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
opt := config.NewDefaultOptions()
opt.AuthenticateURL = mustParseURL("https://authenticate.example.com")
e, err := New(opt, NewStoreFromProtos(
&session.Session{
UserId: "user1",
},
},
"type.googleapis.com/directory.User": map[string]interface{}{
"user1": &directory.User{
&directory.User{
Id: "user1",
GroupIds: []string{"group1", "group2"},
},
},
"type.googleapis.com/directory.Group": map[string]interface{}{
"group1": &directory.Group{
&directory.Group{
Id: "group1",
Name: "admin",
Email: "admin@example.com",
},
"group2": &directory.Group{
&directory.Group{
Id: "group2",
Name: "test",
},
},
}
bs, _ := json.Marshal(new(Evaluator).newInput(&Request{
DataBrokerData: dbd,
))
require.NoError(t, err)
bs, _ := json.Marshal(e.newInput(&Request{
HTTP: RequestHTTP{
Method: "GET",
URL: "https://example.com",
@ -63,12 +58,7 @@ func TestJSONMarshal(t *testing.T) {
},
}, true))
assert.JSONEq(t, `{
"databroker_data": {
"groups": ["admin", "admin@example.com", "test", "group1", "group2"],
"session": {
"user_id": "user1"
}
},
"databroker_data": {},
"http": {
"client_certificate": "CLIENT_CERTIFICATE",
"headers": {
@ -131,11 +121,13 @@ func TestEvaluator_JWTPayload(t *testing.T) {
now, _ := ptypes.Timestamp(nowPb)
tests := []struct {
name string
store *Store
req *Request
want map[string]interface{}
}{
{
"iss and aud",
NewStore(),
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
},
@ -146,19 +138,15 @@ func TestEvaluator_JWTPayload(t *testing.T) {
},
{
"with session",
&Request{
DataBrokerData: DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
NewStoreFromProtos(&session.Session{
Id: "SESSION_ID",
IdToken: &session.IDToken{
ExpiresAt: nowPb,
IssuedAt: nowPb,
},
ExpiresAt: nowPb,
},
},
},
}),
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
Session: RequestSession{
ID: "SESSION_ID",
@ -174,16 +162,12 @@ func TestEvaluator_JWTPayload(t *testing.T) {
},
{
"with service account",
&Request{
DataBrokerData: DataBrokerData{
"type.googleapis.com/user.ServiceAccount": map[string]interface{}{
"SERVICE_ACCOUNT_ID": &user.ServiceAccount{
NewStoreFromProtos(&user.ServiceAccount{
Id: "SERVICE_ACCOUNT_ID",
IssuedAt: nowPb,
ExpiresAt: nowPb,
},
},
},
}),
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
Session: RequestSession{
ID: "SERVICE_ACCOUNT_ID",
@ -199,22 +183,15 @@ func TestEvaluator_JWTPayload(t *testing.T) {
},
{
"with user",
&Request{
DataBrokerData: DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
NewStoreFromProtos(&session.Session{
Id: "SESSION_ID",
UserId: "USER_ID",
},
},
"type.googleapis.com/user.User": map[string]interface{}{
"USER_ID": &user.User{
}, &user.User{
Id: "USER_ID",
Name: "foo",
Email: "foo@example.com",
},
},
},
}),
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
Session: RequestSession{
ID: "SESSION_ID",
@ -231,33 +208,27 @@ func TestEvaluator_JWTPayload(t *testing.T) {
},
{
"with directory user",
&Request{
DataBrokerData: DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
NewStoreFromProtos(
&session.Session{
Id: "SESSION_ID",
UserId: "USER_ID",
},
},
"type.googleapis.com/directory.User": map[string]interface{}{
"USER_ID": &directory.User{
&directory.User{
Id: "USER_ID",
GroupIds: []string{"group1", "group2"},
},
},
"type.googleapis.com/directory.Group": map[string]interface{}{
"group1": &directory.Group{
&directory.Group{
Id: "group1",
Name: "admin",
Email: "admin@example.com",
},
"group2": &directory.Group{
&directory.Group{
Id: "group2",
Name: "test",
Email: "test@example.com",
},
},
},
),
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
Session: RequestSession{
ID: "SESSION_ID",
@ -272,20 +243,18 @@ func TestEvaluator_JWTPayload(t *testing.T) {
},
{
"with impersonate",
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
Session: RequestSession{
ID: "SESSION_ID",
},
DataBrokerData: DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
NewStoreFromProtos(
&session.Session{
Id: "SESSION_ID",
UserId: "USER_ID",
ImpersonateEmail: proto.String("user@example.com"),
ImpersonateGroups: []string{"admin", "test"},
},
},
),
&Request{
HTTP: RequestHTTP{URL: "https://example.com"},
Session: RequestSession{
ID: "SESSION_ID",
},
},
map[string]interface{}{
@ -304,7 +273,7 @@ func TestEvaluator_JWTPayload(t *testing.T) {
t.Parallel()
e, err := New(&config.Options{
AuthenticateURL: mustParseURL("https://authn.example.com"),
}, NewStore())
}, tc.store)
require.NoError(t, err)
assert.Equal(t, tc.want, e.JWTPayload(tc.req))
})
@ -312,41 +281,8 @@ func TestEvaluator_JWTPayload(t *testing.T) {
}
func TestEvaluator_Evaluate(t *testing.T) {
dbd := make(DataBrokerData)
sessionID := uuid.New().String()
userID := uuid.New().String()
data, _ := ptypes.MarshalAny(&session.Session{
Version: "1",
Id: sessionID,
UserId: userID,
IdToken: &session.IDToken{
Issuer: "TestEvaluatorEvaluate",
Subject: userID,
IssuedAt: ptypes.TimestampNow(),
},
OauthToken: &session.OAuthToken{
AccessToken: "ACCESS TOKEN",
TokenType: "Bearer",
RefreshToken: "REFRESH TOKEN",
},
})
dbd.Update(&databroker.Record{
Version: "1",
Type: "type.googleapis.com/session.Session",
Id: sessionID,
Data: data,
})
data, _ = ptypes.MarshalAny(&user.User{
Version: "1",
Id: userID,
Email: "foo@example.com",
})
dbd.Update(&databroker.Record{
Version: "1",
Type: "type.googleapis.com/user.User",
Id: userID,
Data: data,
})
ctx := context.Background()
allowedPolicy := []config.Policy{{From: "https://foo.com", AllowedUsers: []string{"foo@example.com"}}}
@ -370,13 +306,47 @@ func TestEvaluator_Evaluate(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
store := NewStoreFromProtos()
data, _ := ptypes.MarshalAny(&session.Session{
Version: "1",
Id: sessionID,
UserId: userID,
IdToken: &session.IDToken{
Issuer: "TestEvaluatorEvaluate",
Subject: userID,
IssuedAt: ptypes.TimestampNow(),
},
OauthToken: &session.OAuthToken{
AccessToken: "ACCESS TOKEN",
TokenType: "Bearer",
RefreshToken: "REFRESH TOKEN",
},
})
store.UpdateRecord(&databroker.Record{
Version: "1",
Type: "type.googleapis.com/session.Session",
Id: sessionID,
Data: data,
})
data, _ = ptypes.MarshalAny(&user.User{
Version: "1",
Id: userID,
Email: "foo@example.com",
})
store.UpdateRecord(&databroker.Record{
Version: "1",
Type: "type.googleapis.com/user.User",
Id: userID,
Data: data,
})
e, err := New(&config.Options{
AuthenticateURL: mustParseURL("https://authn.example.com"),
Policies: tc.policies,
}, NewStore())
}, store)
require.NoError(t, err)
res, err := e.Evaluate(ctx, &Request{
DataBrokerData: dbd,
HTTP: RequestHTTP{Method: "GET", URL: tc.reqURL},
Session: RequestSession{ID: tc.sessionID},
CustomPolicies: tc.customPolicies,
@ -397,16 +367,16 @@ func mustParseURL(str string) *url.URL {
}
func BenchmarkEvaluator_Evaluate(b *testing.B) {
store := NewStore()
e, err := New(&config.Options{
AuthenticateURL: mustParseURL("https://authn.example.com"),
}, NewStore())
}, store)
if !assert.NoError(b, err) {
return
}
lastSessionID := ""
dbd := make(DataBrokerData)
for i := 0; i < 100; i++ {
sessionID := uuid.New().String()
lastSessionID = sessionID
@ -426,7 +396,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
RefreshToken: "REFRESH TOKEN",
},
})
dbd.Update(&databroker.Record{
store.UpdateRecord(&databroker.Record{
Version: fmt.Sprint(i),
Type: "type.googleapis.com/session.Session",
Id: sessionID,
@ -436,7 +406,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Version: fmt.Sprint(i),
Id: userID,
})
dbd.Update(&databroker.Record{
store.UpdateRecord(&databroker.Record{
Version: fmt.Sprint(i),
Type: "type.googleapis.com/user.User",
Id: userID,
@ -448,7 +418,6 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
ctx := context.Background()
for i := 0; i < b.N; i++ {
e.Evaluate(ctx, &Request{
DataBrokerData: dbd,
HTTP: RequestHTTP{
Method: "GET",
URL: "https://example.com/path",

View file

@ -13,7 +13,6 @@ import (
type (
// Request is the request data used for the evaluator.
Request struct {
DataBrokerData DataBrokerData `json:"databroker_data"`
HTTP RequestHTTP `json:"http"`
Session RequestSession `json:"session"`
CustomPolicies []string
@ -44,21 +43,21 @@ type sessionOrServiceAccount interface {
GetImpersonateUserId() string
}
func (req *Request) fillJWTPayload(payload map[string]interface{}) {
func (req *Request) fillJWTPayload(store *Store, payload map[string]interface{}) {
if u, err := url.Parse(req.HTTP.URL); err == nil {
payload["aud"] = u.Hostname()
}
if s, ok := req.DataBrokerData.Get("type.googleapis.com/session.Session", req.Session.ID).(*session.Session); ok {
req.fillJWTPayloadSessionOrServiceAccount(payload, s)
if s, ok := store.GetRecordData("type.googleapis.com/session.Session", req.Session.ID).(*session.Session); ok {
req.fillJWTPayloadSessionOrServiceAccount(store, payload, s)
}
if sa, ok := req.DataBrokerData.Get("type.googleapis.com/user.ServiceAccount", req.Session.ID).(*user.ServiceAccount); ok {
req.fillJWTPayloadSessionOrServiceAccount(payload, sa)
if sa, ok := store.GetRecordData("type.googleapis.com/user.ServiceAccount", req.Session.ID).(*user.ServiceAccount); ok {
req.fillJWTPayloadSessionOrServiceAccount(store, payload, sa)
}
}
func (req *Request) fillJWTPayloadSessionOrServiceAccount(payload map[string]interface{}, s sessionOrServiceAccount) {
func (req *Request) fillJWTPayloadSessionOrServiceAccount(store *Store, payload map[string]interface{}, s sessionOrServiceAccount) {
payload["jti"] = s.GetId()
if s.GetExpiresAt().IsValid() {
payload["exp"] = s.GetExpiresAt().AsTime().Unix()
@ -71,18 +70,18 @@ func (req *Request) fillJWTPayloadSessionOrServiceAccount(payload map[string]int
if s.GetImpersonateUserId() != "" {
userID = s.GetImpersonateUserId()
}
if u, ok := req.DataBrokerData.Get("type.googleapis.com/user.User", userID).(*user.User); ok {
if u, ok := store.GetRecordData("type.googleapis.com/user.User", userID).(*user.User); ok {
payload["sub"] = u.GetId()
payload["user"] = u.GetId()
payload["email"] = u.GetEmail()
}
if du, ok := req.DataBrokerData.Get("type.googleapis.com/directory.User", userID).(*directory.User); ok {
if du, ok := store.GetRecordData("type.googleapis.com/directory.User", userID).(*directory.User); ok {
if du.GetEmail() != "" {
payload["email"] = du.GetEmail()
}
var groupNames []string
for _, groupID := range du.GetGroupIds() {
if dg, ok := req.DataBrokerData.Get("type.googleapis.com/directory.Group", groupID).(*directory.Group); ok {
if dg, ok := store.GetRecordData("type.googleapis.com/directory.Group", groupID).(*directory.Group); ok {
groupNames = append(groupNames, dg.Name)
}
}

View file

@ -2,10 +2,15 @@ package evaluator
import (
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/inmem"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
@ -24,12 +29,66 @@ func NewStore() *Store {
}
}
// NewStoreFromProtos creates a new Store from an existing set of protobuf messages.
func NewStoreFromProtos(msgs ...proto.Message) *Store {
s := NewStore()
for _, msg := range msgs {
any, err := anypb.New(msg)
if err != nil {
continue
}
record := new(databroker.Record)
record.CreatedAt = timestamppb.Now()
record.ModifiedAt = timestamppb.Now()
record.Version = uuid.New().String()
record.Id = uuid.New().String()
record.Data = any
record.Type = any.TypeUrl
if hasID, ok := msg.(interface{ GetId() string }); ok {
record.Id = hasID.GetId()
}
s.UpdateRecord(record)
}
return s
}
// ClearRecords removes all the records from the store.
func (s *Store) ClearRecords(typeURL string) {
rawPath := fmt.Sprintf("/databroker_data/%s", typeURL)
s.delete(rawPath)
}
// GetRecordData gets a record's data from the store. `nil` is returned
// if no record exists for the given type and id.
func (s *Store) GetRecordData(typeURL, id string) proto.Message {
rawPath := fmt.Sprintf("/databroker_data/%s/%s", typeURL, id)
data := s.get(rawPath)
if data == nil {
return nil
}
any := anypb.Any{
TypeUrl: typeURL,
}
msg, err := any.UnmarshalNew()
if err != nil {
return nil
}
bs, err := json.Marshal(data)
if err != nil {
return nil
}
err = json.Unmarshal(bs, &msg)
if err != nil {
return nil
}
return msg
}
// UpdateRoutePolicies updates the route policies in the store.
func (s *Store) UpdateRoutePolicies(routePolicies []config.Policy) {
s.write("/route_policies", routePolicies)
@ -85,6 +144,27 @@ func (s *Store) delete(rawPath string) {
}
}
func (s *Store) get(rawPath string) (value interface{}) {
p, ok := storage.ParsePath(rawPath)
if !ok {
log.Error().
Str("path", rawPath).
Msg("opa-store: invalid path, ignoring data")
return nil
}
var err error
value, err = storage.ReadOne(context.Background(), s.opaStore, p)
if storage.IsNotFound(err) {
return nil
} else if err != nil {
log.Error().Err(err).Msg("opa-store: error reading data")
return nil
}
return value
}
func (s *Store) write(rawPath string, value interface{}) {
p, ok := storage.ParsePath(rawPath)
if !ok {

View file

@ -63,9 +63,6 @@ func (a *Authorize) Check(ctx context.Context, in *envoy_service_auth_v2.CheckRe
sessionState = nil
}
a.dataBrokerDataLock.RLock()
defer a.dataBrokerDataLock.RUnlock()
req, err := a.getEvaluatorRequestFromCheckRequest(in, sessionState)
if err != nil {
log.Warn().Err(err).Msg("error building evaluator request")
@ -111,16 +108,12 @@ func (a *Authorize) forceSyncSession(ctx context.Context, sessionID string) inte
state := a.state.Load()
a.dataBrokerDataLock.RLock()
s, ok := a.dataBrokerData.Get(sessionTypeURL, sessionID).(*session.Session)
a.dataBrokerDataLock.RUnlock()
s, ok := a.store.GetRecordData(sessionTypeURL, sessionID).(*session.Session)
if ok {
return s
}
a.dataBrokerDataLock.RLock()
sa, ok := a.dataBrokerData.Get(serviceAccountTypeURL, sessionID).(*user.ServiceAccount)
a.dataBrokerDataLock.RUnlock()
sa, ok := a.store.GetRecordData(serviceAccountTypeURL, sessionID).(*user.ServiceAccount)
if ok {
return sa
}
@ -134,12 +127,10 @@ func (a *Authorize) forceSyncSession(ctx context.Context, sessionID string) inte
return nil
}
a.dataBrokerDataLock.Lock()
if current := a.dataBrokerData.Get(sessionTypeURL, sessionID); current == nil {
a.dataBrokerData.Update(res.GetRecord())
if current := a.store.GetRecordData(sessionTypeURL, sessionID); current == nil {
a.store.UpdateRecord(res.GetRecord())
}
s, _ = a.dataBrokerData.Get(sessionTypeURL, sessionID).(*session.Session)
a.dataBrokerDataLock.Unlock()
s, _ = a.store.GetRecordData(sessionTypeURL, sessionID).(*session.Session)
return s
}
@ -150,9 +141,7 @@ func (a *Authorize) forceSyncUser(ctx context.Context, userID string) *user.User
state := a.state.Load()
a.dataBrokerDataLock.RLock()
u, ok := a.dataBrokerData.Get(userTypeURL, userID).(*user.User)
a.dataBrokerDataLock.RUnlock()
u, ok := a.store.GetRecordData(userTypeURL, userID).(*user.User)
if ok {
return u
}
@ -166,12 +155,10 @@ func (a *Authorize) forceSyncUser(ctx context.Context, userID string) *user.User
return nil
}
a.dataBrokerDataLock.Lock()
if current := a.dataBrokerData.Get(userTypeURL, userID); current == nil {
a.dataBrokerData.Update(res.GetRecord())
if current := a.store.GetRecordData(userTypeURL, userID); current == nil {
a.store.UpdateRecord(res.GetRecord())
}
u, _ = a.dataBrokerData.Get(userTypeURL, userID).(*user.User)
a.dataBrokerDataLock.Unlock()
u, _ = a.store.GetRecordData(userTypeURL, userID).(*user.User)
return u
}
@ -234,7 +221,6 @@ func (a *Authorize) getEvaluatorRequestFromCheckRequest(
) (*evaluator.Request, error) {
requestURL := getCheckRequestURL(in)
req := &evaluator.Request{
DataBrokerData: a.dataBrokerData,
HTTP: evaluator.RequestHTTP{
Method: in.GetAttributes().GetRequest().GetHttp().GetMethod(),
URL: requestURL.String(),

View file

@ -436,15 +436,7 @@ func TestSync(t *testing.T) {
t.Parallel()
a, err := New(&config.Config{Options: o})
require.NoError(t, err)
a.dataBrokerData = evaluator.DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"dbd_session_id": &session.Session{UserId: "dbd_user1"},
},
"type.googleapis.com/user.User": map[string]interface{}{
"dbd_user1": &user.User{Id: "dbd_user1"},
},
}
a.state.Load().dataBrokerClient = tc.databrokerClient
a.state.Load().dataBrokerClient = dbdClient
assert.True(t, (a.forceSync(ctx, tc.sessionState) != nil) == tc.wantErr)
})
}

View file

@ -121,15 +121,12 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
serverVersion = res.GetServerVersion()
for _, record := range res.GetRecords() {
a.updateRecord(record)
a.store.UpdateRecord(record)
recordVersion = record.GetVersion()
}
break
}
a.dataBrokerDataLock.Lock()
log.Info().Str("type_url", typeURL).Int("count", a.dataBrokerData.Count(typeURL)).Msg("initial data load complete")
a.dataBrokerDataLock.Unlock()
span.End()
if ch, ok := a.dataBrokerInitialSync[typeURL]; ok {
@ -169,7 +166,7 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
Msg("detected new server version, clearing data")
serverVersion = res.GetServerVersion()
recordVersion = ""
a.clearRecords(typeURL)
a.store.ClearRecords(typeURL)
}
for _, record := range res.GetRecords() {
if record.GetVersion() > recordVersion {
@ -178,26 +175,12 @@ func (a *Authorize) runDataTypeSyncer(ctx context.Context, typeURL string) error
}
for _, record := range res.GetRecords() {
a.updateRecord(record)
a.store.UpdateRecord(record)
}
}
})
}
func (a *Authorize) clearRecords(typeURL string) {
a.store.ClearRecords(typeURL)
a.dataBrokerDataLock.Lock()
a.dataBrokerData.Clear(typeURL)
a.dataBrokerDataLock.Unlock()
}
func (a *Authorize) updateRecord(record *databroker.Record) {
a.store.UpdateRecord(record)
a.dataBrokerDataLock.Lock()
a.dataBrokerData.Update(record)
a.dataBrokerDataLock.Unlock()
}
func tryForever(ctx context.Context, callback func(onSuccess interface{ Reset() }) error) error {
backoff := backoff.NewExponentialBackOff()
backoff.MaxElapsedTime = 0

View file

@ -120,41 +120,33 @@ func TestAuthorize_getJWTClaimHeaders(t *testing.T) {
encoder, _ := jws.NewHS256Signer([]byte{0, 0, 0, 0})
a.state.Load().encoder = encoder
a.currentOptions.Store(opt)
a.store = evaluator.NewStore()
pe, err := newPolicyEvaluator(opt, a.store)
require.NoError(t, err)
a.state.Load().evaluator = pe
signedJWT, _ := pe.SignedJWT(pe.JWTPayload(&evaluator.Request{
DataBrokerData: evaluator.DataBrokerData{
"type.googleapis.com/session.Session": map[string]interface{}{
"SESSION_ID": &session.Session{
a.store = evaluator.NewStoreFromProtos(
&session.Session{
Id: "SESSION_ID",
UserId: "USER_ID",
},
},
"type.googleapis.com/user.User": map[string]interface{}{
"USER_ID": &user.User{
&user.User{
Id: "USER_ID",
Name: "foo",
Email: "foo@example.com",
},
},
"type.googleapis.com/directory.User": map[string]interface{}{
"USER_ID": &directory.User{
&directory.User{
Id: "USER_ID",
GroupIds: []string{"admin_id", "test_id"},
},
},
"type.googleapis.com/directory.Group": map[string]interface{}{
"admin_id": &directory.Group{
&directory.Group{
Id: "admin_id",
Name: "admin",
},
"test_id": &directory.Group{
&directory.Group{
Id: "test_id",
Name: "test",
},
},
},
)
pe, err := newPolicyEvaluator(opt, a.store)
require.NoError(t, err)
a.state.Load().evaluator = pe
signedJWT, _ := pe.SignedJWT(pe.JWTPayload(&evaluator.Request{
HTTP: evaluator.RequestHTTP{URL: "https://example.com"},
Session: evaluator.RequestSession{
ID: "SESSION_ID",