Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@
addons:
apt:
packages:
- btrfs-tools
- libdevmapper-dev
- libgpgme11-dev
9 changes: 5 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@ SKOPEO_REPO = projectatomic/skopeo
SKOPEO_BRANCH = master
# Set SUDO=sudo to run container integration tests using sudo.
SUDO =
BUILDFLAGS = -tags "btrfs_noversion libdm_no_deferred_remove"

all: deps .gitvalidation test validate

deps:
go get -t ./...
go get -u github.com/golang/lint/golint
go get github.com/vbatts/git-validation
go get -t $(BUILDFLAGS) ./...
go get -u $(BUILDFLAGS) github.com/golang/lint/golint
go get $(BUILDFLAGS) github.com/vbatts/git-validation

test:
@go test -cover ./...
@go test $(BUILDFLAGS) -cover ./...

# This is not run as part of (make all), but Travis CI does run this.
# Demonstarting a working version of skopeo (possibly with modified SKOPEO_REPO/SKOPEO_BRANCH, e.g.
Expand Down
164 changes: 100 additions & 64 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ type digestingReader struct {
validationFailed bool
}

// imageCopier allows us to keep track of diffID values for blobs, and other
// data, that we're copying between images, and cache other information that
// might allow us to take some shortcuts
type imageCopier struct {
copiedBlobs map[digest.Digest]digest.Digest
cachedDiffIDs map[digest.Digest]digest.Digest
manifestUpdates *types.ManifestUpdateOptions
dest types.ImageDestination
src types.Image
rawSource types.ImageSource
diffIDsAreNeeded bool
canModifyManifest bool
reportWriter io.Writer
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// and set validationFailed to true if the source stream does not match expectedDigest.
func newDigestingReader(source io.Reader, expectedDigest digest.Digest) (*digestingReader, error) {
Expand Down Expand Up @@ -147,7 +162,20 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return err
}

if err := copyLayers(&manifestUpdates, dest, src, rawSource, canModifyManifest, reportWriter); err != nil {
// If src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates) will be true, it needs to be true by the time we get here.
ic := imageCopier{
copiedBlobs: make(map[digest.Digest]digest.Digest),
cachedDiffIDs: make(map[digest.Digest]digest.Digest),
manifestUpdates: &manifestUpdates,
dest: dest,
src: src,
rawSource: rawSource,
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates),
canModifyManifest: canModifyManifest,
reportWriter: reportWriter,
}

if err := ic.copyLayers(); err != nil {
return err
}

Expand All @@ -167,7 +195,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return fmt.Errorf("Error reading manifest: %v", err)
}

if err := copyConfig(dest, pendingImage, reportWriter); err != nil {
if err := ic.copyConfig(pendingImage); err != nil {
return err
}

Expand Down Expand Up @@ -206,57 +234,41 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return nil
}

