pomerium/internal/events/target.go
Caleb Doxsey 5f4e13e130
core/events: fix flaky test (#4700)
* core/events: use synchronous channel to fix flaky test

* switch to mutexes

* remove test
2023-11-02 11:28:28 -06:00

151 lines
3.6 KiB
Go

package events
import (
"context"
"errors"
"sync"
"github.com/google/uuid"
)
type (
// A Listener is a function that listens for events of type T.
Listener[T any] func(ctx context.Context, event T)
// A Handle represents a listener.
Handle string
dispatchEvent[T any] struct {
ctx context.Context
event T
}
)
// A Target is a target for events.
//
// Listeners are added with AddListener with a function to be called when the event occurs.
// AddListener returns a Handle which can be used to remove a listener with RemoveListener.
//
// Dispatch dispatches events to all the registered listeners.
//
// Target is safe to use in its zero state.
//
// Each listener is run in its own goroutine.
//
// A slow listener will cause the next event dispatch to block. This is the
// opposite behavior from Manager.
//
// Close will remove and cancel all listeners.
type Target[T any] struct {
mu sync.RWMutex
listeners map[Handle]targetListener[T]
}
// AddListener adds a listener to the target.
func (t *Target[T]) AddListener(listener Listener[T]) Handle {
// using a handle is necessary because you can't use a function as a map key.
h := Handle(uuid.NewString())
tl := newTargetListener(listener)
t.mu.Lock()
defer t.mu.Unlock()
if t.listeners == nil {
t.listeners = make(map[Handle]targetListener[T])
}
t.listeners[h] = tl
return h
}
// Close closes the event target. This can be called multiple times safely.
func (t *Target[T]) Close() {
t.mu.Lock()
defer t.mu.Unlock()
for _, tl := range t.listeners {
tl.close()
}
t.listeners = nil
}
// Dispatch dispatches an event to all listeners.
func (t *Target[T]) Dispatch(ctx context.Context, evt T) {
// store all the listeners in a slice so we don't hold the lock while dispatching
var tls []targetListener[T]
t.mu.RLock()
tls = make([]targetListener[T], 0, len(t.listeners))
for _, tl := range t.listeners {
tls = append(tls, tl)
}
t.mu.RUnlock()
// Because we're outside of the lock it's possible we may dispatch to a listener
// that's been removed if Dispatch and RemoveListener are called from separate
// goroutines. There should be no possibility of a deadlock however.
for _, tl := range tls {
tl.dispatch(dispatchEvent[T]{ctx: ctx, event: evt})
}
}
// RemoveListener removes a listener from the target.
func (t *Target[T]) RemoveListener(handle Handle) {
t.mu.Lock()
defer t.mu.Unlock()
if t.listeners == nil {
t.listeners = make(map[Handle]targetListener[T])
}
tl, ok := t.listeners[handle]
if !ok {
return
}
tl.close()
delete(t.listeners, handle)
}
// A targetListener starts a goroutine that pulls events from "ch" and
// calls the listener for each event.
//
// The goroutine is stopped when ".close()" is called. We don't rely
// on closing "ch" because sending to a closed channel results in a
// panic. Instead we signal closing via "ctx.Done()".
type targetListener[T any] struct {
ctx context.Context
cancel context.CancelCauseFunc
ch chan dispatchEvent[T]
listener Listener[T]
}
func newTargetListener[T any](listener Listener[T]) targetListener[T] {
li := targetListener[T]{}
li.ctx, li.cancel = context.WithCancelCause(context.Background())
li.ch = make(chan dispatchEvent[T])
li.listener = listener
go li.run()
return li
}
func (li targetListener[T]) close() {
li.cancel(errors.New("events target listener closed"))
}
func (li targetListener[T]) dispatch(evt dispatchEvent[T]) {
select {
case <-li.ctx.Done():
case li.ch <- evt:
}
}
func (li targetListener[T]) run() {
for {
select {
case <-li.ctx.Done():
return
case evt := <-li.ch:
li.listener(evt.ctx, evt.event)
}
}
}