diff --git a/cache/cache.go b/cache/cache.go index 66eaf1cc1..9ad68afa4 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -10,9 +10,9 @@ import ( "net" "sync" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "gopkg.in/tomb.v2" "github.com/pomerium/pomerium/config" "github.com/pomerium/pomerium/internal/directory" @@ -118,24 +118,19 @@ func (c *Cache) Register(grpcServer *grpc.Server) { // Run runs the cache components. func (c *Cache) Run(ctx context.Context) error { - t, ctx := tomb.WithContext(ctx) - if c.dataBrokerStorageType == config.StorageInMemoryName { - t.Go(func() error { - return c.runMemberList(ctx) - }) - } - t.Go(func() error { + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { return c.localGRPCServer.Serve(c.localListener) }) - t.Go(func() error { + eg.Go(func() error { <-ctx.Done() c.localGRPCServer.Stop() return nil }) - t.Go(func() error { + eg.Go(func() error { return c.manager.Run(ctx) }) - return t.Wait() + return eg.Wait() } func (c *Cache) update(cfg *config.Config) error { diff --git a/cache/memberlist.go b/cache/memberlist.go deleted file mode 100644 index 4c68d8a9c..000000000 --- a/cache/memberlist.go +++ /dev/null @@ -1,96 +0,0 @@ -package cache - -import ( - "bufio" - "context" - "errors" - "fmt" - "io" - stdlog "log" - "strings" - "time" - - "github.com/hashicorp/memberlist" - "github.com/rs/zerolog" - - "github.com/pomerium/pomerium/internal/log" -) - -type memberlistHandler struct { - cfg *memberlist.Config - memberlist *memberlist.Memberlist - log zerolog.Logger -} - -func (c *Cache) runMemberList(ctx context.Context) error { - mh := new(memberlistHandler) - mh.log = log.With().Str("service", "memberlist").Logger() - - pr, pw := io.Pipe() - defer pw.Close() - defer pr.Close() - - mh.cfg = memberlist.DefaultLANConfig() - mh.cfg.Events = mh - mh.cfg.Logger = stdlog.New(pw, "", 0) - go mh.runLogHandler(pr) - - var err error - mh.memberlist, err = memberlist.Create(mh.cfg) - if err != nil { - return fmt.Errorf("memberlist: error creating memberlist: %w", err) - } - - // the only way memberlist would be empty here, following create is if - // the current node suddenly died. Still, we check to be safe. - if len(mh.memberlist.Members()) == 0 { - return errors.New("memberlist: can't find self") - } - - mh.log.Info().Str("cluster_url", c.deprecatedCacheClusterDomain).Msg("checking for existing cluster members") - - joined, err := mh.memberlist.Join([]string{c.deprecatedCacheClusterDomain, mh.memberlist.Members()[0].Addr.String()}) - if err != nil { - return fmt.Errorf("memberlist: failed to join cluster: %w", err) - } - - mh.log.Info().Int("contacted", joined).Interface("members", mh.memberlist.Members()).Msg("contacted nodes") - if mh.memberlist.NumMembers() > 1 { - mh.log.Error().Msg("multiple cache servers not supported") - } - <-ctx.Done() - err = mh.memberlist.Leave(1 * time.Second) - if err != nil { - mh.log.Error().Err(err).Msg("failed to leave cluster") - } - return mh.memberlist.Shutdown() -} - -func (mh *memberlistHandler) NotifyJoin(node *memberlist.Node) { - mh.log.Debug().Interface("node", node).Msg("node joined") - - go func(memberListNotNil bool) { - if memberListNotNil && mh.memberlist.NumMembers() > 1 { - mh.log.Error().Msg("detected multiple cache servers, which is not supported") - } - }(mh.memberlist != nil) -} - -func (mh *memberlistHandler) NotifyLeave(node *memberlist.Node) { - mh.log.Debug().Interface("node", node).Msg("node left") -} - -func (mh *memberlistHandler) NotifyUpdate(node *memberlist.Node) { - mh.log.Debug().Interface("node", node).Msg("node updated") -} - -func (mh *memberlistHandler) runLogHandler(r io.Reader) { - br := bufio.NewReader(r) - for { - str, err := br.ReadString('\n') - if err != nil { - break - } - mh.log.Debug().Msg(strings.TrimSpace(str)) - } -} diff --git a/cache/memberlist_test.go b/cache/memberlist_test.go deleted file mode 100644 index 2ff12b5b9..000000000 --- a/cache/memberlist_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package cache - -import ( - "context" - "net/url" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/pomerium/pomerium/config" - "github.com/pomerium/pomerium/pkg/cryptutil" -) - -func TestCache_runMemberList(t *testing.T) { - c, err := New(&config.Config{ - Options: &config.Options{ - SharedKey: cryptutil.NewBase64Key(), - DataBrokerURL: &url.URL{Scheme: "http", Host: "member1"}, - Provider: "google", - }, - }) - require.NoError(t, err) - - ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second) - defer cancelFunc() - - ch := make(chan error) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ch <- c.runMemberList(ctx) - close(ch) - }() - - select { - case <-ctx.Done(): - // No error - case err := <-ch: - assert.NoError(t, err) - } - - // When we're here, either there an error, or ch was closed already. - assert.NoError(t, <-ch) - wg.Wait() -} diff --git a/go.mod b/go.mod index 725a3b015..5987b7267 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,6 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 - github.com/hashicorp/memberlist v0.2.2 github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect github.com/lithammer/shortuuid/v3 v3.0.4 github.com/mitchellh/hashstructure v1.0.0 @@ -70,6 +69,5 @@ require ( gopkg.in/cookieo9/resources-go.v2 v2.0.0-20150225115733-d27c04069d0d gopkg.in/ini.v1 v1.51.1 // indirect gopkg.in/square/go-jose.v2 v2.5.1 - gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 62a36e396..524d2f276 100644 --- a/go.sum +++ b/go.sum @@ -323,8 +323,6 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= -github.com/hashicorp/memberlist v0.2.2 h1:5+RffWKwqJ71YPu9mWsF7ZOscZmwfasdA8kbdC7AO2g= -github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -391,7 +389,6 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/mholt/acmez v0.1.1 h1:KQODCqk+hBn3O7qfCRPj6L96uG65T5BSS95FKNEqtdA= github.com/mholt/acmez v0.1.1/go.mod h1:8qnn8QA/Ewx8E3ZSsmscqsIjhhpxuy9vqdgbX2ceceM= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.30 h1:Qww6FseFn8PRfw07jueqIXqodm0JKiiKuK0DeXSqfyo= github.com/miekg/dns v1.1.30/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= @@ -646,11 +643,11 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 h1:phUcVbl53swtrUN8kQEXFhUxPlIlWyBfKmidCu7P95o= golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -762,7 +759,6 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -826,7 +822,6 @@ golang.org/x/tools v0.0.0-20190624222133-a101b041ded4/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -925,6 +920,7 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 h1:Rt0FRalMgdSlXAVJvX4pr65KfqaxHXSLkSJRD9pw6g0= google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -979,8 +975,6 @@ gopkg.in/square/go-jose.v2 v2.5.1 h1:7odma5RETjNHWJnR32wx8t+Io4djHE1PqxCFx3iiZ2w gopkg.in/square/go-jose.v2 v2.5.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/identity/manager/manager.go b/internal/identity/manager/manager.go index 1d277941b..2c9dadf3d 100644 --- a/internal/identity/manager/manager.go +++ b/internal/identity/manager/manager.go @@ -12,7 +12,7 @@ import ( "github.com/google/btree" "github.com/rs/zerolog" "golang.org/x/oauth2" - "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" "github.com/pomerium/pomerium/internal/directory" "github.com/pomerium/pomerium/internal/identity/identity" @@ -101,33 +101,33 @@ func (mgr *Manager) Run(ctx context.Context) error { return fmt.Errorf("failed to initialize directory users: %w", err) } - t, ctx := tomb.WithContext(ctx) + eg, ctx := errgroup.WithContext(ctx) updatedSession := make(chan sessionMessage, 1) - t.Go(func() error { + eg.Go(func() error { return mgr.syncSessions(ctx, updatedSession) }) updatedUser := make(chan userMessage, 1) - t.Go(func() error { + eg.Go(func() error { return mgr.syncUsers(ctx, updatedUser) }) updatedDirectoryGroup := make(chan *directory.Group, 1) - t.Go(func() error { + eg.Go(func() error { return mgr.syncDirectoryGroups(ctx, updatedDirectoryGroup) }) updatedDirectoryUser := make(chan *directory.User, 1) - t.Go(func() error { + eg.Go(func() error { return mgr.syncDirectoryUsers(ctx, updatedDirectoryUser) }) - t.Go(func() error { + eg.Go(func() error { return mgr.refreshLoop(ctx, updatedSession, updatedUser, updatedDirectoryUser, updatedDirectoryGroup) }) - return t.Wait() + return eg.Wait() } func (mgr *Manager) refreshLoop(