diff --git a/copy/copy.go b/copy/copy.go index 3ed8a2b824..d2db17e84c 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -43,6 +43,11 @@ type digestingReader struct { // downloads. Let's follow Firefox by limiting it to 6. var maxParallelDownloads = 6 +// progressBarRefreshRate defines the refresh rate of the mpb progress bars. +// 120 ms aligns with the initial upstream defaults. Setting the rate here is +// more idiomatic than using some magic number in the code below. +var progressBarRefreshRate = 120 * time.Millisecond + // newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error // or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest. // (neither is set if EOF is never reached). @@ -467,11 +472,9 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { copySemaphore = semaphore.NewWeighted(int64(1)) } - data := make([]copyLayerData, numLayers) - copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress) { + copyLayerHelper := func(index int, srcLayer types.BlobInfo, pool *mpb.Progress, cld *copyLayerData) { defer copySemaphore.Release(1) defer copyGroup.Done() - cld := copyLayerData{} if ic.c.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 { // DiffIDs are, currently, needed only when converting from schema1. // In which case src.LayerInfos will not have URLs because schema1 @@ -485,16 +488,26 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { } else { cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, pool) } - data[index] = cld } + // digestToCopyData maps a digest to a corresponding copyLayerData and + // avoids to redundantly copy the same layer. + digestToCopyData := make(map[digest.Digest]*copyLayerData) func() { // A scope for defer progressPool, progressCleanup := ic.c.newProgressPool(ctx) defer progressCleanup() for i, srcLayer := range srcInfos { + cld, ok := digestToCopyData[srcLayer.Digest] + if ok { + // digest is already being copied + copyGroup.Done() + continue + } copySemaphore.Acquire(ctx, 1) - go copyLayerHelper(i, srcLayer, progressPool) + cld = ©LayerData{} + digestToCopyData[srcLayer.Digest] = cld + go copyLayerHelper(i, srcLayer, progressPool, cld) } // Wait for all layers to be copied @@ -503,7 +516,11 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { destInfos := make([]types.BlobInfo, numLayers) diffIDs := make([]digest.Digest, numLayers) - for i, cld := range data { + for i, srcLayer := range srcInfos { + cld, ok := digestToCopyData[srcLayer.Digest] + if !ok { + return fmt.Errorf("no copy data found for layer %q", srcLayer.Digest) + } if cld.err != nil { return cld.err } @@ -577,16 +594,25 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context) ([]byte // The caller must eventually call the returned cleanup function after the pool will no longer be updated. func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { ctx, cancel := context.WithCancel(ctx) - pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx)) + pool := mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput), mpb.WithContext(ctx), mpb.WithRefreshRate(progressBarRefreshRate)) return pool, func() { cancel() pool.Wait() } } +// createReplacementProgressBar is wrapper around createProgressBar and creates +// a progress bar to replace the `toReplace` bar. +func (c *copier) createReplacementProgressBar(pool *mpb.Progress, toReplace *mpb.Bar, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar { + barOpts = append(barOpts, mpb.BarReplaceOnComplete(toReplace)) + bar := c.createProgressBar(pool, info, kind, filler, barOpts...) + toReplace.SetTotal(0, true) + return bar +} + // createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter // is ioutil.Discard, the progress bar's output will be discarded -func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { +func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, filler string, barOpts ...mpb.BarOption) *mpb.Bar { // shortDigestLen is the length of the digest used for blobs. const shortDigestLen = 12 @@ -597,15 +623,35 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind prefix = prefix[:maxPrefixLen] } - bar := pool.AddBar(info.Size, - mpb.BarClearOnComplete(), + options := []mpb.BarOption{ mpb.PrependDecorators( decor.Name(prefix), ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " "+onComplete), - ), - ) + } + options = append(options, barOpts...) + + // Fillers are used for placeholder bars that are later replaced by a + // common progress bar. Hence, only append a counter decorator when + // filler == "". + if filler == "" { + options = append(options, + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), " done"), + )) + options = append(options, mpb.BarClearOnComplete()) + } + + var bar *mpb.Bar + if filler != "" { + barFiller := mpb.FillerFunc( + func(w io.Writer, width int, st *decor.Statistics) { + fmt.Fprint(w, filler) + }) + bar = pool.Add(0, barFiller, options...) + } else { + bar = pool.AddBar(info.Size, options...) + } + if c.progressOutput == ioutil.Discard { c.Printf("Copying %s %s\n", kind, info.Digest) } @@ -624,7 +670,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { destInfo, err := func() (types.BlobInfo, error) { // A scope for defer progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() - bar := c.createProgressBar(progressPool, srcInfo, "config", "done") + bar := c.createProgressBar(progressPool, srcInfo, "config", "") destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, bar) if err != nil { return types.BlobInfo{}, err @@ -649,23 +695,119 @@ type diffIDResult struct { err error } +// tryReusingBlob is wrapper around the destination's TryReusingBlob() method. +// If the blob is present, it returns immediately. Otherwise, if the +// destination supports blob locking, it will acquire the blob lock and perform +// another call to TryReusingBlob() to see if another process has already +// copied the blob. +func (ic *imageCopier) tryReusingBlob(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress, bar *mpb.Bar) (bool, types.BlobInfo, error) { + reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + if err != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + } + if reused { + logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) + newBar := ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "skipped: already exists") + newBar.SetTotal(0, true) + return true, blobInfo, nil + } + + if !ic.c.dest.SupportsBlobLocks() { + return false, types.BlobInfo{}, nil + } + + // Check if another process might be copying the layer at the moment. + // If so, wait for it's completion and try to reuse the blob again. + logrus.Debugf("Checking if blob %s is already being copied by another process", srcInfo.Digest) + + // LockBlob() does not support try-lock semantics which would be a + // straight-forward mechanism to detect if another process is currently + // copying a specific layer. + // + // The trick we are using here is to acquire the lock in a separate + // goroutine and use a channel for signaling when the lock has been + // acquired. To detect if this goroutine is blocking on the lock, we + // spin up a second one sending a message via another channel after a + // time out. + chanDone := make(chan bool, 1) + chanBlockedByAnotherProcess := make(chan bool, 1) + + var lockError error + // Acquire the blob lock + go func() { + lockError = ic.c.dest.LockBlob(srcInfo) + chanDone <- true + }() + // Spin up the timer + go func() { + // The initial sleep time is `progressBarRefreshRate` + // to make sure the bar is properly rendered before + // replacing it. + time.Sleep(progressBarRefreshRate) + chanBlockedByAnotherProcess <- true + }() + + copiedByAnotherProcess := false + select { + case <-chanDone: + break + case <-chanBlockedByAnotherProcess: + logrus.Debugf("Blob %s is already being copied by another process", srcInfo.Digest) + copiedByAnotherProcess = true + bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "paused: being copied by another process", mpb.BarRemoveOnComplete()) + // Wait until we acquired the blob lock + <-chanDone + } + + if lockError != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, "Error acquiring lock for blob %s", srcInfo.Digest) + } + + // If the blob has not been copied by another process, we need to copy + // it on our own and can return immediately. + if !copiedByAnotherProcess { + logrus.Debugf("Blob %s is not being copied by another process", srcInfo.Digest) + return false, types.BlobInfo{}, nil + } + + // If the blob has been copied by another process, it + // must be present in the in the destintation and be + // available for reuse. + reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + if err != nil { + return false, types.BlobInfo{}, errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + } + + if reused { + ic.c.dest.UnlockBlob(srcInfo) + logrus.Debugf("Blob %s has been successfully copied by another process", srcInfo.Digest) + bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "done: copied by another process") + bar.SetTotal(0, true) + return true, blobInfo, nil + } + + logrus.Debugf("Blob %s is not present: copy operation of other process must have failed", srcInfo.Digest) + return false, types.BlobInfo{}, nil +} + // copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) { cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be "" diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" + // Create the initial placeholder bar which is later being replaced by + // another one. + bar := ic.c.createProgressBar(pool, srcInfo, "blob", " ", mpb.BarRemoveOnComplete()) + // If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source. if !diffIDIsNeeded { - reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs) + reused, blobInfo, err := ic.tryReusingBlob(ctx, srcInfo, pool, bar) if err != nil { - return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest) + return types.BlobInfo{}, "", err } if reused { - logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists") - bar.SetTotal(0, true) - return blobInfo, cachedDiffID, nil + return blobInfo, cachedDiffID, err } } @@ -676,7 +818,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, po } defer srcStream.Close() - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") + bar = ic.c.createReplacementProgressBar(pool, bar, srcInfo, "blob", "") blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize}, diffIDIsNeeded, bar) if err != nil { diff --git a/directory/directory_dest.go b/directory/directory_dest.go index 4b2ab022e2..955759d97a 100644 --- a/directory/directory_dest.go +++ b/directory/directory_dest.go @@ -129,6 +129,27 @@ func (d *dirImageDestination) HasThreadSafePutBlob() bool { return false } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *dirImageDestination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *dirImageDestination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *dirImageDestination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result (with all data filled in). // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/docker/docker_image_dest.go b/docker/docker_image_dest.go index c116cbec32..b49c203f5f 100644 --- a/docker/docker_image_dest.go +++ b/docker/docker_image_dest.go @@ -117,6 +117,27 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool { return true } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *dockerImageDestination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *dockerImageDestination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *dockerImageDestination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result (with all data filled in). // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/docker/tarfile/dest.go b/docker/tarfile/dest.go index 5f30eddbc7..78368ed3e7 100644 --- a/docker/tarfile/dest.go +++ b/docker/tarfile/dest.go @@ -87,6 +87,27 @@ func (d *Destination) HasThreadSafePutBlob() bool { return false } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *Destination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *Destination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *Destination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result (with all data filled in). // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/image/docker_schema2_test.go b/image/docker_schema2_test.go index 9d3f96fc6d..86ea5f31b0 100644 --- a/image/docker_schema2_test.go +++ b/image/docker_schema2_test.go @@ -401,6 +401,15 @@ func (d *memoryImageDest) IgnoresEmbeddedDockerReference() bool { func (d *memoryImageDest) HasThreadSafePutBlob() bool { panic("Unexpected call to a mock function") } +func (d *memoryImageDest) SupportsBlobLocks() bool { + panic("Unexpected call to a mock function") +} +func (d *memoryImageDest) LockBlob(b types.BlobInfo) error { + panic("Unexpected call to a mock function") +} +func (d *memoryImageDest) UnlockBlob(b types.BlobInfo) error { + panic("Unexpected call to a mock function") +} func (d *memoryImageDest) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { if d.storedBlobs == nil { d.storedBlobs = make(map[digest.Digest][]byte) diff --git a/oci/archive/oci_dest.go b/oci/archive/oci_dest.go index 9571c37e2b..8435f91451 100644 --- a/oci/archive/oci_dest.go +++ b/oci/archive/oci_dest.go @@ -82,6 +82,27 @@ func (d *ociArchiveImageDestination) HasThreadSafePutBlob() bool { return false } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *ociArchiveImageDestination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *ociArchiveImageDestination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *ociArchiveImageDestination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/oci/layout/oci_dest.go b/oci/layout/oci_dest.go index db102184db..99f7e2f703 100644 --- a/oci/layout/oci_dest.go +++ b/oci/layout/oci_dest.go @@ -112,6 +112,27 @@ func (d *ociImageDestination) HasThreadSafePutBlob() bool { return false } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *ociImageDestination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *ociImageDestination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *ociImageDestination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/openshift/openshift.go b/openshift/openshift.go index 814c3eea1d..b775edfb9a 100644 --- a/openshift/openshift.go +++ b/openshift/openshift.go @@ -388,6 +388,27 @@ func (d *openshiftImageDestination) HasThreadSafePutBlob() bool { return false } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *openshiftImageDestination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *openshiftImageDestination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *openshiftImageDestination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result (with all data filled in). // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/ostree/ostree_dest.go b/ostree/ostree_dest.go index d69f4fa331..deb770a660 100644 --- a/ostree/ostree_dest.go +++ b/ostree/ostree_dest.go @@ -137,6 +137,27 @@ func (d *ostreeImageDestination) HasThreadSafePutBlob() bool { return false } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (d *ostreeImageDestination) SupportsBlobLocks() bool { + return false +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +func (d *ostreeImageDestination) LockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + +// UnlockBlob unlocks the blob. Note that it is safe to call UnlockBlob() +// multiple times. Only the first call is unlocking the blob. This is +// required to unlock a blob in the presence of errors or panics during copy +// operations. +func (d *ostreeImageDestination) UnlockBlob(b types.BlobInfo) error { + // NOOP for this type. + return nil +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/storage/storage_image.go b/storage/storage_image.go index b39d2bcc04..93967a741f 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -64,6 +64,11 @@ type storageImageDestination struct { fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice + + // data for {Lock,Unlock}Blob + lockedDigests map[digest.Digest]storage.Locker // Bookkeeping of digests that have been locked + unlockedDigests map[digest.Digest]bool // Bookkeeping of digests that have been unlocked. Required to unlock yet unlocked digests in Close() + digestLockMutex sync.Mutex // Serializes accesses to lockedDigests } type storageImageCloser struct { @@ -323,12 +328,14 @@ func newImageDestination(imageRef storageReference) (*storageImageDestination, e return nil, errors.Wrapf(err, "error creating a temporary directory") } image := &storageImageDestination{ - imageRef: imageRef, - directory: directory, - blobDiffIDs: make(map[digest.Digest]digest.Digest), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, + imageRef: imageRef, + directory: directory, + blobDiffIDs: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + lockedDigests: make(map[digest.Digest]storage.Locker), + unlockedDigests: make(map[digest.Digest]bool), + SignatureSizes: []int{}, } return image, nil } @@ -339,8 +346,14 @@ func (s *storageImageDestination) Reference() types.ImageReference { return s.imageRef } -// Close cleans up the temporary directory. +// Close cleans up the temporary directory and unlocks all unlocked digest locks and removes them from `lockedDigests`. func (s *storageImageDestination) Close() error { + // Make sure that all lockes are unlocked to avoid potential deadlocks. + for digest, unlocked := range s.unlockedDigests { + if !unlocked { + s.unlockDigest(digest) + } + } return os.RemoveAll(s.directory) } @@ -360,6 +373,58 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool { return true } +// SupportsBlobLocks indicates whether the ImageDestination supports blob +// locking. +func (s *storageImageDestination) SupportsBlobLocks() bool { + return true +} + +// LockBlob can be used to synchronize operations on it (e.g., copying). +// Note that any unlocked lock will be unlocked in Close(). +func (s *storageImageDestination) LockBlob(b types.BlobInfo) error { + locker, err := func() (storage.Locker, error) { + // anonymous function defer the Unlock() and not worry about + // error paths + s.digestLockMutex.Lock() + defer s.digestLockMutex.Unlock() + if locker, ok := s.lockedDigests[b.Digest]; ok { + return locker, nil + } + locker, err := s.imageRef.transport.store.GetDigestLock(b.Digest) + if err != nil { + return nil, err + } + s.lockedDigests[b.Digest] = locker + return locker, nil + }() + if err != nil { + return err + } + locker.Lock() + s.digestLockMutex.Lock() + s.unlockedDigests[b.Digest] = false + s.digestLockMutex.Unlock() + return nil +} + +func (s *storageImageDestination) unlockDigest(d digest.Digest) error { + s.digestLockMutex.Lock() + defer s.digestLockMutex.Unlock() + + locker, ok := s.lockedDigests[d] + if !ok { + return errors.Errorf("trying to unlock non existent lock for digest %q", d) + } + locker.Unlock() + s.unlockedDigests[d] = true + return nil +} + +// UnlockBlob unlocks the blob. +func (s *storageImageDestination) UnlockBlob(b types.BlobInfo) error { + return s.unlockDigest(b.Digest) +} + // PutBlob writes contents of stream and returns data representing the result. // inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it. // inputInfo.Size is the expected length of stream, if known. diff --git a/types/types.go b/types/types.go index 9fdab2314a..7114ae116c 100644 --- a/types/types.go +++ b/types/types.go @@ -287,6 +287,16 @@ type ImageDestination interface { // - Uploaded data MAY be visible to others before Commit() is called // - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) Commit(ctx context.Context) error + // SupportsBlobLocks indicates whether the ImageDestination supports blob locking. + SupportsBlobLocks() bool + // LockBlob is used to synchronize copy operations and is only + // implemented by `storage.storageImageDestination`. If a given + // BlobInfo is locked implies that the data is currently being copied + // the lock owner. This allows for avoiding redudantly downloading the + // same layer from a registry. + LockBlob(BlobInfo) error + // UnlockBlob unlocks the blob. + UnlockBlob(BlobInfo) error } // ManifestTypeRejectedError is returned by ImageDestination.PutManifest if the destination is in principle available, diff --git a/vendor.conf b/vendor.conf index 89b29722b9..d61c954653 100644 --- a/vendor.conf +++ b/vendor.conf @@ -1,7 +1,7 @@ github.com/containers/image github.com/sirupsen/logrus v1.0.0 -github.com/containers/storage v1.12.1 +github.com/containers/storage c574f6cdd9805955143a82a5326ca2134c95bc18 github.com/davecgh/go-spew 346938d642f2ec3594ed81d874461961cd0faa76 github.com/docker/docker-credential-helpers d68f9aeca33f5fd3f08eeae5e9d175edf4e731d1 github.com/docker/distribution 5f6282db7d65e6d72ad7c2cc66310724a57be716 @@ -47,6 +47,6 @@ github.com/boltdb/bolt master github.com/klauspost/pgzip v1.2.1 github.com/klauspost/compress v1.4.1 github.com/klauspost/cpuid v1.2.0 -github.com/vbauerster/mpb v3.3.4 +github.com/vbauerster/mpb v3.4.0 github.com/mattn/go-isatty v0.0.4 github.com/VividCortex/ewma v1.1.1