Skip to content

Commit

Permalink
Add digest -> specific variant, annotation data to BIC
Browse files Browse the repository at this point in the history
The cache implementations are recording both the base and specific compression variant;
CandidateLocations2 all call CandidateTemplateWithCompression to choose the
appropriate variants to return based on CandidateLocations2Options.

This way, neither the BIC implementations nor the transports are not responsible for
converting zstd:chunked entries to zstd entries if the user wants the latter.

Signed-off-by: Miloslav Trmač <[email protected]>
  • Loading branch information
mtrmac committed Aug 6, 2024
1 parent 5dcb348 commit f9d27e8
Show file tree
Hide file tree
Showing 10 changed files with 387 additions and 113 deletions.
8 changes: 6 additions & 2 deletions copy/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
// with the same digest and both ZstdAlgorithmName and ZstdChunkedAlgorithmName , which causes warnings about
// inconsistent data to be logged.
c.blobInfoCache.RecordDigestCompressorData(uploadedInfo.Digest, internalblobinfocache.DigestCompressorData{
BaseVariantCompressor: d.uploadedCompressorName,
BaseVariantCompressor: d.uploadedCompressorName,
SpecificVariantCompressor: internalblobinfocache.UnknownCompression,
SpecificVariantAnnotations: nil,
})
}
}
Expand All @@ -361,7 +363,9 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
// blob as is, or perhaps decompressed it; either way we don’t trust the TOC digest,
// so record neither the variant name, nor the TOC digest.
c.blobInfoCache.RecordDigestCompressorData(srcInfo.Digest, internalblobinfocache.DigestCompressorData{
BaseVariantCompressor: d.srcCompressorBaseVariantName,
BaseVariantCompressor: d.srcCompressorBaseVariantName,
SpecificVariantCompressor: internalblobinfocache.UnknownCompression,
SpecificVariantAnnotations: nil,
})
}
return nil
Expand Down
21 changes: 14 additions & 7 deletions internal/blobinfocache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ type BlobInfoCache2 interface {

// RecordDigestCompressorData records data for the blob with the specified digest.
// WARNING: Only call this with LOCALLY VERIFIED data:
// - don’t record a compressor for a digest just because some remote author claims so
// (e.g. because a manifest says so);
// - don’t record a compressor for a digest just because some remote author claims so
// (e.g. because a manifest says so);
// - don’t record the non-base variant or annotations if we are not _sure_ that the base variant
// and the blob’s digest match the non-base variant’s annotations (e.g. because we saw them
// in a manifest)
// otherwise the cache could be poisoned and cause us to make incorrect edits to type
// information in a manifest.
RecordDigestCompressorData(anyDigest digest.Digest, data DigestCompressorData)
Expand All @@ -52,6 +55,9 @@ type BlobInfoCache2 interface {
// (This is worded generically, but basically targeted at the zstd / zstd:chunked situation.)
type DigestCompressorData struct {
BaseVariantCompressor string // A compressor’s base variant name, or Uncompressed or UnknownCompression.
// The following fields are only valid if the base variant is neither Uncompressed nor UnknownCompression:
SpecificVariantCompressor string // A non-base variant compressor (or UnknownCompression if the true format is just the base variant)
SpecificVariantAnnotations map[string]string // Annotations required to benefit from the base variant.
}

// CandidateLocations2Options are used in CandidateLocations2.
Expand All @@ -66,9 +72,10 @@ type CandidateLocations2Options struct {

// BICReplacementCandidate2 is an item returned by BlobInfoCache2.CandidateLocations2.
type BICReplacementCandidate2 struct {
Digest digest.Digest
CompressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed
CompressionAlgorithm *compressiontypes.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed
UnknownLocation bool // is true when `Location` for this blob is not set
Location types.BICLocationReference // not set if UnknownLocation is set to `true`
Digest digest.Digest
CompressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed
CompressionAlgorithm *compressiontypes.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed
CompressionAnnotations map[string]string // If necessary, annotations necessary to use CompressionAlgorithm
UnknownLocation bool // is true when `Location` for this blob is not set
Location types.BICLocationReference // not set if UnknownLocation is set to `true`
}
5 changes: 0 additions & 5 deletions internal/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ type ReuseConditions struct {
// (which can be nil to represent uncompressed or unknown) matches reuseConditions.
func CandidateCompressionMatchesReuseConditions(c ReuseConditions, candidateCompression *compressiontypes.Algorithm) bool {
if c.RequiredCompression != nil {
if c.RequiredCompression.Name() == compressiontypes.ZstdChunkedAlgorithmName {
// HACK: Never match when the caller asks for zstd:chunked, because we don’t record the annotations required to use the chunked blobs.
// The caller must re-compress to build those annotations.
return false
}
if candidateCompression == nil ||
(c.RequiredCompression.Name() != candidateCompression.Name() && c.RequiredCompression.Name() != candidateCompression.BaseVariantName()) {
return false
Expand Down
3 changes: 3 additions & 0 deletions internal/manifest/manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func TestCandidateCompressionMatchesReuseConditions(t *testing.T) {
}{
// RequiredCompression restrictions
{&compression.Zstd, nil, &compression.Zstd, true},
{&compression.Zstd, nil, &compression.ZstdChunked, true},
{&compression.ZstdChunked, nil, &compression.Zstd, false},
{&compression.ZstdChunked, nil, &compression.ZstdChunked, true},
{&compression.Gzip, nil, &compression.Zstd, false},
{&compression.Zstd, nil, nil, false},
{nil, nil, &compression.Zstd, true},
Expand Down
92 changes: 77 additions & 15 deletions pkg/blobinfocache/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package boltdb

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/fs"
Expand Down Expand Up @@ -30,6 +32,10 @@ var (
// digestCompressorBucket stores a mapping from any digest to a compressor, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression).
// It may not exist in caches created by older versions, even if uncompressedDigestBucket is present.
digestCompressorBucket = []byte("digestCompressor")
// digestSpecificVariantCompressorBucket stores a mapping from any digest to a (compressor, NUL byte, annotations as JSON), valid
// only if digestCompressorBucket contains a value. The compressor is not `UnknownCompression`.
digestSpecificVariantCompressorBucket = []byte("digestSpecificVariantCompressor")
// It may not exist in caches created by older versions, even if digestCompressorBucket is present.
// digestByUncompressedBucket stores a bucket per uncompressed digest, with the bucket containing a set of digests for that uncompressed digest
// (as a set of key=digest, value="" pairs)
digestByUncompressedBucket = []byte("digestByUncompressed")
Expand Down Expand Up @@ -299,25 +305,68 @@ func (bdc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompresse
// WARNING: Only call this with LOCALLY VERIFIED data:
// - don’t record a compressor for a digest just because some remote author claims so
// (e.g. because a manifest says so);
// - don’t record the non-base variant or annotations if we are not _sure_ that the base variant
// and the blob’s digest match the non-base variant’s annotations (e.g. because we saw them
// in a manifest)
//
// otherwise the cache could be poisoned and cause us to make incorrect edits to type
// information in a manifest.
func (bdc *cache) RecordDigestCompressorData(anyDigest digest.Digest, data blobinfocache.DigestCompressorData) {
_ = bdc.update(func(tx *bolt.Tx) error {
key := []byte(anyDigest.String())

b, err := tx.CreateBucketIfNotExists(digestCompressorBucket)
if err != nil {
return err
}
key := []byte(anyDigest.String())
warned := false
if previousBytes := b.Get(key); previousBytes != nil {
if string(previousBytes) != data.BaseVariantCompressor {
logrus.Warnf("Compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, string(previousBytes), data.BaseVariantCompressor)
warned = true
}
}
if data.BaseVariantCompressor == blobinfocache.UnknownCompression {
return b.Delete(key)
if err := b.Delete(key); err != nil {
return err
}
if b := tx.Bucket(digestSpecificVariantCompressorBucket); b != nil {
if err := b.Delete(key); err != nil {
return err
}
}
}
return b.Put(key, []byte(data.BaseVariantCompressor))
if err := b.Put(key, []byte(data.BaseVariantCompressor)); err != nil {
return err
}

if data.SpecificVariantCompressor != blobinfocache.UnknownCompression {
b, err := tx.CreateBucketIfNotExists(digestSpecificVariantCompressorBucket)
if err != nil {
return err
}
if !warned { // Don’t warn twice about the same digest
if previousBytes := b.Get(key); previousBytes != nil {
if prevSVCBytes, _, ok := bytes.Cut(previousBytes, []byte{0}); ok {
prevSVC := string(prevSVCBytes)
if data.SpecificVariantCompressor != prevSVC {
logrus.Warnf("Specific compressor for blob with digest %s previously recorded as %s, now %s", anyDigest, prevSVC, data.SpecificVariantCompressor)
}
}
}
}
annotations, err := json.Marshal(data.SpecificVariantAnnotations)
if err != nil {
return err
}
data := bytes.Clone([]byte(data.SpecificVariantCompressor))
data = append(data, 0)
data = append(data, annotations...)
if err := b.Put(key, data); err != nil {
return err
}
}
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

Expand Down Expand Up @@ -354,24 +403,36 @@ func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope type

// appendReplacementCandidates creates prioritize.CandidateWithTime values for digest in scopeBucket
// (which might be nil) with corresponding compression
// info from compressionBucket (which might be nil), and returns the result of appending them
// info from compressionBucket and specificVariantCompresssionBucket (which might be nil), and returns the result of appending them
// to candidates.
// v2Options is not nil if the caller is CandidateLocations2: this allows including candidates with unknown location, and filters out candidates
// with unknown compression.
func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket *bolt.Bucket, digest digest.Digest,
v2Options *blobinfocache.CandidateLocations2Options) []prioritize.CandidateWithTime {
func (bdc *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, scopeBucket, compressionBucket, specificVariantCompresssionBucket *bolt.Bucket,
digest digest.Digest, v2Options *blobinfocache.CandidateLocations2Options) []prioritize.CandidateWithTime {
digestKey := []byte(digest.String())
compressorName := blobinfocache.UnknownCompression
compressionData := blobinfocache.DigestCompressorData{
BaseVariantCompressor: blobinfocache.UnknownCompression,
SpecificVariantCompressor: blobinfocache.UnknownCompression,
SpecificVariantAnnotations: nil,
}
if compressionBucket != nil {
// the bucket won't exist if the cache was created by a v1 implementation and
// hasn't yet been updated by a v2 implementation
if compressorNameValue := compressionBucket.Get(digestKey); len(compressorNameValue) > 0 {
compressorName = string(compressorNameValue)
compressionData.BaseVariantCompressor = string(compressorNameValue)
}
if specificVariantCompresssionBucket != nil {
if svcData := specificVariantCompresssionBucket.Get(digestKey); svcData != nil {
if compressorBytes, annotationBytes, ok := bytes.Cut(svcData, []byte{0}); ok {
compressionData.SpecificVariantCompressor = string(compressorBytes)
if err := json.Unmarshal(annotationBytes, &compressionData.SpecificVariantAnnotations); err != nil {
return candidates // FIXME? Log error (but throttle the log volume on repeated accesses)?
}
}
}
}
}
template := prioritize.CandidateTemplateWithCompression(v2Options, digest, blobinfocache.DigestCompressorData{
BaseVariantCompressor: compressorName,
})
template := prioritize.CandidateTemplateWithCompression(v2Options, digest, compressionData)
if template == nil {
return candidates
}
Expand Down Expand Up @@ -416,11 +477,12 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types
if scopeBucket != nil {
scopeBucket = scopeBucket.Bucket([]byte(scope.Opaque))
}
// compressionBucket won't have been created if previous writers never recorded info about compression,
// compressionBucket and svCompressionBucket won't have been created if previous writers never recorded info about compression,
// and we don't want to fail just because of that
compressionBucket := tx.Bucket(digestCompressorBucket)
specificVariantCompressionBucket := tx.Bucket(digestSpecificVariantCompressorBucket)

res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, primaryDigest, v2Options)
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, specificVariantCompressionBucket, primaryDigest, v2Options)
if canSubstitute {
if uncompressedDigestValue = bdc.uncompressedDigest(tx, primaryDigest); uncompressedDigestValue != "" {
b := tx.Bucket(digestByUncompressedBucket)
Expand All @@ -433,7 +495,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types
return err
}
if d != primaryDigest && d != uncompressedDigestValue {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, d, v2Options)
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, specificVariantCompressionBucket, d, v2Options)
}
return nil
}); err != nil {
Expand All @@ -442,7 +504,7 @@ func (bdc *cache) candidateLocations(transport types.ImageTransport, scope types
}
}
if uncompressedDigestValue != primaryDigest {
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, uncompressedDigestValue, v2Options)
res = bdc.appendReplacementCandidates(res, scopeBucket, compressionBucket, specificVariantCompressionBucket, uncompressedDigestValue, v2Options)
}
}
}
Expand Down
70 changes: 50 additions & 20 deletions pkg/blobinfocache/internal/prioritize/prioritize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ const replacementUnknownLocationAttempts = 2
// CandidateTemplate is a subset of BICReplacementCandidate2 with data related to a specific digest,
// which can be later combined with information about a location.
type CandidateTemplate struct {
digest digest.Digest
compressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed
compressionAlgorithm *compression.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed
digest digest.Digest
compressionOperation types.LayerCompression // Either types.Decompress for uncompressed, or types.Compress for compressed
compressionAlgorithm *compression.Algorithm // An algorithm when the candidate is compressed, or nil when it is uncompressed
compressionAnnotations map[string]string // If necessary, annotations necessary to use compressionAlgorithm
}

// CandidateTemplateWithCompression returns a CandidateTemplate if a blob with data is acceptable
Expand All @@ -40,7 +41,7 @@ type CandidateTemplate struct {
// if not nil, the call is assumed to be CandidateLocations2.
func CandidateTemplateWithCompression(v2Options *blobinfocache.CandidateLocations2Options, digest digest.Digest, data blobinfocache.DigestCompressorData) *CandidateTemplate {
if v2Options == nil {
return &CandidateTemplate{ // Anything goes. The compressionOperation, compressionAlgorithm values are not used.
return &CandidateTemplate{ // Anything goes. The compressionOperation, compressionAlgorithm and compressionAnnotations values are not used.
digest: digest,
}
}
Expand All @@ -60,14 +61,40 @@ func CandidateTemplateWithCompression(v2Options *blobinfocache.CandidateLocation
return nil
}
return &CandidateTemplate{
digest: digest,
compressionOperation: types.Decompress,
compressionAlgorithm: nil,
digest: digest,
compressionOperation: types.Decompress,
compressionAlgorithm: nil,
compressionAnnotations: nil,
}
case blobinfocache.UnknownCompression:
logrus.Debugf("Ignoring BlobInfoCache record of digest %q with unknown compression", digest.String())
return nil // Not allowed with CandidateLocations2
default:
// See if we can use the specific variant, first.
if data.SpecificVariantCompressor != blobinfocache.UnknownCompression {
algo, err := compression.AlgorithmByName(data.SpecificVariantCompressor)
if err != nil {
logrus.Debugf("Not considering unrecognized specific compression variant %q for BlobInfoCache record of digest %q: %v",
data.SpecificVariantCompressor, digest.String(), err)
} else {
if !manifest.CandidateCompressionMatchesReuseConditions(manifest.ReuseConditions{
PossibleManifestFormats: v2Options.PossibleManifestFormats,
RequiredCompression: v2Options.RequiredCompression,
}, &algo) {
logrus.Debugf("Ignoring specific compression variant %q for BlobInfoCache record of digest %q, it does not match required %s or MIME types %#v",
data.SpecificVariantCompressor, digest.String(), requiredCompression, v2Options.PossibleManifestFormats)
} else {
return &CandidateTemplate{
digest: digest,
compressionOperation: types.Compress,
compressionAlgorithm: &algo,
compressionAnnotations: data.SpecificVariantAnnotations,
}
}
}
}

// Try the base variant.
algo, err := compression.AlgorithmByName(data.BaseVariantCompressor)
if err != nil {
logrus.Debugf("Ignoring BlobInfoCache record of digest %q with unrecognized compression %q: %v",
Expand All @@ -83,9 +110,10 @@ func CandidateTemplateWithCompression(v2Options *blobinfocache.CandidateLocation
return nil
}
return &CandidateTemplate{
digest: digest,
compressionOperation: types.Compress,
compressionAlgorithm: &algo,
digest: digest,
compressionOperation: types.Compress,
compressionAlgorithm: &algo,
compressionAnnotations: nil,
}
}
}
Expand All @@ -100,11 +128,12 @@ type CandidateWithTime struct {
func (template CandidateTemplate) CandidateWithLocation(location types.BICLocationReference, lastSeen time.Time) CandidateWithTime {
return CandidateWithTime{
candidate: blobinfocache.BICReplacementCandidate2{
Digest: template.digest,
CompressionOperation: template.compressionOperation,
CompressionAlgorithm: template.compressionAlgorithm,
UnknownLocation: false,
Location: location,
Digest: template.digest,
CompressionOperation: template.compressionOperation,
CompressionAlgorithm: template.compressionAlgorithm,
CompressionAnnotations: template.compressionAnnotations,
UnknownLocation: false,
Location: location,
},
lastSeen: lastSeen,
}
Expand All @@ -114,11 +143,12 @@ func (template CandidateTemplate) CandidateWithLocation(location types.BICLocati
func (template CandidateTemplate) CandidateWithUnknownLocation() CandidateWithTime {
return CandidateWithTime{
candidate: blobinfocache.BICReplacementCandidate2{
Digest: template.digest,
CompressionOperation: template.compressionOperation,
CompressionAlgorithm: template.compressionAlgorithm,
UnknownLocation: true,
Location: types.BICLocationReference{Opaque: ""},
Digest: template.digest,
CompressionOperation: template.compressionOperation,
CompressionAlgorithm: template.compressionAlgorithm,
CompressionAnnotations: template.compressionAnnotations,
UnknownLocation: true,
Location: types.BICLocationReference{Opaque: ""},
},
lastSeen: time.Time{},
}
Expand Down
Loading

0 comments on commit f9d27e8

Please sign in to comment.