xds: only tag contexts used for UpdateRecords (#2269)

This commit is contained in:
wasaga 2021-06-04 14:01:25 -04:00 committed by GitHub
parent 2156dbc553
commit 744e2c7993
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 31 deletions

View file

@ -4,12 +4,12 @@ package contextkeys
type contextKey int type contextKey int
const ( const (
// DatabrokerConfigVersion identifies the uint64 databroker version of the config // UpdateRecordsVersion identifies the uint64 databroker version of the config
DatabrokerConfigVersion contextKey = iota UpdateRecordsVersion contextKey = iota
) )
func (x contextKey) String() string { func (x contextKey) String() string {
return map[contextKey]string{ return map[contextKey]string{
DatabrokerConfigVersion: "db_config_version", UpdateRecordsVersion: "update_records_version",
}[x] }[x]
} }

View file

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"os" "os"
"sync" "sync"
@ -254,7 +255,7 @@ func (mgr *Manager) Update(ctx context.Context, resources map[string][]*envoy_se
mgr.mu.Lock() mgr.mu.Lock()
mgr.nonce = nonce mgr.nonce = nonce
mgr.resources = resources mgr.resources = resources
mgr.nonceToConfig.Add(nonce, ctx.Value(contextkeys.DatabrokerConfigVersion)) mgr.nonceToConfig.Add(nonce, ctx.Value(contextkeys.UpdateRecordsVersion))
mgr.mu.Unlock() mgr.mu.Unlock()
mgr.signal.Broadcast(ctx) mgr.signal.Broadcast(ctx)
@ -303,7 +304,7 @@ func (mgr *Manager) ackEvent(ctx context.Context, req *envoy_service_discovery_v
ResourceSubscribed: req.ResourceNamesSubscribe, ResourceSubscribed: req.ResourceNamesSubscribe,
ResourceUnsubscribed: req.ResourceNamesUnsubscribe, ResourceUnsubscribed: req.ResourceNamesUnsubscribe,
TypeUrl: req.TypeUrl, TypeUrl: req.TypeUrl,
Message: "ok", Message: fmt.Sprintf("ok %s", req.ResponseNonce),
}) })
log.Debug(ctx). log.Debug(ctx).
@ -323,7 +324,7 @@ func (mgr *Manager) changeEvent(ctx context.Context, res *envoy_service_discover
ResourceSubscribed: resourceNames(res.Resources), ResourceSubscribed: resourceNames(res.Resources),
ResourceUnsubscribed: res.RemovedResources, ResourceUnsubscribed: res.RemovedResources,
TypeUrl: res.TypeUrl, TypeUrl: res.TypeUrl,
Message: "change", Message: fmt.Sprintf("change %s", res.Nonce),
}) })
log.Debug(ctx). log.Debug(ctx).
Uint64("ctx_config_version", mgr.nonceToConfigVersion(res.Nonce)). Uint64("ctx_config_version", mgr.nonceToConfigVersion(res.Nonce)).

View file

