pomerium/pkg/fanout/fanout.go
2023-12-11 17:31:39 -05:00

70 lines
1.8 KiB
Go

// Package fanout implements a fan-out pattern that allows publishing messages to multiple subscribers
package fanout
import (
"context"
"errors"
)
var (
// ErrSubscriberClosed is returned when a subscriber is closed on the subscriber side (Receive)
ErrSubscriberClosed = errors.New("subscriber closed")
// ErrSubscriberEvicted is returned when a subscriber is unable to keep up with the messages
ErrSubscriberEvicted = errors.New("subscriber evicted, cannot keep up consuming messages")
// ErrStopped is returned when the fanout is stopped
ErrStopped = errors.New("fanout is stopped, no more messages will be accepted")
)
// FanOut is a fan-out pattern implementation that allows publishing messages to multiple subscribers
type FanOut[T any] struct {
cfg config
done <-chan struct{}
messages chan T
subscribers chan *subscriber[T]
}
// Start creates and runs a new FanOut
func Start[T any](ctx context.Context, opts ...Option) *FanOut[T] {
cfg := defaultFanOutConfig()
cfg.apply(opts...)
f := &FanOut[T]{
cfg: cfg,
done: ctx.Done(),
messages: make(chan T, cfg.publishBufferSize),
subscribers: make(chan *subscriber[T], cfg.subscriberBufferSize),
}
go f.dispatchLoop(ctx)
return f
}
func (f *FanOut[T]) dispatchLoop(ctx context.Context) {
subscribers := make(subscribers[T])
defer subscribers.closeAll(ErrStopped)
for {
select {
case <-ctx.Done():
return
case sub := <-f.subscribers:
subscribers.add(sub)
continue
case msg := <-f.messages:
subscribers.dispatch(ctx, msg)
}
}
}
func (f *FanOut[T]) addSubscriber(ctx context.Context, sub *subscriber[T]) error {
ctx, cancel := context.WithTimeout(ctx, f.cfg.addSubscriberTimeout)
defer cancel()
select {
case <-ctx.Done():
return context.Cause(ctx)
case <-f.done:
return ErrStopped
case f.subscribers <- sub:
return nil
}
}