mirror of
https://github.com/pomerium/pomerium.git
synced 2025-05-23 22:17:14 +02:00
remove memberlist (#1615)
This commit is contained in:
parent
251dd8e03e
commit
ee03d0e9f8
6 changed files with 16 additions and 174 deletions
17
cache/cache.go
vendored
17
cache/cache.go
vendored
|
@ -10,9 +10,9 @@ import (
|
|||
"net"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"gopkg.in/tomb.v2"
|
||||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/internal/directory"
|
||||
|
@ -118,24 +118,19 @@ func (c *Cache) Register(grpcServer *grpc.Server) {
|
|||
|
||||
// Run runs the cache components.
|
||||
func (c *Cache) Run(ctx context.Context) error {
|
||||
t, ctx := tomb.WithContext(ctx)
|
||||
if c.dataBrokerStorageType == config.StorageInMemoryName {
|
||||
t.Go(func() error {
|
||||
return c.runMemberList(ctx)
|
||||
})
|
||||
}
|
||||
t.Go(func() error {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
return c.localGRPCServer.Serve(c.localListener)
|
||||
})
|
||||
t.Go(func() error {
|
||||
eg.Go(func() error {
|
||||
<-ctx.Done()
|
||||
c.localGRPCServer.Stop()
|
||||
return nil
|
||||
})
|
||||
t.Go(func() error {
|
||||
eg.Go(func() error {
|
||||
return c.manager.Run(ctx)
|
||||
})
|
||||
return t.Wait()
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (c *Cache) update(cfg *config.Config) error {
|
||||
|
|
96
cache/memberlist.go
vendored
96
cache/memberlist.go
vendored
|
@ -1,96 +0,0 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
stdlog "log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/memberlist"
|
||||
"github.com/rs/zerolog"
|
||||
|
||||
"github.com/pomerium/pomerium/internal/log"
|
||||
)
|
||||
|
||||
type memberlistHandler struct {
|
||||
cfg *memberlist.Config
|
||||
memberlist *memberlist.Memberlist
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func (c *Cache) runMemberList(ctx context.Context) error {
|
||||
mh := new(memberlistHandler)
|
||||
mh.log = log.With().Str("service", "memberlist").Logger()
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
defer pw.Close()
|
||||
defer pr.Close()
|
||||
|
||||
mh.cfg = memberlist.DefaultLANConfig()
|
||||
mh.cfg.Events = mh
|
||||
mh.cfg.Logger = stdlog.New(pw, "", 0)
|
||||
go mh.runLogHandler(pr)
|
||||
|
||||
var err error
|
||||
mh.memberlist, err = memberlist.Create(mh.cfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("memberlist: error creating memberlist: %w", err)
|
||||
}
|
||||
|
||||
// the only way memberlist would be empty here, following create is if
|
||||
// the current node suddenly died. Still, we check to be safe.
|
||||
if len(mh.memberlist.Members()) == 0 {
|
||||
return errors.New("memberlist: can't find self")
|
||||
}
|
||||
|
||||
mh.log.Info().Str("cluster_url", c.deprecatedCacheClusterDomain).Msg("checking for existing cluster members")
|
||||
|
||||
joined, err := mh.memberlist.Join([]string{c.deprecatedCacheClusterDomain, mh.memberlist.Members()[0].Addr.String()})
|
||||
if err != nil {
|
||||
return fmt.Errorf("memberlist: failed to join cluster: %w", err)
|
||||
}
|
||||
|
||||
mh.log.Info().Int("contacted", joined).Interface("members", mh.memberlist.Members()).Msg("contacted nodes")
|
||||
if mh.memberlist.NumMembers() > 1 {
|
||||
mh.log.Error().Msg("multiple cache servers not supported")
|
||||
}
|
||||
<-ctx.Done()
|
||||
err = mh.memberlist.Leave(1 * time.Second)
|
||||
if err != nil {
|
||||
mh.log.Error().Err(err).Msg("failed to leave cluster")
|
||||
}
|
||||
return mh.memberlist.Shutdown()
|
||||
}
|
||||
|
||||
func (mh *memberlistHandler) NotifyJoin(node *memberlist.Node) {
|
||||
mh.log.Debug().Interface("node", node).Msg("node joined")
|
||||
|
||||
go func(memberListNotNil bool) {
|
||||
if memberListNotNil && mh.memberlist.NumMembers() > 1 {
|
||||
mh.log.Error().Msg("detected multiple cache servers, which is not supported")
|
||||
}
|
||||
}(mh.memberlist != nil)
|
||||
}
|
||||
|
||||
func (mh *memberlistHandler) NotifyLeave(node *memberlist.Node) {
|
||||
mh.log.Debug().Interface("node", node).Msg("node left")
|
||||
}
|
||||
|
||||
func (mh *memberlistHandler) NotifyUpdate(node *memberlist.Node) {
|
||||
mh.log.Debug().Interface("node", node).Msg("node updated")
|
||||
}
|
||||
|
||||
func (mh *memberlistHandler) runLogHandler(r io.Reader) {
|
||||
br := bufio.NewReader(r)
|
||||
for {
|
||||
str, err := br.ReadString('\n')
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
mh.log.Debug().Msg(strings.TrimSpace(str))
|
||||
}
|
||||
}
|
49
cache/memberlist_test.go
vendored
49
cache/memberlist_test.go
vendored
|
@ -1,49 +0,0 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/pomerium/pomerium/config"
|
||||
"github.com/pomerium/pomerium/pkg/cryptutil"
|
||||
)
|
||||
|
||||
func TestCache_runMemberList(t *testing.T) {
|
||||
c, err := New(&config.Config{
|
||||
Options: &config.Options{
|
||||
SharedKey: cryptutil.NewBase64Key(),
|
||||
DataBrokerURL: &url.URL{Scheme: "http", Host: "member1"},
|
||||
Provider: "google",
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancelFunc()
|
||||
|
||||
ch := make(chan error)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ch <- c.runMemberList(ctx)
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// No error
|
||||
case err := <-ch:
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// When we're here, either there an error, or ch was closed already.
|
||||
assert.NoError(t, <-ch)
|
||||
wg.Wait()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue