protoutil: add NewAny method for deterministic serialization (#2462) (#2662)

This commit is contained in:
backport-actions-token[bot] 2021-10-05 15:41:32 -04:00 committed by GitHub
parent 46a1426925
commit 0f6cc036ae
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 90 additions and 148 deletions

View file

@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
@ -23,6 +22,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/directory" "github.com/pomerium/pomerium/pkg/grpc/directory"
"github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
func TestEvaluator(t *testing.T) { func TestEvaluator(t *testing.T) {
@ -480,7 +480,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
sessionID := uuid.New().String() sessionID := uuid.New().String()
lastSessionID = sessionID lastSessionID = sessionID
userID := uuid.New().String() userID := uuid.New().String()
data, _ := anypb.New(&session.Session{ data := protoutil.NewAny(&session.Session{
Version: fmt.Sprint(i), Version: fmt.Sprint(i),
Id: sessionID, Id: sessionID,
UserId: userID, UserId: userID,
@ -501,7 +501,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Id: sessionID, Id: sessionID,
Data: data, Data: data,
}) })
data, _ = anypb.New(&user.User{ data = protoutil.NewAny(&user.User{
Version: fmt.Sprint(i), Version: fmt.Sprint(i),
Id: userID, Id: userID,
}) })
@ -512,7 +512,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Data: data, Data: data,
}) })
data, _ = anypb.New(&directory.User{ data = protoutil.NewAny(&directory.User{
Version: fmt.Sprint(i), Version: fmt.Sprint(i),
Id: userID, Id: userID,
GroupIds: []string{"1", "2", "3", "4"}, GroupIds: []string{"1", "2", "3", "4"},
@ -524,7 +524,7 @@ func BenchmarkEvaluator_Evaluate(b *testing.B) {
Data: data, Data: data,
}) })
data, _ = anypb.New(&directory.Group{ data = protoutil.NewAny(&directory.Group{
Version: fmt.Sprint(i), Version: fmt.Sprint(i),
Id: fmt.Sprint(i), Id: fmt.Sprint(i),
}) })

View file

@ -15,13 +15,13 @@ import (
"github.com/open-policy-agent/opa/storage/inmem" "github.com/open-policy-agent/opa/storage/inmem"
"github.com/open-policy-agent/opa/types" "github.com/open-policy-agent/opa/types"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
type dataBrokerData struct { type dataBrokerData struct {
@ -102,11 +102,7 @@ func NewStore() *Store {
func NewStoreFromProtos(serverVersion uint64, msgs ...proto.Message) *Store { func NewStoreFromProtos(serverVersion uint64, msgs ...proto.Message) *Store {
s := NewStore() s := NewStore()
for _, msg := range msgs { for _, msg := range msgs {
any, err := anypb.New(msg) any := protoutil.NewAny(msg)
if err != nil {
continue
}
record := new(databroker.Record) record := new(databroker.Record)
record.ModifiedAt = timestamppb.Now() record.ModifiedAt = timestamppb.Now()
record.Version = cryptutil.NewRandomUInt64() record.Version = cryptutil.NewRandomUInt64()

View file

@ -4,11 +4,11 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
func TestStore(t *testing.T) { func TestStore(t *testing.T) {
@ -20,7 +20,7 @@ func TestStore(t *testing.T) {
Name: "name", Name: "name",
Email: "name@example.com", Email: "name@example.com",
} }
any, _ := anypb.New(u) any := protoutil.NewAny(u)
s.UpdateRecord(0, &databroker.Record{ s.UpdateRecord(0, &databroker.Record{
Version: 1, Version: 1,
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),

View file

@ -11,12 +11,12 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpcutil" "github.com/pomerium/pomerium/pkg/grpcutil"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
func TestAuthorize_waitForRecordSync(t *testing.T) { func TestAuthorize_waitForRecordSync(t *testing.T) {
@ -103,10 +103,7 @@ type storableMessage interface {
} }
func newRecord(msg storableMessage) *databroker.Record { func newRecord(msg storableMessage) *databroker.Record {
any, err := anypb.New(msg) any := protoutil.NewAny(msg)
if err != nil {
panic(err)
}
return &databroker.Record{ return &databroker.Record{
Version: 1, Version: 1,
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),

View file

@ -7,11 +7,11 @@ import (
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_config_trace_v3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3" envoy_config_trace_v3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/durationpb"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/telemetry/trace" "github.com/pomerium/pomerium/internal/telemetry/trace"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
func buildTracingCluster(options *config.Options) (*envoy_config_cluster_v3.Cluster, error) { func buildTracingCluster(options *config.Options) (*envoy_config_cluster_v3.Cluster, error) {
@ -102,7 +102,7 @@ func buildTracingHTTP(options *config.Options) (*envoy_config_trace_v3.Tracing_H
switch tracingOptions.Provider { switch tracingOptions.Provider {
case trace.DatadogTracingProviderName: case trace.DatadogTracingProviderName:
tracingTC, _ := anypb.New(&envoy_config_trace_v3.DatadogConfig{ tracingTC := protoutil.NewAny(&envoy_config_trace_v3.DatadogConfig{
CollectorCluster: "datadog-apm", CollectorCluster: "datadog-apm",
ServiceName: tracingOptions.Service, ServiceName: tracingOptions.Service,
}) })
@ -117,7 +117,7 @@ func buildTracingHTTP(options *config.Options) (*envoy_config_trace_v3.Tracing_H
if path == "" { if path == "" {
path = "/" path = "/"
} }
tracingTC, _ := anypb.New(&envoy_config_trace_v3.ZipkinConfig{ tracingTC := protoutil.NewAny(&envoy_config_trace_v3.ZipkinConfig{
CollectorCluster: "zipkin", CollectorCluster: "zipkin",
CollectorEndpoint: path, CollectorEndpoint: path,
CollectorEndpointVersion: envoy_config_trace_v3.ZipkinConfig_HTTP_JSON, CollectorEndpointVersion: envoy_config_trace_v3.ZipkinConfig_HTTP_JSON,

View file

@ -4,11 +4,11 @@ import (
"context" "context"
"errors" "errors"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/directory" "github.com/pomerium/pomerium/pkg/grpc/directory"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
// RefreshUser refreshes a user's directory information. // RefreshUser refreshes a user's directory information.
@ -26,11 +26,7 @@ func (c *DataBroker) RefreshUser(ctx context.Context, req *directory.RefreshUser
return nil, err return nil, err
} }
any, err := anypb.New(u) any := protoutil.NewAny(u)
if err != nil {
return nil, err
}
_, err = c.dataBrokerServer.Put(ctx, &databroker.PutRequest{ _, err = c.dataBrokerServer.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),

View file

@ -10,12 +10,12 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/grpc" "github.com/pomerium/pomerium/pkg/grpc"
databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker" databrokerpb "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/events" "github.com/pomerium/pomerium/pkg/grpc/events"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
const maxEnvoyConfigurationEvents = 50 const maxEnvoyConfigurationEvents = 50
@ -46,10 +46,7 @@ func (srv *Server) runEnvoyConfigurationEventHandler(ctx context.Context) error
} }
func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *events.EnvoyConfigurationEvent) error { func (srv *Server) storeEnvoyConfigurationEvent(ctx context.Context, evt *events.EnvoyConfigurationEvent) error {
any, err := anypb.New(evt) any := protoutil.NewAny(evt)
if err != nil {
return err
}
client, err := srv.getDataBrokerClient(ctx) client, err := srv.getDataBrokerClient(ctx)
if err != nil { if err != nil {

View file

@ -5,9 +5,9 @@ import (
"encoding/hex" "encoding/hex"
envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
const ( const (
@ -24,7 +24,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
return nil, err return nil, err
} }
for _, cluster := range clusters { for _, cluster := range clusters {
any, _ := anypb.New(cluster) any := protoutil.NewAny(cluster)
resources[clusterTypeURL] = append(resources[clusterTypeURL], &envoy_service_discovery_v3.Resource{ resources[clusterTypeURL] = append(resources[clusterTypeURL], &envoy_service_discovery_v3.Resource{
Name: cluster.Name, Name: cluster.Name,
Version: hex.EncodeToString(cryptutil.HashProto(cluster)), Version: hex.EncodeToString(cryptutil.HashProto(cluster)),
@ -37,7 +37,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
return nil, err return nil, err
} }
for _, listener := range listeners { for _, listener := range listeners {
any, _ := anypb.New(listener) any := protoutil.NewAny(listener)
resources[listenerTypeURL] = append(resources[listenerTypeURL], &envoy_service_discovery_v3.Resource{ resources[listenerTypeURL] = append(resources[listenerTypeURL], &envoy_service_discovery_v3.Resource{
Name: listener.Name, Name: listener.Name,
Version: hex.EncodeToString(cryptutil.HashProto(listener)), Version: hex.EncodeToString(cryptutil.HashProto(listener)),

View file

@ -9,11 +9,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
configpb "github.com/pomerium/pomerium/pkg/grpc/config" configpb "github.com/pomerium/pomerium/pkg/grpc/config"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
func TestConfigSource(t *testing.T) { func TestConfigSource(t *testing.T) {
@ -52,7 +52,7 @@ func TestConfigSource(t *testing.T) {
}) })
cfgs <- src.GetConfig() cfgs <- src.GetConfig()
data, _ := anypb.New(&configpb.Config{ data := protoutil.NewAny(&configpb.Config{
Name: "config", Name: "config",
Routes: []*configpb.Route{ Routes: []*configpb.Route{
{ {

View file

@ -15,16 +15,14 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/internal/testutil" "github.com/pomerium/pomerium/internal/testutil"
"github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
type testSyncerHandler struct { type testSyncerHandler struct {
@ -58,10 +56,8 @@ func TestServer_Get(t *testing.T) {
s := new(session.Session) s := new(session.Session)
s.Id = "1" s.Id = "1"
any, err := anypb.New(s) any := protoutil.NewAny(s)
assert.NoError(t, err) _, err := srv.Put(context.Background(), &databroker.PutRequest{
_, err = srv.Put(context.Background(), &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.TypeUrl, Type: any.TypeUrl,
Id: s.Id, Id: s.Id,
@ -92,10 +88,8 @@ func TestServer_Options(t *testing.T) {
s := new(session.Session) s := new(session.Session)
s.Id = "1" s.Id = "1"
any, err := anypb.New(s) any := protoutil.NewAny(s)
assert.NoError(t, err) _, err := srv.Put(context.Background(), &databroker.PutRequest{
_, err = srv.Put(context.Background(), &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.TypeUrl, Type: any.TypeUrl,
Id: s.Id, Id: s.Id,
@ -143,10 +137,8 @@ func TestServer_Query(t *testing.T) {
s := new(session.Session) s := new(session.Session)
s.Id = "1" s.Id = "1"
any, err := anypb.New(s) any := protoutil.NewAny(s)
assert.NoError(t, err) _, err := srv.Put(context.Background(), &databroker.PutRequest{
_, err = srv.Put(context.Background(), &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.TypeUrl, Type: any.TypeUrl,
Id: s.Id, Id: s.Id,
@ -166,10 +158,8 @@ func TestServer_Sync(t *testing.T) {
s := new(session.Session) s := new(session.Session)
s.Id = "1" s.Id = "1"
any, err := anypb.New(s) any := protoutil.NewAny(s)
assert.NoError(t, err) _, err := srv.Put(context.Background(), &databroker.PutRequest{
_, err = srv.Put(context.Background(), &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.TypeUrl, Type: any.TypeUrl,
Id: s.Id, Id: s.Id,
@ -252,10 +242,8 @@ func TestServerInvalidStorage(t *testing.T) {
s := new(session.Session) s := new(session.Session)
s.Id = "1" s.Id = "1"
any, err := anypb.New(s) any := protoutil.NewAny(s)
assert.NoError(t, err) _, err := srv.Put(context.Background(), &databroker.PutRequest{
_, err = srv.Put(context.Background(), &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.TypeUrl, Type: any.TypeUrl,
Id: s.Id, Id: s.Id,
@ -275,10 +263,8 @@ func TestServerRedis(t *testing.T) {
s := new(session.Session) s := new(session.Session)
s.Id = "1" s.Id = "1"
any, err := anypb.New(s) any := protoutil.NewAny(s)
assert.NoError(t, err) _, err := srv.Put(context.Background(), &databroker.PutRequest{
_, err = srv.Put(context.Background(), &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.TypeUrl, Type: any.TypeUrl,
Id: s.Id, Id: s.Id,

View file

@ -13,7 +13,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/directory" "github.com/pomerium/pomerium/internal/directory"
@ -25,6 +24,7 @@ import (
"github.com/pomerium/pomerium/pkg/grpc/session" "github.com/pomerium/pomerium/pkg/grpc/session"
"github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/grpcutil" "github.com/pomerium/pomerium/pkg/grpcutil"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
const ( const (
@ -246,18 +246,14 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director
curDG, ok := mgr.directoryGroups[groupID] curDG, ok := mgr.directoryGroups[groupID]
if !ok || !proto.Equal(newDG, curDG) { if !ok || !proto.Equal(newDG, curDG) {
id := newDG.GetId() id := newDG.GetId()
any, err := anypb.New(newDG) any := protoutil.NewAny(newDG)
if err != nil {
log.Warn(ctx).Err(err).Msg("failed to marshal directory group")
return
}
eg.Go(func() error { eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err return err
} }
defer mgr.dataBrokerSemaphore.Release(1) defer mgr.dataBrokerSemaphore.Release(1)
_, err = mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{ _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),
Id: id, Id: id,
@ -276,18 +272,14 @@ func (mgr *Manager) mergeGroups(ctx context.Context, directoryGroups []*director
_, ok := lookup[groupID] _, ok := lookup[groupID]
if !ok { if !ok {
id := curDG.GetId() id := curDG.GetId()
any, err := anypb.New(curDG) any := protoutil.NewAny(curDG)
if err != nil {
log.Warn(ctx).Err(err).Msg("failed to marshal directory group")
return
}
eg.Go(func() error { eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err return err
} }
defer mgr.dataBrokerSemaphore.Release(1) defer mgr.dataBrokerSemaphore.Release(1)
_, err = mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{ _, err := mgr.cfg.Load().dataBrokerClient.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),
Id: id, Id: id,
@ -319,11 +311,7 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.
curDU, ok := mgr.directoryUsers[userID] curDU, ok := mgr.directoryUsers[userID]
if !ok || !proto.Equal(newDU, curDU) { if !ok || !proto.Equal(newDU, curDU) {
id := newDU.GetId() id := newDU.GetId()
any, err := anypb.New(newDU) any := protoutil.NewAny(newDU)
if err != nil {
log.Warn(ctx).Err(err).Msg("failed to marshal directory user")
return
}
eg.Go(func() error { eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err return err
@ -349,11 +337,7 @@ func (mgr *Manager) mergeUsers(ctx context.Context, directoryUsers []*directory.
_, ok := lookup[userID] _, ok := lookup[userID]
if !ok { if !ok {
id := curDU.GetId() id := curDU.GetId()
any, err := anypb.New(curDU) any := protoutil.NewAny(curDU)
if err != nil {
log.Warn(ctx).Err(err).Msg("failed to marshal directory user")
return
}
eg.Go(func() error { eg.Go(func() error {
if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil { if err := mgr.dataBrokerSemaphore.Acquire(ctx, 1); err != nil {
return err return err

View file

@ -5,17 +5,17 @@ import (
context "context" context "context"
"fmt" "fmt"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"github.com/pomerium/pomerium/internal/identity" "github.com/pomerium/pomerium/internal/identity"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
// Delete deletes a session from the databroker. // Delete deletes a session from the databroker.
func Delete(ctx context.Context, client databroker.DataBrokerServiceClient, sessionID string) error { func Delete(ctx context.Context, client databroker.DataBrokerServiceClient, sessionID string) error {
any, _ := anypb.New(new(Session)) any := protoutil.NewAny(new(Session))
_, err := client.Put(ctx, &databroker.PutRequest{ _, err := client.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),
@ -29,8 +29,7 @@ func Delete(ctx context.Context, client databroker.DataBrokerServiceClient, sess
// Get gets a session from the databroker. // Get gets a session from the databroker.
func Get(ctx context.Context, client databroker.DataBrokerServiceClient, sessionID string) (*Session, error) { func Get(ctx context.Context, client databroker.DataBrokerServiceClient, sessionID string) (*Session, error) {
any, _ := anypb.New(new(Session)) any := protoutil.NewAny(new(Session))
res, err := client.Get(ctx, &databroker.GetRequest{ res, err := client.Get(ctx, &databroker.GetRequest{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),
Id: sessionID, Id: sessionID,
@ -49,7 +48,7 @@ func Get(ctx context.Context, client databroker.DataBrokerServiceClient, session
// Put sets a session in the databroker. // Put sets a session in the databroker.
func Put(ctx context.Context, client databroker.DataBrokerServiceClient, s *Session) (*databroker.PutResponse, error) { func Put(ctx context.Context, client databroker.DataBrokerServiceClient, s *Session) (*databroker.PutResponse, error) {
any, _ := anypb.New(s) any := protoutil.NewAny(s)
res, err := client.Put(ctx, &databroker.PutRequest{ res, err := client.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),

View file

@ -5,16 +5,16 @@ import (
context "context" context "context"
"fmt" "fmt"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/structpb"
"github.com/pomerium/pomerium/internal/identity" "github.com/pomerium/pomerium/internal/identity"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
// Get gets a user from the databroker. // Get gets a user from the databroker.
func Get(ctx context.Context, client databroker.DataBrokerServiceClient, userID string) (*User, error) { func Get(ctx context.Context, client databroker.DataBrokerServiceClient, userID string) (*User, error) {
any, _ := anypb.New(new(User)) any := protoutil.NewAny(new(User))
res, err := client.Get(ctx, &databroker.GetRequest{ res, err := client.Get(ctx, &databroker.GetRequest{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),
@ -34,7 +34,7 @@ func Get(ctx context.Context, client databroker.DataBrokerServiceClient, userID
// Put sets a user in the databroker. // Put sets a user in the databroker.
func Put(ctx context.Context, client databroker.DataBrokerServiceClient, u *User) (*databroker.Record, error) { func Put(ctx context.Context, client databroker.DataBrokerServiceClient, u *User) (*databroker.Record, error) {
any, _ := anypb.New(u) any := protoutil.NewAny(u)
res, err := client.Put(ctx, &databroker.PutRequest{ res, err := client.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),
@ -50,7 +50,7 @@ func Put(ctx context.Context, client databroker.DataBrokerServiceClient, u *User
// PutServiceAccount sets a service account in the databroker. // PutServiceAccount sets a service account in the databroker.
func PutServiceAccount(ctx context.Context, client databroker.DataBrokerServiceClient, sa *ServiceAccount) (*databroker.Record, error) { func PutServiceAccount(ctx context.Context, client databroker.DataBrokerServiceClient, sa *ServiceAccount) (*databroker.Record, error) {
any, _ := anypb.New(sa) any := protoutil.NewAny(sa)
res, err := client.Put(ctx, &databroker.PutRequest{ res, err := client.Put(ctx, &databroker.PutRequest{
Record: &databroker.Record{ Record: &databroker.Record{
Type: any.GetTypeUrl(), Type: any.GetTypeUrl(),

View file

@ -9,6 +9,8 @@ import (
"google.golang.org/protobuf/reflect/protoreflect" "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb" "google.golang.org/protobuf/types/known/wrapperspb"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
// A Scrubber scrubs potentially sensitive strings from protobuf messages. // A Scrubber scrubs potentially sensitive strings from protobuf messages.
@ -90,7 +92,7 @@ func (s *Scrubber) scrubProtoAny(dst, src *anypb.Any) {
if err != nil { if err != nil {
// this will happen if a type isn't registered. // this will happen if a type isn't registered.
// So we will just hash the raw data. // So we will just hash the raw data.
a, _ := anypb.New(wrapperspb.Bytes(s.hmacBytes(src.Value))) a := protoutil.NewAny(wrapperspb.Bytes(s.hmacBytes(src.Value)))
dst.TypeUrl = a.TypeUrl dst.TypeUrl = a.TypeUrl
dst.Value = a.Value dst.Value = a.Value
return return
@ -101,12 +103,7 @@ func (s *Scrubber) scrubProtoAny(dst, src *anypb.Any) {
s.scrubProtoMessage(dstmsg, srcmsg) s.scrubProtoMessage(dstmsg, srcmsg)
a, err := anypb.New(dstmsg.Interface()) a := protoutil.NewAny(dstmsg.Interface())
if err != nil {
// this really shouldn't happen, but in case it does,
// we hash the raw data as above.
a, _ = anypb.New(wrapperspb.Bytes(s.hmacBytes(src.Value)))
}
dst.TypeUrl = a.TypeUrl dst.TypeUrl = a.TypeUrl
dst.Value = a.Value dst.Value = a.Value
} }

View file

@ -14,10 +14,10 @@ import (
"github.com/open-policy-agent/opa/rego" "github.com/open-policy-agent/opa/rego"
"github.com/open-policy-agent/opa/types" "github.com/open-policy-agent/opa/types"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/pkg/policy/generator" "github.com/pomerium/pomerium/pkg/policy/generator"
"github.com/pomerium/pomerium/pkg/policy/parser" "github.com/pomerium/pomerium/pkg/policy/parser"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
var testingNow = time.Date(2021, 5, 11, 13, 43, 0, 0, time.Local) var testingNow = time.Date(2021, 5, 11, 13, 43, 0, 0, time.Local)
@ -94,11 +94,7 @@ func evaluate(t *testing.T,
} }
for _, record := range dataBrokerRecords { for _, record := range dataBrokerRecords {
any, err := anypb.New(record) any := protoutil.NewAny(record)
if err != nil {
return nil, err
}
if string(recordType) == any.GetTypeUrl() && if string(recordType) == any.GetTypeUrl() &&
string(recordID) == record.GetId() { string(recordID) == record.GetId() {
bs, _ := json.Marshal(record) bs, _ := json.Marshal(record)

View file

@ -40,72 +40,72 @@ func ToAny(value interface{}) *anypb.Any {
case uint64: case uint64:
return NewAnyUInt64(v) return NewAnyUInt64(v)
default: default:
a, err := anypb.New(ToStruct(value)) return NewAny(ToStruct(value))
}
}
// NewAny creates a new Any using deterministic serialization.
func NewAny(msg proto.Message) *anypb.Any {
a := new(anypb.Any)
err := anypb.MarshalFrom(a, msg, proto.MarshalOptions{
AllowPartial: true,
Deterministic: true,
})
if err != nil { if err != nil {
// on error, which doesn't really happen in practice, return null
return NewAnyNull() return NewAnyNull()
} }
return a return a
} }
}
// NewAnyBool creates a new any type from a bool. // NewAnyBool creates a new any type from a bool.
func NewAnyBool(v bool) *anypb.Any { func NewAnyBool(v bool) *anypb.Any {
a, _ := anypb.New(wrapperspb.Bool(v)) return NewAny(wrapperspb.Bool(v))
return a
} }
// NewAnyBytes creates a new any type from bytes. // NewAnyBytes creates a new any type from bytes.
func NewAnyBytes(v []byte) *anypb.Any { func NewAnyBytes(v []byte) *anypb.Any {
a, _ := anypb.New(wrapperspb.Bytes(v)) return NewAny(wrapperspb.Bytes(v))
return a
} }
// NewAnyDouble creates a new any type from a float64. // NewAnyDouble creates a new any type from a float64.
func NewAnyDouble(v float64) *anypb.Any { func NewAnyDouble(v float64) *anypb.Any {
a, _ := anypb.New(wrapperspb.Double(v)) return NewAny(wrapperspb.Double(v))
return a
} }
// NewAnyFloat creates a new any type from a float32. // NewAnyFloat creates a new any type from a float32.
func NewAnyFloat(v float32) *anypb.Any { func NewAnyFloat(v float32) *anypb.Any {
a, _ := anypb.New(wrapperspb.Float(v)) return NewAny(wrapperspb.Float(v))
return a
} }
// NewAnyInt64 creates a new any type from an int64. // NewAnyInt64 creates a new any type from an int64.
func NewAnyInt64(v int64) *anypb.Any { func NewAnyInt64(v int64) *anypb.Any {
a, _ := anypb.New(wrapperspb.Int64(v)) return NewAny(wrapperspb.Int64(v))
return a
} }
// NewAnyInt32 creates a new any type from an int32. // NewAnyInt32 creates a new any type from an int32.
func NewAnyInt32(v int32) *anypb.Any { func NewAnyInt32(v int32) *anypb.Any {
a, _ := anypb.New(wrapperspb.Int32(v)) return NewAny(wrapperspb.Int32(v))
return a
} }
// NewAnyNull creates a new any type from a null struct. // NewAnyNull creates a new any type from a null struct.
func NewAnyNull() *anypb.Any { func NewAnyNull() *anypb.Any {
a, _ := anypb.New(NewStructNull()) return NewAny(NewStructNull())
return a
} }
// NewAnyString creates a new any type from a string. // NewAnyString creates a new any type from a string.
func NewAnyString(v string) *anypb.Any { func NewAnyString(v string) *anypb.Any {
a, _ := anypb.New(wrapperspb.String(v)) return NewAny(wrapperspb.String(v))
return a
} }
// NewAnyUInt64 creates a new any type from an uint64. // NewAnyUInt64 creates a new any type from an uint64.
func NewAnyUInt64(v uint64) *anypb.Any { func NewAnyUInt64(v uint64) *anypb.Any {
a, _ := anypb.New(wrapperspb.UInt64(v)) return NewAny(wrapperspb.UInt64(v))
return a
} }
// NewAnyUInt32 creates a new any type from an uint32. // NewAnyUInt32 creates a new any type from an uint32.
func NewAnyUInt32(v uint32) *anypb.Any { func NewAnyUInt32(v uint32) *anypb.Any {
a, _ := anypb.New(wrapperspb.UInt32(v)) return NewAny(wrapperspb.UInt32(v))
return a
} }
// GetTypeURL gets the TypeURL for a protobuf message. // GetTypeURL gets the TypeURL for a protobuf message.

View file

@ -39,10 +39,7 @@ func (t transformer) transformAny(dst, src *anypb.Any) error {
return err return err
} }
a, err := anypb.New(dstMsg.Interface()) a := NewAny(dstMsg.Interface())
if err != nil {
return err
}
dst.TypeUrl = a.TypeUrl dst.TypeUrl = a.TypeUrl
dst.Value = a.Value dst.Value = a.Value
return nil return nil

View file

@ -11,6 +11,7 @@ import (
"github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
type encryptedRecordStream struct { type encryptedRecordStream struct {
@ -185,13 +186,8 @@ func (e *encryptedBackend) encrypt(in *anypb.Any) (out *anypb.Any, err error) {
} }
encrypted := cryptutil.Encrypt(e.cipher, plaintext, nil) encrypted := cryptutil.Encrypt(e.cipher, plaintext, nil)
out = protoutil.NewAny(&wrapperspb.BytesValue{
out, err = anypb.New(&wrapperspb.BytesValue{
Value: encrypted, Value: encrypted,
}) })
if err != nil {
return nil, err
}
return out, nil return out, nil
} }

View file

@ -12,6 +12,7 @@ import (
"github.com/pomerium/pomerium/pkg/cryptutil" "github.com/pomerium/pomerium/pkg/cryptutil"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
func TestEncryptedBackend(t *testing.T) { func TestEncryptedBackend(t *testing.T) {
@ -56,7 +57,7 @@ func TestEncryptedBackend(t *testing.T) {
return return
} }
any, _ := anypb.New(wrapperspb.String("HELLO WORLD")) any := protoutil.NewAny(wrapperspb.String("HELLO WORLD"))
rec := &databroker.Record{ rec := &databroker.Record{
Type: "", Type: "",

View file

@ -5,10 +5,10 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
"github.com/pomerium/pomerium/pkg/grpc/user" "github.com/pomerium/pomerium/pkg/grpc/user"
"github.com/pomerium/pomerium/pkg/protoutil"
) )
type mockBackend struct { type mockBackend struct {
@ -36,7 +36,7 @@ func (m *mockBackend) GetAll(ctx context.Context) ([]*databroker.Record, *databr
func TestMatchAny(t *testing.T) { func TestMatchAny(t *testing.T) {
u := &user.User{Id: "id", Name: "name", Email: "email"} u := &user.User{Id: "id", Name: "name", Email: "email"}
data, _ := anypb.New(u) data := protoutil.NewAny(u)
assert.True(t, MatchAny(data, "")) assert.True(t, MatchAny(data, ""))
assert.True(t, MatchAny(data, "id")) assert.True(t, MatchAny(data, "id"))
assert.True(t, MatchAny(data, "name")) assert.True(t, MatchAny(data, "name"))