Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions pkg/image/registryclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
"github.com/docker/distribution/registry/client/auth/challenge"
"github.com/docker/distribution/registry/client/transport"
"github.com/opencontainers/go-digest"

imagereference "github.com/openshift/library-go/pkg/image/reference"
)

// RepositoryRetriever fetches a Docker distribution.Repository.
type RepositoryRetriever interface {
// Repository returns a properly authenticated distribution.Repository for the given registry, repository
// name, and insecure toleration behavior.
Repository(ctx context.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error)
Repository(ctx context.Context, registry *url.URL, repoName string, insecure bool) (RepositoryWithLocation, error)
}

// ErrNotV2Registry is returned when the server does not report itself as a V2 Docker registry
Expand Down Expand Up @@ -80,6 +82,7 @@ type Context struct {
Credentials auth.CredentialStore
RequestModifiers []transport.RequestModifier
Limiter *rate.Limiter
Alternates AlternateBlobSourceStrategy

DisableDigestVerification bool

Expand Down Expand Up @@ -138,6 +141,11 @@ func (c *Context) WithCredentials(credentials auth.CredentialStore) *Context {
return c
}

func (c *Context) WithAlternateBlobSourceStrategy(alternateStrategy AlternateBlobSourceStrategy) *Context {
c.Alternates = alternateStrategy
return c
}

// Reset clears any cached repository info for this context.
func (c *Context) Reset() {
c.lock.Lock()
Expand Down Expand Up @@ -200,18 +208,67 @@ func (c *Context) Ping(ctx context.Context, registry *url.URL, insecure bool) (h
return t, &src, nil
}

func (c *Context) Repository(ctx context.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) {
// RepositoryForRef returns a distribution.Repository against the provided image reference. If insecure
// is true, HTTP connections are allowed and HTTPS certificate verification errors will be ignored. The returned
// Repository instance is threadsafe but the ManifestService, TagService, or BlobService are not.
func (c *Context) RepositoryForRef(ctx context.Context, ref imagereference.DockerImageReference, insecure bool) (distribution.Repository, error) {
return c.connectToRegistry(ctx, repositoryLocator{ref: ref}, insecure)
}

// Repository returns a distribution.Repository against the provided registry and repository name. If insecure
// is true, HTTP connections are allowed and HTTPS certificate verification errors will be ignored. The returned
// Repository instance is threadsafe but the ManifestService, TagService, or BlobService are not. Note - the caller
// is responsible for providing a valid registry url for docker.io - use RepositoryForRef() to avoid that.
func (c *Context) Repository(ctx context.Context, registry *url.URL, repoName string, insecure bool) (RepositoryWithLocation, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change made *Context to stop implementing RepositoryRetriever interface. Was that expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you currently using that interface? I'm wondering if anybody uses it. I can update the interface though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, image registry repository leverages this interface in a few places.

named, err := reference.WithName(repoName)
if err != nil {
return nil, err
}
ref, err := imagereference.Parse(repoName)
if err != nil {
return nil, err
}
ref.Registry = registry.Host
locator := repositoryLocator{
named: named,
ref: ref,
url: registry,
}
return &blobMirroredRepository{
locator: locator,
strategy: c.Alternates,
retriever: c,
}, nil
}

// connectToRegistry is private and returns a non-wrapped, non-mirrorable repository.
func (c *Context) connectToRegistry(ctx context.Context, locator repositoryLocator, insecure bool) (RepositoryWithLocation, error) {
var named reference.Named = locator.named
var registryURL *url.URL = locator.url
var path string

// ensure the values needed from the locator are defaulted
if named == nil {
path = locator.ref.RepositoryName()
var err error
named, err = reference.WithName(path)
if err != nil {
return nil, err
}
} else {
path = reference.Path(named)
}
if registryURL == nil {
registryURL = locator.ref.RegistryURL()
}

rt, src, err := c.Ping(ctx, registry, insecure)
// attempt to connect to the registry to get auth instructions
rt, src, err := c.Ping(ctx, registryURL, insecure)
if err != nil {
return nil, err
}

rt = c.repositoryTransport(rt, src, repoName)
rt = c.repositoryTransport(rt, src, path)

repo, err := registryclient.NewRepository(named, src.String(), rt)
if err != nil {
Expand All @@ -224,7 +281,7 @@ func (c *Context) Repository(ctx context.Context, registry *url.URL, repoName st
if limiter == nil {
limiter = rate.NewLimiter(rate.Limit(5), 5)
}
return NewLimitedRetryRepository(repo, c.Retries, limiter), nil
return NewLimitedRetryRepository(locator.ref, repo, c.Retries, limiter), nil
}

func (c *Context) ping(registry url.URL, insecure bool, transport http.RoundTripper) (*url.URL, error) {
Expand Down Expand Up @@ -359,23 +416,29 @@ var nowFn = time.Now
type retryRepository struct {
distribution.Repository

ref imagereference.DockerImageReference
limiter *rate.Limiter
retries int
sleepFn func(time.Duration)
}

// NewLimitedRetryRepository wraps a distribution.Repository with helpers that will retry temporary failures
// over a limited time window and duration, and also obeys a rate limit.
func NewLimitedRetryRepository(repo distribution.Repository, retries int, limiter *rate.Limiter) distribution.Repository {
func NewLimitedRetryRepository(ref imagereference.DockerImageReference, repo distribution.Repository, retries int, limiter *rate.Limiter) RepositoryWithLocation {
return &retryRepository{
Repository: repo,

ref: ref,
limiter: limiter,
retries: retries,
sleepFn: time.Sleep,
}
}

func (r *retryRepository) Ref() imagereference.DockerImageReference {
return r.ref
}

// isTemporaryHTTPError returns true if the error indicates a temporary or partial HTTP failure
func isTemporaryHTTPError(err error) (time.Duration, bool) {
if err == nil {
Expand Down
Loading