databroker: fix fast forward (#4194)

databroker: fix fast forward (#4192)

* databroker: sort configs

* databroker: fix fast-forward

* newest not oldest

Co-authored-by: Caleb Doxsey <cdoxsey@pomerium.com>
This commit is contained in:
backport-actions-token[bot] 2023-05-23 15:47:07 -06:00 committed by GitHub
parent ca59798540
commit 4aa6960e06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 47 deletions

View file

@ -81,7 +81,7 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
// start the updater // start the updater
src.runUpdater(cfg) src.runUpdater(cfg)
seen := map[uint64]struct{}{} seen := map[uint64]string{}
for _, policy := range cfg.Options.GetAllPolicies() { for _, policy := range cfg.Options.GetAllPolicies() {
id, err := policy.RouteID() id, err := policy.RouteID()
if err != nil { if err != nil {
@ -90,7 +90,7 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
Msg("databroker: invalid policy config, ignoring") Msg("databroker: invalid policy config, ignoring")
return return
} }
seen[id] = struct{}{} seen[id] = ""
} }
var additionalPolicies []config.Policy var additionalPolicies []config.Policy
@ -145,11 +145,12 @@ func (src *ConfigSource) rebuild(ctx context.Context, firstTime firstTime) {
errCount++ errCount++
log.Warn(ctx).Err(err). log.Warn(ctx).Err(err).
Str("db_config_id", id). Str("db_config_id", id).
Str("seen-in", seen[routeID]).
Str("policy", policy.String()). Str("policy", policy.String()).
Msg("databroker: duplicate policy detected, ignoring") Msg("databroker: duplicate policy detected, ignoring")
continue continue
} }
seen[routeID] = struct{}{} seen[routeID] = id
additionalPolicies = append(additionalPolicies, *policy) additionalPolicies = append(additionalPolicies, *policy)
} }

View file

@ -2,15 +2,18 @@ package databroker
import ( import (
"context" "context"
"sync"
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/pkg/slices"
) )
// fastForwardHandler will skip // fastForwardHandler will skip
type fastForwardHandler struct { type fastForwardHandler struct {
handler SyncerHandler handler SyncerHandler
in chan *ffCmd pending chan ffCmd
exec chan *ffCmd
mu sync.Mutex
} }
type ffCmd struct { type ffCmd struct {
@ -22,52 +25,23 @@ type ffCmd struct {
func newFastForwardHandler(ctx context.Context, handler SyncerHandler) SyncerHandler { func newFastForwardHandler(ctx context.Context, handler SyncerHandler) SyncerHandler {
ff := &fastForwardHandler{ ff := &fastForwardHandler{
handler: handler, handler: handler,
in: make(chan *ffCmd, 20), pending: make(chan ffCmd, 1),
exec: make(chan *ffCmd),
} }
go ff.runSelect(ctx) go ff.run(ctx)
go ff.runExec(ctx)
return ff return ff
} }
func (ff *fastForwardHandler) update(ctx context.Context, c *ffCmd) { func (ff *fastForwardHandler) run(ctx context.Context) {
ff.handler.UpdateRecords(ctx, c.serverVersion, c.records)
}
func (ff *fastForwardHandler) runSelect(ctx context.Context) {
var update *ffCmd
for {
if update == nil {
select {
case <-ctx.Done():
return
case update = <-ff.in:
}
} else {
select {
case <-ctx.Done():
return
case update = <-ff.in:
case ff.exec <- update:
update = nil
}
}
}
}
func (ff *fastForwardHandler) runExec(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case update := <-ff.exec: case cmd := <-ff.pending:
if update.clearRecords { if cmd.clearRecords {
ff.handler.ClearRecords(ctx) ff.handler.ClearRecords(ctx)
continue } else {
ff.handler.UpdateRecords(ctx, cmd.serverVersion, cmd.records)
} }
ff.update(ctx, update)
} }
} }
} }
@ -77,19 +51,57 @@ func (ff *fastForwardHandler) GetDataBrokerServiceClient() DataBrokerServiceClie
} }
func (ff *fastForwardHandler) ClearRecords(ctx context.Context) { func (ff *fastForwardHandler) ClearRecords(ctx context.Context) {
ff.mu.Lock()
defer ff.mu.Unlock()
var cmd ffCmd
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Error(ctx). return
Msg("ff_handler: ClearRecords: context canceled") case cmd = <-ff.pending:
case ff.exec <- &ffCmd{clearRecords: true}: default:
}
cmd.clearRecords = true
cmd.records = nil
select {
case <-ctx.Done():
case ff.pending <- cmd:
} }
} }
func (ff *fastForwardHandler) UpdateRecords(ctx context.Context, serverVersion uint64, records []*Record) { func (ff *fastForwardHandler) UpdateRecords(ctx context.Context, serverVersion uint64, records []*Record) {
ff.mu.Lock()
defer ff.mu.Unlock()
var cmd ffCmd
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Error(ctx). return
Msg("ff_handler: UpdateRecords: context canceled") case cmd = <-ff.pending:
case ff.in <- &ffCmd{serverVersion: serverVersion, records: records}: default:
}
records = append(cmd.records, records...)
// reverse, so that when we get the unique records, the newest take precedence
slices.Reverse(records)
cnt := len(records)
records = slices.UniqueBy(records, func(record *Record) [2]string {
return [2]string{record.GetType(), record.GetId()}
})
dropped := cnt - len(records)
if dropped > 0 {
log.Info(ctx).Msgf("databroker: fast-forwarded %d records", dropped)
}
// reverse back so they appear in the order they were delivered
slices.Reverse(records)
cmd.clearRecords = false
cmd.serverVersion = serverVersion
cmd.records = records
select {
case <-ctx.Done():
case ff.pending <- cmd:
} }
} }

View file

@ -33,6 +33,13 @@ func Remove[S ~[]E, E comparable](s S, e E) S {
return ns return ns
} }
// Reverse reverses a slice's order.
func Reverse[S ~[]E, E comparable](s S) {
for i := 0; i < len(s)/2; i++ {
s[i], s[len(s)-1-i] = s[len(s)-1-i], s[i]
}
}
// Unique returns the unique elements of s. // Unique returns the unique elements of s.
func Unique[S ~[]E, E comparable](s S) S { func Unique[S ~[]E, E comparable](s S) S {
var ns S var ns S
@ -45,3 +52,17 @@ func Unique[S ~[]E, E comparable](s S) S {
} }
return ns return ns
} }
// UniqueBy returns the unique elements of s using a function to map elements.
func UniqueBy[S ~[]E, E any, V comparable](s S, by func(E) V) S {
var ns S
h := map[V]struct{}{}
for _, el := range s {
v := by(el)
if _, ok := h[v]; !ok {
h[v] = struct{}{}
ns = append(ns, el)
}
}
return ns
}

32
pkg/slices/slices_test.go Normal file
View file

@ -0,0 +1,32 @@
package slices
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestReverse(t *testing.T) {
t.Parallel()
for _, tc := range []struct {
in []int
expect []int
}{
{in: []int{1, 2, 3}, expect: []int{3, 2, 1}},
{in: []int{1, 2}, expect: []int{2, 1}},
{in: []int{1}, expect: []int{1}},
} {
s := make([]int, len(tc.in))
copy(s, tc.in)
Reverse(s)
assert.Equal(t, tc.expect, s)
}
}
func TestUniqueBy(t *testing.T) {
t.Parallel()
s := UniqueBy([]int{1, 2, 3, 4, 3, 1, 1, 4, 2}, func(i int) int { return i % 3 })
assert.Equal(t, []int{1, 2, 3}, s)
}