diff --git a/pkg/cli/admin/catalog/mirror.go b/pkg/cli/admin/catalog/mirror.go index d929a0efd3..a18f273eca 100644 --- a/pkg/cli/admin/catalog/mirror.go +++ b/pkg/cli/admin/catalog/mirror.go @@ -283,6 +283,7 @@ func (o *MirrorCatalogOptions) Complete(cmd *cobra.Command, args []string) error a.ParallelOptions = o.ParallelOptions a.KeepManifestList = true a.Mappings = mappings + a.SkipMultipleScopes = true if err := a.Validate(); err != nil { fmt.Fprintf(o.IOStreams.ErrOut, "error configuring image mirroring: %v\n", err) } diff --git a/pkg/cli/image/mirror/mappings.go b/pkg/cli/image/mirror/mappings.go index e6ee36984a..3c29587ce3 100644 --- a/pkg/cli/image/mirror/mappings.go +++ b/pkg/cli/image/mirror/mappings.go @@ -17,10 +17,6 @@ import ( // ErrAlreadyExists may be returned by the blob Create function to indicate that the blob already exists. var ErrAlreadyExists = fmt.Errorf("blob already exists in the target location") -// As length of mappings increases, so does Authorization Header size, and this causes upload failures with -// registries that have a header size limit (quay.io) -var maxLenMappings = 10 - type Mapping struct { Source imagesource.TypedImageReference Destination imagesource.TypedImageReference @@ -173,69 +169,47 @@ func (d *destinations) mergeIntoDigests(srcDigest digest.Digest, target pushTarg type targetTree map[key]*destinations -func buildTargetTrees(mappings []Mapping) []targetTree { - var trees []targetTree - // split targetTrees into groups of 10 - splitMappings := split(mappings, maxLenMappings) - for _, splitm := range splitMappings { - tree := make(targetTree) - for _, m := range splitm { - srcKey := key{t: m.Source.Type, registry: m.Source.Ref.Registry, repository: m.Source.Ref.RepositoryName()} - dstKey := key{t: m.Destination.Type, registry: m.Destination.Ref.Registry, repository: m.Destination.Ref.RepositoryName()} - - src, ok := tree[srcKey] - if !ok { - src = &destinations{} - src.ref = imagesource.TypedImageReference{Ref: m.Source.Ref.AsRepository(), Type: m.Source.Type} - src.digests = make(map[string]pushTargets) - src.tags = make(map[string]pushTargets) - tree[srcKey] = src - } +func buildTargetTree(mappings []Mapping) targetTree { + tree := make(targetTree) + for _, m := range mappings { + srcKey := key{t: m.Source.Type, registry: m.Source.Ref.Registry, repository: m.Source.Ref.RepositoryName()} + dstKey := key{t: m.Destination.Type, registry: m.Destination.Ref.Registry, repository: m.Destination.Ref.RepositoryName()} - var current pushTargets - if id := m.Source.Ref.ID; len(id) > 0 { - current = src.digests[m.Source.Ref.ID] - if current == nil { - current = make(pushTargets) - src.digests[m.Source.Ref.ID] = current - } - } else { - tag := m.Source.Ref.Tag - current = src.tags[tag] - if current == nil { - current = make(pushTargets) - src.tags[tag] = current - } - } + src, ok := tree[srcKey] + if !ok { + src = &destinations{} + src.ref = imagesource.TypedImageReference{Ref: m.Source.Ref.AsRepository(), Type: m.Source.Type} + src.digests = make(map[string]pushTargets) + src.tags = make(map[string]pushTargets) + tree[srcKey] = src + } - dst, ok := current[dstKey] - if !ok { - dst.ref = imagesource.TypedImageReference{Ref: m.Destination.Ref.AsRepository(), Type: m.Destination.Type} + var current pushTargets + if id := m.Source.Ref.ID; len(id) > 0 { + current = src.digests[m.Source.Ref.ID] + if current == nil { + current = make(pushTargets) + src.digests[m.Source.Ref.ID] = current } - if len(m.Destination.Ref.Tag) > 0 { - dst.tags = append(dst.tags, m.Destination.Ref.Tag) + } else { + tag := m.Source.Ref.Tag + current = src.tags[tag] + if current == nil { + current = make(pushTargets) + src.tags[tag] = current } - current[dstKey] = dst } - trees = append(trees, tree) - } - return trees -} - -func split(mappings []Mapping, size int) [][]Mapping { - var splitMappings [][]Mapping - for i := 0; i < len(mappings); i += size { - m := mappings[i:min(i+size, len(mappings))] - splitMappings = append(splitMappings, m) - } - return splitMappings -} -func min(a, b int) int { - if a <= b { - return a + dst, ok := current[dstKey] + if !ok { + dst.ref = imagesource.TypedImageReference{Ref: m.Destination.Ref.AsRepository(), Type: m.Destination.Type} + } + if len(m.Destination.Ref.Tag) > 0 { + dst.tags = append(dst.tags, m.Destination.Ref.Tag) + } + current[dstKey] = dst } - return b + return tree } func addDockerRegistryScopes(scopes map[contextKey]map[string]bool, targets map[string]pushTargets, srcKey key) { diff --git a/pkg/cli/image/mirror/mirror.go b/pkg/cli/image/mirror/mirror.go index 32400a9f9f..3bef81c3ae 100644 --- a/pkg/cli/image/mirror/mirror.go +++ b/pkg/cli/image/mirror/mirror.go @@ -268,175 +268,173 @@ func (o *MirrorImageOptions) Validate() error { func (o *MirrorImageOptions) Run() error { var continuedOnFailure bool start := time.Now() - plans, err := o.plan() + p, err := o.plan() if err != nil { return err } - for _, p := range plans { - p.Print(o.ErrOut) - fmt.Fprintln(o.ErrOut) + p.Print(o.ErrOut) + fmt.Fprintln(o.ErrOut) - if errs := p.Errors(); len(errs) > 0 { - for _, err := range errs { - fmt.Fprintf(o.ErrOut, "error: %v\n", err) - } - if !o.ContinueOnError { - return fmt.Errorf("an error occurred during planning") - } - continuedOnFailure = true + if errs := p.Errors(); len(errs) > 0 { + for _, err := range errs { + fmt.Fprintf(o.ErrOut, "error: %v\n", err) } + if !o.ContinueOnError { + return fmt.Errorf("an error occurred during planning") + } + continuedOnFailure = true + } - work := Greedy(p) - work.Print(o.ErrOut) - fmt.Fprintln(o.ErrOut) + work := Greedy(p) + work.Print(o.ErrOut) + fmt.Fprintln(o.ErrOut) - fmt.Fprintf(o.ErrOut, "info: Planning completed in %s\n", time.Now().Sub(start).Round(10*time.Millisecond)) + fmt.Fprintf(o.ErrOut, "info: Planning completed in %s\n", time.Now().Sub(start).Round(10*time.Millisecond)) - if o.DryRun { - fmt.Fprintf(o.ErrOut, "info: Dry run complete\n") - return nil - } + if o.DryRun { + fmt.Fprintf(o.ErrOut, "info: Dry run complete\n") + return nil + } - // we must have a client available for accessing referential URLs - referentialClient, err := o.SecurityOptions.ReferentialHTTPClient() - if err != nil { - return err - } + // we must have a client available for accessing referential URLs + referentialClient, err := o.SecurityOptions.ReferentialHTTPClient() + if err != nil { + return err + } - stopCh := make(chan struct{}) - defer close(stopCh) - q := workqueue.New(o.MaxRegistry, stopCh) - registryWorkers := make(map[string]workqueue.Interface) - for name := range p.RegistryNames() { - registryWorkers[name] = workqueue.New(o.ParallelOptions.MaxPerRegistry, stopCh) - } + stopCh := make(chan struct{}) + defer close(stopCh) + q := workqueue.New(o.MaxRegistry, stopCh) + registryWorkers := make(map[string]workqueue.Interface) + for name := range p.RegistryNames() { + registryWorkers[name] = workqueue.New(o.ParallelOptions.MaxPerRegistry, stopCh) + } - next := time.Now() - defer func() { - d := time.Now().Sub(next) - fmt.Fprintf(o.ErrOut, "info: Mirroring completed in %s (%s/s)\n", d.Truncate(10*time.Millisecond), units.HumanSize(float64(work.stats.bytes)/d.Seconds())) - }() + next := time.Now() + defer func() { + d := time.Now().Sub(next) + fmt.Fprintf(o.ErrOut, "info: Mirroring completed in %s (%s/s)\n", d.Truncate(10*time.Millisecond), units.HumanSize(float64(work.stats.bytes)/d.Seconds())) + }() - ctx := apirequest.NewContext() - for j := range work.phases { - phase := &work.phases[j] - q.Batch(func(w workqueue.Work) { - for i := range phase.independent { - unit := phase.independent[i] - w.Parallel(func() { - // upload blobs - registryWorkers[unit.registry.name].Batch(func(w workqueue.Work) { - for i := range unit.repository.blobs { - op := unit.repository.blobs[i] - for digestString := range op.blobs { - digest := godigest.Digest(digestString) - blob := op.parent.parent.parent.GetBlob(digest) - w.Parallel(func() { - if err := copyBlob(ctx, work, op, blob, referentialClient, o.Force, o.SkipMount, o.ErrOut); err != nil { - phase.ExecutionFailure(err) - return - } - op.parent.parent.AssociateBlob(unit.repository.name, blob) - }) - } - } - }) - if phase.IsFailed() { - if !o.ContinueOnError { - return + ctx := apirequest.NewContext() + for j := range work.phases { + phase := &work.phases[j] + q.Batch(func(w workqueue.Work) { + for i := range phase.independent { + unit := phase.independent[i] + w.Parallel(func() { + // upload blobs + registryWorkers[unit.registry.name].Batch(func(w workqueue.Work) { + for i := range unit.repository.blobs { + op := unit.repository.blobs[i] + for digestString := range op.blobs { + digest := godigest.Digest(digestString) + blob := op.parent.parent.parent.GetBlob(digest) + w.Parallel(func() { + if err := copyBlob(ctx, work, op, blob, referentialClient, o.Force, o.SkipMount, o.ErrOut); err != nil { + phase.ExecutionFailure(err) + return + } + op.parent.parent.AssociateBlob(unit.repository.name, blob) + }) } - continuedOnFailure = true } - // upload manifests in batches by their prerequisites - op := unit.repository.manifests - dependencies := make(map[godigest.Digest]godigest.Digest) - for from, to := range op.prerequisites { - dependencies[from] = to + }) + if phase.IsFailed() { + if !o.ContinueOnError { + return + } + continuedOnFailure = true + } + // upload manifests in batches by their prerequisites + op := unit.repository.manifests + dependencies := make(map[godigest.Digest]godigest.Digest) + for from, to := range op.prerequisites { + dependencies[from] = to + } + marked := sets.NewString() + for { + waiting := sets.NewString() + for _, to := range dependencies { + waiting.Insert(string(to)) } - marked := sets.NewString() - for { - waiting := sets.NewString() - for _, to := range dependencies { - waiting.Insert(string(to)) + uploaded := 0 + registryWorkers[unit.registry.name].Batch(func(w workqueue.Work) { + ref, err := reference.WithName(op.toRef.Ref.RepositoryName()) + if err != nil { + phase.ExecutionFailure(fmt.Errorf("unable to create reference to repository %s: %v", op.toRef, err)) + return } - uploaded := 0 - registryWorkers[unit.registry.name].Batch(func(w workqueue.Work) { - ref, err := reference.WithName(op.toRef.Ref.RepositoryName()) - if err != nil { - phase.ExecutionFailure(fmt.Errorf("unable to create reference to repository %s: %v", op.toRef, err)) - return + // upload and tag the manifest + for digest := range op.digestsToTags { + if waiting.Has(string(digest)) || marked.Has(string(digest)) { + continue } - // upload and tag the manifest - for digest := range op.digestsToTags { - if waiting.Has(string(digest)) || marked.Has(string(digest)) { - continue + delete(dependencies, digest) + marked.Insert(string(digest)) + uploaded++ + + srcDigest := digest + tags := op.digestsToTags[srcDigest].List() + w.Parallel(func() { + if errs := copyManifestToTags(ctx, ref, srcDigest, tags, op, o.Out, o.ErrOut); len(errs) > 0 { + phase.ExecutionFailure(errs...) } - delete(dependencies, digest) - marked.Insert(string(digest)) - uploaded++ - - srcDigest := digest - tags := op.digestsToTags[srcDigest].List() - w.Parallel(func() { - if errs := copyManifestToTags(ctx, ref, srcDigest, tags, op, o.Out, o.ErrOut); len(errs) > 0 { - phase.ExecutionFailure(errs...) - } - }) + }) + } + // this is a pure manifest move, put the manifest by its id + for digest := range op.digestCopies { + if waiting.Has(string(digest)) || marked.Has(string(digest)) { + continue } - // this is a pure manifest move, put the manifest by its id - for digest := range op.digestCopies { - if waiting.Has(string(digest)) || marked.Has(string(digest)) { - continue + delete(dependencies, godigest.Digest(digest)) + marked.Insert(string(digest)) + uploaded++ + + srcDigest := godigest.Digest(digest) + w.Parallel(func() { + if err := copyManifest(ctx, ref, srcDigest, op, o.Out, o.ErrOut); err != nil { + phase.ExecutionFailure(err) } - delete(dependencies, godigest.Digest(digest)) - marked.Insert(string(digest)) - uploaded++ - - srcDigest := godigest.Digest(digest) - w.Parallel(func() { - if err := copyManifest(ctx, ref, srcDigest, op, o.Out, o.ErrOut); err != nil { - phase.ExecutionFailure(err) - } - }) - } - }) - if len(op.prerequisites) > 0 && uploaded == 0 { - phase.ExecutionFailure(fmt.Errorf("circular dependency in manifest lists, unable to upload all: %#v", dependencies)) - break - } - if waiting.Len() == 0 { - break + }) } + }) + if len(op.prerequisites) > 0 && uploaded == 0 { + phase.ExecutionFailure(fmt.Errorf("circular dependency in manifest lists, unable to upload all: %#v", dependencies)) + break } - }) - } - }) - if phase.IsFailed() { - for _, err := range phase.ExecutionFailures() { - fmt.Fprintf(o.ErrOut, "error: %v\n", err) - } - if !o.ContinueOnError { - return fmt.Errorf("one or more errors occurred while uploading images") - } - continuedOnFailure = true + if waiting.Len() == 0 { + break + } + } + }) + } + }) + if phase.IsFailed() { + for _, err := range phase.ExecutionFailures() { + fmt.Fprintf(o.ErrOut, "error: %v\n", err) + } + if !o.ContinueOnError { + return fmt.Errorf("one or more errors occurred while uploading images") } + continuedOnFailure = true } + } - if o.ManifestUpdateCallback != nil { - for _, reg := range p.registries { - klog.V(4).Infof("Manifests mapped %#v", reg.manifestConversions) - if err := o.ManifestUpdateCallback(reg.name, reg.manifestConversions); err != nil { - if !o.ContinueOnError { - return err - } - continuedOnFailure = true - fmt.Fprintf(o.ErrOut, "error: %v\n", err) + if o.ManifestUpdateCallback != nil { + for _, reg := range p.registries { + klog.V(4).Infof("Manifests mapped %#v", reg.manifestConversions) + if err := o.ManifestUpdateCallback(reg.name, reg.manifestConversions); err != nil { + if !o.ContinueOnError { + return err } + continuedOnFailure = true + fmt.Fprintf(o.ErrOut, "error: %v\n", err) } } - if continuedOnFailure { - return fmt.Errorf("one or more errors occurred") - } + } + if continuedOnFailure { + return fmt.Errorf("one or more errors occurred") } return nil } @@ -446,8 +444,7 @@ type contextKey struct { registry string } -func (o *MirrorImageOptions) plan() ([]*plan, error) { - var plans []*plan +func (o *MirrorImageOptions) plan() (*plan, error) { ctx := apirequest.NewContext() context, err := o.SecurityOptions.Context() if err != nil { @@ -457,197 +454,194 @@ func (o *MirrorImageOptions) plan() ([]*plan, error) { toContext := context.Copy().WithActions("pull", "push") toContexts := make(map[contextKey]*registryclient.Context) - trees := buildTargetTrees(o.Mappings) - for _, tree := range trees { - for registry, scopes := range calculateDockerRegistryScopes(tree) { - klog.V(5).Infof("Using scopes for registry %s: %v", registry, scopes) - if o.SkipMultipleScopes { - toContexts[registry] = toContext.Copy() - } else { - toContexts[registry] = toContext.Copy().WithScopes(scopes...) - } + tree := buildTargetTree(o.Mappings) + for registry, scopes := range calculateDockerRegistryScopes(tree) { + klog.V(5).Infof("Using scopes for registry %s: %v", registry, scopes) + if o.SkipMultipleScopes { + toContexts[registry] = toContext.Copy() + } else { + toContexts[registry] = toContext.Copy().WithScopes(scopes...) } + } - stopCh := make(chan struct{}) - defer close(stopCh) - q := workqueue.New(o.MaxRegistry, stopCh) - registryWorkers := make(map[string]workqueue.Interface) - for name := range tree { - if _, ok := registryWorkers[name.registry]; !ok { - registryWorkers[name.registry] = workqueue.New(o.ParallelOptions.MaxPerRegistry, stopCh) - } + stopCh := make(chan struct{}) + defer close(stopCh) + q := workqueue.New(o.MaxRegistry, stopCh) + registryWorkers := make(map[string]workqueue.Interface) + for name := range tree { + if _, ok := registryWorkers[name.registry]; !ok { + registryWorkers[name.registry] = workqueue.New(o.ParallelOptions.MaxPerRegistry, stopCh) } + } - plan := newPlan() + plan := newPlan() - for name := range tree { - src := tree[name] - q.Queue(func(_ workqueue.Work) { - srcRepo, err := o.Repository(ctx, fromContext, src.ref, true) - if err != nil { - plan.AddError(retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref}) - return - } - manifests, err := srcRepo.Manifests(ctx) - if err != nil { - plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)}) - return - } - rq := registryWorkers[name.registry] - rq.Batch(func(w workqueue.Work) { - // convert source tags to digests - for tag := range src.tags { - srcTag, pushTargets := tag, src.tags[tag] - w.Parallel(func() { - desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag) - if err != nil { - if o.SkipMissing && imagemanifest.IsImageNotFound(err) { - ref := src.ref - ref.Ref.Tag = srcTag - fmt.Fprintf(o.ErrOut, "warning: Image %s does not exist and will not be mirrored\n", ref) - return - } - plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag %s: %v", src.ref, srcTag, err)}) + for name := range tree { + src := tree[name] + q.Queue(func(_ workqueue.Work) { + srcRepo, err := o.Repository(ctx, fromContext, src.ref, true) + if err != nil { + plan.AddError(retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref}) + return + } + manifests, err := srcRepo.Manifests(ctx) + if err != nil { + plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)}) + return + } + rq := registryWorkers[name.registry] + rq.Batch(func(w workqueue.Work) { + // convert source tags to digests + for tag := range src.tags { + srcTag, pushTargets := tag, src.tags[tag] + w.Parallel(func() { + desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag) + if err != nil { + if o.SkipMissing && imagemanifest.IsImageNotFound(err) { + ref := src.ref + ref.Ref.Tag = srcTag + fmt.Fprintf(o.ErrOut, "warning: Image %s does not exist and will not be mirrored\n", ref) return } - srcDigest := desc.Digest - klog.V(3).Infof("Resolved source image %s:%s to %s\n", src.ref, srcTag, srcDigest) - src.mergeIntoDigests(srcDigest, pushTargets) - }) - } - }) + plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag %s: %v", src.ref, srcTag, err)}) + return + } + srcDigest := desc.Digest + klog.V(3).Infof("Resolved source image %s:%s to %s\n", src.ref, srcTag, srcDigest) + src.mergeIntoDigests(srcDigest, pushTargets) + }) + } + }) - canonicalFrom := srcRepo.Named() + canonicalFrom := srcRepo.Named() - rq.Queue(func(w workqueue.Work) { - for key := range src.digests { - srcDigestString, pushTargets := key, src.digests[key] - w.Parallel(func() { - // load the manifest - srcDigest := godigest.Digest(srcDigestString) - srcManifest, err := manifests.Get(ctx, godigest.Digest(srcDigest), imagemanifest.PreferManifestList) - if err != nil { - plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest %s: %v", src.ref, srcDigest, err)}) - return - } - klog.V(5).Infof("Found manifest %s with type %T", srcDigest, srcManifest) + rq.Queue(func(w workqueue.Work) { + for key := range src.digests { + srcDigestString, pushTargets := key, src.digests[key] + w.Parallel(func() { + // load the manifest + srcDigest := godigest.Digest(srcDigestString) + srcManifest, err := manifests.Get(ctx, godigest.Digest(srcDigest), imagemanifest.PreferManifestList) + if err != nil { + plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest %s: %v", src.ref, srcDigest, err)}) + return + } + klog.V(5).Infof("Found manifest %s with type %T", srcDigest, srcManifest) + + // filter or load manifest list as appropriate + originalSrcDigest := srcDigest + srcManifests, srcManifest, srcDigest, err := imagemanifest.ProcessManifestList(ctx, srcDigest, srcManifest, manifests, src.ref.Ref, o.FilterOptions.IncludeAll, o.KeepManifestList) + if err != nil { + plan.AddError(retrieverError{src: src.ref, err: err}) + return + } + if len(srcManifests) == 0 { + fmt.Fprintf(o.ErrOut, "info: Filtered all images from %s, skipping\n", src.ref) + return + } - // filter or load manifest list as appropriate - originalSrcDigest := srcDigest - srcManifests, srcManifest, srcDigest, err := imagemanifest.ProcessManifestList(ctx, srcDigest, srcManifest, manifests, src.ref.Ref, o.FilterOptions.IncludeAll, o.KeepManifestList) - if err != nil { - plan.AddError(retrieverError{src: src.ref, err: err}) - return + var location string + if srcDigest == originalSrcDigest { + location = fmt.Sprintf("manifest %s", srcDigest) + } else { + location = fmt.Sprintf("manifest %s in manifest list %s", srcDigest, originalSrcDigest) + } + + for _, dst := range pushTargets { + var toRepo distribution.Repository + var err error + if o.DryRun { + toRepo, err = imagesource.NewDryRun(dst.ref) + } else { + toRepo, err = o.Repository(ctx, toContexts[contextKeyForReference(dst.ref)], dst.ref, false) } - if len(srcManifests) == 0 { - fmt.Fprintf(o.ErrOut, "info: Filtered all images from %s, skipping\n", src.ref) - return + if err != nil { + plan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)}) + continue } - var location string - if srcDigest == originalSrcDigest { - location = fmt.Sprintf("manifest %s", srcDigest) - } else { - location = fmt.Sprintf("manifest %s in manifest list %s", srcDigest, originalSrcDigest) + canonicalTo := toRepo.Named() + + registryPlan := plan.RegistryPlan(dst.ref) + repoPlan := registryPlan.RepositoryPlan(canonicalTo.String()) + blobPlan := repoPlan.Blobs(src.ref, location) + + toManifests, err := toRepo.Manifests(ctx) + if err != nil { + repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)}) + continue } - for _, dst := range pushTargets { - var toRepo distribution.Repository - var err error - if o.DryRun { - toRepo, err = imagesource.NewDryRun(dst.ref) + var mustCopyLayers bool + switch { + case o.Force: + mustCopyLayers = true + case src.ref.EqualRegistry(dst.ref) && canonicalFrom.String() == canonicalTo.String(): + // if the source and destination repos are the same, we don't need to copy layers unless forced + default: + if _, err := toManifests.Get(ctx, srcDigest); err != nil { + mustCopyLayers = true + blobPlan.AlreadyExists(distribution.Descriptor{Digest: srcDigest}) } else { - toRepo, err = o.Repository(ctx, toContexts[contextKeyForReference(dst.ref)], dst.ref, false) - } - if err != nil { - plan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)}) - continue + klog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref) } + } - canonicalTo := toRepo.Named() + toBlobs := toRepo.Blobs(ctx) - registryPlan := plan.RegistryPlan(dst.ref) - repoPlan := registryPlan.RepositoryPlan(canonicalTo.String()) - blobPlan := repoPlan.Blobs(src.ref, location) + if mustCopyLayers { + // upload all the blobs + srcBlobs := srcRepo.Blobs(ctx) - toManifests, err := toRepo.Manifests(ctx) - if err != nil { - repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)}) - continue - } - - var mustCopyLayers bool - switch { - case o.Force: - mustCopyLayers = true - case src.ref.EqualRegistry(dst.ref) && canonicalFrom.String() == canonicalTo.String(): - // if the source and destination repos are the same, we don't need to copy layers unless forced - default: - if _, err := toManifests.Get(ctx, srcDigest); err != nil { - mustCopyLayers = true - blobPlan.AlreadyExists(distribution.Descriptor{Digest: srcDigest}) - } else { - klog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref) + // upload each manifest + for _, srcManifest := range srcManifests { + switch srcManifest.(type) { + case *schema2.DeserializedManifest: + case *schema1.SignedManifest: + case *ocischema.DeserializedManifest: + case *manifestlist.DeserializedManifestList: + // we do not need to upload layers in a manifestlist + continue + default: + repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("the manifest type %T is not supported", srcManifest)}) + continue } - } - - toBlobs := toRepo.Blobs(ctx) - - if mustCopyLayers { - // upload all the blobs - srcBlobs := srcRepo.Blobs(ctx) - - // upload each manifest - for _, srcManifest := range srcManifests { - switch srcManifest.(type) { - case *schema2.DeserializedManifest: - case *schema1.SignedManifest: - case *ocischema.DeserializedManifest: - case *manifestlist.DeserializedManifestList: - // we do not need to upload layers in a manifestlist - continue - default: - repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("the manifest type %T is not supported", srcManifest)}) - continue - } - for _, blob := range srcManifest.References() { - if src.ref.EqualRegistry(dst.ref) { - registryPlan.AssociateBlob(canonicalFrom.String(), blob) - } - blobPlan.Copy(blob, srcBlobs, toBlobs) + for _, blob := range srcManifest.References() { + if src.ref.EqualRegistry(dst.ref) { + registryPlan.AssociateBlob(canonicalFrom.String(), blob) } + blobPlan.Copy(blob, srcBlobs, toBlobs) } } + } - if len(srcManifests) > 1 { - for _, srcManifest := range srcManifests { - manifestDigest, err := registryclient.ContentDigestForManifest(srcManifest, srcDigest.Algorithm()) - if err != nil { - repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("could not create manifesnt for %T", srcManifest)}) - continue - } - repoPlan.Manifests().Copy(manifestDigest, srcManifest, nil, toManifests, toBlobs) + if len(srcManifests) > 1 { + for _, srcManifest := range srcManifests { + manifestDigest, err := registryclient.ContentDigestForManifest(srcManifest, srcDigest.Algorithm()) + if err != nil { + repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("could not create manifesnt for %T", srcManifest)}) + continue } + repoPlan.Manifests().Copy(manifestDigest, srcManifest, nil, toManifests, toBlobs) } - - repoPlan.Manifests().Copy(srcDigest, srcManifest, dst.tags, toManifests, toBlobs) } - }) - } - }) + + repoPlan.Manifests().Copy(srcDigest, srcManifest, dst.tags, toManifests, toBlobs) + } + }) + } }) - } - for _, q := range registryWorkers { - q.Done() - } + }) + } + for _, q := range registryWorkers { q.Done() + } + q.Done() - plan.trim() - plan.calculateStats() + plan.trim() + plan.calculateStats() - plans = append(plans, plan) - } - return plans, nil + return plan, nil } func copyBlob(ctx context.Context, plan *workPlan, c *repositoryBlobCopy, blob distribution.Descriptor, referentialClient *http.Client, force, skipMount bool, errOut io.Writer) error {