Skip to content

Commit

Permalink
[feat] preheat supports authentication and rich format range
Browse files Browse the repository at this point in the history
Signed-off-by: yxxhero <[email protected]>
  • Loading branch information
yxxhero committed Apr 17, 2022
1 parent 0649293 commit e117638
Show file tree
Hide file tree
Showing 9 changed files with 787 additions and 95 deletions.
7 changes: 5 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ linters-settings:
gocyclo:
min-complexity: 42
gci:
local-prefixes: d7y.io/dragonfly/v2
sections:
- standard
- default
- prefix(d7y.io/dragonfly/v2)

linters:
disable-all: true
enable:
- gci
- gofmt
- golint
- golint
- misspell
- govet
- goconst
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/casbin/casbin/v2 v2.34.1
github.com/casbin/gorm-adapter/v3 v3.3.2
github.com/colinmarc/hdfs/v2 v2.2.0
github.com/containerd/containerd v1.6.2
github.com/distribution/distribution/v3 v3.0.0-20210804104954-38ab4c606ee3
github.com/docker/go-connections v0.4.0
github.com/docker/go-units v0.4.0
Expand Down Expand Up @@ -46,6 +47,7 @@ require (
github.com/onsi/ginkgo/v2 v2.1.0
github.com/onsi/gomega v1.18.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.0.2
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
Expand Down Expand Up @@ -123,7 +125,7 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down Expand Up @@ -156,16 +158,16 @@ require (
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
github.com/moby/locker v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.28.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/prometheus/common v0.30.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rs/cors v1.7.0 // indirect
Expand All @@ -190,6 +192,7 @@ require (
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel/internal/metric v0.27.0 // indirect
go.opentelemetry.io/otel/metric v0.27.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/exp v0.0.0-20201221025956-e89b829e73ea // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
Expand Down
551 changes: 547 additions & 4 deletions go.sum

Large diffs are not rendered by default.

228 changes: 158 additions & 70 deletions manager/job/preheat.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ import (
"time"

machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/manifest/schema2"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"

Expand All @@ -49,10 +52,7 @@ type PreheatType string
const (
PreheatImageType PreheatType = "image"
PreheatFileType PreheatType = "file"
)

const (
timeout = 1 * time.Minute
timeout = 1 * time.Minute
)

var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")
Expand Down Expand Up @@ -104,7 +104,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule
return nil, err
}

files, err = p.getLayers(ctx, url, filter, httputils.MapToHeader(rawheader), image)
files, err = p.getLayers(ctx, image, json)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,93 +168,64 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe
}, nil
}

func (p *preheat) getLayers(ctx context.Context, url string, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {
func (p *preheat) getLayers(ctx context.Context, image *preheatImage, preheatArgs types.PreheatArgs) ([]*internaljob.PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()

resp, err := p.getManifests(ctx, url, header)
ocispecManifest, err := p.getManifests(ctx, image, preheatArgs.Username, preheatArgs.Password)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
if resp.StatusCode == http.StatusUnauthorized {
token, err := getAuthToken(ctx, resp.Header)
if err != nil {
return nil, err
}

bearer := "Bearer " + token
header.Add("Authorization", bearer)

resp, err = p.getManifests(ctx, url, header)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("request registry %d", resp.StatusCode)
}
}

layers, err := p.parseLayers(resp, url, filter, header, image)
layers, err := p.parseLayers(ocispecManifest, preheatArgs.URL, preheatArgs.Filter, httputils.MapToHeader(preheatArgs.Headers), image)
if err != nil {
return nil, err
}

return layers, nil
}

func (p *preheat) getManifests(ctx context.Context, url string, header http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
func Fetch(ctx context.Context, f remotes.Fetcher, desc ocispec.Descriptor, dest io.Writer) error {
r, err := f.Fetch(ctx, desc)
defer func() {
closeErr := r.Close()
logger.Warnf("close fetch reader: %v", closeErr)
}()
if err != nil {
return nil, err
return err
}
dgstr := desc.Digest.Algorithm().Digester()
_, err = io.Copy(io.MultiWriter(dest, dgstr.Hash()), r)
if err != nil {
return err
}
if dgstr.Digest() != desc.Digest {
return fmt.Errorf("content mismatch: %s != %s", dgstr.Digest(), desc.Digest)
}

req.Header = header
req.Header.Add("Accept", schema2.MediaTypeManifest)
return nil
}

// getResolver returns a resolver.
func (p *preheat) getResolver(ctx context.Context, username, password string) remotes.Resolver {
creds := func(string) (string, string, error) {
return username, password, nil
}
client := &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
options := docker.ResolverOptions{}
options.Hosts = docker.ConfigureDefaultRegistries(
docker.WithClient(client),
docker.WithAuthorizer(docker.NewDockerAuthorizer(
docker.WithAuthClient(client),
docker.WithAuthCreds(creds),
)),
)
return docker.NewResolver(options)

resp, err := client.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}

func (p *preheat) parseLayers(resp *http.Response, url, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

manifest, _, err := distribution.UnmarshalManifest(schema2.MediaTypeManifest, body)
if err != nil {
return nil, err
}

var layers []*internaljob.PreheatRequest
for _, v := range manifest.References() {
digest := v.Digest.String()
layer := &internaljob.PreheatRequest{
URL: layerURL(image.protocol, image.domain, image.name, digest),
Tag: p.bizTag,
Filter: filter,
Digest: digest,
Headers: httputils.HeaderToMap(header),
}

layers = append(layers, layer)
}

return layers, nil
}

func getAuthToken(ctx context.Context, header http.Header) (string, error) {
Expand Down Expand Up @@ -314,6 +285,123 @@ func authURL(wwwAuth []string) string {
return fmt.Sprintf("%s?%s", host, query)
}

func (p *preheat) getManifests(ctx context.Context, image *preheatImage, username, password string) (ocispec.Manifest, error) {

resolver := p.getResolver(ctx, username, password)

// combine image url and tag
i := fmt.Sprintf("%s/%s:%s", image.domain, image.name, image.tag)

_, imageDesc, err := resolver.Resolve(ctx, i)
if err != nil {
return ocispec.Manifest{}, err
}
f, err := resolver.Fetcher(ctx, i)
if err != nil {
return ocispec.Manifest{}, err
}
r, err := f.Fetch(ctx, imageDesc)
if err != nil {
return ocispec.Manifest{}, fmt.Errorf("failed to fetch %s: %w", imageDesc.Digest, err)
}
defer r.Close()

descResponse, err := io.ReadAll(r)
if err != nil {
return ocispec.Manifest{}, fmt.Errorf("failed to read %s: %w", imageDesc.Digest, err)
}

platform := platforms.Only(platforms.DefaultSpec())
switch imageDesc.MediaType {
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:

var manifest ocispec.Manifest
if err := json.Unmarshal(descResponse, &manifest); err != nil {
return ocispec.Manifest{}, err
}

if manifest.Config.Digest == imageDesc.Digest && (!platform.Match(*manifest.Config.Platform)) {
return ocispec.Manifest{}, fmt.Errorf("manifest: invalid platform %s: %w", manifest.Config.Platform, err)
}

return manifest, nil
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
var idx ocispec.Index
if err := json.Unmarshal(descResponse, &idx); err != nil {
return ocispec.Manifest{}, err
}
return ocispec.Manifest{
Versioned: idx.Versioned,
MediaType: idx.MediaType,
Layers: idx.Manifests,
Annotations: idx.Annotations,
}, nil
default:
return ocispec.Manifest{}, fmt.Errorf("unsupported manifest type %s", imageDesc.MediaType)
}
}

func references(om ocispec.Manifest) []ocispec.Descriptor {
references := make([]ocispec.Descriptor, 0, 1+len(om.Layers))
references = append(references, om.Config)
references = append(references, om.Layers...)
return references
}

func (p *preheat) parseLayers(om ocispec.Manifest, url, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {

var layers []*internaljob.PreheatRequest

req, err := http.NewRequestWithContext(context.Background(), "GET", url, nil)
if err != nil {
return nil, err
}
client := &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}

resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 && resp.StatusCode != http.StatusUnauthorized {
return nil, fmt.Errorf("request registry %d", resp.StatusCode)
}

layerHeader := header.Clone()
if resp.StatusCode == http.StatusUnauthorized {
token, err := getAuthToken(context.Background(), resp.Header)
if err != nil {
return nil, err
}

layerHeader.Set("Authorization", fmt.Sprintf("Bearer %s", token))
}

for _, v := range references(om) {
digest := v.Digest.String()
if len(digest) == 0 {
continue
}
layer := &internaljob.PreheatRequest{
URL: layerURL(image.protocol, image.domain, image.name, digest),
Tag: p.bizTag,
Filter: filter,
Digest: digest,
Headers: httputils.HeaderToMap(layerHeader),
}

layers = append(layers, layer)
}

return layers, nil
}

func layerURL(protocol string, domain string, name string, digest string) string {
return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", protocol, domain, name, digest)
}
Expand Down
Loading

0 comments on commit e117638

Please sign in to comment.