Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convertor: support multi-arch images #266

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 228 additions & 46 deletions cmd/convertor/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,33 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/containerd/accelerated-container-image/cmd/convertor/database"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

type Builder interface {
Build(ctx context.Context) error
}

type BuilderOptions struct {
Ref string
TargetRef string
Expand All @@ -57,18 +62,214 @@ type BuilderOptions struct {
Reserve bool
NoUpload bool
DumpManifest bool

// ConcurrencyLimit limits the number of manifests that can be built at once
// 0 means no limit
ConcurrencyLimit int
}

type overlaybdBuilder struct {
layers int
config v1.Image
engine builderEngine
type graphBuilder struct {
// required
Resolver remotes.Resolver

// options
BuilderOptions

// private
fetcher remotes.Fetcher
pusher remotes.Pusher
tagPusher remotes.Pusher
group *errgroup.Group
sem chan struct{}
id atomic.Int32
}

func (b *graphBuilder) Build(ctx context.Context) error {
fetcher, err := b.Resolver.Fetcher(ctx, b.Ref)
if err != nil {
return fmt.Errorf("failed to obtain new fetcher: %w", err)
}
pusher, err := b.Resolver.Pusher(ctx, b.TargetRef+"@") // append '@' to avoid tag
if err != nil {
return fmt.Errorf("failed to obtain new pusher: %w", err)
}
tagPusher, err := b.Resolver.Pusher(ctx, b.TargetRef) // append '@' to avoid tag
if err != nil {
return fmt.Errorf("failed to obtain new tag pusher: %w", err)
}
b.fetcher = fetcher
b.pusher = pusher
b.tagPusher = tagPusher
_, src, err := b.Resolver.Resolve(ctx, b.Ref)
if err != nil {
return fmt.Errorf("failed to resolve: %w", err)
}

g, gctx := errgroup.WithContext(ctx)
b.group = g
if b.ConcurrencyLimit > 0 {
b.sem = make(chan struct{}, b.ConcurrencyLimit)
}
g.Go(func() error {
target, err := b.process(gctx, src, true)
if err != nil {
return fmt.Errorf("failed to build %q: %w", src.Digest, err)
}
log.G(gctx).Infof("converted to %q, digest: %q", b.TargetRef, target.Digest)
return nil
})
return g.Wait()
}

func NewOverlayBDBuilder(ctx context.Context, opt BuilderOptions) (Builder, error) {
func (b *graphBuilder) process(ctx context.Context, src v1.Descriptor, tag bool) (v1.Descriptor, error) {
switch src.MediaType {
case v1.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
return b.buildOne(ctx, src, tag)
case v1.MediaTypeImageIndex, images.MediaTypeDockerSchema2ManifestList:
var index v1.Index
rc, err := b.fetcher.Fetch(ctx, src)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to fetch index: %w", err)
}
defer rc.Close()
indexBytes, err := io.ReadAll(rc)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to read index: %w", err)
}
if err := json.Unmarshal(indexBytes, &index); err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to unmarshal index: %w", err)
}
var wg sync.WaitGroup
for _i, _m := range index.Manifests {
i := _i
m := _m
wg.Add(1)
b.group.Go(func() error {
defer wg.Done()
target, err := b.process(ctx, m, false)
if err != nil {
return fmt.Errorf("failed to build %q: %w", m.Digest, err)
}
index.Manifests[i] = target
return nil
})
}
wg.Wait()
if ctx.Err() != nil {
return v1.Descriptor{}, ctx.Err()
}

// upload index
indexBytes, err = json.Marshal(index)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to marshal index: %w", err)
}
expected := src
expected.Digest = digest.FromBytes(indexBytes)
expected.Size = int64(len(indexBytes))
var pusher remotes.Pusher
if tag {
pusher = b.tagPusher
} else {
pusher = b.pusher
}
if err := uploadBytes(ctx, pusher, expected, indexBytes); err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to upload index: %w", err)
}
return expected, nil
default:
return v1.Descriptor{}, fmt.Errorf("unsupported media type %q", src.MediaType)
}
}