@ -93,6 +93,7 @@ func (syncer *Syncer) Run(ctx context.Context) error {
cancel() cancel()
}() }()
ctx = syncer.logCtx(ctx)
for { for {
var err error var err error
if syncer.serverVersion == 0 { if syncer.serverVersion == 0 {
@ -102,7 +103,7 @@ func (syncer *Syncer) Run(ctx context.Context) error {
} }
if err != nil { if err != nil {
log.Error(syncer.logCtx(ctx)).Err(err).Msg("sync") log.Error(ctx).Err(err).Msg("sync")
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@ -113,42 +114,42 @@ func (syncer *Syncer) Run(ctx context.Context) error {
} }
func (syncer *Syncer) init(ctx context.Context) error { func (syncer *Syncer) init(ctx context.Context) error {
log.Info(syncer.logCtx(ctx)).Msg("initial sync") log.Info(ctx).Msg("initial sync")
records, recordVersion, serverVersion, err := InitialSync(syncer.logCtx(ctx), syncer.handler.GetDataBrokerServiceClient(), &SyncLatestRequest{ records, recordVersion, serverVersion, err := InitialSync(ctx, syncer.handler.GetDataBrokerServiceClient(), &SyncLatestRequest{
Type: syncer.cfg.typeURL, Type: syncer.cfg.typeURL,
}) })
if err != nil { if err != nil {
log.Error(syncer.logCtx(ctx)).Err(err).Msg("error during initial sync") log.Error(ctx).Err(err).Msg("error during initial sync")
return err return err
} }
syncer.backoff.Reset() syncer.backoff.Reset()
// reset the records as we have to sync latest // reset the records as we have to sync latest
syncer.handler.ClearRecords(syncer.logCtx(ctx)) syncer.handler.ClearRecords(ctx)
syncer.recordVersion = recordVersion syncer.recordVersion = recordVersion
syncer.serverVersion = serverVersion syncer.serverVersion = serverVersion
syncer.handler.UpdateRecords(syncer.logCtx(ctx), serverVersion, records) syncer.handler.UpdateRecords(ctx, serverVersion, records)
return nil return nil
} }
func (syncer *Syncer) sync(ctx context.Context) error { func (syncer *Syncer) sync(ctx context.Context) error {
stream, err := syncer.handler.GetDataBrokerServiceClient().Sync(syncer.logCtx(ctx), &SyncRequest{ stream, err := syncer.handler.GetDataBrokerServiceClient().Sync(ctx, &SyncRequest{
ServerVersion: syncer.serverVersion, ServerVersion: syncer.serverVersion,
RecordVersion: syncer.recordVersion, RecordVersion: syncer.recordVersion,
}) })
if err != nil { if err != nil {
log.Error(syncer.logCtx(ctx)).Err(err).Msg("error during sync") log.Error(ctx).Err(err).Msg("error during sync")
return err return err
} }
log.Info(syncer.logCtx(ctx)).Msg("listening for updates") log.Info(ctx).Msg("listening for updates")
for { for {
res, err := stream.Recv() res, err := stream.Recv()
if status.Code(err) == codes.Aborted { if status.Code(err) == codes.Aborted {
log.Error(syncer.logCtx(ctx)).Err(err).Msg("aborted sync due to mismatched server version") log.Error(ctx).Err(err).Msg("aborted sync due to mismatched server version")
// server version changed, so re-init // server version changed, so re-init
syncer.serverVersion = 0 syncer.serverVersion = 0
return nil return nil
@ -156,32 +157,38 @@ func (syncer *Syncer) sync(ctx context.Context) error {
return err return err
} }
log.Debug(syncer.logCtx(ctx)). rec := res.GetRecord()
Uint("version", uint(res.GetRecord().GetVersion())). log.Debug(logCtxRec(ctx, rec)).Msg("syncer got record")
Str("id", res.GetRecord().GetId()).
Msg("syncer got record")
if syncer.recordVersion != res.GetRecord().GetVersion()-1 { if syncer.recordVersion != res.GetRecord().GetVersion()-1 {
log.Error(syncer.logCtx(ctx)).Err(err). log.Error(logCtxRec(ctx, rec)).Err(err).
Uint64("received", res.GetRecord().GetVersion()).
Msg("aborted sync due to missing record") Msg("aborted sync due to missing record")
syncer.serverVersion = 0 syncer.serverVersion = 0
return fmt.Errorf("missing record version") return fmt.Errorf("missing record version")
} }
syncer.recordVersion = res.GetRecord().GetVersion() syncer.recordVersion = res.GetRecord().GetVersion()
if syncer.cfg.typeURL == "" || syncer.cfg.typeURL == res.GetRecord().GetType() { if syncer.cfg.typeURL == "" || syncer.cfg.typeURL == res.GetRecord().GetType() {
syncer.handler.UpdateRecords(syncer.logCtx(ctx), syncer.serverVersion, []*Record{res.GetRecord()}) ctx := logCtxRec(ctx, rec)
log.Debug(ctx).Msg("update records")
syncer.handler.UpdateRecords(
context.WithValue(ctx, contextkeys.UpdateRecordsVersion, rec.GetVersion()),
syncer.serverVersion, []*Record{rec})
} }
} }
} }
// logCtx adds log params to context which // logCtxRecRec adds log params to context related to particular record
func (syncer *Syncer) logCtx(ctx context.Context) context.Context { func logCtxRec(ctx context.Context, rec *Record) context.Context {
ctx = log.WithContext(ctx, func(c zerolog.Context) zerolog.Context { return log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("syncer_id", syncer.id). return c.Str("record_type", rec.GetType()).
Str("type", syncer.cfg.typeURL). Str("record_id", rec.GetId()).
Uint64("server_version", syncer.serverVersion). Uint64("record_version", rec.GetVersion())
Uint64("record_version", syncer.recordVersion) })
}
func (syncer *Syncer) logCtx(ctx context.Context) context.Context {
return log.WithContext(ctx, func(c zerolog.Context) zerolog.Context {
return c.Str("syncer_id", syncer.id).
Str("syncer_type", syncer.cfg.typeURL)
}) })
return context.WithValue(ctx, contextkeys.DatabrokerConfigVersion, syncer.recordVersion)
} }