core/logging: less verbose logs (#5040)

This commit is contained in:
Caleb Doxsey 2024-03-29 15:26:20 -06:00 committed by GitHub
parent ecbd84b7df
commit 4ac06d3bbd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 51 additions and 42 deletions

View file

@ -191,7 +191,7 @@ func getOrCreatePolicyEvaluators(
m[p.id] = p.evaluator m[p.id] = p.evaluator
} }
log.Info(ctx). log.Debug(ctx).
Dur("duration", time.Since(now)). Dur("duration", time.Since(now)).
Int("reused-policies", reusedCount). Int("reused-policies", reusedCount).
Int("created-policies", len(cfg.Policies)-reusedCount). Int("created-policies", len(cfg.Policies)-reusedCount).

View file

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.uber.org/automaxprocs/maxprocs"
"github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
@ -31,8 +30,6 @@ func main() {
return return
} }
_, _ = maxprocs.Set(maxprocs.Logger(func(s string, i ...interface{}) { log.Info(context.Background()).Msgf(s, i...) }))
ctx := context.Background() ctx := context.Background()
runFn := run runFn := run
if zero_cmd.IsManagedMode(*configFile) { if zero_cmd.IsManagedMode(*configFile) {

View file

@ -24,6 +24,9 @@ func (c certMagicLoggerCore) With(fs []zapcore.Field) zapcore.Core {
} }
func (c certMagicLoggerCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { func (c certMagicLoggerCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if e.Level == zap.InfoLevel {
e.Level = zap.DebugLevel
}
if !c.Enabled(e.Level) { if !c.Enabled(e.Level) {
return ce return ce
} }
@ -31,6 +34,9 @@ func (c certMagicLoggerCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *z
} }
func (c certMagicLoggerCore) Write(e zapcore.Entry, fs []zapcore.Field) error { func (c certMagicLoggerCore) Write(e zapcore.Entry, fs []zapcore.Field) error {
if e.Level == zap.InfoLevel {
e.Level = zap.DebugLevel
}
fs = append(c.fields, fs...) fs = append(c.fields, fs...)
for _, f := range fs { for _, f := range fs {
if f.Type == zapcore.ErrorType && errors.Is(f.Interface.(error), certmagic.ErrNoOCSPServerSpecified) { if f.Type == zapcore.ErrorType && errors.Is(f.Interface.(error), certmagic.ErrNoOCSPServerSpecified) {

View file

@ -180,7 +180,7 @@ func (srv *Server) Run(ctx context.Context) error {
// start the gRPC server // start the gRPC server
eg.Go(func() error { eg.Go(func() error {
log.Info(ctx).Str("addr", srv.GRPCListener.Addr().String()).Msg("starting control-plane gRPC server") log.Debug(ctx).Str("addr", srv.GRPCListener.Addr().String()).Msg("starting control-plane gRPC server")
return grpcutil.ServeWithGracefulStop(ctx, srv.GRPCServer, srv.GRPCListener, time.Second*5) return grpcutil.ServeWithGracefulStop(ctx, srv.GRPCServer, srv.GRPCListener, time.Second*5)
}) })
@ -199,7 +199,7 @@ func (srv *Server) Run(ctx context.Context) error {
// start the HTTP server // start the HTTP server
eg.Go(func() error { eg.Go(func() error {
log.Info(ctx). log.Debug(ctx).
Str("addr", entry.Listener.Addr().String()). Str("addr", entry.Listener.Addr().String()).
Msgf("starting control-plane %s server", entry.Name) Msgf("starting control-plane %s server", entry.Name)
return httputil.ServeWithGracefulStop(ctx, entry.Handler, entry.Listener, time.Second*5) return httputil.ServeWithGracefulStop(ctx, entry.Handler, entry.Listener, time.Second*5)

View file

@ -26,7 +26,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
cfg := srv.currentConfig.Load() cfg := srv.currentConfig.Load()
log.Info(ctx).Msg("controlplane: building discovery resources") log.Debug(ctx).Msg("controlplane: building discovery resources")
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
@ -83,7 +83,7 @@ func (srv *Server) buildDiscoveryResources(ctx context.Context) (map[string][]*e
return nil, err return nil, err
} }
log.Info(ctx). log.Debug(ctx).
Int("cluster-count", len(clusterResources)). Int("cluster-count", len(clusterResources)).
Int("listener-count", len(listenerResources)). Int("listener-count", len(listenerResources)).
Int("route-configuration-count", len(routeConfigurationResources)). Int("route-configuration-count", len(routeConfigurationResources)).

View file

@ -107,7 +107,7 @@ func (mgr *Manager) DeltaAggregatedResources(
case req.GetResponseNonce() == "": case req.GetResponseNonce() == "":
// neither an ACK or a NACK // neither an ACK or a NACK
case req.GetErrorDetail() != nil: case req.GetErrorDetail() != nil:
log.Info(ctx). log.Debug(ctx).
Str("type-url", req.GetTypeUrl()). Str("type-url", req.GetTypeUrl()).
Any("error-detail", req.GetErrorDetail()). Any("error-detail", req.GetErrorDetail()).
Msg("xdsmgr: nack") Msg("xdsmgr: nack")
@ -118,7 +118,7 @@ func (mgr *Manager) DeltaAggregatedResources(
state.clientResourceVersions[resource.Name] = resource.Version state.clientResourceVersions[resource.Name] = resource.Version
} }
case req.GetResponseNonce() == mgr.nonce: case req.GetResponseNonce() == mgr.nonce:
log.Info(ctx). log.Debug(ctx).
Str("type-url", req.GetTypeUrl()). Str("type-url", req.GetTypeUrl()).
Msg("xdsmgr: ack") Msg("xdsmgr: ack")
// an ACK for the last response // an ACK for the last response
@ -129,7 +129,7 @@ func (mgr *Manager) DeltaAggregatedResources(
} }
default: default:
// an ACK for a response that's not the last response // an ACK for a response that's not the last response
log.Info(ctx). log.Debug(ctx).
Str("type-url", req.GetTypeUrl()). Str("type-url", req.GetTypeUrl()).
Msg("xdsmgr: ack") Msg("xdsmgr: ack")
} }
@ -211,7 +211,7 @@ func (mgr *Manager) DeltaAggregatedResources(
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case res := <-outgoing: case res := <-outgoing:
log.Info(ctx). log.Debug(ctx).
Str("type-url", res.GetTypeUrl()). Str("type-url", res.GetTypeUrl()).
Int("resource-count", len(res.GetResources())). Int("resource-count", len(res.GetResources())).
Int("removed-resource-count", len(res.GetRemovedResources())). Int("removed-resource-count", len(res.GetRemovedResources())).

View file

@ -90,7 +90,7 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
now := time.Now() now := time.Now()
src.mu.Lock() src.mu.Lock()
defer src.mu.Unlock() defer src.mu.Unlock()
log.Info(ctx).Str("lock-wait", time.Since(now).String()).Msg("databroker: rebuilding configuration") log.Debug(ctx).Str("lock-wait", time.Since(now).String()).Msg("databroker: rebuilding configuration")
cfg := src.underlyingConfig.Clone() cfg := src.underlyingConfig.Clone()
@ -103,7 +103,7 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
log.Error(ctx).Err(err).Msg("databroker: failed to build new config") log.Error(ctx).Err(err).Msg("databroker: failed to build new config")
return return
} }
log.Info(ctx).Str("elapsed", time.Since(now).String()).Msg("databroker: built new config") log.Debug(ctx).Str("elapsed", time.Since(now).String()).Msg("databroker: built new config")
src.computedConfig = cfg src.computedConfig = cfg
if !firstTime { if !firstTime {
@ -278,7 +278,7 @@ func (src *ConfigSource) runUpdater(cfg *config.Config) {
databrokerURLs = append(databrokerURLs, url.String()) databrokerURLs = append(databrokerURLs, url.String())
} }
log.Info(ctx). log.Debug(ctx).
Str("outbound_port", cfg.OutboundPort). Str("outbound_port", cfg.OutboundPort).
Strs("databroker_urls", databrokerURLs). Strs("databroker_urls", databrokerURLs).
Msg("config: starting databroker config source syncer") Msg("config: starting databroker config source syncer")

View file

@ -75,7 +75,7 @@ func (srv *Server) UpdateConfig(options ...ServerOption) {
func (srv *Server) AcquireLease(ctx context.Context, req *databroker.AcquireLeaseRequest) (*databroker.AcquireLeaseResponse, error) { func (srv *Server) AcquireLease(ctx context.Context, req *databroker.AcquireLeaseRequest) (*databroker.AcquireLeaseResponse, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.AcquireLease") ctx, span := trace.StartSpan(ctx, "databroker.grpc.AcquireLease")
defer span.End() defer span.End()
log.Info(ctx). log.Debug(ctx).
Str("name", req.GetName()). Str("name", req.GetName()).
Dur("duration", req.GetDuration().AsDuration()). Dur("duration", req.GetDuration().AsDuration()).
Msg("acquire lease") Msg("acquire lease")
@ -102,7 +102,7 @@ func (srv *Server) AcquireLease(ctx context.Context, req *databroker.AcquireLeas
func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databroker.GetResponse, error) { func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databroker.GetResponse, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.Get") ctx, span := trace.StartSpan(ctx, "databroker.grpc.Get")
defer span.End() defer span.End()
log.Info(ctx). log.Debug(ctx).
Str("type", req.GetType()). Str("type", req.GetType()).
Str("id", req.GetId()). Str("id", req.GetId()).
Msg("get") Msg("get")
@ -129,7 +129,7 @@ func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databr
func (srv *Server) ListTypes(ctx context.Context, _ *emptypb.Empty) (*databroker.ListTypesResponse, error) { func (srv *Server) ListTypes(ctx context.Context, _ *emptypb.Empty) (*databroker.ListTypesResponse, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.ListTypes") ctx, span := trace.StartSpan(ctx, "databroker.grpc.ListTypes")
defer span.End() defer span.End()
log.Info(ctx).Msg("list types") log.Debug(ctx).Msg("list types")
db, err := srv.getBackend() db, err := srv.getBackend()
if err != nil { if err != nil {
@ -146,7 +146,7 @@ func (srv *Server) ListTypes(ctx context.Context, _ *emptypb.Empty) (*databroker
func (srv *Server) Query(ctx context.Context, req *databroker.QueryRequest) (*databroker.QueryResponse, error) { func (srv *Server) Query(ctx context.Context, req *databroker.QueryRequest) (*databroker.QueryResponse, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.Query") ctx, span := trace.StartSpan(ctx, "databroker.grpc.Query")
defer span.End() defer span.End()
log.Info(ctx). log.Debug(ctx).
Str("type", req.GetType()). Str("type", req.GetType()).
Str("query", req.GetQuery()). Str("query", req.GetQuery()).
Int64("offset", req.GetOffset()). Int64("offset", req.GetOffset()).
@ -202,7 +202,7 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr
records := req.GetRecords() records := req.GetRecords()
if len(records) == 1 { if len(records) == 1 {
log.Info(ctx). log.Debug(ctx).
Str("record-type", records[0].GetType()). Str("record-type", records[0].GetType()).
Str("record-id", records[0].GetId()). Str("record-id", records[0].GetId()).
Msg("put") Msg("put")
@ -211,7 +211,7 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr
for _, record := range records { for _, record := range records {
recordType = record.GetType() recordType = record.GetType()
} }
log.Info(ctx). log.Debug(ctx).
Int("record-count", len(records)). Int("record-count", len(records)).
Str("record-type", recordType). Str("record-type", recordType).
Msg("put") Msg("put")
@ -241,7 +241,7 @@ func (srv *Server) Patch(ctx context.Context, req *databroker.PatchRequest) (*da
records := req.GetRecords() records := req.GetRecords()
if len(records) == 1 { if len(records) == 1 {
log.Info(ctx). log.Debug(ctx).
Str("record-type", records[0].GetType()). Str("record-type", records[0].GetType()).
Str("record-id", records[0].GetId()). Str("record-id", records[0].GetId()).
Msg("patch") Msg("patch")
@ -250,7 +250,7 @@ func (srv *Server) Patch(ctx context.Context, req *databroker.PatchRequest) (*da
for _, record := range records { for _, record := range records {
recordType = record.GetType() recordType = record.GetType()
} }
log.Info(ctx). log.Debug(ctx).
Int("record-count", len(records)). Int("record-count", len(records)).
Str("record-type", recordType). Str("record-type", recordType).
Msg("patch") Msg("patch")
@ -277,7 +277,7 @@ func (srv *Server) Patch(ctx context.Context, req *databroker.PatchRequest) (*da
func (srv *Server) ReleaseLease(ctx context.Context, req *databroker.ReleaseLeaseRequest) (*emptypb.Empty, error) { func (srv *Server) ReleaseLease(ctx context.Context, req *databroker.ReleaseLeaseRequest) (*emptypb.Empty, error) {
ctx, span := trace.StartSpan(ctx, "databroker.grpc.ReleaseLease") ctx, span := trace.StartSpan(ctx, "databroker.grpc.ReleaseLease")
defer span.End() defer span.End()
log.Info(ctx). log.Debug(ctx).
Str("name", req.GetName()). Str("name", req.GetName()).
Str("id", req.GetId()). Str("id", req.GetId()).
Msg("release lease") Msg("release lease")
@ -351,7 +351,7 @@ func (srv *Server) Sync(req *databroker.SyncRequest, stream databroker.DataBroke
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
log.Info(ctx). log.Debug(ctx).
Uint64("server_version", req.GetServerVersion()). Uint64("server_version", req.GetServerVersion()).
Uint64("record_version", req.GetRecordVersion()). Uint64("record_version", req.GetRecordVersion()).
Msg("sync") Msg("sync")
@ -388,7 +388,7 @@ func (srv *Server) SyncLatest(req *databroker.SyncLatestRequest, stream databrok
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
log.Info(ctx). log.Debug(ctx).
Str("type", req.GetType()). Str("type", req.GetType()).
Msg("sync latest") Msg("sync latest")

View file

@ -127,7 +127,7 @@ func (mgr *Manager) refreshLoop(ctx context.Context, update <-chan updateRecords
mgr.onUpdateRecords(ctx, msg) mgr.onUpdateRecords(ctx, msg)
} }
log.Info(ctx). log.Debug(ctx).
Int("sessions", mgr.sessions.Len()). Int("sessions", mgr.sessions.Len()).
Int("users", mgr.users.Len()). Int("users", mgr.users.Len()).
Msg("initial sync complete") Msg("initial sync complete")

View file

@ -80,7 +80,7 @@ func (svc *Mux) subscribeAndDispatch(ctx context.Context, onConnected func()) (e
err = multierror.Append(err, svc.onDisconnected(ctx)).ErrorOrNil() err = multierror.Append(err, svc.onDisconnected(ctx)).ErrorOrNil()
}() }()
log.Ctx(ctx).Info().Msg("subscribed to connect service") log.Ctx(ctx).Debug().Msg("subscribed to connect service")
for { for {
msg, err := stream.Recv() msg, err := stream.Recv()
log.Ctx(ctx).Info().Interface("msg", msg).Err(err).Msg("receive") log.Ctx(ctx).Info().Interface("msg", msg).Err(err).Msg("receive")

View file

@ -70,7 +70,7 @@ func (c *controller) initAPI(ctx context.Context) error {
} }
func run(ctx context.Context, name string, runFn func(context.Context) error) error { func run(ctx context.Context, name string, runFn func(context.Context) error) error {
log.Ctx(ctx).Info().Str("name", name).Msg("starting") log.Ctx(ctx).Debug().Str("name", name).Msg("starting")
err := runFn(ctx) err := runFn(ctx)
if err != nil && !errors.Is(err, context.Canceled) { if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("%s: %w", name, err) return fmt.Errorf("%s: %w", name, err)

View file

@ -14,16 +14,16 @@ func (c *controller) RunConnectLog(ctx context.Context) error {
return c.api.Watch(ctx, return c.api.Watch(ctx,
connect_mux.WithOnConnected(func(ctx context.Context) { connect_mux.WithOnConnected(func(ctx context.Context) {
logger.Info().Msg("connected") logger.Debug().Msg("connected")
}), }),
connect_mux.WithOnDisconnected(func(ctx context.Context) { connect_mux.WithOnDisconnected(func(ctx context.Context) {
logger.Info().Msg("disconnected") logger.Debug().Msg("disconnected")
}), }),
connect_mux.WithOnBootstrapConfigUpdated(func(ctx context.Context) { connect_mux.WithOnBootstrapConfigUpdated(func(ctx context.Context) {
logger.Info().Msg("bootstrap config updated") logger.Debug().Msg("bootstrap config updated")
}), }),
connect_mux.WithOnBundleUpdated(func(ctx context.Context, key string) { connect_mux.WithOnBundleUpdated(func(ctx context.Context, key string) {
logger.Info().Str("key", key).Msg("bundle updated") logger.Debug().Str("key", key).Msg("bundle updated")
}), }),
) )
} }

View file

@ -91,7 +91,7 @@ func (c *client) logConnectionState(ctx context.Context, conn *grpc.ClientConn)
for ctx.Err() == nil && state != connectivity.Shutdown { for ctx.Err() == nil && state != connectivity.Shutdown {
_ = conn.WaitForStateChange(ctx, state) _ = conn.WaitForStateChange(ctx, state)
state = conn.GetState() state = conn.GetState()
log.Ctx(ctx).Info(). log.Ctx(ctx).Debug().
Str("endpoint", c.config.connectionURI). Str("endpoint", c.config.connectionURI).
Str("state", state.String()). Str("state", state.String()).
Msg("grpc connection state") Msg("grpc connection state")
@ -103,7 +103,7 @@ func interceptorLogger(ctx context.Context, lvl logging.Level, msg string, field
switch lvl { switch lvl {
case logging.LevelDebug: case logging.LevelDebug:
l.Info().Msg(msg) l.Debug().Msg(msg)
case logging.LevelInfo: case logging.LevelInfo:
l.Info().Msg(msg) l.Info().Msg(msg)
case logging.LevelWarn: case logging.LevelWarn:

View file

@ -165,7 +165,7 @@ func (c *service) syncBundle(ctx context.Context, key string) error {
RecordTypes: bundleRecordTypes, RecordTypes: bundleRecordTypes,
} }
log.Ctx(ctx).Info(). log.Ctx(ctx).Debug().
Str("bundle", key). Str("bundle", key).
Strs("record_types", bundleRecordTypes). Strs("record_types", bundleRecordTypes).
Str("etag", current.ETag). Str("etag", current.ETag).

View file

@ -10,6 +10,7 @@ import (
"syscall" "syscall"
envoy_service_auth_v3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3" envoy_service_auth_v3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3"
"go.uber.org/automaxprocs/maxprocs"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/pomerium/pomerium/authenticate" "github.com/pomerium/pomerium/authenticate"
@ -31,6 +32,8 @@ import (
// Run runs the main pomerium application. // Run runs the main pomerium application.
func Run(ctx context.Context, src config.Source) error { func Run(ctx context.Context, src config.Source) error {
_, _ = maxprocs.Set(maxprocs.Logger(func(s string, i ...interface{}) { log.Debug(context.Background()).Msgf(s, i...) }))
log.Info(ctx). log.Info(ctx).
Str("envoy_version", files.FullVersion()). Str("envoy_version", files.FullVersion()).
Str("version", version.FullVersion()). Str("version", version.FullVersion()).

View file

@ -76,7 +76,7 @@ func NewServer(ctx context.Context, src config.Source, builder *envoyconfig.Buil
src.OnConfigChange(ctx, srv.onConfigChange) src.OnConfigChange(ctx, srv.onConfigChange)
srv.onConfigChange(ctx, src.GetConfig()) srv.onConfigChange(ctx, src.GetConfig())
log.Info(ctx). log.Debug(ctx).
Str("path", envoyPath). Str("path", envoyPath).
Str("checksum", files.Checksum()). Str("checksum", files.Checksum()).
Msg("running envoy") Msg("running envoy")
@ -122,7 +122,7 @@ func (srv *Server) update(ctx context.Context, cfg *config.Config) {
} }
srv.options = options srv.options = options
log.Info(ctx).Msg("envoy: starting envoy process") log.Debug(ctx).Msg("envoy: starting envoy process")
if err := srv.run(ctx, cfg); err != nil { if err := srv.run(ctx, cfg); err != nil {
log.Error(ctx).Err(err).Str("service", "envoy").Msg("envoy: failed to run envoy process") log.Error(ctx).Err(err).Str("service", "envoy").Msg("envoy: failed to run envoy process")
return return
@ -249,6 +249,9 @@ func (srv *Server) handleLogs(ctx context.Context, rc io.ReadCloser) {
if x, err := zerolog.ParseLevel(logLevel); err == nil { if x, err := zerolog.ParseLevel(logLevel); err == nil {
lvl = x lvl = x
} }
if lvl == zerolog.InfoLevel {
lvl = zerolog.DebugLevel
}
if msg == "" { if msg == "" {
msg = ln msg = ln
} }
@ -270,7 +273,7 @@ func (srv *Server) handleLogs(ctx context.Context, rc io.ReadCloser) {
} }
func (srv *Server) monitorProcess(ctx context.Context, pid int32) { func (srv *Server) monitorProcess(ctx context.Context, pid int32) {
log.Info(ctx). log.Debug(ctx).
Int32("pid", pid). Int32("pid", pid).
Msg("envoy: start monitoring subprocess") Msg("envoy: start monitoring subprocess")

View file

@ -55,7 +55,7 @@ func NewGRPCClientConn(ctx context.Context, opts *Options, other ...grpc.DialOpt
grpc.WithInsecure(), grpc.WithInsecure(),
} }
dialOptions = append(dialOptions, other...) dialOptions = append(dialOptions, other...)
log.Info(ctx).Str("address", opts.Address).Msg("grpc: dialing") log.Debug(ctx).Str("address", opts.Address).Msg("grpc: dialing")
return grpc.DialContext(ctx, opts.Address, dialOptions...) return grpc.DialContext(ctx, opts.Address, dialOptions...)
} }

View file

@ -128,7 +128,7 @@ func (syncer *Syncer) Run(ctx context.Context) error {
} }
func (syncer *Syncer) init(ctx context.Context) error { func (syncer *Syncer) init(ctx context.Context) error {
log.Info(ctx).Msg("initial sync") log.Debug(ctx).Msg("initial sync")
records, recordVersion, serverVersion, err := InitialSync(ctx, syncer.handler.GetDataBrokerServiceClient(), &SyncLatestRequest{ records, recordVersion, serverVersion, err := InitialSync(ctx, syncer.handler.GetDataBrokerServiceClient(), &SyncLatestRequest{
Type: syncer.cfg.typeURL, Type: syncer.cfg.typeURL,
}) })
@ -157,7 +157,7 @@ func (syncer *Syncer) sync(ctx context.Context) error {
return fmt.Errorf("error calling sync: %w", err) return fmt.Errorf("error calling sync: %w", err)
} }
log.Info(ctx).Msg("listening for updates") log.Debug(ctx).Msg("listening for updates")
for { for {
res, err := stream.Recv() res, err := stream.Recv()