internal/databroker: store server version (#1121)

Storing server version when creating new server. After then, we can
retrieve the version from backend when server restart.

With storage backend which supports persistent, the server version
won't change after restarting.
This commit is contained in:
Cuong Manh Le 2020-07-22 03:50:22 +07:00 committed by GitHub
parent 26f099b49d
commit 99785cbb5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 347 additions and 178 deletions

View file

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
"github.com/rs/zerolog"
@ -22,6 +23,11 @@ import (
"github.com/pomerium/pomerium/pkg/storage/inmemory"
)
const (
recordTypeServerVersion = "server_version"
serverVersionKey = "version"
)
// Server implements the databroker service using an in memory database.
type Server struct {
version string
@ -44,6 +50,8 @@ func New(options ...ServerOption) *Server {
byType: make(map[string]storage.Backend),
onchange: NewSignal(),
}
srv.initVersion()
go func() {
ticker := time.NewTicker(cfg.deletePermanentlyAfter / 2)
defer ticker.Stop()
@ -64,6 +72,28 @@ func New(options ...ServerOption) *Server {
return srv
}
func (srv *Server) initVersion() {
dbServerVersion := srv.getDB(recordTypeServerVersion)
if dbServerVersion == nil {
return
}
// Get version from storage first.
if r := dbServerVersion.Get(context.Background(), serverVersionKey); r != nil {
var sv databroker.ServerVersion
if err := ptypes.UnmarshalAny(r.GetData(), &sv); err != nil {
srv.log.Debug().Str("server_version", sv.Version).Msg("got db version from DB")
srv.version = sv.Version
}
return
}
data, _ := ptypes.MarshalAny(&databroker.ServerVersion{Version: srv.version})
if err := dbServerVersion.Put(context.Background(), serverVersionKey, data); err != nil {
srv.log.Warn().Err(err).Msg("failed to save server version.")
}
}
// Delete deletes a record from the in-memory list.
func (srv *Server) Delete(ctx context.Context, req *databroker.DeleteRequest) (*empty.Empty, error) {
_, span := trace.StartSpan(ctx, "databroker.grpc.Delete")