mirror of
https://github.com/pomerium/pomerium.git
synced 2025-08-06 10:21:05 +02:00
xds: retry storing configuration events (#2266)
This commit is contained in:
parent
c3286aa355
commit
4af12c4bbb
1 changed files with 36 additions and 4 deletions
|
@ -3,8 +3,12 @@ package controlplane
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
|
|
||||||
|
@ -34,10 +38,10 @@ func (srv *Server) runEnvoyConfigurationEventHandler(ctx context.Context) error
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case evt = <-srv.envoyConfigurationEvents:
|
case evt = <-srv.envoyConfigurationEvents:
|
||||||
}
|
}
|
||||||
err := srv.storeEnvoyConfigurationEvent(ctx, evt)
|
|
||||||
if err != nil {
|
withGRPCBackoff(ctx, func() error {
|
||||||
log.Error(ctx).Err(err).Msg("controlplane: error storing configuration event")
|
return srv.storeEnvoyConfigurationEvent(ctx, evt)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,3 +114,31 @@ func (srv *Server) getDataBrokerClient(ctx context.Context) (databrokerpb.DataBr
|
||||||
client := databrokerpb.NewDataBrokerServiceClient(cc)
|
client := databrokerpb.NewDataBrokerServiceClient(cc)
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// withGRPCBackoff runs f. If an unavailable or resource exhausted error occurs, the request will be retried.
|
||||||
|
// All other errors return immediately.
|
||||||
|
func withGRPCBackoff(ctx context.Context, f func() error) {
|
||||||
|
bo := backoff.NewExponentialBackOff()
|
||||||
|
bo.MaxElapsedTime = 0
|
||||||
|
for {
|
||||||
|
err := f()
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
return
|
||||||
|
case status.Code(err) == codes.Unavailable,
|
||||||
|
status.Code(err) == codes.ResourceExhausted,
|
||||||
|
status.Code(err) == codes.DeadlineExceeded:
|
||||||
|
log.Error(ctx).Err(err).Msg("controlplane: error storing configuration event, retrying")
|
||||||
|
// retry
|
||||||
|
default:
|
||||||
|
log.Error(ctx).Err(err).Msg("controlplane: error storing configuration event")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-time.After(bo.NextBackOff()):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue