databroker: remove unused installation id, close streams when backend is closed (#2062)

This commit is contained in:
Caleb Doxsey 2021-04-06 13:41:19 -06:00 committed by GitHub
parent 187d0a0195
commit 294addd857
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 10 deletions

View file

@ -35,7 +35,6 @@ func (srv *dataBrokerServer) OnConfigChange(cfg *config.Config) {
func (srv *dataBrokerServer) getOptions(cfg *config.Config) []databroker.ServerOption {
cert, _ := cfg.Options.GetDataBrokerCertificate()
return []databroker.ServerOption{
databroker.WithInstallationID(cfg.Options.InstallationID),
databroker.WithSharedKey(cfg.Options.SharedKey),
databroker.WithStorageType(cfg.Options.DataBrokerStorageType),
databroker.WithStorageConnectionString(cfg.Options.DataBrokerStorageConnectionString),

View file

@ -20,7 +20,6 @@ var (
)
type serverConfig struct {
installationID string
deletePermanentlyAfter time.Duration
secret []byte
storageType string
@ -61,13 +60,6 @@ func WithGetAllPageSize(pageSize int) ServerOption {
}
}
// WithInstallationID sets the installation id in the config.
func WithInstallationID(installationID string) ServerOption {
return func(cfg *serverConfig) {
cfg.installationID = installationID
}
}
// WithSharedKey sets the secret in the config.
func WithSharedKey(sharedKey string) ServerOption {
return func(cfg *serverConfig) {

View file

@ -30,6 +30,14 @@ func newRecordStream(ctx context.Context, backend *Backend, version uint64) *rec
closed: make(chan struct{}),
}
// if the backend is closed, close the stream
go func() {
select {
case <-stream.closed:
case <-backend.closed:
_ = stream.Close()
}
}()
return stream
}

View file

@ -27,7 +27,7 @@ type recordStream struct {
}
func newRecordStream(ctx context.Context, backend *Backend, version uint64) *recordStream {
return &recordStream{
stream := &recordStream{
ctx: ctx,
backend: backend,
@ -36,6 +36,15 @@ func newRecordStream(ctx context.Context, backend *Backend, version uint64) *rec
closed: make(chan struct{}),
}
// if the backend is closed, close the stream
go func() {
select {
case <-stream.closed:
case <-backend.closed:
_ = stream.Close()
}
}()
return stream
}
func (stream *recordStream) Close() error {