databroker: implement leases (#2172)

* databroker: implement leases

* return error

* handle gRPC errors
This commit is contained in:
Caleb Doxsey 2021-05-10 13:30:25 -06:00 committed by GitHub
parent a54d43b937
commit 94aa0b1a48
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 2135 additions and 149 deletions

View file

@ -10,8 +10,10 @@ import (
"sync"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/pomerium/pomerium/config"
"github.com/pomerium/pomerium/internal/log"
@ -72,6 +74,34 @@ func (srv *Server) UpdateConfig(options ...ServerOption) {
}
}
// AcquireLease acquires a lease.
func (srv *Server) AcquireLease(ctx context.Context, req *databroker.AcquireLeaseRequest) (*databroker.AcquireLeaseResponse, error) {
_, span := trace.StartSpan(ctx, "databroker.grpc.AcquireLease")
defer span.End()
log.Info(ctx).
Str("peer", grpcutil.GetPeerAddr(ctx)).
Str("name", req.GetName()).
Dur("duration", req.GetDuration().AsDuration()).
Msg("acquire lease")
db, err := srv.getBackend()
if err != nil {
return nil, err
}
leaseID := uuid.NewString()
acquired, err := db.Lease(ctx, req.GetName(), leaseID, req.GetDuration().AsDuration())
if err != nil {
return nil, err
} else if !acquired {
return nil, status.Error(codes.AlreadyExists, "lease is already taken")
}
return &databroker.AcquireLeaseResponse{
Id: leaseID,
}, nil
}
// Get gets a record from the in-memory list.
func (srv *Server) Get(ctx context.Context, req *databroker.GetRequest) (*databroker.GetResponse, error) {
_, span := trace.StartSpan(ctx, "databroker.grpc.Get")
@ -169,6 +199,55 @@ func (srv *Server) Put(ctx context.Context, req *databroker.PutRequest) (*databr
}, nil
}
// ReleaseLease releases a lease.
func (srv *Server) ReleaseLease(ctx context.Context, req *databroker.ReleaseLeaseRequest) (*emptypb.Empty, error) {
_, span := trace.StartSpan(ctx, "databroker.grpc.ReleaseLease")
defer span.End()
log.Info(ctx).
Str("peer", grpcutil.GetPeerAddr(ctx)).
Str("name", req.GetName()).
Str("id", req.GetId()).
Msg("release lease")
db, err := srv.getBackend()
if err != nil {
return nil, err
}
_, err = db.Lease(ctx, req.GetName(), req.GetId(), -1)
if err != nil {
return nil, err
}
return new(emptypb.Empty), nil
}
// RenewLease releases a lease.
func (srv *Server) RenewLease(ctx context.Context, req *databroker.RenewLeaseRequest) (*emptypb.Empty, error) {
_, span := trace.StartSpan(ctx, "databroker.grpc.RenewLease")
defer span.End()
log.Info(ctx).
Str("peer", grpcutil.GetPeerAddr(ctx)).
Str("name", req.GetName()).
Str("id", req.GetId()).
Dur("duration", req.GetDuration().AsDuration()).
Msg("renew lease")
db, err := srv.getBackend()
if err != nil {
return nil, err
}
acquired, err := db.Lease(ctx, req.GetName(), req.GetId(), req.GetDuration().AsDuration())
if err != nil {
return nil, err
} else if !acquired {
return nil, status.Error(codes.AlreadyExists, "lease no longer held")
}
return new(emptypb.Empty), nil
}
// SetOptions sets options for a type in the databroker.
func (srv *Server) SetOptions(ctx context.Context, req *databroker.SetOptionsRequest) (*databroker.SetOptionsResponse, error) {
_, span := trace.StartSpan(ctx, "databroker.grpc.SetOptions")