func (b *graphBuilder) buildOne(ctx context.Context, src v1.Descriptor, tag bool) (v1.Descriptor, error) {
if b.sem != nil {
select {
case <-ctx.Done():
return v1.Descriptor{}, ctx.Err()
case b.sem <- struct{}{}:
}
}
defer func() {
if b.sem != nil {
select {
case <-ctx.Done():
case <-b.sem:
}
}
}()
id := b.id.Add(1)

var platform string
if src.Platform == nil {
platform = ""
} else {
platform = platforms.Format(*src.Platform)
ctx = log.WithLogger(ctx, log.G(ctx).WithField("platform", platform))
}
workdir := filepath.Join(b.WorkDir, fmt.Sprintf("%d-%s-%s", id, strings.ReplaceAll(platform, "/", "_"), src.Digest.Encoded()))
log.G(ctx).Infof("building %s ...", workdir)

// init build engine
manifest, config, err := fetchManifestAndConfig(ctx, b.fetcher, src)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to fetch manifest and config: %w", err)
}
var pusher remotes.Pusher
if tag {
pusher = b.tagPusher
} else {
pusher = b.pusher
}
engineBase := &builderEngineBase{
resolver: b.Resolver,
fetcher: b.fetcher,
pusher: pusher,
manifest: *manifest,
config: *config,
inputDesc: src,
}
engineBase.workDir = workdir
engineBase.oci = b.OCI
engineBase.mkfs = b.Mkfs
engineBase.vsize = b.Vsize
engineBase.db = b.DB
refspec, err := reference.Parse(b.Ref)
if err != nil {
return v1.Descriptor{}, err
}
engineBase.host = refspec.Hostname()
engineBase.repository = strings.TrimPrefix(refspec.Locator, engineBase.host+"/")
engineBase.reserve = b.Reserve
engineBase.noUpload = b.NoUpload
engineBase.dumpManifest = b.DumpManifest

var engine builderEngine
switch b.Engine {
case Overlaybd:
engine = NewOverlayBDBuilderEngine(engineBase)
case TurboOCI:
engine = NewTurboOCIBuilderEngine(engineBase)
}

// build
builder := &overlaybdBuilder{
layers: len(engineBase.manifest.Layers),
engine: engine,
}
desc, err := builder.Build(ctx)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to build %s: %w", workdir, err)
}
src.Digest = desc.Digest
src.Size = desc.Size
return src, nil
}

func Build(ctx context.Context, opt BuilderOptions) error {
tlsConfig, err := loadTLSConfig(opt.CertOption)
if err != nil {
return nil, fmt.Errorf("failed to load certifications: %w", err)
return fmt.Errorf("failed to load certifications: %w", err)
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Expand Down Expand Up @@ -106,41 +307,21 @@ func NewOverlayBDBuilder(ctx context.Context, opt BuilderOptions) (Builder, erro
}),
),
})
engineBase, err := getBuilderEngineBase(ctx, resolver, opt.Ref, opt.TargetRef)
if err != nil {
return nil, err
}
engineBase.workDir = opt.WorkDir
engineBase.oci = opt.OCI
engineBase.mkfs = opt.Mkfs
engineBase.vsize = opt.Vsize
engineBase.db = opt.DB

refspec, err := reference.Parse(opt.Ref)
if err != nil {
return nil, err
}
engineBase.host = refspec.Hostname()
engineBase.repository = strings.TrimPrefix(refspec.Locator, engineBase.host+"/")
engineBase.reserve = opt.Reserve
engineBase.noUpload = opt.NoUpload
engineBase.dumpManifest = opt.DumpManifest
return (&graphBuilder{
Resolver: resolver,
BuilderOptions: opt,
}).Build(ctx)
}

var engine builderEngine
switch opt.Engine {
case Overlaybd:
engine = NewOverlayBDBuilderEngine(engineBase)
case TurboOCI:
engine = NewTurboOCIBuilderEngine(engineBase)
}
return &overlaybdBuilder{
layers: len(engineBase.manifest.Layers),
engine: engine,
config: engineBase.config,
}, nil
type overlaybdBuilder struct {
layers int
engine builderEngine
}

func (b *overlaybdBuilder) Build(ctx context.Context) error {
// Build return a descriptor of the converted target, as the caller may need it
// to tag or compose an index
func (b *overlaybdBuilder) Build(ctx context.Context) (v1.Descriptor, error) {
defer b.engine.Cleanup()
alreadyConverted := make([]chan *v1.Descriptor, b.layers)
downloaded := make([]chan error, b.layers)
Expand All @@ -150,7 +331,7 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
// when errors are encountered fallback to regular conversion
if convertedDesc, err := b.engine.CheckForConvertedManifest(ctx); err == nil && convertedDesc.Digest != "" {
logrus.Infof("Image found already converted in registry with digest %s", convertedDesc.Digest)
return nil
return convertedDesc, nil
}

// Errgroups will close the context after wait returns so the operations need their own
Expand Down Expand Up @@ -244,15 +425,16 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
})
}
if err := g.Wait(); err != nil {
return err
return v1.Descriptor{}, err
}

if err := b.engine.UploadImage(ctx); err != nil {
return errors.Wrap(err, "failed to upload manifest or config")
targetDesc, err := b.engine.UploadImage(ctx)
if err != nil {
return v1.Descriptor{}, errors.Wrap(err, "failed to upload manifest or config")
}
b.engine.StoreConvertedManifestDetails(ctx)
logrus.Info("convert finished")
return nil
return targetDesc, nil
}

// block until ctx.Done() or sent
Expand Down
Loading
Loading