cache : add cache service (#457)

Signed-off-by: Bobby DeSimone <bobbydesimone@gmail.com>
This commit is contained in:
Bobby DeSimone 2020-01-20 18:25:34 -08:00 committed by GitHub
parent 8a9cb0f803
commit dccc7cd2ff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 1837 additions and 587 deletions

View file

@ -0,0 +1,235 @@
package autocache
import (
"context"
"errors"
"fmt"
stdlog "log"
"net/http"
"sync"
"github.com/golang/groupcache"
"github.com/pomerium/autocache"
"github.com/pomerium/pomerium/internal/httputil"
"github.com/pomerium/pomerium/internal/kv"
"github.com/pomerium/pomerium/internal/urlutil"
)
// Name represents autocache's shorthand named.
const Name = "autocache"
const defaultQueryParamKey = "ati"
var _ kv.Store = &Store{}
// Store implements a the store interface for autocache, a distributed cache
// with gossip based peer membership enrollment.
// https://github.com/pomerium/autocache
type Store struct {
db *groupcache.Group
cluster *autocache.Autocache
sharedKey string
srv *http.Server
}
// Options represent autocache options.
type Options struct {
Addr string
CacheSize int64
ClusterDomain string
GetterFn groupcache.GetterFunc
Group string
Log *stdlog.Logger
Port int
Scheme string
SharedKey string
}
// DefaultOptions are the default options used by the autocache service.
var DefaultOptions = &Options{
Addr: ":8333",
Port: 8333,
Scheme: "http",
CacheSize: 10 << 20,
Group: "default",
GetterFn: func(ctx context.Context, id string, dest groupcache.Sink) error {
b := fromContext(ctx)
if len(b) == 0 {
return fmt.Errorf("autocache: empty ctx for id: %s", id)
}
if err := dest.SetBytes(b); err != nil {
return fmt.Errorf("autocache: sink error %w", err)
}
return nil
},
}
// New creates a new autocache key value store. Autocache will start several
// services to support distributed cluster management and membership.
// A HTTP server will be used by groupcache to perform cross node-RPC. By
// default that server will start on port ``:8333`.
// Memberlist will likewise start and listen for group membership on port
//
//
// NOTE: RPC communication between nodes is _authenticated_ but not encrypted.
// NOTE: Groupchache starts a HTTP listener (Default: :8333)
// NOTE: Memberlist starts a GOSSIP listener on TCP/UDP. (Default: :7946)
func New(o *Options) (*Store, error) {
var s Store
var err error
if o.SharedKey == "" {
return nil, errors.New("autocache: shared secret must be set")
}
if o.Addr == "" {
o.Addr = DefaultOptions.Addr
}
if o.Scheme == "" {
o.Scheme = DefaultOptions.Scheme
}
if o.Port == 0 {
o.Port = DefaultOptions.Port
}
if o.Group == "" {
o.Group = DefaultOptions.Group
}
if o.GetterFn == nil {
o.GetterFn = DefaultOptions.GetterFn
}
if o.CacheSize == 0 {
o.CacheSize = DefaultOptions.CacheSize
}
if o.ClusterDomain == "" {
o.Log.Println("")
}
s.db = groupcache.NewGroup(o.Group, o.CacheSize, o.GetterFn)
s.cluster, err = autocache.New(&autocache.Options{
PoolTransportFn: s.addSessionToCtx,
PoolScheme: o.Scheme,
PoolPort: o.Port,
Logger: o.Log,
})
if err != nil {
return nil, err
}
serverOpts := &httputil.ServerOptions{Addr: o.Addr}
var wg sync.WaitGroup
s.srv, err = httputil.NewServer(serverOpts, QueryParamToCtx(s.cluster), &wg)
if err != nil {
return nil, err
}
if _, err := s.cluster.Join([]string{o.ClusterDomain}); err != nil {
return nil, err
}
return &s, nil
}
// Set stores a key value pair. Since group cache actually only implements
// Get, we have to be a little creative in how we smuggle in value using
// context.
func (s Store) Set(ctx context.Context, k string, v []byte) error {
// smuggle the the value pair as a context value
ctx = newContext(ctx, v)
if err := s.db.Get(ctx, k, groupcache.AllocatingByteSliceSink(&v)); err != nil {
return fmt.Errorf("autocache: set %s failed: %w", k, err)
}
return nil
}
// Get retrieves the value for a key in the bucket.
func (s *Store) Get(ctx context.Context, k string) (bool, []byte, error) {
var value []byte
if err := s.db.Get(ctx, k, groupcache.AllocatingByteSliceSink(&value)); err != nil {
return false, nil, fmt.Errorf("autocache: get %s failed: %w", k, err)
}
return true, value, nil
}
// Close shuts down any HTTP server used for groupcache pool, and
// also stop any background maintenance of memberlist.
func (s Store) Close(ctx context.Context) error {
var retErr error
if s.srv != nil {
if err := s.srv.Shutdown(ctx); err != nil {
retErr = fmt.Errorf("autocache: http shutdown error: %w", err)
}
}
if s.cluster.Memberlist != nil {
if err := s.cluster.Memberlist.Shutdown(); err != nil {
retErr = fmt.Errorf("autocache: memberlist shutdown error: %w", err)
}
}
return retErr
}
// addSessionToCtx is a wrapper function that allows us to add a session
// into http client's round trip and sign the outgoing request.
func (s *Store) addSessionToCtx(ctx context.Context) http.RoundTripper {
var sh signedSession
sh.session = string(fromContext(ctx))
sh.sharedKey = s.sharedKey
return sh
}
type signedSession struct {
session string
sharedKey string
}
// RoundTrip copies the request's session context and adds it to the
// outgoing client request as a query param. The whole URL is then signed for
// authenticity.
func (s signedSession) RoundTrip(req *http.Request) (*http.Response, error) {
// clone request before mutating
// https://golang.org/src/net/http/client.go?s=4306:5535#L105
newReq := cloneRequest(req)
session := s.session
newReqURL := *newReq.URL
q := newReqURL.Query()
q.Set(defaultQueryParamKey, session)
newReqURL.RawQuery = q.Encode()
newReq.URL = urlutil.NewSignedURL(s.sharedKey, &newReqURL).Sign()
return http.DefaultTransport.RoundTrip(newReq)
}
// QueryParamToCtx takes a value from a query param and adds it to the
// current request request context.
func QueryParamToCtx(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
session := r.FormValue(defaultQueryParamKey)
ctx := newContext(r.Context(), []byte(session))
next.ServeHTTP(w, r.WithContext(ctx))
})
}
var sessionCtxKey = &contextKey{"PomeriumCachedSessionBytes"}
type contextKey struct {
name string
}
func newContext(ctx context.Context, b []byte) context.Context {
ctx = context.WithValue(ctx, sessionCtxKey, b)
return ctx
}
func fromContext(ctx context.Context) []byte {
b, _ := ctx.Value(sessionCtxKey).([]byte)
return b
}
func cloneRequest(req *http.Request) *http.Request {
r := new(http.Request)
*r = *req
r.Header = cloneHeaders(req.Header)
return r
}
func cloneHeaders(in http.Header) http.Header {
out := make(http.Header, len(in))
for key, values := range in {
newValues := make([]string, len(values))
copy(newValues, values)
out[key] = newValues
}
return out
}

