zero/bundle-download: update metadata (#5212)

* zero/download: refresh metadata

* fix cmp
This commit is contained in:
Denis Mishin 2024-08-22 16:18:17 -04:00 committed by GitHub
parent 0503b41108
commit 99d7a73cef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 143 additions and 13 deletions

View file

@ -5,6 +5,7 @@ import (
"compress/gzip" "compress/gzip"
"context" "context"
"encoding/xml" "encoding/xml"
"errors"
"fmt" "fmt"
"io" "io"
"mime" "mime"
@ -24,6 +25,8 @@ const (
maxUncompressedBlobSize = 2 << 30 // 1gb maxUncompressedBlobSize = 2 << 30 // 1gb
) )
var ErrEtagChanged = errors.New("etag changed")
// DownloadClusterResourceBundle downloads given cluster resource bundle to given writer. // DownloadClusterResourceBundle downloads given cluster resource bundle to given writer.
func (api *API) DownloadClusterResourceBundle( func (api *API) DownloadClusterResourceBundle(
ctx context.Context, ctx context.Context,
@ -42,8 +45,15 @@ func (api *API) DownloadClusterResourceBundle(
} }
defer resp.Body.Close() defer resp.Body.Close()
log.Ctx(ctx).Trace().
Str("url_path", req.Request.URL.Path).
Interface("request_headers", req.Header).
Interface("response_headers", resp.Header).
Str("status", resp.Status).
Msg("bundle download request")
if resp.StatusCode == http.StatusNotModified { if resp.StatusCode == http.StatusNotModified {
return &DownloadResult{NotModified: true}, nil return newContentNotModifiedDownloadResult(resp.Header.Get("Last-Modified") != current.LastModified), nil
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
@ -71,10 +81,46 @@ func (api *API) DownloadClusterResourceBundle(
return nil, fmt.Errorf("cannot obtain cache conditions from response: %w", err) return nil, fmt.Errorf("cannot obtain cache conditions from response: %w", err)
} }
return &DownloadResult{ return newUpdatedDownloadResult(updated, extractMetadata(resp.Header, req.CaptureHeaders)), nil
DownloadConditional: updated, }
Metadata: extractMetadata(resp.Header, req.CaptureHeaders),
}, nil func (api *API) HeadClusterResourceBundle(
ctx context.Context,
id string,
etag string,
) (*DownloadResult, error) {
req, err := api.getHeadRequest(ctx, id)
if err != nil {
return nil, fmt.Errorf("get head request: %w", err)
}
resp, err := api.cfg.httpClient.Do(req.Request)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()
log.Ctx(ctx).Trace().
Str("url_path", req.Request.URL.Path).
Interface("request_headers", req.Header).
Interface("response_headers", resp.Header).
Str("status", resp.Status).
Msg("bundle metadata request")
if resp.StatusCode != http.StatusOK {
return nil, httpDownloadError(ctx, resp)
}
if gotEtag := resp.Header.Get("ETag"); gotEtag != etag {
return nil, ErrEtagChanged
}
updated, err := newConditionalFromResponse(resp)
if err != nil {
return nil, fmt.Errorf("cannot obtain cache conditions from response: %w", err)
}
return newUpdatedDownloadResult(updated, extractMetadata(resp.Header, req.CaptureHeaders)), nil
} }
type downloadRequest struct { type downloadRequest struct {
@ -105,6 +151,23 @@ func (api *API) getDownloadRequest(ctx context.Context, id string, current *Down
}, nil }, nil
} }
func (api *API) getHeadRequest(ctx context.Context, id string) (*downloadRequest, error) {
params, err := api.getDownloadParams(ctx, id)
if err != nil {
return nil, fmt.Errorf("get download URL: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, params.URL.String(), nil)
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
return &downloadRequest{
Request: req,
DownloadCacheEntry: *params,
}, nil
}
func (api *API) getDownloadParams(ctx context.Context, id string) (*cluster_api.DownloadCacheEntry, error) { func (api *API) getDownloadParams(ctx context.Context, id string) (*cluster_api.DownloadCacheEntry, error) {
param, ok := api.downloadURLCache.Get(id, api.cfg.downloadURLCacheTTL) param, ok := api.downloadURLCache.Get(id, api.cfg.downloadURLCacheTTL)
if ok { if ok {
@ -145,14 +208,35 @@ func (api *API) updateBundleDownloadParams(ctx context.Context, id string) (*clu
// DownloadResult contains the result of a download operation // DownloadResult contains the result of a download operation
type DownloadResult struct { type DownloadResult struct {
// NotModified is true if the bundle has not been modified // ContentUpdated indicates the bundle contents were updated
NotModified bool ContentUpdated bool
// MetadataUpdated indicates the metadata was updated
MetadataUpdated bool
// DownloadConditional contains the new conditional // DownloadConditional contains the new conditional
*DownloadConditional *DownloadConditional
// Metadata contains the metadata of the downloaded bundle // Metadata contains the metadata of the downloaded bundle
Metadata map[string]string Metadata map[string]string
} }
func newUpdatedDownloadResult(
updated *DownloadConditional,
metadata map[string]string,
) *DownloadResult {
return &DownloadResult{
ContentUpdated: true,
MetadataUpdated: true,
DownloadConditional: updated,
Metadata: metadata,
}
}
func newContentNotModifiedDownloadResult(metadataUpdated bool) *DownloadResult {
return &DownloadResult{
ContentUpdated: false,
MetadataUpdated: metadataUpdated,
}
}
// DownloadConditional contains the conditional headers for a download operation // DownloadConditional contains the conditional headers for a download operation
type DownloadConditional struct { type DownloadConditional struct {
ETag string ETag string
@ -161,8 +245,8 @@ type DownloadConditional struct {
// Validate validates the conditional headers // Validate validates the conditional headers
func (c *DownloadConditional) Validate() error { func (c *DownloadConditional) Validate() error {
if c.ETag == "" && c.LastModified == "" { if c.ETag == "" {
return fmt.Errorf("either ETag or LastModified must be set") return fmt.Errorf("ETag must be set")
} }
return nil return nil
} }
@ -176,7 +260,6 @@ func (c *DownloadConditional) SetHeaders(req *http.Request) error {
return err return err
} }
req.Header.Set("If-None-Match", c.ETag) req.Header.Set("If-None-Match", c.ETag)
req.Header.Set("If-Modified-Since", c.LastModified)
return nil return nil
} }
@ -240,6 +323,7 @@ func isXML(ct string) bool {
} }
func extractMetadata(header http.Header, keys []string) map[string]string { func extractMetadata(header http.Header, keys []string) map[string]string {
log.Info().Interface("header", header).Msg("extract metadata")
m := make(map[string]string) m := make(map[string]string)
for _, k := range keys { for _, k := range keys {
v := header.Get(k) v := header.Get(k)

View file

@ -20,6 +20,7 @@ import (
"github.com/pomerium/pomerium/internal/log" "github.com/pomerium/pomerium/internal/log"
"github.com/pomerium/pomerium/internal/retry" "github.com/pomerium/pomerium/internal/retry"
zero "github.com/pomerium/pomerium/internal/zero/api"
"github.com/pomerium/pomerium/pkg/grpc/databroker" "github.com/pomerium/pomerium/pkg/grpc/databroker"
) )
@ -140,17 +141,27 @@ func (c *service) syncBundle(ctx context.Context, key string) error {
return fmt.Errorf("download bundle: %w", err) return fmt.Errorf("download bundle: %w", err)
} }
if result.NotModified { if result.ContentUpdated {
log.Ctx(ctx).Debug().Str("bundle", key).Msg("bundle not changed") return c.syncUpdatedBundle(ctx, key, cached, result, fd)
}
if !result.MetadataUpdated {
log.Ctx(ctx).Debug().Str("id", key).Msg("bundle not updated")
return nil return nil
} }
if cached == nil {
return fmt.Errorf("invalid state: bundle metadata updated but no cached entry")
}
return c.getUpdatedMetadata(ctx, key, *cached, result)
}
func (c *service) syncUpdatedBundle(ctx context.Context, key string, cached *BundleCacheEntry, result *zero.DownloadResult, fd ReadWriteSeekCloser) error {
log.Ctx(ctx).Debug().Str("bundle", key). log.Ctx(ctx).Debug().Str("bundle", key).
Interface("cached-entry", cached). Interface("cached-entry", cached).
Interface("current-entry", result.DownloadConditional). Interface("current-entry", result.DownloadConditional).
Msg("bundle updated") Msg("bundle updated")
_, err = fd.Seek(0, io.SeekStart) _, err := fd.Seek(0, io.SeekStart)
if err != nil { if err != nil {
return fmt.Errorf("seek to start: %w", err) return fmt.Errorf("seek to start: %w", err)
} }
@ -184,6 +195,41 @@ func (c *service) syncBundle(ctx context.Context, key string) error {
return nil return nil
} }
func (c *service) getUpdatedMetadata(ctx context.Context, key string, cached BundleCacheEntry, result *zero.DownloadResult) error {
log.Ctx(ctx).Debug().Str("bundle", key).
Interface("cached-entry", cached).
Interface("current-entry", result.DownloadConditional).
Msg("bundle metadata updated")
result, err := c.config.api.HeadClusterResourceBundle(ctx, key, cached.ETag)
if err != nil {
return fmt.Errorf("get bundle metadata: %w", err)
}
current := BundleCacheEntry{
DownloadConditional: *result.DownloadConditional,
RecordTypes: cached.GetRecordTypes(),
}
log.Ctx(ctx).Debug().
Str("bundle", key).
Strs("record_types", current.RecordTypes).
Str("etag", current.ETag).
Str("last_modified", current.LastModified).
Interface("metadata", result.Metadata).
Msg("metadata updated")
err = c.SetBundleCacheEntry(ctx, key, current)
if err != nil {
err = fmt.Errorf("set bundle cache entry: %w", err)
c.ReportBundleAppliedFailure(key, fmt.Errorf("set bundle cache entry: %w", err))
return err
}
c.ReportBundleAppliedSuccess(key, result.Metadata)
return nil
}
func strUnion(a, b []string) []string { func strUnion(a, b []string) []string {
m := make(map[string]struct{}, len(a)+len(b)) m := make(map[string]struct{}, len(a)+len(b))
for _, s := range a { for _, s := range a {