// copyLayers copies layers from src/rawSource to dest, using and updating manifestUpdates if necessary and canModifyManifest.
// If src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates) will be true, it needs to be true by the time this function is called.
func copyLayers(manifestUpdates *types.ManifestUpdateOptions, dest types.ImageDestination, src types.Image, rawSource types.ImageSource,
canModifyManifest bool, reportWriter io.Writer) error {
type copiedLayer struct {
blobInfo types.BlobInfo
diffID digest.Digest
}

diffIDsAreNeeded := src.UpdatedImageNeedsLayerDiffIDs(*manifestUpdates)

srcInfos := src.LayerInfos()
// copyLayers copies layers from src/rawSource to dest, using and updating ic.manifestUpdates if necessary and ic.canModifyManifest.
func (ic *imageCopier) copyLayers() error {
srcInfos := ic.src.LayerInfos()
destInfos := []types.BlobInfo{}
diffIDs := []digest.Digest{}
copiedLayers := map[digest.Digest]copiedLayer{}
for _, srcLayer := range srcInfos {
cl, ok := copiedLayers[srcLayer.Digest]
if !ok {
var (
destInfo types.BlobInfo
diffID digest.Digest
err error
)
if dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if diffIDsAreNeeded {
return errors.New("getting DiffID for foreign layers is unimplemented")
}
destInfo = srcLayer
fmt.Fprintf(reportWriter, "Skipping foreign layer %q copy to %s\n", destInfo.Digest, dest.Reference().Transport().Name())
} else {
fmt.Fprintf(reportWriter, "Copying blob %s\n", srcLayer.Digest)
destInfo, diffID, err = copyLayer(dest, rawSource, srcLayer, diffIDsAreNeeded, canModifyManifest, reportWriter)
if err != nil {
return err
}
var (
destInfo types.BlobInfo
diffID digest.Digest
err error
)
if ic.dest.AcceptsForeignLayerURLs() && len(srcLayer.URLs) != 0 {
// DiffIDs are, currently, needed only when converting from schema1.
// In which case src.LayerInfos will not have URLs because schema1
// does not support them.
if ic.diffIDsAreNeeded {
return errors.New("getting DiffID for foreign layers is unimplemented")
}
destInfo = srcLayer
fmt.Fprintf(ic.reportWriter, "Skipping foreign layer %q copy to %s\n", destInfo.Digest, ic.dest.Reference().Transport().Name())
} else {
destInfo, diffID, err = ic.copyLayer(srcLayer)
if err != nil {
return err
}
cl = copiedLayer{blobInfo: destInfo, diffID: diffID}
copiedLayers[srcLayer.Digest] = cl
}
destInfos = append(destInfos, cl.blobInfo)
diffIDs = append(diffIDs, cl.diffID)
destInfos = append(destInfos, destInfo)
diffIDs = append(diffIDs, diffID)
}
manifestUpdates.InformationOnly.LayerInfos = destInfos
if diffIDsAreNeeded {
manifestUpdates.InformationOnly.LayerDiffIDs = diffIDs
ic.manifestUpdates.InformationOnly.LayerInfos = destInfos
if ic.diffIDsAreNeeded {
ic.manifestUpdates.InformationOnly.LayerDiffIDs = diffIDs
}
if layerDigestsDiffer(srcInfos, destInfos) {
manifestUpdates.LayerInfos = destInfos
ic.manifestUpdates.LayerInfos = destInfos
}
return nil
}
Expand All @@ -275,15 +287,15 @@ func layerDigestsDiffer(a, b []types.BlobInfo) bool {
}

// copyConfig copies config.json, if any, from src to dest.
func copyConfig(dest types.ImageDestination, src types.Image, reportWriter io.Writer) error {
func (ic *imageCopier) copyConfig(src types.Image) error {
srcInfo := src.ConfigInfo()
if srcInfo.Digest != "" {
fmt.Fprintf(reportWriter, "Copying config %s\n", srcInfo.Digest)
fmt.Fprintf(ic.reportWriter, "Copying config %s\n", srcInfo.Digest)
configBlob, err := src.ConfigBlob()
if err != nil {
return fmt.Errorf("Error reading config blob %s: %v", srcInfo.Digest, err)
}
destInfo, err := copyBlobFromStream(dest, bytes.NewReader(configBlob), srcInfo, nil, false, reportWriter)
destInfo, err := ic.copyBlobFromStream(bytes.NewReader(configBlob), srcInfo, nil, false)
if err != nil {
return err
}
Expand All @@ -303,16 +315,40 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func copyLayer(dest types.ImageDestination, src types.ImageSource, srcInfo types.BlobInfo,
diffIDIsNeeded bool, canCompress bool, reportWriter io.Writer) (types.BlobInfo, digest.Digest, error) {
srcStream, srcBlobSize, err := src.GetBlob(srcInfo)
func (ic *imageCopier) copyLayer(srcInfo types.BlobInfo) (types.BlobInfo, digest.Digest, error) {
// Check if we already have a blob with this digest
haveBlob, extantBlobSize, err := ic.dest.HasBlob(srcInfo)
if err != nil && err != types.ErrBlobNotFound {
return types.BlobInfo{}, "", fmt.Errorf("Error checking for blob %s at destination: %v", srcInfo.Digest, err)
}
// If we already have a cached diffID for this blob, we don't need to compute it
diffIDIsNeeded := ic.diffIDsAreNeeded && (ic.cachedDiffIDs[srcInfo.Digest] == "")
// If we already have the blob, and we don't need to recompute the diffID, then we might be able to avoid reading it again
if haveBlob && !diffIDIsNeeded {
// Check the blob sizes match, if we were given a size this time
if srcInfo.Size != -1 && srcInfo.Size != extantBlobSize {
return types.BlobInfo{}, "", fmt.Errorf("Error: blob %s is already present, but with size %d instead of %d", srcInfo.Digest, extantBlobSize, srcInfo.Size)
}
srcInfo.Size = extantBlobSize
// Tell the image destination that this blob's delta is being applied again. For some image destinations, this can be faster than using GetBlob/PutBlob
blobinfo, err := ic.dest.ReapplyBlob(srcInfo)
if err != nil {
return types.BlobInfo{}, "", fmt.Errorf("Error reapplying blob %s at destination: %v", srcInfo.Digest, err)
}
fmt.Fprintf(ic.reportWriter, "Skipping fetch of repeat blob %s\n", srcInfo.Digest)
return blobinfo, ic.cachedDiffIDs[srcInfo.Digest], err
}

// Fallback: copy the layer, computing the diffID if we need to do so
fmt.Fprintf(ic.reportWriter, "Copying blob %s\n", srcInfo.Digest)
srcStream, srcBlobSize, err := ic.rawSource.GetBlob(srcInfo)
if err != nil {
return types.BlobInfo{}, "", fmt.Errorf("Error reading blob %s: %v", srcInfo.Digest, err)
}
defer srcStream.Close()

blobInfo, diffIDChan, err := copyLayerFromStream(dest, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize},
diffIDIsNeeded, canCompress, reportWriter)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize},
diffIDIsNeeded)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand All @@ -323,6 +359,7 @@ func copyLayer(dest types.ImageDestination, src types.ImageSource, srcInfo types
return types.BlobInfo{}, "", fmt.Errorf("Error computing layer DiffID: %v", diffIDResult.err)
}
logrus.Debugf("Computed DiffID %s for layer %s", diffIDResult.digest, srcInfo.Digest)
ic.cachedDiffIDs[srcInfo.Digest] = diffIDResult.digest
}
return blobInfo, diffIDResult.digest, nil
}
Expand All @@ -331,8 +368,8 @@ func copyLayer(dest types.ImageDestination, src types.ImageSource, srcInfo types
// it copies a blob with srcInfo (with known Digest and possibly known Size) from srcStream to dest,
// perhaps compressing the stream if canCompress,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func copyLayerFromStream(dest types.ImageDestination, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, canCompress bool, reportWriter io.Writer) (types.BlobInfo, <-chan diffIDResult, error) {
func (ic *imageCopier) copyLayerFromStream(srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(decompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -356,8 +393,7 @@ func copyLayerFromStream(dest types.ImageDestination, srcStream io.Reader, srcIn
return pipeWriter
}
}
blobInfo, err := copyBlobFromStream(dest, srcStream, srcInfo,
getDiffIDRecorder, canCompress, reportWriter) // Sets err to nil on success
blobInfo, err := ic.copyBlobFromStream(srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down Expand Up @@ -391,9 +427,9 @@ func computeDiffID(stream io.Reader, decompressor decompressorFunc) (digest.Dige
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied blob.
func copyBlobFromStream(dest types.ImageDestination, srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor decompressorFunc) io.Writer, canCompress bool,
reportWriter io.Writer) (types.BlobInfo, error) {
func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor decompressorFunc) io.Writer,
canCompress bool) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers.
// === Input: srcStream

Expand All @@ -419,13 +455,13 @@ func copyBlobFromStream(dest types.ImageDestination, srcStream io.Reader, srcInf

// === Report progress using a pb.Reader.
bar := pb.New(int(srcInfo.Size)).SetUnits(pb.U_BYTES)
bar.Output = reportWriter
bar.Output = ic.reportWriter
bar.SetMaxWidth(80)
bar.ShowTimeLeft = false
bar.ShowPercent = false
bar.Start()
destStream = bar.NewProxyReader(destStream)
defer fmt.Fprint(reportWriter, "\n")
defer fmt.Fprint(ic.reportWriter, "\n")

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
Expand All @@ -436,7 +472,7 @@ func copyBlobFromStream(dest types.ImageDestination, srcStream io.Reader, srcInf

// === Compress the layer if it is uncompressed and compression is desired
var inputInfo types.BlobInfo
if !canCompress || isCompressed || !dest.ShouldCompressLayers() {
if !canCompress || isCompressed || !ic.dest.ShouldCompressLayers() {
logrus.Debugf("Using original blob without modification")
inputInfo = srcInfo
} else {
Expand All @@ -454,7 +490,7 @@ func copyBlobFromStream(dest types.ImageDestination, srcStream io.Reader, srcInf
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := dest.PutBlob(destStream, inputInfo)
uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
if err != nil {
return types.BlobInfo{}, fmt.Errorf("Error writing blob: %v", err)
}
Expand Down
19 changes: 19 additions & 0 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,25 @@ func (d *dirImageDestination) PutBlob(stream io.Reader, inputInfo types.BlobInfo
return types.BlobInfo{Digest: computedDigest, Size: size}, nil
}

func (d *dirImageDestination) HasBlob(info types.BlobInfo) (bool, int64, error) {
if info.Digest == "" {
return false, -1, fmt.Errorf(`"Can not check for a blob with unknown digest`)
}
blobPath := d.ref.layerPath(info.Digest)
finfo, err := os.Stat(blobPath)
if err != nil && os.IsNotExist(err) {
return false, -1, types.ErrBlobNotFound
}
if err != nil {
return false, -1, err
}
return true, finfo.Size(), nil
}

func (d *dirImageDestination) ReapplyBlob(info types.BlobInfo) (types.BlobInfo, error) {
return info, nil
}

func (d *dirImageDestination) PutManifest(manifest []byte) error {
return ioutil.WriteFile(d.ref.manifestPath(), manifest, 0644)
}
Expand Down
23 changes: 20 additions & 3 deletions docker/daemon/daemon_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type daemonImageDestination struct {
writer *io.PipeWriter
tar *tar.Writer
// Other state
committed bool // writer has been closed
committed bool // writer has been closed
blobs map[digest.Digest]types.BlobInfo // list of already-sent blobs
}

// newImageDestination returns a types.ImageDestination for the specified image reference.
Expand Down Expand Up @@ -62,6 +63,7 @@ func newImageDestination(systemCtx *types.SystemContext, ref daemonReference) (t
writer: writer,
tar: tar.NewWriter(writer),
committed: false,
blobs: make(map[digest.Digest]types.BlobInfo),
}, nil
}

Expand Down Expand Up @@ -142,8 +144,8 @@ func (d *daemonImageDestination) AcceptsForeignLayerURLs() bool {
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *daemonImageDestination) PutBlob(stream io.Reader, inputInfo types.BlobInfo) (types.BlobInfo, error) {
if inputInfo.Digest.String() == "" {
return types.BlobInfo{}, fmt.Errorf(`"Can not stream a blob with unknown digest to "docker-daemon:"`)
if ok, size, err := d.HasBlob(inputInfo); err == nil && ok {
return types.BlobInfo{Digest: inputInfo.Digest, Size: size}, nil
}

if inputInfo.Size == -1 { // Ouch, we need to stream the blob into a temporary file just to determine the size.
Expand Down Expand Up @@ -173,9 +175,24 @@ func (d *daemonImageDestination) PutBlob(stream io.Reader, inputInfo types.BlobI
if err := d.sendFile(inputInfo.Digest.String(), inputInfo.Size, tee); err != nil {
return types.BlobInfo{}, err
}
d.blobs[inputInfo.Digest] = types.BlobInfo{Digest: digester.Digest(), Size: inputInfo.Size}
return types.BlobInfo{Digest: digester.Digest(), Size: inputInfo.Size}, nil
}

func (d *daemonImageDestination) HasBlob(info types.BlobInfo) (bool, int64, error) {
if info.Digest == "" {
return false, -1, fmt.Errorf(`"Can not check for a blob with unknown digest`)
}
if blob, ok := d.blobs[info.Digest]; ok {
return true, blob.Size, nil
}
return false, -1, types.ErrBlobNotFound
}

func (d *daemonImageDestination) ReapplyBlob(info types.BlobInfo) (types.BlobInfo, error) {
return info, nil
}

func (d *daemonImageDestination) PutManifest(m []byte) error {
var man schema2Manifest
if err := json.Unmarshal(m, &man); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion docker/docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

"github.com/Sirupsen/logrus"
"github.com/containers/image/types"
"github.com/docker/docker/pkg/homedir"
"github.com/containers/storage/pkg/homedir"
"github.com/docker/go-connections/sockets"
"github.com/docker/go-connections/tlsconfig"
)
Expand Down
Loading