109
internal/kv/bolt/bolt.go Normal file
View file

@ -0,0 +1,109 @@
package bolt
import (
"context"
"github.com/pomerium/pomerium/internal/kv"
bolt "go.etcd.io/bbolt"
)
var _ kv.Store = &Store{}
// Name represents bbolt's shorthand named.
const Name = "bolt"
// Store implements a the Store interface for bolt.
// https://godoc.org/github.com/etcd-io/bbolt
type Store struct {
db *bolt.DB
bucket string
}
// Options represents options for configuring the boltdb cache store.
type Options struct {
// Buckets are collections of key/value pairs within the database.
// All keys in a bucket must be unique.
Bucket string
// Path is where the database file will be stored.
Path string
}
// DefaultOptions contain's bolts default options.
var DefaultOptions = &Options{
Bucket: "default",
Path: Name + ".db",
}
// New creates a new bolt cache store.
// It is up to the operator to make sure that the store's path
// is writeable.
func New(o *Options) (*Store, error) {
if o.Path == "" {
o.Path = DefaultOptions.Path
}
if o.Bucket == "" {
o.Bucket = DefaultOptions.Bucket
}
db, err := bolt.Open(o.Path, 0600, nil)
if err != nil {
return nil, err
}
err = db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(o.Bucket))
if err != nil {
return err
}
return nil
})
if err != nil {
return nil, err
}
return &Store{db: db, bucket: o.Bucket}, nil
}
// Set sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction,
// if the key is blank, if the key is too large, or if the value is too large.
func (s Store) Set(ctx context.Context, k string, v []byte) error {
err := s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(s.bucket))
return b.Put([]byte(k), v)
})
if err != nil {
return err
}
return nil
}
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (s *Store) Get(ctx context.Context, k string) (bool, []byte, error) {
var value []byte
err := s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(s.bucket))
txData := b.Get([]byte(k)) // only valid in transaction
value = append(txData[:0:0], txData...)
return nil
})
if err != nil {
return false, nil, err
}
if value == nil {
return false, nil, nil
}
return true, value, nil
}
// Close releases all database resources.
// It will block waiting for any open transactions to finish
// before closing the database and returning.
func (s Store) Close(ctx context.Context) error {
return s.db.Close()
}

