Merge branch 'main' into wasaga/pomerium-disable-validation

This commit is contained in:
Denis Mishin 2023-11-02 22:42:16 -04:00 committed by GitHub
commit 684a6dee41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1150 additions and 2337 deletions

View file

@ -9,7 +9,6 @@ import (
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/registry"
"github.com/pomerium/pomerium/internal/registry/inmemory"
"github.com/pomerium/pomerium/internal/registry/redis"
"github.com/pomerium/pomerium/internal/telemetry/trace"
registrypb "github.com/pomerium/pomerium/pkg/grpc/registry"
"github.com/pomerium/pomerium/pkg/storage"
@ -110,16 +109,6 @@ func (srv *Server) newRegistryLocked(backend storage.Backend) (registry.Interfac
case config.StorageInMemoryName:
log.Info(ctx).Msg("using in-memory registry")
return inmemory.New(ctx, srv.cfg.registryTTL), nil
case config.StorageRedisName:
log.Info(ctx).Msg("using redis registry")
r, err := redis.New(
srv.cfg.storageConnectionString,
redis.WithTLSConfig(srv.getTLSConfigLocked(ctx)),
)
if err != nil {
return nil, fmt.Errorf("failed to create new redis registry: %w", err)
}
return r, nil
}
return nil, fmt.Errorf("unsupported registry type: %s", srv.cfg.storageType)

View file

@ -3,7 +3,6 @@ package databroker
import (
"context"
"crypto/tls"
"errors"
"fmt"
"strings"
@ -19,12 +18,10 @@ import (
"github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/registry"
"github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/storage"
"github.com/pomerium/pomerium/pkg/storage/inmemory"
"github.com/pomerium/pomerium/pkg/storage/postgres"
"github.com/pomerium/pomerium/pkg/storage/redis"
)
// Server implements the databroker service using an in memory database.
@ -237,6 +234,45 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr
return res, nil
}
// Patch updates specific fields of an existing record.
func (srv *Server) Patch(ctx context.Context, req *databroker.PatchRequest) (*databroker.PatchResponse, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.Patch")
defer span.End()
records := req.GetRecords()
if len(records) == 1 {
log.Info(ctx).
Str("record-type", records[0].GetType()).
Str("record-id", records[0].GetId()).
Msg("patch")
} else {
var recordType string
for _, record := range records {
recordType = record.GetType()
}
log.Info(ctx).
Int("record-count", len(records)).
Str("record-type", recordType).
Msg("patch")
}
db, err := srv.getBackend()
if err != nil {
return nil, err
}
serverVersion, patchedRecords, err := db.Patch(ctx, records, req.GetFieldMask())
if err != nil {
return nil, err
}
res := &databroker.PatchResponse{
ServerVersion: serverVersion,
Records: patchedRecords,
}
return res, nil
}
// ReleaseLease releases a lease.
func (srv *Server) ReleaseLease(ctx context.Context, req *databroker.ReleaseLeaseRequest) (*emptypb.Empty, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.ReleaseLease")
@ -426,39 +462,8 @@ func (srv *Server) newBackendLocked() (backend storage.Backend, err error) {
case config.StoragePostgresName:
log.Info(ctx).Msg("using postgres store")
backend = postgres.New(srv.cfg.storageConnectionString)
case config.StorageRedisName:
log.Info(ctx).Msg("using redis store")
backend, err = redis.New(
srv.cfg.storageConnectionString,
redis.WithTLSConfig(srv.getTLSConfigLocked(ctx)),
)
if err != nil {
return nil, fmt.Errorf("failed to create new redis storage: %w", err)
}
if srv.cfg.secret != nil {
backend, err = storage.NewEncryptedBackend(srv.cfg.secret, backend)
if err != nil {
return nil, err
}
}
default:
return nil, fmt.Errorf("unsupported storage type: %s", srv.cfg.storageType)
}
return backend, nil
}
func (srv *Server) getTLSConfigLocked(ctx context.Context) *tls.Config {
caCertPool, err := cryptutil.GetCertPool("", srv.cfg.storageCAFile)
if err != nil {
log.Warn(ctx).Err(err).Msg("failed to read databroker CA file")
}
tlsConfig := &tls.Config{
RootCAs: caCertPool,
//nolint: gosec
InsecureSkipVerify: srv.cfg.storageCertSkipVerify,
}
if srv.cfg.storageCertificate != nil {
tlsConfig.Certificates = []tls.Certificate{*srv.cfg.storageCertificate}
}
return tlsConfig
}

View file

@ -18,11 +18,11 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/testutil"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/protoutil"
@ -85,6 +85,58 @@ func TestServer_Get(t *testing.T) {
})
}
func TestServer_Patch(t *testing.T) {
cfg := newServerConfig()
srv := newServer(cfg)
s := &session.Session{
Id: "1",
OauthToken: &session.OAuthToken{AccessToken: "access-token"},
}
data := protoutil.NewAny(s)
_, err := srv.Put(context.Background(), &databroker.PutRequest{
Records: []*databroker.Record{{
Type: data.TypeUrl,
Id: s.Id,
Data: data,
}},
})
require.NoError(t, err)
fm, err := fieldmaskpb.New(s, "accessed_at")
require.NoError(t, err)
now := timestamppb.Now()
s.AccessedAt = now
s.OauthToken.AccessToken = "access-token-field-ignored"
data = protoutil.NewAny(s)
patchResponse, err := srv.Patch(context.Background(), &databroker.PatchRequest{
Records: []*databroker.Record{{
Type: data.TypeUrl,
Id: s.Id,
Data: data,
}},
FieldMask: fm,
})
require.NoError(t, err)
testutil.AssertProtoEqual(t, protoutil.NewAny(&session.Session{
Id: "1",
AccessedAt: now,
OauthToken: &session.OAuthToken{AccessToken: "access-token"},
}), patchResponse.GetRecord().GetData())
getResponse, err := srv.Get(context.Background(), &databroker.GetRequest{
Type: data.TypeUrl,
Id: s.Id,
})
require.NoError(t, err)
testutil.AssertProtoEqual(t, protoutil.NewAny(&session.Session{
Id: "1",
AccessedAt: now,
OauthToken: &session.OAuthToken{AccessToken: "access-token"},
}), getResponse.GetRecord().GetData())
}
func TestServer_Options(t *testing.T) {
cfg := newServerConfig()
srv := newServer(cfg)
@ -287,12 +339,11 @@ func TestServerInvalidStorage(t *testing.T) {
_ = assert.Error(t, err) && assert.Contains(t, err.Error(), "unsupported storage type")
}
func TestServerRedis(t *testing.T) {
testutil.WithTestRedis(false, func(rawURL string) error {
func TestServerPostgres(t *testing.T) {
testutil.WithTestPostgres(func(dsn string) error {
srv := newServer(&serverConfig{
storageType: "redis",
storageConnectionString: rawURL,
secret: cryptutil.NewKey(),
storageType: "postgres",
storageConnectionString: dsn,
})
s := new(session.Session)