package reconciler

/*
 * This is a main control loop for the reconciler service.
 *
 */

import (
	"context"
	"time"

	"golang.org/x/sync/errgroup"
	"golang.org/x/time/rate"

	"github.com/pomerium/pomerium/internal/atomicutil"
	connect_mux "github.com/pomerium/pomerium/internal/zero/connect-mux"
)

type service struct {
	config *reconcilerConfig

	databrokerRateLimit *rate.Limiter

	bundles BundleQueue

	fullSyncRequest        chan struct{}
	bundleSyncRequest      chan struct{}
	periodicUpdateInterval atomicutil.Value[time.Duration]
}

// Run creates a new bundle updater client
// that runs until the context is canceled or a fatal error occurs.
func Run(ctx context.Context, opts ...Option) error {
	config := newConfig(opts...)

	c := &service{
		config:              config,
		databrokerRateLimit: rate.NewLimiter(rate.Limit(config.databrokerRPS), 1),
		fullSyncRequest:     make(chan struct{}, 1),
	}
	c.periodicUpdateInterval.Store(config.checkForUpdateIntervalWhenDisconnected)

	eg, ctx := errgroup.WithContext(ctx)
	eg.Go(func() error { return c.watchUpdates(ctx) })
	eg.Go(func() error { return c.SyncLoop(ctx) })

	return eg.Wait()
}

func (c *service) watchUpdates(ctx context.Context) error {
	return c.config.api.Watch(ctx,
		connect_mux.WithOnConnected(func(_ context.Context) {
			c.triggerFullUpdate(true)
		}),
		connect_mux.WithOnDisconnected(func(_ context.Context) {
			c.triggerFullUpdate(false)
		}),
		connect_mux.WithOnBundleUpdated(func(_ context.Context, key string) {
			c.triggerBundleUpdate(key)
		}),
	)
}

func (c *service) triggerBundleUpdate(id string) {
	c.periodicUpdateInterval.Store(c.config.checkForUpdateIntervalWhenConnected)
	c.bundles.MarkForSync(id)

	select {
	case c.fullSyncRequest <- struct{}{}:
	default:
	}
}

func (c *service) triggerFullUpdate(connected bool) {
	timeout := c.config.checkForUpdateIntervalWhenDisconnected
	if connected {
		timeout = c.config.checkForUpdateIntervalWhenConnected
	}
	c.periodicUpdateInterval.Store(timeout)

	select {
	case c.fullSyncRequest <- struct{}{}:
	default:
	}
}