diff --git a/cache/cache.go b/cache/cache.go index dcb02d0db..f77ec6c49 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -31,9 +31,10 @@ type Cache struct { userServer *UserServer manager *manager.Manager - localListener net.Listener - localGRPCServer *grpc.Server - localGRPCConnection *grpc.ClientConn + localListener net.Listener + localGRPCServer *grpc.Server + localGRPCConnection *grpc.ClientConn + deprecatedCacheClusterDomain string //TODO: remove in v0.11 } // New creates a new cache service. @@ -85,9 +86,10 @@ func New(opts config.Options) (*Cache, error) { userServer: userServer, manager: manager, - localListener: localListener, - localGRPCServer: localGRPCServer, - localGRPCConnection: localGRPCConnection, + localListener: localListener, + localGRPCServer: localGRPCServer, + localGRPCConnection: localGRPCConnection, + deprecatedCacheClusterDomain: opts.GetDataBrokerURL().Hostname(), }, nil } diff --git a/cache/memberlist.go b/cache/memberlist.go index bc2b2ab71..409540942 100644 --- a/cache/memberlist.go +++ b/cache/memberlist.go @@ -46,16 +46,30 @@ func (c *Cache) runMemberList(ctx context.Context) error { return errors.New("memberlist: can't find self") } + mh.log.Info().Str("cluster_url", c.deprecatedCacheClusterDomain).Msg("checking for existing cluster members") + + joined, err := mh.memberlist.Join([]string{c.deprecatedCacheClusterDomain, mh.memberlist.Members()[0].Addr.String()}) + if err != nil { + return fmt.Errorf("memberlist: failed to join cluster: %w", err) + } + + mh.log.Info().Int("joined", joined).Interface("members", mh.memberlist.Members()).Msg("joined nodes") + if joined > 1 { + mh.log.Error().Msg("multiple cache servers not supported") + } <-ctx.Done() + mh.memberlist.Leave() return mh.memberlist.Shutdown() } func (mh *memberlistHandler) NotifyJoin(node *memberlist.Node) { mh.log.Debug().Interface("node", node).Msg("node joined") - if mh.memberlist != nil && len(mh.memberlist.Members()) > 1 { - mh.log.Error().Msg("detected multiple cache servers, which is not supported") - } + go func() { + if mh.memberlist != nil && len(mh.memberlist.Members()) > 1 { + mh.log.Error().Msg("detected multiple cache servers, which is not supported") + } + }() } func (mh *memberlistHandler) NotifyLeave(node *memberlist.Node) {