View file

@ -0,0 +1,89 @@
package redis
import (
"context"
"crypto/tls"
"errors"
"fmt"
"github.com/go-redis/redis"
"github.com/pomerium/pomerium/internal/kv"
)
var _ kv.Store = &Store{}
// Name represents redis's shorthand name.
const Name = "redis"
// Store implements a the Store interface for redis.
// https://godoc.org/github.com/go-redis/redis
type Store struct {
db *redis.Client
}
// Options represents options for configuring the redis store.
type Options struct {
// host:port Addr.
Addr string
// Optional password. Must match the password specified in the
// requirepass server configuration option.
Password string
// Database to be selected after connecting to the server.
DB int
// TLS Config to use. When set TLS will be negotiated.
TLSConfig *tls.Config
}
// New creates a new redis cache store.
// It is up to the operator to make sure that the store's path
// is writeable.
func New(o *Options) (*Store, error) {
if o.Addr == "" {
return nil, fmt.Errorf("kv/redis: connection address is required")
}
db := redis.NewClient(
&redis.Options{
Addr: o.Addr,
Password: o.Password,
DB: o.DB,
TLSConfig: o.TLSConfig,
})
if _, err := db.Ping().Result(); err != nil {
return nil, fmt.Errorf("kv/redis: error connecting to redis: %w", err)
}
return &Store{db: db}, nil
}
// Set is equivalent to redis `SET key value [expiration]` command.
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
func (s Store) Set(ctx context.Context, k string, v []byte) error {
if err := s.db.Set(k, string(v), 0).Err(); err != nil {
return err
}
return nil
}
// Get is equivalent to Redis `GET key` command.
// It returns redis.Nil error when key does not exist.
func (s *Store) Get(ctx context.Context, k string) (bool, []byte, error) {
v, err := s.db.Get(k).Result()
if errors.Is(err, redis.Nil) {
return false, nil, nil
} else if err != nil {
return false, nil, err
}
return true, []byte(v), nil
}
// Close closes the client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (s Store) Close(ctx context.Context) error {
return s.db.Close()
}

10
internal/kv/store.go Normal file
View file

@ -0,0 +1,10 @@
package kv
import "context"
// Store specifies a key value storage interface.
type Store interface {
Set(ctx context.Context, key string, value []byte) error
Get(ctx context.Context, key string) (keyExists bool, value []byte, err error)
Close(ctx context.Context) error
}