diff --git a/pkg/blobcache/blobcache.go b/pkg/blobcache/blobcache.go index 31e6a428c38..c50141db39c 100644 --- a/pkg/blobcache/blobcache.go +++ b/pkg/blobcache/blobcache.go @@ -13,12 +13,13 @@ import ( "github.com/containers/image/docker/reference" "github.com/containers/image/image" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/transports" "github.com/containers/image/types" "github.com/containers/storage/pkg/archive" "github.com/containers/storage/pkg/ioutils" digest "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -416,7 +417,7 @@ func (s *blobCacheDestination) HasThreadSafePutBlob() bool { return s.destination.HasThreadSafePutBlob() } -func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { var tempfile *os.File var err error var n int @@ -479,7 +480,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in } } } - newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig) + newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig, bar) if err != nil { return newBlobInfo, errors.Wrapf(err, "error storing blob to image destination for cache %q", transports.ImageName(d.reference)) } @@ -491,8 +492,8 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in return newBlobInfo, nil } -func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute, bar) if err != nil || present { return present, reusedInfo, err } @@ -502,7 +503,7 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl f, err := os.Open(filename) if err == nil { defer f.Close() - uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) + uploadedInfo, err := d.destination.PutBlob(ctx, f, info, layerIndexInImage, cache, isConfig, bar) if err != nil { return false, types.BlobInfo{}, err } diff --git a/pkg/blobcache/blobcache_test.go b/pkg/blobcache/blobcache_test.go index 7000050a295..72fd73e7824 100644 --- a/pkg/blobcache/blobcache_test.go +++ b/pkg/blobcache/blobcache_test.go @@ -21,7 +21,7 @@ import ( "github.com/containers/storage/pkg/archive" digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -128,11 +128,11 @@ func TestBlobCache(t *testing.T) { if err != nil { t.Fatalf("error opening source image for writing: %v", err) } - _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(blobBytes), blobInfo, none.NoCache, false) + _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(blobBytes), blobInfo, 0, none.NoCache, false, nil) if err != nil { t.Fatalf("error writing layer blob to source image: %v", err) } - _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(configBytes), configInfo, none.NoCache, true) + _, err = destImage.PutBlob(context.TODO(), bytes.NewReader(configBytes), configInfo, 0, none.NoCache, true, nil) if err != nil { t.Fatalf("error writing config blob to source image: %v", err) } diff --git a/vendor.conf b/vendor.conf index 0c982626a4f..9c20c6028de 100644 --- a/vendor.conf +++ b/vendor.conf @@ -3,12 +3,12 @@ github.com/blang/semver v3.5.0 github.com/BurntSushi/toml v0.2.0 github.com/containerd/continuity 004b46473808b3e7a4a3049c20e4376c91eb966d github.com/containernetworking/cni v0.7.0-rc2 -github.com/containers/image 9467ac9cfd92c545aa389f22f27e552de053c0f2 +github.com/containers/image commit https://github.com/vrothberg/image github.com/cyphar/filepath-securejoin v0.2.1 -github.com/vbauerster/mpb v3.3.4 +github.com/vbauerster/mpb v3.4.0 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1 -github.com/containers/storage v1.12.7 +github.com/containers/storage recursive-locks https://github.com/vrothberg/storage github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716 github.com/docker/docker 54dddadc7d5d89fe0be88f76979f6f6ab0dede83 github.com/docker/docker-credential-helpers v0.6.1 diff --git a/vendor/github.com/containers/image/copy/copy.go b/vendor/github.com/containers/image/copy/copy.go index 3ed8a2b8243..941a3837c90 100644 --- a/vendor/github.com/containers/image/copy/copy.go +++ b/vendor/github.com/containers/image/copy/copy.go @@ -18,15 +18,15 @@ import ( "github.com/containers/image/manifest" "github.com/containers/image/pkg/blobinfocache" "github.com/containers/image/pkg/compression" + "github.com/containers/image/pkg/progress" "github.com/containers/image/signature" + "github.com/containers/image/storage" "github.com/containers/image/transports" "github.com/containers/image/types" "github.com/klauspost/pgzip" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/vbauerster/mpb" - "github.com/vbauerster/mpb/decor" "golang.org/x/crypto/ssh/terminal" "golang.org/x/sync/semaphore" ) @@ -160,7 +160,6 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef, // If reportWriter is not a TTY (e.g., when piping to a file), do not // print the progress bars to avoid long and hard to parse output. - // createProgressBar() will print a single line instead. progressOutput := reportWriter if !isTTY(reportWriter) { progressOutput = ioutil.Discard @@ -448,16 +447,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { srcInfosUpdated = true } + // Create a map from the manifest to check if a srcLayer may be empty. + mblob, mtype, err := ic.src.Manifest(ctx) + if err != nil { + return err + } + man, err := manifest.FromBlob(mblob, mtype) + if err != nil { + return err + } + emptyLayerMap := make(map[digest.Digest]bool) + for _, info := range man.LayerInfos() { + emptyLayerMap[info.Digest] = info.EmptyLayer + } + type copyLayerData struct { destInfo types.BlobInfo diffID digest.Digest err error } - // copyGroup is used to determine if all layers are copied - copyGroup := sync.WaitGroup{} - copyGroup.Add(numLayers) - // copySemaphore is used to limit the number of parallel downloads to // avoid malicious images causing troubles and to be nice to servers. var copySemaphore *semaphore.Weighted @@ -467,44 +476,72 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { copySemaphore = semaphore.NewWeighted(int64(1)) } - data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { - defer copySemaphore.Release(1) - defer copyGroup.Done() - cld := copyLayerData{} + // copyGroup is used to determine if all layers are copied + copyGroup := sync.WaitGroup{} + + // progressPool for creating progress bars + progressPool := progress.NewPool(ctx, ic.c.progressOutput) + defer progressPool.CleanUp() + + // A context.WithCancel is needed when encountering an error while + // copying/downloading layers in parallel. + cancelCtx, cancelCopyLayer := context.WithCancel(ctx) + defer cancelCopyLayer() + + copyData := make([]copyLayerData, numLayers) + layerIndex := 0 // some layers might be skipped, so we need a dedicated counter + for i, srcLayer := range srcInfos { if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 { // DiffIDs are, currently, needed only when converting from schema1. // In which case src.LayerInfos will not have URLs because schema1 // does not support them. if ic.diffIDsAreNeeded { - cld.err = errors.New("getting DiffID for foreign layers is unimplemented") - } else { - cld.destInfo = srcLayer - logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) + return errors.New("getting DiffID for foreign layers is unimplemented") } - } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) - } - data[index] = cld - } - - func() { // A scope for defer - progressPool, progressCleanup := ic.c.newProgressPool(ctx) - defer progressCleanup() + logrus.Debugf("Skipping foreign layer %q copy to %s", srcLayer.Digest, ic.c.dest.Reference().Transport().Name()) + copyData[i].destInfo = srcLayer + continue // skip copying + } + + copySemaphore.Acquire(cancelCtx, 1) // limits parallel copy operations + copyGroup.Add(1) // allows the main routine to wait for all copy operations to finish + + index := layerIndex + emptyLayer := false + if ok, empty := emptyLayerMap[srcLayer.Digest]; ok && empty { + emptyLayer = true + index = -1 + } + // Copy the layer. + go func(dataIndex, layerIndex int, srcLayer types.BlobInfo) { + defer copySemaphore.Release(1) + defer copyGroup.Done() + cld := copyLayerData{} + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(cancelCtx, srcLayer, layerIndex, progressPool) + if cld.err != nil { + // Stop all other running goroutines as we can't recover from an error + cancelCopyLayer() + } + copyData[dataIndex] = cld + }(i, index, srcLayer) - for i, srcLayer := range srcInfos { - copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + if !emptyLayer { + layerIndex++ } + } - // Wait for all layers to be copied - copyGroup.Wait() - }() + // Wait for all layer-copy operations to finish + copyGroup.Wait() destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) - for i, cld := range data { + for i := range copyData { + cld := copyData[i] if cld.err != nil { + // Skip context.Canceled errors to determine the real error. + if cld.err == context.Canceled { + continue + } return cld.err } destInfos[i] = cld.destInfo @@ -573,45 +610,6 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte return manifest, nil } -// newProgressPool creates a *mpb.Progress and a cleanup function. -// The caller must eventually call the returned cleanup function after the pool will no longer be updated. -func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { - ctx, cancel := context.WithCancel(ctx) - pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx)) - return pool, func() { - cancel() - pool.Wait() - } -} - -// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter -// is ioutil.Discard, the progress bar's output will be discarded -func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { - // shortDigestLen is the length of the digest used for blobs. - const shortDigestLen = 12 - - prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded()) - // Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column. - maxPrefixLen := len("Copying blob ") + shortDigestLen - if len(prefix) > maxPrefixLen { - prefix = prefix[:maxPrefixLen] - } - - bar := pool.AddBar(info.Size, - mpb.BarClearOnComplete(), - mpb.PrependDecorators( - decor.Name(prefix), - ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete), - ), - ) - if c.progressOutput == ioutil.Discard { - c.Printf("Copying %s %s\n", kind, info.Digest) - } - return bar -} - // copyConfig copies config.json, if any, from src to dest. func (c *copier) copyConfig(ctx context.Context, src types.Image) error { srcInfo := src.ConfigInfo() @@ -622,14 +620,21 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { } destInfo, err := func() (types.BlobInfo, error) { // A scope for defer - progressPool, progressCleanup := c.newProgressPool(ctx) - defer progressCleanup() - bar := c.createProgressBar(progressPool, srcInfo, "config", "done") - destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) + pool := progress.NewPool(ctx, c.progressOutput) + defer pool.CleanUp() + + bar := pool.AddBar( + progress.DigestToCopyAction(srcInfo.Digest, "config"), + 0, + progress.BarOptions{ + RemoveOnCompletion: true, + StaticMessage: " ", + }) + + destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, -1, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err } - bar.SetTotal(int64(len(configBlob)), true) return destInfo, nil }() if err != nil { @@ -651,20 +656,37 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, layerIndexInImage int, pool *progress.Pool) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" + progressBarAction := progress.DigestToCopyAction(srcInfo.Digest, "blob") + bar := pool.AddBar( + progressBarAction, + 0, + progress.BarOptions{ + RemoveOnCompletion: true, + StaticMessage: " ", + }) // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, layerIndexInImage, ic.c.blobInfoCache, ic.canSubstituteBlobs, bar) if err != nil { return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) } if reused { logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists") - bar.SetTotal(0, true) + // Instead of adding boilerplate code to ALL TryReusingBlob()s, just + // special case the storage transport, which is the only transport + // where we need control over the progress.Bar. + if ic.c.dest.Reference().Transport().Name() != storage.Transport.Name() { + bar.ReplaceBar( + progressBarAction, + 0, + progress.BarOptions{ + StaticMessage: "skipped: already exists", + }) + } return blobInfo, cachedDiffID, nil } } @@ -676,9 +698,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } defer srcStream.Close() - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") - - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, layerIndexInImage, diffIDIsNeeded, bar) if err != nil { return types.BlobInfo{}, "", err } @@ -700,7 +720,6 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } } - bar.SetTotal(srcInfo.Size, true) return blobInfo, diffID, nil } @@ -709,7 +728,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po // perhaps compressing the stream if canCompress, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) { + layerIndexInImage int, diffIDIsNeeded bool, bar *progress.Bar) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -733,7 +752,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea return pipeWriter } } - blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success + blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, layerIndexInImage, getDiffIDRecorder, ic.canModifyManifest, false, bar) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan } @@ -768,9 +787,9 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) // perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, // perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, +func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, layerIndexInImage int, getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer, - canModifyBlob bool, isConfig bool, bar *mpb.Bar) (types.BlobInfo, error) { + canModifyBlob bool, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers. // === Input: srcStream @@ -793,7 +812,23 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr return types.BlobInfo{}, errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest) } isCompressed := decompressor != nil - destStream = bar.ProxyReader(destStream) + + // Instead of adding boilerplate code to ALL PutBlob()s, just special case + // the storage transport, which is the only transport where we need control + // over the progress.Bar. + if c.dest.Reference().Transport().Name() != storage.Transport.Name() { + kind := "blob" + if isConfig { + kind = "config" + } + bar = bar.ReplaceBar( + progress.DigestToCopyAction(srcInfo.Digest, kind), + srcInfo.Size, + progress.BarOptions{ + OnCompletionMessage: "done", + }) + destStream = bar.ProxyReader(destStream) + } // === Send a copy of the original, uncompressed, stream, to a separate path if necessary. var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. @@ -847,7 +882,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr } // === Finally, send the layer stream to dest. - uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig) + uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, layerIndexInImage, c.blobInfoCache, isConfig, bar) if err != nil { return types.BlobInfo{}, errors.Wrap(err, "Error writing blob") } diff --git a/vendor/github.com/containers/image/directory/directory_dest.go b/vendor/github.com/containers/image/directory/directory_dest.go index 4b2ab022e24..cfb346c6399 100644 --- a/vendor/github.com/containers/image/directory/directory_dest.go +++ b/vendor/github.com/containers/image/directory/directory_dest.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -129,14 +130,20 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { return false } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob") if err != nil { return types.BlobInfo{}, err @@ -178,11 +185,13 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/docker/docker_image_dest.go b/vendor/github.com/containers/image/docker/docker_image_dest.go index c116cbec32b..6ea3f604592 100644 --- a/vendor/github.com/containers/image/docker/docker_image_dest.go +++ b/vendor/github.com/containers/image/docker/docker_image_dest.go @@ -17,9 +17,10 @@ import ( "github.com/containers/image/docker/reference" "github.com/containers/image/manifest" "github.com/containers/image/pkg/blobinfocache/none" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/docker/distribution/registry/api/errcode" - "github.com/docker/distribution/registry/api/v2" + v2 "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -117,19 +118,25 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { return true } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { if inputInfo.Digest.String() != "" { // This should not really be necessary, at least the copy code calls TryReusingBlob automatically. // Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value. // But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_. - haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, none.NoCache, false) + haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, none.NoCache, false, nil) if err != nil { return types.BlobInfo{}, err } @@ -267,11 +274,13 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/docker/tarfile/dest.go b/vendor/github.com/containers/image/docker/tarfile/dest.go index 5f30eddbc72..bcf0241fe02 100644 --- a/vendor/github.com/containers/image/docker/tarfile/dest.go +++ b/vendor/github.com/containers/image/docker/tarfile/dest.go @@ -15,6 +15,7 @@ import ( "github.com/containers/image/docker/reference" "github.com/containers/image/internal/tmpdir" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/opencontainers/go-digest" "github.com/pkg/errors" @@ -87,14 +88,20 @@ func (d *Destination) HasThreadSafePutBlob() bool { return false } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { // Ouch, we need to stream the blob into a temporary file just to determine the size. // When the layer is decompressed, we also have to generate the digest on uncompressed datas. if inputInfo.Size == -1 || inputInfo.Digest.String() == "" { @@ -126,7 +133,7 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t } // Maybe the blob has been already sent - ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, cache, false) + ok, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, layerIndexInImage, cache, false, nil) if err != nil { return types.BlobInfo{}, err } @@ -160,11 +167,13 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *Destination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf("Can not check for a blob with unknown digest") } diff --git a/vendor/github.com/containers/image/image/docker_schema2.go b/vendor/github.com/containers/image/image/docker_schema2.go index 351e73ea1da..924ec3671a3 100644 --- a/vendor/github.com/containers/image/image/docker_schema2.go +++ b/vendor/github.com/containers/image/image/docker_schema2.go @@ -252,7 +252,7 @@ func (m *manifestSchema2) convertToManifestSchema1(ctx context.Context, dest typ logrus.Debugf("Uploading empty layer during conversion to schema 1") // Ideally we should update the relevant BlobInfoCache about this layer, but that would require passing it down here, // and anyway this blob is so small that it’s easier to just copy it than to worry about figuring out another location where to get it. - info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, none.NoCache, false) + info, err := dest.PutBlob(ctx, bytes.NewReader(GzippedEmptyLayer), types.BlobInfo{Digest: GzippedEmptyLayerDigest, Size: int64(len(GzippedEmptyLayer))}, -1, none.NoCache, false, nil) if err != nil { return nil, errors.Wrap(err, "Error uploading empty layer") } diff --git a/vendor/github.com/containers/image/oci/archive/oci_dest.go b/vendor/github.com/containers/image/oci/archive/oci_dest.go index 9571c37e2ba..7a505a35e09 100644 --- a/vendor/github.com/containers/image/oci/archive/oci_dest.go +++ b/vendor/github.com/containers/image/oci/archive/oci_dest.go @@ -5,6 +5,7 @@ import ( "io" "os" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/storage/pkg/archive" "github.com/pkg/errors" @@ -87,22 +88,29 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.unpackedDest.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *ociArchiveImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { + return d.unpackedDest.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig, bar) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.unpackedDest.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *ociArchiveImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + return d.unpackedDest.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute, bar) } // PutManifest writes manifest to the destination diff --git a/vendor/github.com/containers/image/oci/layout/oci_dest.go b/vendor/github.com/containers/image/oci/layout/oci_dest.go index db102184dbc..ad1afd44b52 100644 --- a/vendor/github.com/containers/image/oci/layout/oci_dest.go +++ b/vendor/github.com/containers/image/oci/layout/oci_dest.go @@ -10,6 +10,7 @@ import ( "runtime" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" digest "github.com/opencontainers/go-digest" imgspec "github.com/opencontainers/image-spec/specs-go" @@ -117,10 +118,15 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { blobFile, err := ioutil.TempFile(d.ref.dir, "oci-put-blob") if err != nil { return types.BlobInfo{}, err @@ -183,11 +189,13 @@ func (d *ociImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ociImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if info.Digest == "" { return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`) } diff --git a/vendor/github.com/containers/image/openshift/openshift.go b/vendor/github.com/containers/image/openshift/openshift.go index 814c3eea1de..71a03ce0f30 100644 --- a/vendor/github.com/containers/image/openshift/openshift.go +++ b/vendor/github.com/containers/image/openshift/openshift.go @@ -15,6 +15,7 @@ import ( "github.com/containers/image/docker" "github.com/containers/image/docker/reference" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/image/version" "github.com/opencontainers/go-digest" @@ -388,26 +389,34 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { return false } -// PutBlob writes contents of stream and returns data representing the result (with all data filled in). +// PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { - return d.docker.PutBlob(ctx, stream, inputInfo, cache, isConfig) +func (d *openshiftImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { + return d.docker.PutBlob(ctx, stream, inputInfo, layerIndexInImage, cache, isConfig, bar) } // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - return d.docker.TryReusingBlob(ctx, info, cache, canSubstitute) +func (d *openshiftImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + return d.docker.TryReusingBlob(ctx, info, layerIndexInImage, cache, canSubstitute, bar) } // PutManifest writes manifest to the destination. diff --git a/vendor/github.com/containers/image/ostree/ostree_dest.go b/vendor/github.com/containers/image/ostree/ostree_dest.go index d69f4fa331a..c0b6eeb3399 100644 --- a/vendor/github.com/containers/image/ostree/ostree_dest.go +++ b/vendor/github.com/containers/image/ostree/ostree_dest.go @@ -21,6 +21,7 @@ import ( "unsafe" "github.com/containers/image/manifest" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/storage/pkg/archive" "github.com/klauspost/pgzip" @@ -142,10 +143,15 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (d *ostreeImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (types.BlobInfo, error) { tmpDir, err := ioutil.TempDir(d.tmpDirPath, "blob") if err != nil { return types.BlobInfo{}, err @@ -338,11 +344,13 @@ func (d *ostreeImageDestination) importConfig(repo *otbuiltin.Repo, blob *blobTo // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. -func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { +func (d *ostreeImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { if d.repo == nil { repo, err := openRepo(d.ref.repo) if err != nil { diff --git a/vendor/github.com/containers/image/pkg/progress/progress.go b/vendor/github.com/containers/image/pkg/progress/progress.go new file mode 100644 index 00000000000..d5dd20f56d3 --- /dev/null +++ b/vendor/github.com/containers/image/pkg/progress/progress.go @@ -0,0 +1,149 @@ +// Package progress exposes a convenience API and abstractions for creating and +// managing pools of multi-progress bars. +package progress + +import ( + "context" + "fmt" + "io" + + "github.com/opencontainers/go-digest" + "github.com/vbauerster/mpb" + "github.com/vbauerster/mpb/decor" +) + +// Pool allows managing progress bars. +type Pool struct { + pool *mpb.Progress + cancel context.CancelFunc + writer io.Writer +} + +// Bar represents a progress bar. +type Bar struct { + bar *mpb.Bar + pool *Pool +} + +// BarOptions includes various options to control AddBar. +type BarOptions struct { + // RemoveOnCompletion the bar on completion. This must be true if the bar + // will be replaced by another one. + RemoveOnCompletion bool + // OnCompletionMessage will be shown on completion and replace the progress + // bar. Note that setting OnCompletionMessage will cause the progress bar + // (or the static message) to be cleared on completion. + OnCompletionMessage string + // ReplaceBar is the bar to replace. + ReplaceBar *Bar + // StaticMessage, if set, will replace displaying the progress bar. Use this + // field for static progress bars that do not show progress. + StaticMessage string +} + +// NewPool returns a Pool. The caller must eventually call ProgressPool.CleanUp() +// when the pool will no longer be updated. +func NewPool(ctx context.Context, writer io.Writer) *Pool { + ctx, cancelFunc := context.WithCancel(ctx) + return &Pool{ + pool: mpb.New(mpb.WithWidth(40), mpb.WithOutput(writer), mpb.WithContext(ctx)), + cancel: cancelFunc, + writer: writer, + } +} + +// CleanUp cleans up resources, such as remaining progress bars, and stops them +// if necessary. +func (p *Pool) CleanUp() { + p.cancel() + p.pool.Wait() +} + +// DigestToCopyAction returns a string based on the blobinfo and kind. +// It's a convenience function for the c/image library when copying images. +func DigestToCopyAction(digest digest.Digest, kind string) string { + // shortDigestLen is the length of the digest used for blobs. + const shortDigestLen = 12 + const maxLen = len("Copying blob ") + shortDigestLen + // Truncate the string (chopping of some part of the digest) to make all + // progress bars aligned in a column. + copyAction := fmt.Sprintf("Copying %s %s", kind, digest.Encoded()) + if len(copyAction) > maxLen { + copyAction = copyAction[:maxLen] + } + return copyAction +} + +// AddBar adds a new Bar to the Pool. Use options to control the behavior and +// appearance of the bar. +func (p *Pool) AddBar(action string, size int64, options BarOptions) *Bar { + var bar *mpb.Bar + + // First decorator showing action (e.g., "Copying blob 123456abcd") + mpbOptions := []mpb.BarOption{ + mpb.PrependDecorators( + decor.Name(action), + ), + } + + if options.RemoveOnCompletion { + mpbOptions = append(mpbOptions, mpb.BarRemoveOnComplete()) + } + + if options.ReplaceBar != nil { + mpbOptions = append(mpbOptions, mpb.BarReplaceOnComplete(options.ReplaceBar.bar)) + // bar.SetTotal(0, true) will make sure that the bar is stopped + defer options.ReplaceBar.bar.SetTotal(0, true) + } + + // If no static message is set, we display the progress bar. Otherwise, + // we'll display the message only. + if options.StaticMessage == "" { + mpbOptions = append(mpbOptions, + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+options.OnCompletionMessage), + ), + ) + mpbOptions = append(mpbOptions, mpb.BarClearOnComplete()) + bar = p.pool.AddBar(size, mpbOptions...) + return &Bar{ + bar: bar, + pool: p, + } + } + + barFiller := mpb.FillerFunc( + func(w io.Writer, width int, st *decor.Statistics) { + fmt.Fprint(w, options.StaticMessage) + }) + + // If OnCompletionMessage is set, we need to add the decorator and clear + // the bar on completion. + if options.OnCompletionMessage != "" { + mpbOptions = append(mpbOptions, + mpb.AppendDecorators( + decor.OnComplete(decor.Name(""), " "+options.OnCompletionMessage), + ), + ) + mpbOptions = append(mpbOptions, mpb.BarClearOnComplete()) + } + + bar = p.pool.Add(size, barFiller, mpbOptions...) + return &Bar{ + bar: bar, + pool: p, + } +} + +// ReplaceBar is like Pool.AddBar but replace the bar in it's pool. Note that +// the bar be terminated and should have been created with +// options.RemoveOnCompletion. +func (b *Bar) ReplaceBar(action string, size int64, options BarOptions) *Bar { + options.ReplaceBar = b + return b.pool.AddBar(action, size, options) +} + +// ProxyReader wraps the reader with metrics for progress tracking. +func (b *Bar) ProxyReader(reader io.Reader) io.ReadCloser { + return b.bar.ProxyReader(reader) +} diff --git a/vendor/github.com/containers/image/storage/storage_image.go b/vendor/github.com/containers/image/storage/storage_image.go index b39d2bcc04a..c09ae8fb42a 100644 --- a/vendor/github.com/containers/image/storage/storage_image.go +++ b/vendor/github.com/containers/image/storage/storage_image.go @@ -13,12 +13,14 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "github.com/containers/image/docker/reference" "github.com/containers/image/image" "github.com/containers/image/internal/tmpdir" "github.com/containers/image/manifest" "github.com/containers/image/pkg/blobinfocache/none" + "github.com/containers/image/pkg/progress" "github.com/containers/image/types" "github.com/containers/storage" "github.com/containers/storage/pkg/archive" @@ -54,16 +56,18 @@ type storageImageSource struct { } type storageImageDestination struct { - imageRef storageReference - directory string // Temporary directory where we store blobs until Commit() time - nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs - manifest []byte // Manifest contents, temporary - signatures []byte // Signature contents, temporary - putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + imageRef storageReference + directory string // Temporary directory where we store blobs until Commit() time + nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs + manifest []byte // Manifest contents, temporary + signatures []byte // Signature contents, temporary + putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + indexToStorageID map[int]string // Mapping from layer index to the layer IDs in the storage. Only valid after receiving `true` from the corresponding `indexToDoneChannel`. + indexToDoneChannel map[int]chan bool // Mapping from layer index to a channel to indicate the layer has been written to storage. True is written when the corresponding index/layer has successfully been written to the storage. + SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice } type storageImageCloser struct { @@ -323,16 +327,30 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToStorageID: make(map[int]string), + indexToDoneChannel: make(map[int]chan bool), + SignatureSizes: []int{}, } return image, nil } +func (s *storageImageDestination) getChannelForLayer(layerIndexInImage int) chan bool { + s.putBlobMutex.Lock() + defer s.putBlobMutex.Unlock() + channel, ok := s.indexToDoneChannel[layerIndexInImage] + if !ok { + // A buffered channel to allow non-blocking sends + channel = make(chan bool, 1) + s.indexToDoneChannel[layerIndexInImage] = channel + } + return channel +} + // Reference returns the reference used to set up this destination. Note that this should directly correspond to user's intent, // e.g. it should use the public hostname instead of the result of resolving CNAMEs or following redirects. func (s *storageImageDestination) Reference() types.ImageReference { @@ -360,15 +378,197 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { return true } +// tryReusingBlobFromOtherProcess is implementing a mechanism to detect if +// another process is already copying the blobinfo by making use of the +// blob-digest locks from containers/storage. The caller is expected to send a +// message through the done channel once the lock has been acquired. If we +// detect that another process is copying the blob, we wait until we own the +// lockfile and call tryReusingBlob() to check if we can reuse the committed +// layer. +func (s *storageImageDestination) tryReusingBlobFromOtherProcess(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, done chan bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + copiedByAnotherProcess := false + select { + case <-done: + case <-time.After(200 * time.Millisecond): + logrus.Debugf("blob %q is being copied by another process", blobinfo.Digest) + copiedByAnotherProcess = true + if bar != nil { + bar = bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, "blob"), + blobinfo.Size, + progress.BarOptions{ + StaticMessage: "paused: being copied by another process", + RemoveOnCompletion: true, + }) + } + // Wait until we acquired the lock or encountered an error. + <-done + } + + // Now, we own the lock. + // + // In case another process copied the blob, we should attempt reusing the + // blob. If it's not available, either the copy-detection heuristic failed + // (i.e., waiting 200 ms was not enough) or the other process failed to copy + // the blob. + if copiedByAnotherProcess { + reusable, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, false) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't resuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if reusable { + logrus.Debugf("another process successfully copied blob %q", blobinfo.Digest) + if bar != nil { + bar = bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, "blob"), + blobinfo.Size, + progress.BarOptions{ + StaticMessage: "done: copied by another process", + }) + } + } else { + logrus.Debugf("another process must have failed copying blob %q", blobinfo.Digest) + } + return reusable, blob, err + } + + return false, types.BlobInfo{}, nil +} + +// getPreviousLayerID returns the layer ID of the previous layer of +// layerIndexInImage. It might wait until the previous layer ID has been +// computed by another goroutine. +// Note that it returns "" if layerIndexInImage <= 0. +func (s *storageImageDestination) getPreviousLayerID(layerIndexInImage int) (string, error) { + if layerIndexInImage <= 0 { + return "", nil + } + + channel := s.getChannelForLayer(layerIndexInImage - 1) + if committed := <-channel; !committed { + err := fmt.Errorf("committing parent layer %d failed", layerIndexInImage-1) + return "", err + } + + s.putBlobMutex.Lock() + previousID, ok := s.indexToStorageID[layerIndexInImage-1] + s.putBlobMutex.Unlock() + if !ok { + return "", fmt.Errorf("could not find parent layer ID for layer %d", layerIndexInImage) + } + + return previousID, nil +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. +// The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. -func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool, bar *progress.Bar) (blob types.BlobInfo, err error) { + // Deferred call to an anonymous func to signal potentially waiting + // goroutines via the index-specific channel. + if layerIndexInImage >= 0 { + defer func() { + // It's a buffered channel, so we don't wait for the message to be + // received + channel := s.getChannelForLayer(layerIndexInImage) + channel <- err == nil + if err != nil { + logrus.Debugf("error while committing blob %d: %v", layerIndexInImage, err) + } + + }() + } + + // Check if another process is already copying the blob. Please refer to the + // doc of tryReusingBlobFromOtherProcess for further information. + if layerIndexInImage >= 0 { + // The reasoning for getting the locker here is to make the code paths + // of locking and unlocking it more obvious and simple. + locker, err := s.imageRef.transport.store.GetDigestLock(blobinfo.Digest) + if err != nil { + return types.BlobInfo{}, errors.Wrapf(err, "error acquiring lock for blob %q", blobinfo.Digest) + } + done := make(chan bool, 1) + defer locker.Unlock() + go func() { + locker.RecursiveLock() + done <- true + }() + reusable, blob, err := s.tryReusingBlobFromOtherProcess(ctx, stream, blobinfo, layerIndexInImage, cache, done, bar) + if err != nil { + return blob, err + } + if reusable { + return blob, nil + } + } + + if bar != nil { + kind := "blob" + message := "" + if isConfig { + kind = "config" + // Setting the StaticMessage for the config avoids displaying a + // progress bar. Configs are comparatively small and quickly + // downloaded, such that displaying a progress is more a distraction + // than an indicator. + message = " " + } + bar = bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, kind), + blobinfo.Size, + progress.BarOptions{ + StaticMessage: message, + OnCompletionMessage: "done", + }) + stream = bar.ProxyReader(stream) + } + + blob, err = s.putBlob(ctx, stream, blobinfo, layerIndexInImage, cache, isConfig) + if err != nil { + return types.BlobInfo{}, err + } + + if isConfig { + return blob, err + } + + // First, wait for the previous layer to be committed + previousID := "" + if layerIndexInImage > 0 { + var err error + previousID, err = s.getPreviousLayerID(layerIndexInImage) + if err != nil { + return types.BlobInfo{}, err + } + } + + // Commit the blob + id, err := s.commitBlob(ctx, blob, previousID) + if err != nil { + return types.BlobInfo{}, err + } + if layerIndexInImage >= 0 { + s.putBlobMutex.Lock() + s.indexToStorageID[layerIndexInImage] = id + s.putBlobMutex.Unlock() + } + + return blob, nil +} + +func (s *storageImageDestination) putBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { // Stores a layer or data blob in our temporary directory, checking that any information // in the blobinfo matches the incoming data. errorBlobInfo := types.BlobInfo{ @@ -425,86 +625,160 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, } // This is safe because we have just computed both values ourselves. cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest()) - return types.BlobInfo{ + blob := types.BlobInfo{ Digest: blobDigest, Size: blobSize, MediaType: blobinfo.MediaType, - }, nil + } + + return blob, nil } -// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination -// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). -// info.Digest must not be empty. -// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. -// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. -// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. -// May use and/or update cache. -func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - // lock the entire method as it executes fairly quickly - s.putBlobMutex.Lock() - defer s.putBlobMutex.Unlock() - if blobinfo.Digest == "" { - return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`) +// tryCopyingLayer looks up a matching layer for the blob in the storage and +// creates a new layer for it. It returns false if no matching layer has been +// found in the storage. Callers are expected to decide if that's an error or +// not. Note that layerID and previousID must either be left empty or both be +// specified. +func (s *storageImageDestination) tryCopyingLayer(ctx context.Context, blob types.BlobInfo, layerIndexInImage int, layerID, previousID string, byCompressed, byUncompressed bool) (string, types.BlobInfo, error) { + if !byCompressed && !byUncompressed { + return "", types.BlobInfo{}, errors.New("Internal error: copyLayer without compressed and uncompressed argument") } - if err := blobinfo.Digest.Validate(); err != nil { - return false, types.BlobInfo{}, errors.Wrapf(err, `Can not check for a blob with invalid digest`) + + // Check if we previously cached a file with that blob's contents. If we + // didn't, then we need to read the desired contents from a layer. + // + // TODO: the lookup here is only required when being called for an entirely + // new layer or reused ones. Maybe we should split that into separate + // functions?bool + s.putBlobMutex.Lock() + filename, ok := s.filenames[blob.Digest] + s.putBlobMutex.Unlock() + + id := "" + if !ok { + // Try to find the layer with contents matching that blobsum. + layer := "" + if byUncompressed { + layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) + if err == nil && len(layers) > 0 { + layer = layers[0].ID + id = layers[0].UncompressedDigest.Hex() + } + } + if byCompressed && layer == "" { + layers, err := s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) + if err == nil && len(layers) > 0 { + layer = layers[0].ID + id = layers[0].UncompressedDigest.Hex() + } + } + // No layer with a matching digest found, so there's nothing to copy. + if layer == "" { + logrus.Debugf("no matching layer found for blob %q", blob.Digest.String()) + return "", types.BlobInfo{}, nil + } + + // Read the layer's contents. + noCompression := archive.Uncompressed + diffOptions := &storage.DiffOptions{ + Compression: &noCompression, + } + diff, err := s.imageRef.transport.store.Diff("", layer, diffOptions) + if err != nil { + return "", types.BlobInfo{}, errors.Wrapf(err, "error reading layer %q for blob %q", layer, blob.Digest) + } + // Copy the layer diff to a file. Diff() takes a lock that it holds + // until the ReadCloser that it returns is closed, and PutLayer() wants + // the same lock, so the diff can't just be directly streamed from one + // to the other. + filename = s.computeNextBlobCacheFile() + file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) + if err != nil { + diff.Close() + return "", types.BlobInfo{}, errors.Wrapf(err, "error creating temporary file %q", filename) + } + // Copy the data to the file. + // TODO: This can take quite some time, and should ideally be cancellable using + // ctx.Done(). + _, err = io.Copy(file, diff) + diff.Close() + file.Close() + if err != nil { + return "", types.BlobInfo{}, errors.Wrapf(err, "error storing blob to file %q", filename) + } } - // Check if we've already cached it in a file. - if size, ok := s.fileSizes[blobinfo.Digest]; ok { - return true, types.BlobInfo{ - Digest: blobinfo.Digest, - Size: size, - MediaType: blobinfo.MediaType, - }, nil + // Read the cached blob and use it as a diff. + file, err := os.Open(filename) + if err != nil { + return "", types.BlobInfo{}, errors.Wrapf(err, "error opening file %q", filename) } + defer file.Close() - // Check if we have a wasn't-compressed layer in storage that's based on that blob. - layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(blobinfo.Digest) - if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + // If layerID is provided, use it. + if layerID != "" { + id = layerID + } + // If no previousID is provided, wait for it to be computed. + if previousID == "" { + var err error + // Wait for the previous layer to be computed. + previousID, err = s.getPreviousLayerID(layerIndexInImage) + if err != nil { + return "", types.BlobInfo{}, err + } + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + id)).Hex() + } } - if len(layers) > 0 { - // Save this for completeness. - s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest - return true, types.BlobInfo{ - Digest: blobinfo.Digest, - Size: layers[0].UncompressedSize, - MediaType: blobinfo.MediaType, - }, nil + + // Build the new layer using the diff, regardless of where it came from. + // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). + layer, _, err := s.imageRef.transport.store.PutLayer(id, previousID, nil, "", false, nil, file) + if err != nil && errors.Cause(err) != storage.ErrDuplicateID { + return "", types.BlobInfo{}, errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) } - // Check if we have a was-compressed layer in storage that's based on that blob. - layers, err = s.imageRef.transport.store.LayersByCompressedDigest(blobinfo.Digest) - if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for compressed layers with digest %q`, blobinfo.Digest) + // Record all data at once to avoid taking the lock too many times. + s.putBlobMutex.Lock() + s.indexToStorageID[layerIndexInImage] = layer.ID + s.blobDiffIDs[blob.Digest] = layer.UncompressedDigest + s.filenames[blob.Digest] = filename + s.putBlobMutex.Unlock() + + return layer.ID, blob, nil +} + +// tryReusingBlob is a helper method for TryReusingBlob to wrap it +func (s *storageImageDestination) tryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { + // lock the entire method as it executes fairly quickly + if blobinfo.Digest == "" { + return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`) } - if len(layers) > 0 { - // Record the uncompressed value so that we can use it to calculate layer IDs. - s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest - return true, types.BlobInfo{ - Digest: blobinfo.Digest, - Size: layers[0].CompressedSize, - MediaType: blobinfo.MediaType, - }, nil + if err := blobinfo.Digest.Validate(); err != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, `Can not check for a blob with invalid digest`) + } + + // Check if we can copy an already existing {un}compressed layer. + if layerID, blob, err := s.tryCopyingLayer(ctx, blobinfo, layerIndexInImage, "", "", true, true); err != nil { + return false, blob, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + } else if layerID != "" { + return true, blob, nil } - // Does the blob correspond to a known DiffID which we already have available? - // Because we must return the size, which is unknown for unavailable compressed blobs, the returned BlobInfo refers to the + // Does the blob correspond to a known DiffID which we already have + // available? Because we must return the size, which is unknown for + // unavailable compressed blobs, the returned BlobInfo refers to the // uncompressed layer, and that can happen only if canSubstitute. if canSubstitute { - if uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest { - layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(uncompressedDigest) - if err != nil && errors.Cause(err) != storage.ErrLayerUnknown { - return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for layers with digest %q`, uncompressedDigest) - } - if len(layers) > 0 { - s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest - return true, types.BlobInfo{ - Digest: uncompressedDigest, - Size: layers[0].UncompressedSize, - MediaType: blobinfo.MediaType, - }, nil + uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest) + if uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest { + // Check if we can copy an existing uncompressed layer. + blobinfo.Digest = uncompressedDigest + if layerID, blob, err := s.tryCopyingLayer(ctx, blobinfo, layerIndexInImage, "", "", false, true); err != nil { + return false, blob, errors.Wrapf(err, `Error looking for layers with digest %q`, blobinfo.Digest) + } else if layerID != "" { + return true, blob, nil } } } @@ -513,6 +787,40 @@ func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo t return false, types.BlobInfo{}, nil } +// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of +// PutBlob() and TryReusingBlob() where the layers must be written to the backend storage in sequential order. A value >= indicates that the blob a layer. +// Note that only the containers-storage destination is sensitive to the layerIndexInImage parameter. Other transport destinations ignore it. +// Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. +// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +// May use and/or update cache. +func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, layerIndexInImage int, cache types.BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, types.BlobInfo, error) { + reused, blob, err := s.tryReusingBlob(ctx, blobinfo, layerIndexInImage, cache, canSubstitute) + // If we can reuse the blob or hit an error trying to do so, we need to + // signal the result through the channel as another Goroutine is potentially + // waiting for it. If we can't reuse the blob and encountered no error, we + // need to copy it and will send the signal in PutBlob(). + if (layerIndexInImage >= 0) && (err != nil || reused) { + defer func() { // Defer to be **very** sure to always execute the code. + channel := s.getChannelForLayer(layerIndexInImage) + channel <- (err == nil) + }() + } + if bar != nil && reused { + bar.ReplaceBar( + progress.DigestToCopyAction(blobinfo.Digest, "blob"), + 0, + progress.BarOptions{ + StaticMessage: "skipped: already exists", + }) + } + return reused, blob, err +} + // computeID computes a recommended image ID based on information we have so far. If // the manifest is not of a type that we recognize, we return an empty value, indicating // that since we don't have a recommendation, a random ID should be used if one needs @@ -572,6 +880,42 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, errors.New("blob not found") } +// commitBlobs commits the specified blob to the storage. +func (s *storageImageDestination) commitBlob(ctx context.Context, blob types.BlobInfo, previousID string) (string, error) { + logrus.Debugf("committing blob %q", blob.Digest) + // Check if there's already a layer with the ID that we'd give to the result of applying + // this layer blob to its parent, if it has one, or the blob's hex value otherwise. + s.putBlobMutex.Lock() + diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + s.putBlobMutex.Unlock() + if !haveDiffID { + return "", errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) + } + + id := diffID.Hex() + if previousID != "" { + id = digest.Canonical.FromBytes([]byte(previousID + "+" + diffID.Hex())).Hex() + } + if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { + logrus.Debugf("layer for blob %q already found in storage", blob.Digest.String()) + return layer.ID, nil + } + + layerID, _, err := s.tryCopyingLayer(ctx, blob, -1, id, previousID, true, true) + if err != nil { + return "", err + } + if layerID == "" { + return "", fmt.Errorf("Internal error: could not locate layer for blob %q", blob.Digest.String()) + } + + return layerID, nil +} + +// Commit marks the process of storing the image as successful and asks for the image to be persisted. +// WARNING: This does not have any transactional semantics: +// - Uploaded data MAY be visible to others before Commit() is called +// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context) error { // Find the list of layer blobs. if len(s.manifest) == 0 { @@ -581,6 +925,7 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if err != nil { return errors.Wrapf(err, "error parsing manifest") } + layerBlobs := man.LayerInfos() // Extract or find the layers. lastLayer := "" @@ -588,102 +933,36 @@ func (s *storageImageDestination) Commit(ctx context.Context) error { if blob.EmptyLayer { continue } - - // Check if there's already a layer with the ID that we'd give to the result of applying - // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - diffID, haveDiffID := s.blobDiffIDs[blob.Digest] + _, haveDiffID := s.blobDiffIDs[blob.Digest] if !haveDiffID { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), // or to even check if we had it. // Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller - // that relies on using a blob digest that has never been seeen by the store had better call + // that relies on using a blob digest that has never been seen by the store had better call // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). logrus.Debugf("looking for diffID for blob %+v", blob.Digest) - has, _, err := s.TryReusingBlob(ctx, blob.BlobInfo, none.NoCache, false) + has, _, err := s.tryReusingBlob(ctx, blob.BlobInfo, -1, none.NoCache, false) if err != nil { return errors.Wrapf(err, "error checking for a layer based on blob %q", blob.Digest.String()) } if !has { return errors.Errorf("error determining uncompressed digest for blob %q", blob.Digest.String()) } - diffID, haveDiffID = s.blobDiffIDs[blob.Digest] + _, haveDiffID = s.blobDiffIDs[blob.Digest] if !haveDiffID { return errors.Errorf("we have blob %q, but don't know its uncompressed digest", blob.Digest.String()) } } - id := diffID.Hex() - if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffID.Hex())).Hex() - } - if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { - // There's already a layer that should have the right contents, just reuse it. - lastLayer = layer.ID - continue - } - // Check if we previously cached a file with that blob's contents. If we didn't, - // then we need to read the desired contents from a layer. - filename, ok := s.filenames[blob.Digest] - if !ok { - // Try to find the layer with contents matching that blobsum. - layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } else { - layers, err2 = s.imageRef.transport.store.LayersByCompressedDigest(blob.Digest) - if err2 == nil && len(layers) > 0 { - layer = layers[0].ID - } - } - if layer == "" { - return errors.Wrapf(err2, "error locating layer for blob %q", blob.Digest) - } - // Read the layer's contents. - noCompression := archive.Uncompressed - diffOptions := &storage.DiffOptions{ - Compression: &noCompression, - } - diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) - if err2 != nil { - return errors.Wrapf(err2, "error reading layer %q for blob %q", layer, blob.Digest) - } - // Copy the layer diff to a file. Diff() takes a lock that it holds - // until the ReadCloser that it returns is closed, and PutLayer() wants - // the same lock, so the diff can't just be directly streamed from one - // to the other. - filename = s.computeNextBlobCacheFile() - file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0600) - if err != nil { - diff.Close() - return errors.Wrapf(err, "error creating temporary file %q", filename) - } - // Copy the data to the file. - // TODO: This can take quite some time, and should ideally be cancellable using - // ctx.Done(). - _, err = io.Copy(file, diff) - diff.Close() - file.Close() - if err != nil { - return errors.Wrapf(err, "error storing blob to file %q", filename) - } - // Make sure that we can find this file later, should we need the layer's - // contents again. - s.filenames[blob.Digest] = filename - } - // Read the cached blob and use it as a diff. - file, err := os.Open(filename) + newID, err := s.commitBlob(ctx, blob.BlobInfo, lastLayer) if err != nil { - return errors.Wrapf(err, "error opening file %q", filename) - } - defer file.Close() - // Build the new layer using the diff, regardless of where it came from. - // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). - layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, nil, file) - if err != nil && errors.Cause(err) != storage.ErrDuplicateID { - return errors.Wrapf(err, "error adding layer with blob %q", blob.Digest) + return err } - lastLayer = layer.ID + lastLayer = newID + } + + if lastLayer == "" { + return fmt.Errorf("image does not contain a non-empty or non-throwaway layer") } // If one of those blobs was a configuration blob, then we can try to dig out the date when the image diff --git a/vendor/github.com/containers/image/types/types.go b/vendor/github.com/containers/image/types/types.go index 78950434819..7cf4c381411 100644 --- a/vendor/github.com/containers/image/types/types.go +++ b/vendor/github.com/containers/image/types/types.go @@ -6,8 +6,9 @@ import ( "time" "github.com/containers/image/docker/reference" + "github.com/containers/image/pkg/progress" "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go/v1" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) // ImageTransport is a top-level namespace for ways to to store/load an image. @@ -262,20 +263,27 @@ type ImageDestination interface { // inputInfo.Size is the expected length of stream, if known. // inputInfo.MediaType describes the blob format, if known. // May update cache. + // layerIndexInImage must be properly set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. + // The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. + // Same applies to bar, which is used in the containers-storage destination to update the progress bars displayed in the terminal. If it's nil, it will be ignored. // WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available // to any other readers for download using the supplied digest. // If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. - PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, cache BlobInfoCache, isConfig bool) (BlobInfo, error) + PutBlob(ctx context.Context, stream io.Reader, inputInfo BlobInfo, layerIndexInImage int, cache BlobInfoCache, isConfig bool, bar *progress.Bar) (BlobInfo, error) // HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently. HasThreadSafePutBlob() bool // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. - // If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input. - // If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size. + // layerIndexInImage is set to the layer index of the corresponding blob in the image. This value is required to allow parallel executions of + // PutBlob() and TryReusingBlob() where the layers must be written to the destination in sequential order. A value >= 0 indicates that the blob is a layer. + // The bar can optionally be specified to allow replacing/updating it. Note that only the containers-storage transport updates the bar; other transports ignore it. + // If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size. // If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. // May use and/or update cache. - TryReusingBlob(ctx context.Context, info BlobInfo, cache BlobInfoCache, canSubstitute bool) (bool, BlobInfo, error) + TryReusingBlob(ctx context.Context, info BlobInfo, layerIndexInImage int, cache BlobInfoCache, canSubstitute bool, bar *progress.Bar) (bool, BlobInfo, error) // PutManifest writes manifest to the destination. // FIXME? This should also receive a MIME type if known, to differentiate between schema versions. // If the destination is in principle available, refuses this manifest type (e.g. it does not recognize the schema), diff --git a/vendor/github.com/containers/image/vendor.conf b/vendor/github.com/containers/image/vendor.conf index 438cab17aec..f8ad0f2496c 100644 --- a/vendor/github.com/containers/image/vendor.conf +++ b/vendor/github.com/containers/image/vendor.conf @@ -46,6 +46,6 @@ github.com/etcd-io/bbolt v1.3.2 github.com/klauspost/pgzip v1.2.1 github.com/klauspost/compress v1.4.1 github.com/klauspost/cpuid v1.2.0 -github.com/vbauerster/mpb v3.3.4 +github.com/vbauerster/mpb v3.4.0 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1 diff --git a/vendor/github.com/containers/storage/containers.go b/vendor/github.com/containers/storage/containers.go index bbac78b60b4..e695523613b 100644 --- a/vendor/github.com/containers/storage/containers.go +++ b/vendor/github.com/containers/storage/containers.go @@ -572,6 +572,10 @@ func (r *containerStore) Lock() { r.lockfile.Lock() } +func (r *containerStore) RecursiveLock() { + r.lockfile.RecursiveLock() +} + func (r *containerStore) RLock() { r.lockfile.RLock() } diff --git a/vendor/github.com/containers/storage/images.go b/vendor/github.com/containers/storage/images.go index 38b5a3ef3b6..a19227f64e3 100644 --- a/vendor/github.com/containers/storage/images.go +++ b/vendor/github.com/containers/storage/images.go @@ -739,6 +739,10 @@ func (r *imageStore) Lock() { r.lockfile.Lock() } +func (r *imageStore) RecursiveLock() { + r.lockfile.RecursiveLock() +} + func (r *imageStore) RLock() { r.lockfile.RLock() } diff --git a/vendor/github.com/containers/storage/layers.go b/vendor/github.com/containers/storage/layers.go index a35dd476b8c..35a0bae2dd4 100644 --- a/vendor/github.com/containers/storage/layers.go +++ b/vendor/github.com/containers/storage/layers.go @@ -1304,6 +1304,10 @@ func (r *layerStore) Lock() { r.lockfile.Lock() } +func (r *layerStore) RecursiveLock() { + r.lockfile.RecursiveLock() +} + func (r *layerStore) RLock() { r.lockfile.RLock() } diff --git a/vendor/github.com/containers/storage/layers_ffjson.go b/vendor/github.com/containers/storage/layers_ffjson.go index 09b5d0f33e5..125b5d8c971 100644 --- a/vendor/github.com/containers/storage/layers_ffjson.go +++ b/vendor/github.com/containers/storage/layers_ffjson.go @@ -1,5 +1,5 @@ // Code generated by ffjson . DO NOT EDIT. -// source: ./layers.go +// source: layers.go package storage diff --git a/vendor/github.com/containers/storage/lockfile.go b/vendor/github.com/containers/storage/lockfile.go index ed8753337f1..c4f1b5549d0 100644 --- a/vendor/github.com/containers/storage/lockfile.go +++ b/vendor/github.com/containers/storage/lockfile.go @@ -15,6 +15,10 @@ type Locker interface { // Acquire a writer lock. Lock() + // Acquire a writer lock recursively, allowing for recursive acquisitions + // within the same process space. + RecursiveLock() + // Unlock the lock. Unlock() diff --git a/vendor/github.com/containers/storage/lockfile_unix.go b/vendor/github.com/containers/storage/lockfile_unix.go index 8e0f22cb5ca..0309f52d0ed 100644 --- a/vendor/github.com/containers/storage/lockfile_unix.go +++ b/vendor/github.com/containers/storage/lockfile_unix.go @@ -25,6 +25,7 @@ type lockfile struct { locktype int16 locked bool ro bool + recursive bool } // openLock opens the file at path and returns the corresponding file @@ -75,7 +76,7 @@ func createLockerForPath(path string, ro bool) (Locker, error) { // lock locks the lockfile via FCTNL(2) based on the specified type and // command. -func (l *lockfile) lock(l_type int16) { +func (l *lockfile) lock(l_type int16, recursive bool) { lk := unix.Flock_t{ Type: l_type, Whence: int16(os.SEEK_SET), @@ -86,7 +87,13 @@ func (l *lockfile) lock(l_type int16) { case unix.F_RDLCK: l.rwMutex.RLock() case unix.F_WRLCK: - l.rwMutex.Lock() + if recursive { + // NOTE: that's okay as recursive is only set in RecursiveLock(), so + // there's no need to protect against hypothetical RDLCK cases. + l.rwMutex.RLock() + } else { + l.rwMutex.Lock() + } default: panic(fmt.Sprintf("attempted to acquire a file lock of unrecognized type %d", l_type)) } @@ -110,6 +117,7 @@ func (l *lockfile) lock(l_type int16) { } l.locktype = l_type l.locked = true + l.recursive = recursive l.counter++ } @@ -119,13 +127,24 @@ func (l *lockfile) Lock() { if l.ro { l.RLock() } else { - l.lock(unix.F_WRLCK) + l.lock(unix.F_WRLCK, false) + } +} + +// RecursiveLock locks the lockfile as a writer but allows for recursive +// acquisitions within the same process space. Note that RLock() will be called +// if it's a lockTypReader lock. +func (l *lockfile) RecursiveLock() { + if l.ro { + l.RLock() + } else { + l.lock(unix.F_WRLCK, true) } } // LockRead locks the lockfile as a reader. func (l *lockfile) RLock() { - l.lock(unix.F_RDLCK) + l.lock(unix.F_RDLCK, false) } // Unlock unlocks the lockfile. @@ -161,7 +180,7 @@ func (l *lockfile) Unlock() { // Close the file descriptor on the last unlock. unix.Close(int(l.fd)) } - if l.locktype == unix.F_RDLCK { + if l.locktype == unix.F_RDLCK || l.recursive { l.rwMutex.RUnlock() } else { l.rwMutex.Unlock() diff --git a/vendor/github.com/containers/storage/lockfile_windows.go b/vendor/github.com/containers/storage/lockfile_windows.go index c02069495c2..caf7c184adb 100644 --- a/vendor/github.com/containers/storage/lockfile_windows.go +++ b/vendor/github.com/containers/storage/lockfile_windows.go @@ -36,6 +36,12 @@ func (l *lockfile) Lock() { l.locked = true } +func (l *lockfile) RecursiveLock() { + // We don't support Windows but a recursive writer-lock in one process-space + // is really a writer lock, so just panic. + panic("not supported") +} + func (l *lockfile) RLock() { l.mu.Lock() l.locked = true diff --git a/vendor/github.com/vbauerster/mpb/README.md b/vendor/github.com/vbauerster/mpb/README.md index 9b760647e56..f96857c4767 100644 --- a/vendor/github.com/vbauerster/mpb/README.md +++ b/vendor/github.com/vbauerster/mpb/README.md @@ -31,8 +31,6 @@ _Note:_ it is preferable to go get from github.com, rather than gopkg.in. See is p := mpb.New( // override default (80) width mpb.WithWidth(64), - // override default "[=>-]" format - mpb.WithFormat("╢▌▌░╟"), // override default 120ms refresh rate mpb.WithRefreshRate(180*time.Millisecond), ) @@ -41,6 +39,8 @@ _Note:_ it is preferable to go get from github.com, rather than gopkg.in. See is name := "Single Bar:" // adding a single bar bar := p.AddBar(int64(total), + // override default "[=>-]" style + mpb.BarStyle("╢▌▌░╟"), mpb.PrependDecorators( // display our name with one space on the right decor.Name(name, decor.WC{W: len(name) + 1, C: decor.DidentRight}), diff --git a/vendor/github.com/vbauerster/mpb/bar.go b/vendor/github.com/vbauerster/mpb/bar.go index 5a506fc84cf..a304a87cb7d 100644 --- a/vendor/github.com/vbauerster/mpb/bar.go +++ b/vendor/github.com/vbauerster/mpb/bar.go @@ -2,6 +2,7 @@ package mpb import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -11,21 +12,8 @@ import ( "unicode/utf8" "github.com/vbauerster/mpb/decor" - "github.com/vbauerster/mpb/internal" ) -const ( - rLeft = iota - rFill - rTip - rEmpty - rRight -) - -const formatLen = 5 - -type barRunes [formatLen]rune - // Bar represents a progress Bar type Bar struct { priority int @@ -45,15 +33,30 @@ type Bar struct { shutdown chan struct{} } +// Filler interface. +// Bar renders by calling Filler's Fill method. You can literally have +// any bar kind, by implementing this interface and passing it to the +// Add method. +type Filler interface { + Fill(w io.Writer, width int, s *decor.Statistics) +} + +// FillerFunc is function type adapter to convert function into Filler. +type FillerFunc func(w io.Writer, width int, stat *decor.Statistics) + +func (f FillerFunc) Fill(w io.Writer, width int, stat *decor.Statistics) { + f(w, width, stat) +} + type ( bState struct { + filler Filler id int width int + alignment int total int64 current int64 - runes barRunes - trimLeftSpace bool - trimRightSpace bool + trimSpace bool toComplete bool removeOnComplete bool barClearOnComplete bool @@ -73,8 +76,8 @@ type ( runningBar *Bar } refill struct { - char rune - till int64 + r rune + limit int64 } frameReader struct { io.Reader @@ -84,14 +87,20 @@ type ( } ) -func newBar(wg *sync.WaitGroup, id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar { - if total <= 0 { - total = time.Now().Unix() - } +func newBar( + ctx context.Context, + wg *sync.WaitGroup, + filler Filler, + id, width int, + total int64, + options ...BarOption, +) *Bar { s := &bState{ + filler: filler, id: id, priority: id, + width: width, total: total, } @@ -104,6 +113,9 @@ func newBar(wg *sync.WaitGroup, id int, total int64, cancel <-chan struct{}, opt s.bufP = bytes.NewBuffer(make([]byte, 0, s.width)) s.bufB = bytes.NewBuffer(make([]byte, 0, s.width)) s.bufA = bytes.NewBuffer(make([]byte, 0, s.width)) + if s.newLineExtendFn != nil { + s.bufNL = bytes.NewBuffer(make([]byte, 0, s.width)) + } b := &Bar{ priority: s.priority, @@ -121,11 +133,7 @@ func newBar(wg *sync.WaitGroup, id int, total int64, cancel <-chan struct{}, opt b.priority = b.runningBar.priority } - if s.newLineExtendFn != nil { - s.bufNL = bytes.NewBuffer(make([]byte, 0, s.width)) - } - - go b.serve(wg, s, cancel) + go b.serve(ctx, wg, s) return b } @@ -178,52 +186,42 @@ func (b *Bar) Current() int64 { } // SetTotal sets total dynamically. -// Set final to true, when total is known, it will trigger bar complete event. -func (b *Bar) SetTotal(total int64, final bool) bool { +// Set complete to true, to trigger bar complete event now. +func (b *Bar) SetTotal(total int64, complete bool) { select { case b.operateState <- func(s *bState) { - if total > 0 { - s.total = total - } - if final { + s.total = total + if complete && !s.toComplete { s.current = s.total s.toComplete = true } }: - return true case <-b.done: - return false } } -// SetRefill sets fill rune to r, up until n. -func (b *Bar) SetRefill(n int, r rune) { - if n <= 0 { - return - } +// SetRefill sets refill, if supported by underlying Filler. +func (b *Bar) SetRefill(amount int64) { b.operateState <- func(s *bState) { - s.refill = &refill{r, int64(n)} + if f, ok := s.filler.(interface{ SetRefill(int64) }); ok { + f.SetRefill(amount) + } } } -// RefillBy is deprecated, use SetRefill -func (b *Bar) RefillBy(n int, r rune) { - b.SetRefill(n, r) -} - // Increment is a shorthand for b.IncrBy(1). func (b *Bar) Increment() { b.IncrBy(1) } // IncrBy increments progress bar by amount of n. -// wdd is optional work duration i.e. time.Since(start), -// which expected to be provided, if any ewma based decorator is used. +// wdd is optional work duration i.e. time.Since(start), which expected +// to be provided, if any ewma based decorator is used. func (b *Bar) IncrBy(n int, wdd ...time.Duration) { select { case b.operateState <- func(s *bState) { s.current += int64(n) - if s.current >= s.total { + if s.total > 0 && s.current >= s.total { s.current = s.total s.toComplete = true } @@ -238,9 +236,9 @@ func (b *Bar) IncrBy(n int, wdd ...time.Duration) { // Completed reports whether the bar is in completed state. func (b *Bar) Completed() bool { // omit select here, because primary usage of the method is for loop - // condition, like for !bar.Completed() {...} - // so when toComplete=true it is called once (at which time, the bar is still alive), - // then quits the loop and never suppose to be called afterwards. + // condition, like for !bar.Completed() {...} so when toComplete=true + // it is called once (at which time, the bar is still alive), then + // quits the loop and never suppose to be called afterwards. return <-b.boolCh } @@ -253,8 +251,9 @@ func (b *Bar) wSyncTable() [][]chan int { } } -func (b *Bar) serve(wg *sync.WaitGroup, s *bState, cancel <-chan struct{}) { +func (b *Bar) serve(ctx context.Context, wg *sync.WaitGroup, s *bState) { defer wg.Done() + cancel := ctx.Done() for { select { case op := <-b.operateState: @@ -322,8 +321,6 @@ func (b *Bar) render(debugOut io.Writer, tw int) { } func (s *bState) draw(termWidth int) io.Reader { - defer s.bufA.WriteByte('\n') - if s.panicMsg != "" { return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", termWidth), s.panicMsg)) } @@ -338,77 +335,32 @@ func (s *bState) draw(termWidth int) io.Reader { s.bufA.WriteString(d.Decor(stat)) } - prependCount := utf8.RuneCount(s.bufP.Bytes()) - appendCount := utf8.RuneCount(s.bufA.Bytes()) - if s.barClearOnComplete && s.completeFlushed { + s.bufA.WriteByte('\n') return io.MultiReader(s.bufP, s.bufA) } - s.fillBar(s.width) - barCount := utf8.RuneCount(s.bufB.Bytes()) - totalCount := prependCount + barCount + appendCount - if spaceCount := 0; totalCount > termWidth { - if !s.trimLeftSpace { - spaceCount++ - } - if !s.trimRightSpace { - spaceCount++ - } - s.fillBar(termWidth - prependCount - appendCount - spaceCount) - } - - return io.MultiReader(s.bufP, s.bufB, s.bufA) -} - -func (s *bState) fillBar(width int) { - defer func() { - s.bufB.WriteRune(s.runes[rRight]) - if !s.trimRightSpace { - s.bufB.WriteByte(' ') - } - }() + prependCount := utf8.RuneCount(s.bufP.Bytes()) + appendCount := utf8.RuneCount(s.bufA.Bytes()) - s.bufB.Reset() - if !s.trimLeftSpace { + if !s.trimSpace { + // reserve space for edge spaces + termWidth -= 2 s.bufB.WriteByte(' ') } - s.bufB.WriteRune(s.runes[rLeft]) - if width <= 2 { - return - } - - // bar s.width without leftEnd and rightEnd runes - barWidth := width - 2 - - completedWidth := internal.Percentage(s.total, s.current, int64(barWidth)) - if s.refill != nil { - till := internal.Percentage(s.total, s.refill.till, int64(barWidth)) - // append refill rune - var i int64 - for i = 0; i < till; i++ { - s.bufB.WriteRune(s.refill.char) - } - for i = till; i < completedWidth; i++ { - s.bufB.WriteRune(s.runes[rFill]) - } + if prependCount+s.width+appendCount > termWidth { + s.filler.Fill(s.bufB, termWidth-prependCount-appendCount, stat) } else { - var i int64 - for i = 0; i < completedWidth; i++ { - s.bufB.WriteRune(s.runes[rFill]) - } + s.filler.Fill(s.bufB, s.width, stat) } - if completedWidth < int64(barWidth) && completedWidth > 0 { - _, size := utf8.DecodeLastRune(s.bufB.Bytes()) - s.bufB.Truncate(s.bufB.Len() - size) - s.bufB.WriteRune(s.runes[rTip]) + if !s.trimSpace { + s.bufB.WriteByte(' ') } - for i := completedWidth; i < int64(barWidth); i++ { - s.bufB.WriteRune(s.runes[rEmpty]) - } + s.bufA.WriteByte('\n') + return io.MultiReader(s.bufP, s.bufB, s.bufA) } func (s *bState) wSyncTable() [][]chan int { @@ -442,14 +394,6 @@ func newStatistics(s *bState) *decor.Statistics { } } -func strToBarRunes(format string) (array barRunes) { - for i, n := 0, 0; len(format) > 0; i++ { - array[i], n = utf8.DecodeRuneInString(format) - format = format[n:] - } - return -} - func countLines(b []byte) int { return bytes.Count(b, []byte("\n")) } diff --git a/vendor/github.com/vbauerster/mpb/bar_filler.go b/vendor/github.com/vbauerster/mpb/bar_filler.go new file mode 100644 index 00000000000..4e9285ca526 --- /dev/null +++ b/vendor/github.com/vbauerster/mpb/bar_filler.go @@ -0,0 +1,111 @@ +package mpb + +import ( + "io" + "unicode/utf8" + + "github.com/vbauerster/mpb/decor" + "github.com/vbauerster/mpb/internal" +) + +const ( + rLeft = iota + rFill + rTip + rEmpty + rRight + rRevTip + rRefill +) + +var defaultBarStyle = "[=>-]<+" + +type barFiller struct { + format [][]byte + refillAmount int64 + reverse bool +} + +func newDefaultBarFiller() Filler { + bf := &barFiller{ + format: make([][]byte, utf8.RuneCountInString(defaultBarStyle)), + } + bf.setStyle(defaultBarStyle) + return bf +} + +func (s *barFiller) setStyle(style string) { + if !utf8.ValidString(style) { + return + } + src := make([][]byte, 0, utf8.RuneCountInString(style)) + for _, r := range style { + src = append(src, []byte(string(r))) + } + copy(s.format, src) +} + +func (s *barFiller) setReverse() { + s.reverse = true +} + +func (s *barFiller) SetRefill(amount int64) { + s.refillAmount = amount +} + +func (s *barFiller) Fill(w io.Writer, width int, stat *decor.Statistics) { + + // don't count rLeft and rRight [brackets] + width -= 2 + if width < 2 { + return + } + + w.Write(s.format[rLeft]) + if width == 2 { + w.Write(s.format[rRight]) + return + } + + bb := make([][]byte, width) + + cwidth := int(internal.Percentage(stat.Total, stat.Current, int64(width))) + + for i := 0; i < cwidth; i++ { + bb[i] = s.format[rFill] + } + + if s.refillAmount > 0 { + var rwidth int + if s.refillAmount > stat.Current { + rwidth = cwidth + } else { + rwidth = int(internal.Percentage(stat.Total, int64(s.refillAmount), int64(width))) + } + for i := 0; i < rwidth; i++ { + bb[i] = s.format[rRefill] + } + } + + if cwidth > 0 && cwidth < width { + bb[cwidth-1] = s.format[rTip] + } + + for i := cwidth; i < width; i++ { + bb[i] = s.format[rEmpty] + } + + if s.reverse { + if cwidth > 0 && cwidth < width { + bb[cwidth-1] = s.format[rRevTip] + } + for i := len(bb) - 1; i >= 0; i-- { + w.Write(bb[i]) + } + } else { + for i := 0; i < len(bb); i++ { + w.Write(bb[i]) + } + } + w.Write(s.format[rRight]) +} diff --git a/vendor/github.com/vbauerster/mpb/bar_option.go b/vendor/github.com/vbauerster/mpb/bar_option.go index e33bce4dabc..e9a4bd2a76a 100644 --- a/vendor/github.com/vbauerster/mpb/bar_option.go +++ b/vendor/github.com/vbauerster/mpb/bar_option.go @@ -6,11 +6,10 @@ import ( "github.com/vbauerster/mpb/decor" ) -// BarOption is a function option which changes the default behavior of a bar, -// if passed to p.AddBar(int64, ...BarOption) +// BarOption is a function option which changes the default behavior of a bar. type BarOption func(*bState) -// AppendDecorators let you inject decorators to the bar's right side +// AppendDecorators let you inject decorators to the bar's right side. func AppendDecorators(appenders ...decor.Decorator) BarOption { return func(s *bState) { for _, decorator := range appenders { @@ -25,7 +24,7 @@ func AppendDecorators(appenders ...decor.Decorator) BarOption { } } -// PrependDecorators let you inject decorators to the bar's left side +// PrependDecorators let you inject decorators to the bar's left side. func PrependDecorators(prependers ...decor.Decorator) BarOption { return func(s *bState) { for _, decorator := range prependers { @@ -40,85 +39,155 @@ func PrependDecorators(prependers ...decor.Decorator) BarOption { } } -// BarTrimLeft trims left side space of the bar -func BarTrimLeft() BarOption { - return func(s *bState) { - s.trimLeftSpace = true - } -} - -// BarTrimRight trims right space of the bar -func BarTrimRight() BarOption { - return func(s *bState) { - s.trimRightSpace = true - } -} - -// BarTrim trims both left and right spaces of the bar -func BarTrim() BarOption { +// BarID sets bar id. +func BarID(id int) BarOption { return func(s *bState) { - s.trimLeftSpace = true - s.trimRightSpace = true + s.id = id } } -// BarID overwrites internal bar id -func BarID(id int) BarOption { +// BarWidth sets bar width independent of the container. +func BarWidth(width int) BarOption { return func(s *bState) { - s.id = id + s.width = width } } -// BarRemoveOnComplete is a flag, if set whole bar line will be removed on complete event. -// If both BarRemoveOnComplete and BarClearOnComplete are set, first bar section gets cleared -// and then whole bar line gets removed completely. +// BarRemoveOnComplete is a flag, if set whole bar line will be removed +// on complete event. If both BarRemoveOnComplete and BarClearOnComplete +// are set, first bar section gets cleared and then whole bar line +// gets removed completely. func BarRemoveOnComplete() BarOption { return func(s *bState) { s.removeOnComplete = true } } -// BarReplaceOnComplete is indicator for delayed bar start, after the `runningBar` is complete. -// To achieve bar replacement effect, `runningBar` should has its `BarRemoveOnComplete` option set. +// BarReplaceOnComplete is indicator for delayed bar start, after the +// `runningBar` is complete. To achieve bar replacement effect, +// `runningBar` should has its `BarRemoveOnComplete` option set. func BarReplaceOnComplete(runningBar *Bar) BarOption { + return BarParkTo(runningBar) +} + +// BarParkTo same as BarReplaceOnComplete +func BarParkTo(runningBar *Bar) BarOption { return func(s *bState) { s.runningBar = runningBar } } -// BarClearOnComplete is a flag, if set will clear bar section on complete event. -// If you need to remove a whole bar line, refer to BarRemoveOnComplete. +// BarClearOnComplete is a flag, if set will clear bar section on +// complete event. If you need to remove a whole bar line, refer to +// BarRemoveOnComplete. func BarClearOnComplete() BarOption { return func(s *bState) { s.barClearOnComplete = true } } -// BarPriority sets bar's priority. -// Zero is highest priority, i.e. bar will be on top. -// If `BarReplaceOnComplete` option is supplied, this option is ignored. +// BarPriority sets bar's priority. Zero is highest priority, i.e. bar +// will be on top. If `BarReplaceOnComplete` option is supplied, this +// option is ignored. func BarPriority(priority int) BarOption { return func(s *bState) { s.priority = priority } } -// BarNewLineExtend takes user defined efn, which gets called each render cycle. -// Any write to provided writer of efn, will appear on new line of respective bar. +// BarNewLineExtend takes user defined efn, which gets called each +// render cycle. Any write to provided writer of efn, will appear on +// new line of respective bar. func BarNewLineExtend(efn func(io.Writer, *decor.Statistics)) BarOption { return func(s *bState) { s.newLineExtendFn = efn } } -func barWidth(w int) BarOption { +// TrimSpace trims bar's edge spaces. +func TrimSpace() BarOption { return func(s *bState) { - s.width = w + s.trimSpace = true + } +} + +// BarStyle sets custom bar style, default one is "[=>-]<+". +// +// '[' left bracket rune +// +// '=' fill rune +// +// '>' tip rune +// +// '-' empty rune +// +// ']' right bracket rune +// +// '<' reverse tip rune, used when BarReverse option is set +// +// '+' refill rune, used when *Bar.SetRefill(int64) is called +// +// It's ok to provide first five runes only, for example mpb.BarStyle("╢▌▌░╟") +func BarStyle(style string) BarOption { + chk := func(filler Filler) (interface{}, bool) { + if style == "" { + return nil, false + } + t, ok := filler.(*barFiller) + return t, ok + } + cb := func(t interface{}) { + t.(*barFiller).setStyle(style) + } + return MakeFillerTypeSpecificBarOption(chk, cb) +} + +// BarReverse reverse mode, bar will progress from right to left. +func BarReverse() BarOption { + chk := func(filler Filler) (interface{}, bool) { + t, ok := filler.(*barFiller) + return t, ok + } + cb := func(t interface{}) { + t.(*barFiller).setReverse() + } + return MakeFillerTypeSpecificBarOption(chk, cb) +} + +// SpinnerStyle sets custom spinner style. +// Effective when Filler type is spinner. +func SpinnerStyle(frames []string) BarOption { + chk := func(filler Filler) (interface{}, bool) { + if len(frames) == 0 { + return nil, false + } + t, ok := filler.(*spinnerFiller) + return t, ok + } + cb := func(t interface{}) { + t.(*spinnerFiller).frames = frames } + return MakeFillerTypeSpecificBarOption(chk, cb) } -func barFormat(format string) BarOption { +// MakeFillerTypeSpecificBarOption makes BarOption specific to Filler's +// actual type. If you implement your own Filler, so most probably +// you'll need this. See BarStyle or SpinnerStyle for example. +func MakeFillerTypeSpecificBarOption( + typeChecker func(Filler) (interface{}, bool), + cb func(interface{}), +) BarOption { return func(s *bState) { - s.runes = strToBarRunes(format) + if t, ok := typeChecker(s.filler); ok { + cb(t) + } + } +} + +// OptionOnCondition returns option when condition evaluates to true. +func OptionOnCondition(option BarOption, condition func() bool) BarOption { + if condition() { + return option } + return nil } diff --git a/vendor/github.com/vbauerster/mpb/cwriter/writer.go b/vendor/github.com/vbauerster/mpb/cwriter/writer.go index 0b1470d4c59..638237c181d 100644 --- a/vendor/github.com/vbauerster/mpb/cwriter/writer.go +++ b/vendor/github.com/vbauerster/mpb/cwriter/writer.go @@ -22,8 +22,8 @@ var ( clearCursorAndLine = cursorUp + clearLine ) -// Writer is a buffered the writer that updates the terminal. -// The contents of writer will be flushed when Flush is called. +// Writer is a buffered the writer that updates the terminal. The +// contents of writer will be flushed when Flush is called. type Writer struct { out io.Writer buf bytes.Buffer @@ -64,11 +64,13 @@ func (w *Writer) WriteString(s string) (n int, err error) { return w.buf.WriteString(s) } -// ReadFrom reads from the provided io.Reader and writes to the underlying buffer. +// ReadFrom reads from the provided io.Reader and writes to the +// underlying buffer. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) { return w.buf.ReadFrom(r) } +// GetWidth returns width of underlying terminal. func (w *Writer) GetWidth() (int, error) { if w.isTerminal { tw, _, err := terminal.GetSize(w.fd) diff --git a/vendor/github.com/vbauerster/mpb/cwriter/writer_windows.go b/vendor/github.com/vbauerster/mpb/cwriter/writer_windows.go index dad7f50b20f..747a6348458 100644 --- a/vendor/github.com/vbauerster/mpb/cwriter/writer_windows.go +++ b/vendor/github.com/vbauerster/mpb/cwriter/writer_windows.go @@ -8,7 +8,7 @@ import ( "syscall" "unsafe" - "github.com/mattn/go-isatty" + isatty "github.com/mattn/go-isatty" ) var kernel32 = syscall.NewLazyDLL("kernel32.dll") diff --git a/vendor/github.com/vbauerster/mpb/decor/counters.go b/vendor/github.com/vbauerster/mpb/decor/counters.go index e4161dc4bab..7d581eefb07 100644 --- a/vendor/github.com/vbauerster/mpb/decor/counters.go +++ b/vendor/github.com/vbauerster/mpb/decor/counters.go @@ -141,12 +141,14 @@ func CountersNoUnit(pairFormat string, wcc ...WC) Decorator { return Counters(0, pairFormat, wcc...) } -// CountersKibiByte is a wrapper around Counters with predefined unit UnitKiB (bytes/1024). +// CountersKibiByte is a wrapper around Counters with predefined unit +// UnitKiB (bytes/1024). func CountersKibiByte(pairFormat string, wcc ...WC) Decorator { return Counters(UnitKiB, pairFormat, wcc...) } -// CountersKiloByte is a wrapper around Counters with predefined unit UnitKB (bytes/1000). +// CountersKiloByte is a wrapper around Counters with predefined unit +// UnitKB (bytes/1000). func CountersKiloByte(pairFormat string, wcc ...WC) Decorator { return Counters(UnitKB, pairFormat, wcc...) } diff --git a/vendor/github.com/vbauerster/mpb/decor/decorator.go b/vendor/github.com/vbauerster/mpb/decor/decorator.go index 6aaf6c8307a..2fe40aea6bf 100644 --- a/vendor/github.com/vbauerster/mpb/decor/decorator.go +++ b/vendor/github.com/vbauerster/mpb/decor/decorator.go @@ -31,8 +31,12 @@ const ( DSyncSpaceR = DSyncWidth | DextraSpace | DidentRight ) +// TimeStyle enum. +type TimeStyle int + +// TimeStyle kinds. const ( - ET_STYLE_GO = iota + ET_STYLE_GO TimeStyle = iota ET_STYLE_HHMMSS ET_STYLE_HHMM ET_STYLE_MMSS @@ -47,35 +51,37 @@ type Statistics struct { } // Decorator interface. -// A decorator must implement this interface, in order to be used with mpb library. +// A decorator must implement this interface, in order to be used with +// mpb library. type Decorator interface { Decor(*Statistics) string Syncable } // Syncable interface. -// All decorators implement this interface implicitly. -// Its Syncable method exposes width sync channel, if sync is enabled. +// All decorators implement this interface implicitly. Its Syncable +// method exposes width sync channel, if sync is enabled. type Syncable interface { Syncable() (bool, chan int) } // OnCompleteMessenger interface. -// Decorators implementing this interface suppose to return provided string on complete event. +// Decorators implementing this interface suppose to return provided +// string on complete event. type OnCompleteMessenger interface { OnCompleteMessage(string) } // AmountReceiver interface. -// If decorator needs to receive increment amount, -// so this is the right interface to implement. +// If decorator needs to receive increment amount, so this is the right +// interface to implement. type AmountReceiver interface { NextAmount(int, ...time.Duration) } // ShutdownListener interface. -// If decorator needs to be notified once upon bar shutdown event, -// so this is the right interface to implement. +// If decorator needs to be notified once upon bar shutdown event, so +// this is the right interface to implement. type ShutdownListener interface { Shutdown() } @@ -90,6 +96,7 @@ var ( // WC is a struct with two public fields W and C, both of int type. // W represents width and C represents bit set of width related config. +// A decorator should embed WC, in order to become Syncable. type WC struct { W int C int @@ -126,12 +133,13 @@ func (wc *WC) Init() { } } +// Syncable is implementation of Syncable interface. func (wc *WC) Syncable() (bool, chan int) { return (wc.C & DSyncWidth) != 0, wc.wsync } -// OnComplete returns decorator, which wraps provided decorator, with sole -// purpose to display provided message on complete event. +// OnComplete returns decorator, which wraps provided decorator, with +// sole purpose to display provided message on complete event. // // `decorator` Decorator to wrap // diff --git a/vendor/github.com/vbauerster/mpb/decor/elapsed.go b/vendor/github.com/vbauerster/mpb/decor/elapsed.go index 649d40a30bd..b2e75852c69 100644 --- a/vendor/github.com/vbauerster/mpb/decor/elapsed.go +++ b/vendor/github.com/vbauerster/mpb/decor/elapsed.go @@ -10,7 +10,7 @@ import ( // `style` one of [ET_STYLE_GO|ET_STYLE_HHMMSS|ET_STYLE_HHMM|ET_STYLE_MMSS] // // `wcc` optional WC config -func Elapsed(style int, wcc ...WC) Decorator { +func Elapsed(style TimeStyle, wcc ...WC) Decorator { var wc WC for _, widthConf := range wcc { wc = widthConf @@ -26,7 +26,7 @@ func Elapsed(style int, wcc ...WC) Decorator { type elapsedDecorator struct { WC - style int + style TimeStyle startTime time.Time msg string completeMsg *string diff --git a/vendor/github.com/vbauerster/mpb/decor/eta.go b/vendor/github.com/vbauerster/mpb/decor/eta.go index 44a1f03eacc..e8dc979b433 100644 --- a/vendor/github.com/vbauerster/mpb/decor/eta.go +++ b/vendor/github.com/vbauerster/mpb/decor/eta.go @@ -6,7 +6,6 @@ import ( "time" "github.com/VividCortex/ewma" - "github.com/vbauerster/mpb/internal" ) type TimeNormalizer func(time.Duration) time.Duration @@ -18,7 +17,7 @@ type TimeNormalizer func(time.Duration) time.Duration // `age` is the previous N samples to average over. // // `wcc` optional WC config -func EwmaETA(style int, age float64, wcc ...WC) Decorator { +func EwmaETA(style TimeStyle, age float64, wcc ...WC) Decorator { return MovingAverageETA(style, ewma.NewMovingAverage(age), NopNormalizer(), wcc...) } @@ -31,7 +30,7 @@ func EwmaETA(style int, age float64, wcc ...WC) Decorator { // `normalizer` available implementations are [NopNormalizer|FixedIntervalTimeNormalizer|MaxTolerateTimeNormalizer] // // `wcc` optional WC config -func MovingAverageETA(style int, average MovingAverage, normalizer TimeNormalizer, wcc ...WC) Decorator { +func MovingAverageETA(style TimeStyle, average MovingAverage, normalizer TimeNormalizer, wcc ...WC) Decorator { var wc WC for _, widthConf := range wcc { wc = widthConf @@ -48,7 +47,7 @@ func MovingAverageETA(style int, average MovingAverage, normalizer TimeNormalize type movingAverageETA struct { WC - style int + style TimeStyle average ewma.MovingAverage completeMsg *string normalizer TimeNormalizer @@ -59,7 +58,7 @@ func (d *movingAverageETA) Decor(st *Statistics) string { return d.FormatMsg(*d.completeMsg) } - v := internal.Round(d.average.Value()) + v := math.Round(d.average.Value()) remaining := d.normalizer(time.Duration((st.Total - st.Current) * int64(v))) hours := int64((remaining / time.Hour) % 60) minutes := int64((remaining / time.Minute) % 60) @@ -105,7 +104,7 @@ func (d *movingAverageETA) OnCompleteMessage(msg string) { // `style` one of [ET_STYLE_GO|ET_STYLE_HHMMSS|ET_STYLE_HHMM|ET_STYLE_MMSS] // // `wcc` optional WC config -func AverageETA(style int, wcc ...WC) Decorator { +func AverageETA(style TimeStyle, wcc ...WC) Decorator { var wc WC for _, widthConf := range wcc { wc = widthConf @@ -121,7 +120,7 @@ func AverageETA(style int, wcc ...WC) Decorator { type averageETA struct { WC - style int + style TimeStyle startTime time.Time completeMsg *string } @@ -133,7 +132,7 @@ func (d *averageETA) Decor(st *Statistics) string { var str string timeElapsed := time.Since(d.startTime) - v := internal.Round(float64(timeElapsed) / float64(st.Current)) + v := math.Round(float64(timeElapsed) / float64(st.Current)) if math.IsInf(v, 0) || math.IsNaN(v) { v = 0 } diff --git a/vendor/github.com/vbauerster/mpb/decor/moving-average.go b/vendor/github.com/vbauerster/mpb/decor/moving-average.go index f9596a27fe8..fcd2689236b 100644 --- a/vendor/github.com/vbauerster/mpb/decor/moving-average.go +++ b/vendor/github.com/vbauerster/mpb/decor/moving-average.go @@ -6,9 +6,9 @@ import ( "github.com/VividCortex/ewma" ) -// MovingAverage is the interface that computes a moving average over a time- -// series stream of numbers. The average may be over a window or exponentially -// decaying. +// MovingAverage is the interface that computes a moving average over +// a time-series stream of numbers. The average may be over a window +// or exponentially decaying. type MovingAverage interface { Add(float64) Value() float64 @@ -57,7 +57,8 @@ func (s *medianEwma) Add(v float64) { s.count++ } -// NewMedianEwma is ewma based MovingAverage, which gets its values from median MovingAverage. +// NewMedianEwma is ewma based MovingAverage, which gets its values +// from median MovingAverage. func NewMedianEwma(age ...float64) MovingAverage { return &medianEwma{ MovingAverage: ewma.NewMovingAverage(age...), diff --git a/vendor/github.com/vbauerster/mpb/decor/speed.go b/vendor/github.com/vbauerster/mpb/decor/speed.go index 395e5d04d41..74658ce4155 100644 --- a/vendor/github.com/vbauerster/mpb/decor/speed.go +++ b/vendor/github.com/vbauerster/mpb/decor/speed.go @@ -137,7 +137,8 @@ func EwmaSpeed(unit int, unitFormat string, age float64, wcc ...WC) Decorator { return MovingAverageSpeed(unit, unitFormat, ewma.NewMovingAverage(age), wcc...) } -// MovingAverageSpeed decorator relies on MovingAverage implementation to calculate its average. +// MovingAverageSpeed decorator relies on MovingAverage implementation +// to calculate its average. // // `unit` one of [0|UnitKiB|UnitKB] zero for no unit // diff --git a/vendor/github.com/vbauerster/mpb/internal/percentage.go b/vendor/github.com/vbauerster/mpb/internal/percentage.go index 3c8defb7d17..0483d259865 100644 --- a/vendor/github.com/vbauerster/mpb/internal/percentage.go +++ b/vendor/github.com/vbauerster/mpb/internal/percentage.go @@ -1,10 +1,12 @@ package internal +import "math" + // Percentage is a helper function, to calculate percentage. func Percentage(total, current, width int64) int64 { if total <= 0 { return 0 } p := float64(width*current) / float64(total) - return int64(Round(p)) + return int64(math.Round(p)) } diff --git a/vendor/github.com/vbauerster/mpb/internal/round.go b/vendor/github.com/vbauerster/mpb/internal/round.go deleted file mode 100644 index c54a789d216..00000000000 --- a/vendor/github.com/vbauerster/mpb/internal/round.go +++ /dev/null @@ -1,49 +0,0 @@ -package internal - -import "math" - -const ( - uvone = 0x3FF0000000000000 - mask = 0x7FF - shift = 64 - 11 - 1 - bias = 1023 - signMask = 1 << 63 - fracMask = 1<= 0.5 { - // return t + Copysign(1, x) - // } - // return t - // } - bits := math.Float64bits(x) - e := uint(bits>>shift) & mask - if e < bias { - // Round abs(x) < 1 including denormals. - bits &= signMask // +-0 - if e == bias-1 { - bits |= uvone // +-1 - } - } else if e < bias+shift { - // Round any abs(x) >= 1 containing a fractional component [0,1). - // - // Numbers with larger exponents are returned unchanged since they - // must be either an integer, infinity, or NaN. - const half = 1 << (shift - 1) - e -= bias - bits += half >> e - bits &^= fracMask >> e - } - return math.Float64frombits(bits) -} diff --git a/vendor/github.com/vbauerster/mpb/options.go b/vendor/github.com/vbauerster/mpb/options.go index 05d2ecf1fc2..44a6ee3f358 100644 --- a/vendor/github.com/vbauerster/mpb/options.go +++ b/vendor/github.com/vbauerster/mpb/options.go @@ -1,29 +1,30 @@ package mpb import ( + "context" "io" "sync" "time" - "unicode/utf8" "github.com/vbauerster/mpb/cwriter" ) -// ProgressOption is a function option which changes the default behavior of -// progress pool, if passed to mpb.New(...ProgressOption) +// ProgressOption is a function option which changes the default +// behavior of progress pool, if passed to mpb.New(...ProgressOption). type ProgressOption func(*pState) -// WithWaitGroup provides means to have a single joint point. -// If *sync.WaitGroup is provided, you can safely call just p.Wait() -// without calling Wait() on provided *sync.WaitGroup. -// Makes sense when there are more than one bar to render. +// WithWaitGroup provides means to have a single joint point. If +// *sync.WaitGroup is provided, you can safely call just p.Wait() +// without calling Wait() on provided *sync.WaitGroup. Makes sense +// when there are more than one bar to render. func WithWaitGroup(wg *sync.WaitGroup) ProgressOption { return func(s *pState) { s.uwg = wg } } -// WithWidth overrides default width 80 +// WithWidth sets container width. Default is 80. Bars inherit this +// width, as long as no BarWidth is applied. func WithWidth(w int) ProgressOption { return func(s *pState) { if w >= 0 { @@ -32,16 +33,7 @@ func WithWidth(w int) ProgressOption { } } -// WithFormat overrides default bar format "[=>-]" -func WithFormat(format string) ProgressOption { - return func(s *pState) { - if utf8.RuneCountInString(format) == formatLen { - s.format = format - } - } -} - -// WithRefreshRate overrides default 120ms refresh rate +// WithRefreshRate overrides default 120ms refresh rate. func WithRefreshRate(d time.Duration) ProgressOption { return func(s *pState) { if d < 10*time.Millisecond { @@ -59,22 +51,25 @@ func WithManualRefresh(ch <-chan time.Time) ProgressOption { } } -// WithCancel provide your cancel channel, -// which you plan to close at some point. -func WithCancel(ch <-chan struct{}) ProgressOption { +// WithContext provided context will be used for cancellation purposes. +func WithContext(ctx context.Context) ProgressOption { return func(s *pState) { - s.cancel = ch + if ctx == nil { + return + } + s.ctx = ctx } } -// WithShutdownNotifier provided chanel will be closed, after all bars have been rendered. +// WithShutdownNotifier provided chanel will be closed, after all bars +// have been rendered. func WithShutdownNotifier(ch chan struct{}) ProgressOption { return func(s *pState) { s.shutdownNotifier = ch } } -// WithOutput overrides default output os.Stdout +// WithOutput overrides default output os.Stdout. func WithOutput(w io.Writer) ProgressOption { return func(s *pState) { if w == nil { diff --git a/vendor/github.com/vbauerster/mpb/options_go1.7.go b/vendor/github.com/vbauerster/mpb/options_go1.7.go deleted file mode 100644 index ca9a5bad82f..00000000000 --- a/vendor/github.com/vbauerster/mpb/options_go1.7.go +++ /dev/null @@ -1,15 +0,0 @@ -//+build go1.7 - -package mpb - -import "context" - -// WithContext provided context will be used for cancellation purposes -func WithContext(ctx context.Context) ProgressOption { - return func(s *pState) { - if ctx == nil { - panic("ctx must not be nil") - } - s.cancel = ctx.Done() - } -} diff --git a/vendor/github.com/vbauerster/mpb/progress.go b/vendor/github.com/vbauerster/mpb/progress.go index d95fe45b7a6..f9e25af7970 100644 --- a/vendor/github.com/vbauerster/mpb/progress.go +++ b/vendor/github.com/vbauerster/mpb/progress.go @@ -2,6 +2,7 @@ package mpb import ( "container/heap" + "context" "fmt" "io" "io/ioutil" @@ -17,8 +18,6 @@ const ( prr = 120 * time.Millisecond // default width pwidth = 80 - // default format - pformat = "[=>-]" ) // Progress represents the container that renders Progress bars @@ -42,24 +41,24 @@ type pState struct { pMatrix map[int][]chan int aMatrix map[int][]chan int - // following are provided by user + // following are provided/overrided by user + ctx context.Context uwg *sync.WaitGroup manualRefreshCh <-chan time.Time - cancel <-chan struct{} shutdownNotifier chan struct{} waitBars map[*Bar]*Bar debugOut io.Writer } -// New creates new Progress instance, which orchestrates bars rendering process. -// Accepts mpb.ProgressOption funcs for customization. +// New creates new Progress instance, which orchestrates bars rendering +// process. Accepts mpb.ProgressOption funcs for customization. func New(options ...ProgressOption) *Progress { pq := make(priorityQueue, 0) heap.Init(&pq) s := &pState{ + ctx: context.Background(), bHeap: &pq, width: pwidth, - format: pformat, cw: cwriter.New(os.Stdout), rr: prr, waitBars: make(map[*Bar]*Bar), @@ -84,12 +83,28 @@ func New(options ...ProgressOption) *Progress { // AddBar creates a new progress bar and adds to the container. func (p *Progress) AddBar(total int64, options ...BarOption) *Bar { + return p.Add(total, newDefaultBarFiller(), options...) +} + +// AddSpinner creates a new spinner bar and adds to the container. +func (p *Progress) AddSpinner(total int64, alignment SpinnerAlignment, options ...BarOption) *Bar { + filler := &spinnerFiller{ + frames: defaultSpinnerStyle, + alignment: alignment, + } + return p.Add(total, filler, options...) +} + +// Add creates a bar which renders itself by provided filler. +func (p *Progress) Add(total int64, filler Filler, options ...BarOption) *Bar { + if filler == nil { + filler = newDefaultBarFiller() + } p.wg.Add(1) result := make(chan *Bar) select { case p.operateState <- func(s *pState) { - options = append(options, barWidth(s.width), barFormat(s.format)) - b := newBar(p.wg, s.idCounter, total, s.cancel, options...) + b := newBar(s.ctx, p.wg, filler, s.idCounter, s.width, total, options...) if b.runningBar != nil { s.waitBars[b.runningBar] = b } else { @@ -106,10 +121,10 @@ func (p *Progress) AddBar(total int64, options ...BarOption) *Bar { } } -// Abort is only effective while bar progress is running, -// it means remove bar now without waiting for its completion. -// If bar is already completed, there is nothing to abort. -// If you need to remove bar after completion, use BarRemoveOnComplete BarOption. +// Abort is only effective while bar progress is running, it means +// remove bar now without waiting for its completion. If bar is already +// completed, there is nothing to abort. If you need to remove bar +// after completion, use BarRemoveOnComplete BarOption. func (p *Progress) Abort(b *Bar, remove bool) { select { case p.operateState <- func(s *pState) { @@ -145,9 +160,10 @@ func (p *Progress) BarCount() int { } } -// Wait first waits for user provided *sync.WaitGroup, if any, -// then waits far all bars to complete and finally shutdowns master goroutine. -// After this method has been called, there is no way to reuse *Progress instance. +// Wait first waits for user provided *sync.WaitGroup, if any, then +// waits far all bars to complete and finally shutdowns master goroutine. +// After this method has been called, there is no way to reuse *Progress +// instance. func (p *Progress) Wait() { if p.uwg != nil { p.uwg.Wait() @@ -205,8 +221,8 @@ func (s *pState) flush(lineCount int) error { defer func() { if frameReader.toShutdown { // shutdown at next flush, in other words decrement underlying WaitGroup - // only after the bar with completed state has been flushed. - // this ensures no bar ends up with less than 100% rendered. + // only after the bar with completed state has been flushed. this + // ensures no bar ends up with less than 100% rendered. s.shutdownPending = append(s.shutdownPending, bar) if replacementBar, ok := s.waitBars[bar]; ok { heap.Push(s.bHeap, replacementBar) diff --git a/vendor/github.com/vbauerster/mpb/spinner_filler.go b/vendor/github.com/vbauerster/mpb/spinner_filler.go new file mode 100644 index 00000000000..36299fef0e5 --- /dev/null +++ b/vendor/github.com/vbauerster/mpb/spinner_filler.go @@ -0,0 +1,48 @@ +package mpb + +import ( + "io" + "strings" + "unicode/utf8" + + "github.com/vbauerster/mpb/decor" +) + +// SpinnerAlignment enum. +type SpinnerAlignment int + +// SpinnerAlignment kinds. +const ( + SpinnerOnLeft SpinnerAlignment = iota + SpinnerOnMiddle + SpinnerOnRight +) + +var defaultSpinnerStyle = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"} + +type spinnerFiller struct { + frames []string + count uint + alignment SpinnerAlignment +} + +func (s *spinnerFiller) Fill(w io.Writer, width int, stat *decor.Statistics) { + + frame := s.frames[s.count%uint(len(s.frames))] + frameWidth := utf8.RuneCountInString(frame) + + if width < frameWidth { + return + } + + switch rest := width - frameWidth; s.alignment { + case SpinnerOnLeft: + io.WriteString(w, frame+strings.Repeat(" ", rest)) + case SpinnerOnMiddle: + str := strings.Repeat(" ", rest/2) + frame + strings.Repeat(" ", rest/2+rest%2) + io.WriteString(w, str) + case SpinnerOnRight: + io.WriteString(w, strings.Repeat(" ", rest)+frame) + } + s.count++ +}