diff --git a/copy/copy.go b/copy/copy.go index 3ed8a2b824..941a3837c9 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -18,15 +18,15 @@ import ( "github.com/containers/image/manifest" "github.com/containers/image/pkg/blobinfocache" "github.com/containers/image/pkg/compression" + "github.com/containers/image/pkg/progress" "github.com/containers/image/signature" + "github.com/containers/image/storage" "github.com/containers/image/transports" "github.com/containers/image/types" "github.com/klauspost/pgzip" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/vbauerster/mpb" - "github.com/vbauerster/mpb/decor" "golang.org/x/crypto/ssh/terminal" "golang.org/x/sync/semaphore" ) @@ -160,7 +160,6 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef, // If reportWriter is not a TTY (e.g., when piping to a file), do not // print the progress bars to avoid long and hard to parse output. - // createProgressBar() will print a single line instead. progressOutput := reportWriter if !isTTY(reportWriter) { progressOutput = ioutil.Discard @@ -448,16 +447,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { srcInfosUpdated = true } + // Create a map from the manifest to check if a srcLayer may be empty. + mblob, mtype, err := ic.src.Manifest(ctx) + if err != nil { + return err + } + man, err := manifest.FromBlob(mblob, mtype) + if err != nil { + return err + } + emptyLayerMap := make(map[digest.Digest]bool) + for _, info := range man.LayerInfos() { + emptyLayerMap[info.Digest] = info.EmptyLayer + } + type copyLayerData struct { destInfo types.BlobInfo diffID digest.Digest err error } - // copyGroup is used to determine if all layers are copied - copyGroup := sync.WaitGroup{} - copyGroup.Add(numLayers) - // copySemaphore is used to limit the number of parallel downloads to // avoid malicious images causing troubles and to be nice to servers. var copySemaphore *semaphore.Weighted @@ -467,44 +476,72 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { copySemaphore = semaphore.NewWeighted(int64(1)) } - data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { - defer copySemaphore.Release(1) - defer copyGroup.Done() - cld := copyLayerData{} + // copyGroup is used to determine if all layers are copied + copyGroup := sync.WaitGroup{} + + // progressPool for creating progress bars + progressPool := progress.NewPool(ctx, ic.c.progressOutput) + defer progressPool.CleanUp() + + // A context.WithCancel is needed when encountering an error while + // copying/downloading layers in parallel. + cancelCtx, cancelCopyLayer := context.WithCancel(ctx) + defer cancelCopyLayer() + + copyData := make([]copyLayerData, numLayers) + layerIndex := 0 // some layers might be skipped, so we need a dedicated counter + for i, srcLayer := range srcInfos { if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 { // DiffIDs are, currently, needed only when converting from schema1. // In which case src.LayerInfos will not have URLs because schema1 // does not support them. if ic.diffIDsAreNeeded { - cld.err = errors.New("getting DiffID for foreign layers is unimplemented") - } else { - cld.destInfo = srcLayer - logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) + return errors.New("getting DiffID for foreign layers is unimplemented") } - } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) - } - data[index] = cld - } - - func() { // A scope for defer - progressPool, progressCleanup := ic.c.newProgressPool(ctx) - defer progressCleanup() + logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name()) + copyData[i].destInfo = srcLayer + continue // skip copying + } + + copySemaphore.Acquire(cancelCtx, 1) // limits parallel copy operations + copyGroup.Add(1) // allows the main routine to wait for all copy operations to finish + + index := layerIndex + emptyLayer := false + if ok, empty := emptyLayerMap[srcLayer.Digest]; ok && empty { + emptyLayer = true + index = -1 + } + // Copy the layer. + go func(dataIndex, layerIndex int, srcLayer types.BlobInfo) { + defer copySemaphore.Release(1) + defer copyGroup.Done() + cld := copyLayerData{} + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, layerIndex, progressPool) + if cld.err != nil { + // Stop all other running goroutines as we can't recover from an error + cancelCopyLayer() + } + copyData[dataIndex] = cld + }(i, index, srcLayer) - for i, srcLayer := range srcInfos { - copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + if !emptyLayer { + layerIndex++ } + } - // Wait for all layers to be copied - copyGroup.Wait() - }() + // Wait for all layer-copy operations to finish + copyGroup.Wait() destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) - for i, cld := range data { + for i := range copyData { + cld := copyData[i] if cld.err != nil { + // Skip context.Canceled errors to determine the real error. + if cld.err == context.Canceled { + continue + } return cld.err } destInfos[i] = cld.destInfo @@ -573,45 +610,6 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte return manifest, nil } -// newProgressPool creates a *mpb.Progress and a cleanup function. -// The caller must eventually call the returned cleanup function after the pool will no longer be updated. -func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { - ctx, cancel := context.WithCancel(ctx) - pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx)) - return pool, func() { - cancel() - pool.Wait() - } -} - -// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter -// is ioutil.Discard, the progress bar's output will be discarded -func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { - // shortDigestLen is the length of the digest used for blobs. - const shortDigestLen = 12 - - prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded()) - // Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column. - maxPrefixLen := len("Copying blob ") + shortDigestLen - if len(prefix) > maxPrefixLen { - prefix = prefix[:maxPrefixLen] - } - - bar := pool.AddBar(info.Size, - mpb.BarClearOnComplete(), - mpb.PrependDecorators( - decor.Name(prefix), - ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete), - ), - ) - if c.progressOutput == ioutil.Discard { - c.Printf("Copying %s %s\n", kind, info.Digest) - } - return bar -} - // copyConfig copies config.json, if any, from src to dest. func (c *copier) copyConfig(ctx context.Context, src types.Image) error { srcInfo := src.ConfigInfo() @@ -622,14 +620,21 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { } destInfo, err := func() (types.BlobInfo, error) { // A scope for defer - progressPool, progressCleanup := c.newProgressPool(ctx) - defer progressCleanup() - bar := c.createProgressBar(progressPool, srcInfo, "config", "done") - destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) + pool := progress.NewPool(ctx, c.progressOutput) + defer pool.CleanUp() + + bar := pool.AddBar( + progress.DigestToCopyAction(srcInfo.Digest, "config"), + 0, + progress.BarOptions{ + RemoveOnCompletion: true, + StaticMessage: " ", + }) + + destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err } - bar.SetTotal(int64(len(configBlob)), true) return destInfo, nil }() if err != nil { @@ -651,20 +656,37 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *progress.Pool) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" + progressBarAction := progress.DigestToCopyAction(srcInfo.Digest, "blob") + bar := pool.AddBar( + progressBarAction, + 0, + progress.BarOptions{ + RemoveOnCompletion: true, + StaticMessage: " ", + }) // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs, bar) if err != nil { return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } if reused { logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists") - bar.SetTotal(0, true) + // Instead of adding boilerplate code to ALL TryReusingBlob()s, just + // special case the storage transport, which is the only transport + // where we need control over the progress.Bar. + if ic.c.dest.Reference().Transport().Name() != storage.Transport.Name() { + bar.ReplaceBar( + progressBarAction, + 0, + progress.BarOptions{ + StaticMessage: "skipped: already exists", + }) + } return blobInfo, cachedDiffID, nil } } @@ -676,9 +698,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } defer srcStream.Close() - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") - - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -700,7 +720,6 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } } - bar.SetTotal(srcInfo.Size, true) return blobInfo, diffID, nil } @@ -709,7 +728,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { + layerIndexInImage int, diffIDIsNeeded bool, bar *progress.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -733,7 +752,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea return pipeWriter } } - blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success + blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } @@ -768,9 +787,9 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, +func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, - canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { + canModifyBlob bool, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. // === Input: srcStream @@ -793,7 +812,23 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest) } isCompressed := decompressor != nil - destStream = bar.ProxyReader(destStream) + + // Instead of adding boilerplate code to ALL PutBlob()s, just special case + // the storage transport, which is the only transport where we need control + // over the progress.Bar. + if c.dest.Reference().Transport().Name() != storage.Transport.Name() { + kind := "blob" + if isConfig { + kind = "config" + } + bar = bar.ReplaceBar( + progress.DigestToCopyAction(srcInfo.Digest, kind), + srcInfo.Size, + progress.BarOptions{ + OnCompletionMessage: "done", + }) + destStream = bar.ProxyReader(destStream) + } // === Send a copy of the original, uncompressed, stream, to a separate path if necessary. var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. @@ -847,7 +882,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } // === Finally, send the layer stream to dest. - uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig) + uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig, bar) if err != nil { return types.BlobInfo{}, errors.Wrap(err, "Error writing blob") } diff --git a/directory/directory_dest.go b/directory/directory_dest.go index 4b2ab022e2..cfb346c639 100644 --- a/directory/directory_dest.go +++ b/directory/directory_dest.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -129,14 +130,20 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { return false } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob") if err != nil { return types.BlobInfo{}, err @@ -178,11 +185,13 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/directory/directory_test.go b/directory/directory_test.go index 121b1cf964..223d003932 100644 --- a/directory/directory_test.go +++ b/directory/directory_test.go @@ -65,7 +65,7 @@ func TestGetPutBlob(t *testing.T) { require.NoError(t, err) defer dest.Close() assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression()) - info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false) + info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, 0, cache, false, nil) assert.NoError(t, err) err = dest.Commit(context.Background()) assert.NoError(t, err) @@ -123,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) { dest, err := ref.NewImageDestination(context.Background(), nil) require.NoError(t, err) defer dest.Close() - _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false) + _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false, nil) assert.Error(t, err) assert.Contains(t, digestErrorString, err.Error()) err = dest.Commit(context.Background()) diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index c116cbec32..6ea3f60459 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -17,9 +17,10 @@ import ( "github.com/containers/image/docker/reference" "github.com/containers/image/manifest" "github.com/containers/image/pkg/blobinfocache/none" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" + v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -117,19 +118,25 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { return true } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { if inputInfo.Digest.String() != "" { // This should not really be necessary, at least the copy code calls TryReusingBlob automatically. // Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value. // But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_. - haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false) + haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false, nil) if err != nil { return types.BlobInfo{}, err } @@ -267,11 +274,13 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/docker/tarfile/dest.go b/docker/tarfile/dest.go index 5f30eddbc7..bcf0241fe0 100644 --- a/docker/tarfile/dest.go +++ b/docker/tarfile/dest.go @@ -15,6 +15,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/containers/image/internal/tmpdir" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -87,14 +88,20 @@ func (d *Destination) HasThreadSafePutBlob() bool { return false } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { // Ouch, we need to stream the blob into a temporary file just to determine the size. // When the layer is decompressed, we also have to generate the digest on uncompressed datas. if inputInfo.Size == -1 || inputInfo.Digest.String() == "" { @@ -126,7 +133,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t } // Maybe the blob has been already sent - ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false) + ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false, nil) if err != nil { return types.BlobInfo{}, err } @@ -160,11 +167,13 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest") } diff --git a/image/docker_schema2.go b/image/docker_schema2.go index 351e73ea1d..924ec3671a 100644 --- a/image/docker_schema2.go +++ b/image/docker_schema2.go @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ logrus.Debugf("Uploading empty layer during conversion to schema 1") // Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here, // and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it. - info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false) + info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false, nil) if err != nil { return nil, errors.Wrap(err, "Error uploading empty layer") } diff --git a/image/docker_schema2_test.go b/image/docker_schema2_test.go index 9d3f96fc6d..4b5668ca4f 100644 --- a/image/docker_schema2_test.go +++ b/image/docker_schema2_test.go @@ -12,6 +12,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -401,7 +402,7 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool { func (d *memoryImageDest) HasThreadSafePutBlob() bool { panic("Unexpected call to a mock function") } -func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { if d.storedBlobs == nil { d.storedBlobs = make(map[digest.Digest][]byte) } @@ -415,7 +416,7 @@ func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputIn d.storedBlobs[inputInfo.Digest] = contents return types.BlobInfo{Digest: inputInfo.Digest, Size: int64(len(contents))}, nil } -func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, types.BlobInfoCache, bool) (bool, types.BlobInfo, error) { +func (d *memoryImageDest) TryReusingBlob(context.Context, types.BlobInfo, int, types.BlobInfoCache, bool, *progress.Bar) (bool, types.BlobInfo, error) { panic("Unexpected call to a mock function") } func (d *memoryImageDest) PutManifest(ctx context.Context, m []byte) error { diff --git a/oci/archive/oci_dest.go b/oci/archive/oci_dest.go index 9571c37e2b..7a505a35e0 100644 --- a/oci/archive/oci_dest.go +++ b/oci/archive/oci_dest.go @@ -5,6 +5,7 @@ import ( "io" "os" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/storage/pkg/archive" "github.com/pkg/errors" @@ -87,22 +88,29 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { + return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig, bar) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute, bar) } // PutManifest writes manifest to the destination diff --git a/oci/layout/oci_dest.go b/oci/layout/oci_dest.go index db102184db..ad1afd44b5 100644 --- a/oci/layout/oci_dest.go +++ b/oci/layout/oci_dest.go @@ -10,6 +10,7 @@ import ( "runtime" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" digest "github.com/opencontainers/go-digest" imgspec "github.com/opencontainers/image-spec/specs-go" @@ -117,10 +118,15 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.dir, "oci-put-blob") if err != nil { return types.BlobInfo{}, err @@ -183,11 +189,13 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/oci/layout/oci_dest_test.go b/oci/layout/oci_dest_test.go index 44316fabb4..901eb50a26 100644 --- a/oci/layout/oci_dest_test.go +++ b/oci/layout/oci_dest_test.go @@ -53,7 +53,7 @@ func TestPutBlobDigestFailure(t *testing.T) { dest, err := ref.NewImageDestination(context.Background(), nil) require.NoError(t, err) defer dest.Close() - _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false) + _, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, 0, cache, false, nil) assert.Error(t, err) assert.Contains(t, digestErrorString, err.Error()) err = dest.Commit(context.Background()) diff --git a/openshift/openshift.go b/openshift/openshift.go index 814c3eea1d..71a03ce0f3 100644 --- a/openshift/openshift.go +++ b/openshift/openshift.go @@ -15,6 +15,7 @@ import ( "github.com/containers/image/docker" "github.com/containers/image/docker/reference" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/image/version" "github.com/opencontainers/go-digest" @@ -388,26 +389,34 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { return false } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.docker.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { + return d.docker.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig, bar) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.docker.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + return d.docker.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute, bar) } // PutManifest writes manifest to the destination. diff --git a/ostree/ostree_dest.go b/ostree/ostree_dest.go index c926e2b4b9..52586b1721 100644 --- a/ostree/ostree_dest.go +++ b/ostree/ostree_dest.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/storage/pkg/archive" "github.com/klauspost/pgzip" @@ -142,10 +143,15 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { tmpDir, err := ioutil.TempDir(d.tmpDirPath, "blob") if err != nil { return types.BlobInfo{}, err @@ -338,11 +344,13 @@ func (d *ostreeImageDestination) importConfig(repo *otbuiltin.Repo, blob *blobTo // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if d.repo == nil { repo, err := openRepo(d.ref.repo) if err != nil { diff --git a/pkg/progress/progress.go b/pkg/progress/progress.go new file mode 100644 index 0000000000..d5dd20f56d --- /dev/null +++ b/pkg/progress/progress.go @@ -0,0 +1,149 @@ +// Package progress exposes a convenience API and abstractions for creating and +// managing pools of multi-progress bars. +package progress + +import ( + "context" + "fmt" + "io" + + "github.com/opencontainers/go-digest" + "github.com/vbauerster/mpb" + "github.com/vbauerster/mpb/decor" +) + +// Pool allows managing progress bars. +type Pool struct { + pool *mpb.Progress + cancel context.CancelFunc + writer io.Writer +} + +// Bar represents a progress bar. +type Bar struct { + bar *mpb.Bar + pool *Pool +} + +// BarOptions includes various options to control AddBar. +type BarOptions struct { + // RemoveOnCompletion the bar on completion. This must be true if the bar + // will be replaced by another one. + RemoveOnCompletion bool + // OnCompletionMessage will be shown on completion and replace the progress + // bar. Note that setting OnCompletionMessage will cause the progress bar + // (or the static message) to be cleared on completion. + OnCompletionMessage string + // ReplaceBar is the bar to replace. + ReplaceBar *Bar + // StaticMessage, if set, will replace displaying the progress bar. Use this + // field for static progress bars that do not show progress. + StaticMessage string +} + +// NewPool returns a Pool. The caller must eventually call ProgressPool.CleanUp() +// when the pool will no longer be updated. +func NewPool(ctx context.Context, writer io.Writer) *Pool { + ctx, cancelFunc := context.WithCancel(ctx) + return &Pool{ + pool: mpb.New(mpb.WithWidth(40), mpb.WithOutput(writer), mpb.WithContext(ctx)), + cancel: cancelFunc, + writer: writer, + } +} + +// CleanUp cleans up resources, such as remaining progress bars, and stops them +// if necessary. +func (p *Pool) CleanUp() { + p.cancel() + p.pool.Wait() +} + +// DigestToCopyAction returns a string based on the blobinfo and kind. +// It's a convenience function for the c/image library when copying images. +func DigestToCopyAction(digest digest.Digest, kind string) string { + // shortDigestLen is the length of the digest used for blobs. + const shortDigestLen = 12 + const maxLen = len("Copying blob ") + shortDigestLen + // Truncate the string (chopping of some part of the digest) to make all + // progress bars aligned in a column. + copyAction := fmt.Sprintf("Copying %s %s", kind, digest.Encoded()) + if len(copyAction) > maxLen { + copyAction = copyAction[:maxLen] + } + return copyAction +} + +// AddBar adds a new Bar to the Pool. Use options to control the behavior and +// appearance of the bar. +func (p *Pool) AddBar(action string, size int64, options BarOptions) *Bar { + var bar *mpb.Bar + + // First decorator showing action (e.g., "Copying blob 123456abcd") + mpbOptions := []mpb.BarOption{ + mpb.PrependDecorators( + decor.Name(action), + ), + } + + if options.RemoveOnCompletion { + mpbOptions = append(mpbOptions, mpb.BarRemoveOnComplete()) + } + + if options.ReplaceBar != nil { + mpbOptions = append(mpbOptions, mpb.BarReplaceOnComplete(options.ReplaceBar.bar)) + // bar.SetTotal(0, true) will make sure that the bar is stopped + defer options.ReplaceBar.bar.SetTotal(0, true) + } + + // If no static message is set, we display the progress bar. Otherwise, + // we'll display the message only. + if options.StaticMessage == "" { + mpbOptions = append(mpbOptions, + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+options.OnCompletionMessage), + ), + ) + mpbOptions = append(mpbOptions, mpb.BarClearOnComplete()) + bar = p.pool.AddBar(size, mpbOptions...) + return &Bar{ + bar: bar, + pool: p, + } + } + + barFiller := mpb.FillerFunc( + func(w io.Writer, width int, st *decor.Statistics) { + fmt.Fprint(w, options.StaticMessage) + }) + + // If OnCompletionMessage is set, we need to add the decorator and clear + // the bar on completion. + if options.OnCompletionMessage != "" { + mpbOptions = append(mpbOptions, + mpb.AppendDecorators( + decor.OnComplete(decor.Name(""), " "+options.OnCompletionMessage), + ), + ) + mpbOptions = append(mpbOptions, mpb.BarClearOnComplete()) + } + + bar = p.pool.Add(size, barFiller, mpbOptions...) + return &Bar{ + bar: bar, + pool: p, + } +} + +// ReplaceBar is like Pool.AddBar but replace the bar in it's pool. Note that +// the bar be terminated and should have been created with +// options.RemoveOnCompletion. +func (b *Bar) ReplaceBar(action string, size int64, options BarOptions) *Bar { + options.ReplaceBar = b + return b.pool.AddBar(action, size, options) +} + +// ProxyReader wraps the reader with metrics for progress tracking. +func (b *Bar) ProxyReader(reader io.Reader) io.ReadCloser { + return b.bar.ProxyReader(reader) +} diff --git a/storage/storage_image.go b/storage/storage_image.go index b39d2bcc04..c09ae8fb42 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -13,12 +13,14 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "github.com/containers/image/docker/reference" "github.com/containers/image/image" "github.com/containers/image/internal/tmpdir" "github.com/containers/image/manifest" "github.com/containers/image/pkg/blobinfocache/none" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/storage" "github.com/containers/storage/pkg/archive" @@ -54,16 +56,18 @@ type storageImageSource struct { } type storageImageDestination struct { - imageRef storageReference - directory string // Temporary directory where we store blobs until Commit() time - nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs - manifest []byte // Manifest contents, temporary - signatures []byte // Signature contents, temporary - putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + directory string // Temporary directory where we store blobs until Commit() time + nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs + manifest []byte // Manifest contents, temporary + signatures []byte // Signature contents, temporary + putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + indexToStorageID map[int]string // Mapping from layer index to the layer IDs in the storage. Only valid after receiving `true` from the corresponding `indexToDoneChannel`. + indexToDoneChannel map[int]chan bool // Mapping from layer index to a channel to indicate the layer has been written to storage. True is written when the corresponding index/layer has successfully been written to the storage. + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice } type storageImageCloser struct { @@ -323,16 +327,30 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToStorageID: make(map[int]string), + indexToDoneChannel: make(map[int]chan bool), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layerIndexInImage int) chan bool { + s.putBlobMutex.Lock() + defer s.putBlobMutex.Unlock() + channel, ok := s.indexToDoneChannel[layerIndexInImage] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan bool, 1) + s.indexToDoneChannel[layerIndexInImage] = channel + } + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -360,15 +378,197 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { return true } +// tryReusingBlobFromOtherProcess is implementing a mechanism to detect if +// another process is already copying the blobinfo by making use of the +// blob-digest locks from containers/storage. The caller is expected to send a +// message through the done channel once the lock has been acquired. If we +// detect that another process is copying the blob, we wait until we own the +// lockfile and call tryReusingBlob() to check if we can reuse the committed +// layer. +func (s *storageImageDestination) tryReusingBlobFromOtherProcess(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, done chan bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + copiedByAnotherProcess := false + select { + case <-done: + case <-time.After(200 * time.Millisecond): + logrus.Debugf("blob %q is being copied by another process", blobinfo.Digest) + copiedByAnotherProcess = true + if bar != nil { + bar = bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, "blob"), + blobinfo.Size, + progress.BarOptions{ + StaticMessage: "paused: being copied by another process", + RemoveOnCompletion: true, + }) + } + // Wait until we acquired the lock or encountered an error. + <-done + } + + // Now, we own the lock. + // + // In case another process copied the blob, we should attempt reusing the + // blob. If it's not available, either the copy-detection heuristic failed + // (i.e., waiting 200 ms was not enough) or the other process failed to copy + // the blob. + if copiedByAnotherProcess { + reusable, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, false) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't resuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if reusable { + logrus.Debugf("another process successfully copied blob %q", blobinfo.Digest) + if bar != nil { + bar = bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, "blob"), + blobinfo.Size, + progress.BarOptions{ + StaticMessage: "done: copied by another process", + }) + } + } else { + logrus.Debugf("another process must have failed copying blob %q", blobinfo.Digest) + } + return reusable, blob, err + } + + return false, types.BlobInfo{}, nil +} + +// getPreviousLayerID returns the layer ID of the previous layer of +// layerIndexInImage. It might wait until the previous layer ID has been +// computed by another goroutine. +// Note that it returns "" if layerIndexInImage <= 0. +func (s *storageImageDestination) getPreviousLayerID(layerIndexInImage int) (string, error) { + if layerIndexInImage <= 0 { + return "", nil + } + + channel := s.getChannelForLayer(layerIndexInImage - 1) + if committed := <-channel; !committed { + err := fmt.Errorf("committing parent layer %d failed", layerIndexInImage-1) + return "", err + } + + s.putBlobMutex.Lock() + previousID, ok := s.indexToStorageID[layerIndexInImage-1] + s.putBlobMutex.Unlock() + if !ok { + return "", fmt.Errorf("could not find parent layer ID for layer %d", layerIndexInImage) + } + + return previousID, nil +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (blob types.BlobInfo, err error) { + // Deferred call to an anonymous func to signal potentially waiting + // goroutines via the index-specific channel. + if layerIndexInImage >= 0 { + defer func() { + // It's a buffered channel, so we don't wait for the message to be + // received + channel := s.getChannelForLayer(layerIndexInImage) + channel <- err == nil + if err != nil { + logrus.Debugf("error while committing blob %d: %v", layerIndexInImage, err) + } + + }() + } + + // Check if another process is already copying the blob. Please refer to the + // doc of tryReusingBlobFromOtherProcess for further information. + if layerIndexInImage >= 0 { + // The reasoning for getting the locker here is to make the code paths + // of locking and unlocking it more obvious and simple. + locker, err := s.imageRef.transport.store.GetDigestLock(blobinfo.Digest) + if err != nil { + return types.BlobInfo{}, errors.Wrapf(err, "error acquiring lock for blob %q", blobinfo.Digest) + } + done := make(chan bool, 1) + defer locker.Unlock() + go func() { + locker.RecursiveLock() + done <- true + }() + reusable, blob, err := s.tryReusingBlobFromOtherProcess(ctx, stream, blobinfo, layerIndexInImage, cache, done, bar) + if err != nil { + return blob, err + } + if reusable { + return blob, nil + } + } + + if bar != nil { + kind := "blob" + message := "" + if isConfig { + kind = "config" + // Setting the StaticMessage for the config avoids displaying a + // progress bar. Configs are comparatively small and quickly + // downloaded, such that displaying a progress is more a distraction + // than an indicator. + message = " " + } + bar = bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, kind), + blobinfo.Size, + progress.BarOptions{ + StaticMessage: message, + OnCompletionMessage: "done", + }) + stream = bar.ProxyReader(stream) + } + + blob, err = s.putBlob(ctx, stream, blobinfo, layerIndexInImage, cache, isConfig) + if err != nil { + return types.BlobInfo{}, err + } + + if isConfig { + return blob, err + } + + // First, wait for the previous layer to be committed + previousID := "" + if layerIndexInImage > 0 { + var err error + previousID, err = s.getPreviousLayerID(layerIndexInImage) + if err != nil { + return types.BlobInfo{}, err + } + } + + // Commit the blob + id, err := s.commitBlob(ctx, blob, previousID) + if err != nil { + return types.BlobInfo{}, err + } + if layerIndexInImage >= 0 { + s.putBlobMutex.Lock() + s.indexToStorageID[layerIndexInImage] = id + s.putBlobMutex.Unlock() + } + + return blob, nil +} + +func (s *storageImageDestination) putBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Stores a layer or data blob in our temporary directory, checking that any information // in the blobinfo matches the incoming data. errorBlobInfo := types.BlobInfo{ @@ -425,86 +625,160 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - // lock the entire method as it executes fairly quickly - s.putBlobMutex.Lock() - defer s.putBlobMutex.Unlock() - if blobinfo.Digest == "" { - return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`) +// tryCopyingLayer looks up a matching layer for the blob in the storage and +// creates a new layer for it. It returns false if no matching layer has been +// found in the storage. Callers are expected to decide if that's an error or +// not. Note that layerID and previousID must either be left empty or both be +// specified. +func (s *storageImageDestination) tryCopyingLayer(ctx context.Context, blob types.BlobInfo, layerIndexInImage int, layerID, previousID string, byCompressed, byUncompressed bool) (string, types.BlobInfo, error) { + if !byCompressed && !byUncompressed { + return "", types.BlobInfo{}, errors.New("Internal error: copyLayer without compressed and uncompressed argument") } - if err := blobinfo.Digest.Validate(); err != nil { - return false, types.BlobInfo{}, errors.Wrapf(err, `Can not check for a blob with invalid digest`) + + // Check if we previously cached a file with that blob's contents. If we + // didn't, then we need to read the desired contents from a layer. + // + // TODO: the lookup here is only required when being called for an entirely + // new layer or reused ones. Maybe we should split that into separate + // functions?bool + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + + id := "" + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + if byUncompressed { + layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err == nil && len(layers) > 0 { + layer = layers[0].ID + id = layers[0].UncompressedDigest.Hex() + } + } + if byCompressed && layer == "" { + layers, err := s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err == nil && len(layers) > 0 { + layer = layers[0].ID + id = layers[0].UncompressedDigest.Hex() + } + } + // No layer with a matching digest found, so there's nothing to copy. + if layer == "" { + logrus.Debugf("no matching layer found for blob %q", blob.Digest.String()) + return "", types.BlobInfo{}, nil + } + + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err != nil { + return "", types.BlobInfo{}, errors.Wrapf(err, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", types.BlobInfo{}, errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", types.BlobInfo{}, errors.Wrapf(err, "error storing blob to file %q", filename) + } } - // Check if we've already cached it in a file. - if size, ok := s.fileSizes[blobinfo.Digest]; ok { - return true, types.BlobInfo{ - Digest: blobinfo.Digest, - Size: size, - MediaType: blobinfo.MediaType, - }, nil + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", types.BlobInfo{}, errors.Wrapf(err, "error opening file %q", filename) } + defer file.Close() - // Check if we have a wasn't-compressed layer in storage that's based on that blob. - layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(blobinfo.Digest) - if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + // If layerID is provided, use it. + if layerID != "" { + id = layerID + } + // If no previousID is provided, wait for it to be computed. + if previousID == "" { + var err error + // Wait for the previous layer to be computed. + previousID, err = s.getPreviousLayerID(layerIndexInImage) + if err != nil { + return "", types.BlobInfo{}, err + } + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + id)).Hex() + } } - if len(layers) > 0 { - // Save this for completeness. - s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest - return true, types.BlobInfo{ - Digest: blobinfo.Digest, - Size: layers[0].UncompressedSize, - MediaType: blobinfo.MediaType, - }, nil + + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", types.BlobInfo{}, errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) } - // Check if we have a was-compressed layer in storage that's based on that blob. - layers, err = s.imageRef.transport.store.LayersByCompressedDigest(blobinfo.Digest) - if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for compressed layers with digest %q`, blobinfo.Digest) + // Record all data at once to avoid taking the lock too many times. + s.putBlobMutex.Lock() + s.indexToStorageID[layerIndexInImage] = layer.ID + s.blobDiffIDs[blob.Digest] = layer.UncompressedDigest + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + + return layer.ID, blob, nil +} + +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + // lock the entire method as it executes fairly quickly + if blobinfo.Digest == "" { + return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`) } - if len(layers) > 0 { - // Record the uncompressed value so that we can use it to calculate layer IDs. - s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest - return true, types.BlobInfo{ - Digest: blobinfo.Digest, - Size: layers[0].CompressedSize, - MediaType: blobinfo.MediaType, - }, nil + if err := blobinfo.Digest.Validate(); err != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, `Can not check for a blob with invalid digest`) + } + + // Check if we can copy an already existing {un}compressed layer. + if layerID, blob, err := s.tryCopyingLayer(ctx, blobinfo, layerIndexInImage, "", "", true, true); err != nil { + return false, blob, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + } else if layerID != "" { + return true, blob, nil } - // Does the blob correspond to a known DiffID which we already have available? - // Because we must return the size, which is unknown for unavailable compressed blobs, the returned BlobInfo refers to the + // Does the blob correspond to a known DiffID which we already have + // available? Because we must return the size, which is unknown for + // unavailable compressed blobs, the returned BlobInfo refers to the // uncompressed layer, and that can happen only if canSubstitute. if canSubstitute { - if uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest { - layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(uncompressedDigest) - if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for layers with digest %q`, uncompressedDigest) - } - if len(layers) > 0 { - s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest - return true, types.BlobInfo{ - Digest: uncompressedDigest, - Size: layers[0].UncompressedSize, - MediaType: blobinfo.MediaType, - }, nil + uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest) + if uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest { + // Check if we can copy an existing uncompressed layer. + blobinfo.Digest = uncompressedDigest + if layerID, blob, err := s.tryCopyingLayer(ctx, blobinfo, layerIndexInImage, "", "", false, true); err != nil { + return false, blob, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + } else if layerID != "" { + return true, blob, nil } } } @@ -513,6 +787,40 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t return false, types.BlobInfo{}, nil } +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the backend storage in sequential order. A value >= indicates that the blob a layer. +// Note that only the containers-storage destination is sensitive to the layerIndexInImage parameter. Other transport destinations ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + reused, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, canSubstitute) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't reuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if (layerIndexInImage >= 0) && (err != nil || reused) { + defer func() { // Defer to be **very** sure to always execute the code. + channel := s.getChannelForLayer(layerIndexInImage) + channel <- (err == nil) + }() + } + if bar != nil && reused { + bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, "blob"), + 0, + progress.BarOptions{ + StaticMessage: "skipped: already exists", + }) + } + return reused, blob, err +} + // computeID computes a recommended image ID based on information we have so far. If // the manifest is not of a type that we recognize, we return an empty value, indicating // that since we don't have a recommendation, a random ID should be used if one needs @@ -572,6 +880,42 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +// commitBlobs commits the specified blob to the storage. +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo, previousID string) (string, error) { + logrus.Debugf("committing blob %q", blob.Digest) + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + logrus.Debugf("layer for blob %q already found in storage", blob.Digest.String()) + return layer.ID, nil + } + + layerID, _, err := s.tryCopyingLayer(ctx, blob, -1, id, previousID, true, true) + if err != nil { + return "", err + } + if layerID == "" { + return "", fmt.Errorf("Internal error: could not locate layer for blob %q", blob.Digest.String()) + } + + return layerID, nil +} + +// Commit marks the process of storing the image as successful and asks for the image to be persisted. +// WARNING: This does not have any transactional semantics: +// - Uploaded data MAY be visible to others before Commit() is called +// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -581,6 +925,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "error parsing manifest") } + layerBlobs := man.LayerInfos() // Extract or find the layers. lastLayer := "" @@ -588,102 +933,36 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if blob.EmptyLayer { continue } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + _, haveDiffID := s.blobDiffIDs[blob.Digest] if !haveDiffID { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), // or to even check if we had it. // Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller - // that relies on using a blob digest that has never been seeen by the store had better call + // that relies on using a blob digest that has never been seen by the store had better call // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) + has, _, err := s.tryReusingBlob(ctx, blob.BlobInfo, -1, none.NoCache, false) if err != nil { return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) } if !has { return errors.Errorf("error determining uncompressed digest for blob %q", blob.Digest.String()) } - diffID, haveDiffID = s.blobDiffIDs[blob.Digest] + _, haveDiffID = s.blobDiffIDs[blob.Digest] if !haveDiffID { return errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) } } - id := diffID.Hex() - if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() - } - if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { - // There's already a layer that should have the right contents, just reuse it. - lastLayer = layer.ID - continue - } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) + newID, err := s.commitBlob(ctx, blob.BlobInfo, lastLayer) if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + return err } - lastLayer = layer.ID + lastLayer = newID + } + + if lastLayer == "" { + return fmt.Errorf("image does not contain a non-empty or non-throwaway layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image diff --git a/storage/storage_test.go b/storage/storage_test.go index d91d7aaa03..98141ca8ee 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -380,11 +380,11 @@ func TestWriteRead(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination: %v", err) } t.Logf("Wrote randomly-generated layer %q (%d/%d bytes) to destination", digest, size, decompressedSize) - if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, cache, false); err != nil { + if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, -1, cache, true, nil); err != nil { t.Fatalf("Error saving config to destination: %v", err) } manifest := strings.Replace(manifestFmt, "%lh", digest.String(), -1) @@ -541,7 +541,7 @@ func TestDuplicateName(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -576,7 +576,7 @@ func TestDuplicateName(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -628,7 +628,7 @@ func TestDuplicateID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -663,7 +663,7 @@ func TestDuplicateID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -718,7 +718,7 @@ func TestDuplicateNameID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size, Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, first pass: %v", err) } manifest := fmt.Sprintf(` @@ -753,7 +753,7 @@ func TestDuplicateNameID(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: int64(size), Digest: digest, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer to destination, second pass: %v", err) } manifest = fmt.Sprintf(` @@ -850,21 +850,21 @@ func TestSize(t *testing.T) { if dest == nil { t.Fatalf("NewImageDestination(%q) returned no destination", ref.StringWithinTransport()) } - if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, cache, false); err != nil { + if _, err := dest.PutBlob(context.Background(), bytes.NewBufferString(config), configInfo, 0, cache, true, nil); err != nil { t.Fatalf("Error saving config to destination: %v", err) } digest1, usize1, size1, blob := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination: %v", err) } digest2, usize2, size2, blob := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 1, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination: %v", err) } manifest := fmt.Sprintf(` @@ -946,26 +946,26 @@ func TestDuplicateBlob(t *testing.T) { if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob1), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 0, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination (first copy): %v", err) } digest2, _, size2, blob2 := makeLayer(t, archive.Gzip) if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob2), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 1, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination (first copy): %v", err) } if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob1), types.BlobInfo{ Size: size1, Digest: digest1, - }, cache, false); err != nil { + }, 2, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer 1 to destination (second copy): %v", err) } if _, err := dest.PutBlob(context.Background(), bytes.NewBuffer(blob2), types.BlobInfo{ Size: size2, Digest: digest2, - }, cache, false); err != nil { + }, 3, cache, false, nil); err != nil { t.Fatalf("Error saving randomly-generated layer 2 to destination (second copy): %v", err) } manifest := fmt.Sprintf(` diff --git a/types/types.go b/types/types.go index 7895043481..7cf4c38141 100644 --- a/types/types.go +++ b/types/types.go @@ -6,8 +6,9 @@ import ( "time" "github.com/containers/image/docker/reference" + "github.com/containers/image/pkg/progress" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) // ImageTransport is a top-level namespace for ways to to store/load an image. @@ -262,20 +263,27 @@ type ImageDestination interface { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. + // layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. + // The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. + // Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. - PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, cache BlobInfoCache, isConfig bool) (BlobInfo, error) + PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, layerIndexInImage int, cache BlobInfoCache, isConfig bool, bar *progress.Bar) (BlobInfo, error) // HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently. HasThreadSafePutBlob() bool // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. - // If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. - // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. + // layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. + // The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. + // If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. - TryReusingBlob(ctx context.Context, info BlobInfo, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) + TryReusingBlob(ctx context.Context, info BlobInfo, layerIndexInImage int, cache BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, BlobInfo, error) // PutManifest writes manifest to the destination. // FIXME? This should also receive a MIME type if known, to differentiate between schema versions. // If the destination is in principle available, refuses this manifest type (e.g. it does not recognize the schema), diff --git a/vendor.conf b/vendor.conf index 438cab17ae..f8ad0f2496 100644 --- a/vendor.conf +++ b/vendor.conf @@ -46,6 +46,6 @@ github.com/etcd-io/bbolt v1.3.2 github.com/klauspost/pgzip v1.2.1 github.com/klauspost/compress v1.4.1 github.com/klauspost/cpuid v1.2.0 -github.com/vbauerster/mpb v3.3.4 +github.com/vbauerster/mpb v3.4.0 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1