Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docker archive auto compression #481

Merged
merged 8 commits into from
Jul 17, 2018
5 changes: 4 additions & 1 deletion copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc)
if err != nil {
return "", err
}
defer s.Close()
stream = s
}

Expand Down Expand Up @@ -673,10 +674,12 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
inputInfo.Size = -1
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Decompress && isCompressed {
logrus.Debugf("Blob will be decompressed")
destStream, err = decompressor(destStream)
s, err := decompressor(destStream)
if err != nil {
return types.BlobInfo{}, err
}
defer s.Close()
destStream = s
inputInfo.Digest = ""
inputInfo.Size = -1
} else {
Expand Down
102 changes: 71 additions & 31 deletions docker/tarfile/src.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tarfile
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
Expand Down Expand Up @@ -43,28 +42,32 @@ type layerInfo struct {
// the source of an image.
// To do for both the NewSourceFromFile and NewSourceFromStream functions

// NewSourceFromFile returns a tarfile.Source for the specified path
// NewSourceFromFile supports both conpressed and uncompressed input
// NewSourceFromFile returns a tarfile.Source for the specified path.
func NewSourceFromFile(path string) (*Source, error) {
file, err := os.Open(path)
if err != nil {
return nil, errors.Wrapf(err, "error opening file %q", path)
}
defer file.Close()

reader, err := gzip.NewReader(file)
// If the file is already not compressed we can just return the file itself
// as a source. Otherwise we pass the stream to NewSourceFromStream.
stream, isCompressed, err := compression.AutoDecompress(file)
if err != nil {
return nil, errors.Wrapf(err, "Error detecting compression for file %q", path)
}
defer stream.Close()
if !isCompressed {
return &Source{
tarPath: path,
}, nil
}
defer reader.Close()

return NewSourceFromStream(reader)
return NewSourceFromStream(stream)
}

// NewSourceFromStream returns a tarfile.Source for the specified inputStream, which must be uncompressed.
// The caller can close the inputStream immediately after NewSourceFromFile returns.
// NewSourceFromStream returns a tarfile.Source for the specified inputStream,
// which can be either compressed or uncompressed. The caller can close the
// inputStream immediately after NewSourceFromFile returns.
func NewSourceFromStream(inputStream io.Reader) (*Source, error) {
// FIXME: use SystemContext here.
// Save inputStream to a temporary file
Expand All @@ -81,8 +84,20 @@ func NewSourceFromStream(inputStream io.Reader) (*Source, error) {
}
}()

// TODO: This can take quite some time, and should ideally be cancellable using a context.Context.
if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
// In order to be compatible with docker-load, we need to support
// auto-decompression (it's also a nice quality-of-life thing to avoid
// giving users really confusing "invalid tar header" errors).
uncompressedStream, _, err := compression.AutoDecompress(inputStream)
if err != nil {
return nil, errors.Wrap(err, "Error auto-decompressing input")
}
defer uncompressedStream.Close()

// Copy the plain archive to the temporary file.
//
// TODO: This can take quite some time, and should ideally be cancellable
// using a context.Context.
if _, err := io.Copy(tarCopyFile, uncompressedStream); err != nil {
return nil, errors.Wrapf(err, "error copying contents to temporary file %q", tarCopyFile.Name())
}
succeeded = true
Expand Down Expand Up @@ -292,7 +307,25 @@ func (s *Source) prepareLayerData(tarManifest *ManifestItem, parsedConfig *manif
return nil, err
}
if li, ok := unknownLayerSizes[h.Name]; ok {
li.size = h.Size
// Since GetBlob will decompress layers that are compressed we need
// to do the decompression here as well, otherwise we will
// incorrectly report the size. Pretty critical, since tools like
// umoci always compress layer blobs. Obviously we only bother with
// the slower method of checking if it's compressed.
uncompressedStream, isCompressed, err := compression.AutoDecompress(t)
if err != nil {
return nil, errors.Wrapf(err, "Error auto-decompressing %s to determine its size", h.Name)
}
defer uncompressedStream.Close()

uncompressedSize := h.Size
if isCompressed {
uncompressedSize, err = io.Copy(ioutil.Discard, uncompressedStream)
if err != nil {
return nil, errors.Wrapf(err, "Error reading %s to find its size", h.Name)
}
}
li.size = uncompressedSize
delete(unknownLayerSizes, h.Name)
}
}
Expand Down Expand Up @@ -346,16 +379,22 @@ func (s *Source) GetManifest(ctx context.Context, instanceDigest *digest.Digest)
return s.generatedManifest, manifest.DockerV2Schema2MediaType, nil
}

type readCloseWrapper struct {
// uncompressedReadCloser is an io.ReadCloser that closes both the uncompressed stream and the underlying input.
type uncompressedReadCloser struct {
io.Reader
closeFunc func() error
underlyingCloser func() error
uncompressedCloser func() error
}

func (r readCloseWrapper) Close() error {
if r.closeFunc != nil {
return r.closeFunc()
func (r uncompressedReadCloser) Close() error {
var res error
if err := r.uncompressedCloser(); err != nil {
res = err
}
return nil
if err := r.underlyingCloser(); err != nil && res == nil {
res = err
}
return res
}

// GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown).
Expand All @@ -369,10 +408,16 @@ func (s *Source) GetBlob(ctx context.Context, info types.BlobInfo) (io.ReadClose
}

if li, ok := s.knownLayers[info.Digest]; ok { // diffID is a digest of the uncompressed tarball,
stream, err := s.openTarComponent(li.path)
underlyingStream, err := s.openTarComponent(li.path)
if err != nil {
return nil, 0, err
}
closeUnderlyingStream := true
defer func() {
if closeUnderlyingStream {
underlyingStream.Close()
}
}()

// In order to handle the fact that digests != diffIDs (and thus that a
// caller which is trying to verify the blob will run into problems),
Expand All @@ -386,22 +431,17 @@ func (s *Source) GetBlob(ctx context.Context, info types.BlobInfo) (io.ReadClose
// be verifing a "digest" which is not the actual layer's digest (but
// is instead the DiffID).

decompressFunc, reader, err := compression.DetectCompression(stream)
uncompressedStream, _, err := compression.AutoDecompress(underlyingStream)
if err != nil {
return nil, 0, errors.Wrapf(err, "Detecting compression in blob %s", info.Digest)
}

if decompressFunc != nil {
reader, err = decompressFunc(reader)
if err != nil {
return nil, 0, errors.Wrapf(err, "Decompressing blob %s stream", info.Digest)
}
return nil, 0, errors.Wrapf(err, "Error auto-decompressing blob %s", info.Digest)
}

newStream := readCloseWrapper{
Reader: reader,
closeFunc: stream.Close,
newStream := uncompressedReadCloser{
Reader: uncompressedStream,
underlyingCloser: underlyingStream.Close,
uncompressedCloser: uncompressedStream.Close,
}
closeUnderlyingStream = false

return newStream, li.size, nil
}
Expand Down
41 changes: 34 additions & 7 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,34 @@ import (
"compress/bzip2"
"compress/gzip"
"io"
"io/ioutil"

"github.com/pkg/errors"

"github.com/sirupsen/logrus"
"github.com/ulikunitz/xz"
)

// DecompressorFunc returns the decompressed stream, given a compressed stream.
type DecompressorFunc func(io.Reader) (io.Reader, error)
// The caller must call Close() on the decompressed stream (even if the compressed input stream does not need closing!).
type DecompressorFunc func(io.Reader) (io.ReadCloser, error)

// GzipDecompressor is a DecompressorFunc for the gzip compression algorithm.
func GzipDecompressor(r io.Reader) (io.Reader, error) {
func GzipDecompressor(r io.Reader) (io.ReadCloser, error) {
return gzip.NewReader(r)
}

// Bzip2Decompressor is a DecompressorFunc for the bzip2 compression algorithm.
func Bzip2Decompressor(r io.Reader) (io.Reader, error) {
return bzip2.NewReader(r), nil
func Bzip2Decompressor(r io.Reader) (io.ReadCloser, error) {
return ioutil.NopCloser(bzip2.NewReader(r)), nil
}

// XzDecompressor is a DecompressorFunc for the xz compression algorithm.
func XzDecompressor(r io.Reader) (io.Reader, error) {
return nil, errors.New("Decompressing xz streams is not supported")
func XzDecompressor(r io.Reader) (io.ReadCloser, error) {
r, err := xz.NewReader(r)
if err != nil {
return nil, err
}
return ioutil.NopCloser(r), nil
}

// compressionAlgos is an internal implementation detail of DetectCompression
Expand Down Expand Up @@ -65,3 +71,24 @@ func DetectCompression(input io.Reader) (DecompressorFunc, io.Reader, error) {

return decompressor, io.MultiReader(bytes.NewReader(buffer[:n]), input), nil
}

// AutoDecompress takes a stream and returns an uncompressed version of the
// same stream.
// The caller must call Close() on the returned stream (even if the input does not need,
// or does not even support, closing!).
func AutoDecompress(stream io.Reader) (io.ReadCloser, bool, error) {
decompressor, stream, err := DetectCompression(stream)
if err != nil {
return nil, false, errors.Wrapf(err, "Error detecting compression")
}
var res io.ReadCloser
if decompressor != nil {
res, err = decompressor(stream)
if err != nil {
return nil, false, errors.Wrapf(err, "Error initializing decompression")
}
} else {
res = ioutil.NopCloser(stream)
}
return res, decompressor != nil, nil
}
95 changes: 68 additions & 27 deletions pkg/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,52 @@ import (
)

func TestDetectCompression(t *testing.T) {
cases := []struct {
filename string
unimplemented bool
}{
{"fixtures/Hello.uncompressed", false},
{"fixtures/Hello.gz", false},
{"fixtures/Hello.bz2", false},
{"fixtures/Hello.xz", true},
cases := []string{
"fixtures/Hello.uncompressed",
"fixtures/Hello.gz",
"fixtures/Hello.bz2",
"fixtures/Hello.xz",
}

// The original stream is preserved.
for _, c := range cases {
originalContents, err := ioutil.ReadFile(c.filename)
require.NoError(t, err, c.filename)
originalContents, err := ioutil.ReadFile(c)
require.NoError(t, err, c)

stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
stream, err := os.Open(c)
require.NoError(t, err, c)
defer stream.Close()

_, updatedStream, err := DetectCompression(stream)
require.NoError(t, err, c.filename)
require.NoError(t, err, c)

updatedContents, err := ioutil.ReadAll(updatedStream)
require.NoError(t, err, c.filename)
assert.Equal(t, originalContents, updatedContents, c.filename)
require.NoError(t, err, c)
assert.Equal(t, originalContents, updatedContents, c)
}

// The correct decompressor is chosen, and the result is as expected.
for _, c := range cases {
stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
stream, err := os.Open(c)
require.NoError(t, err, c)
defer stream.Close()

decompressor, updatedStream, err := DetectCompression(stream)
require.NoError(t, err, c.filename)
require.NoError(t, err, c)

var uncompressedStream io.Reader
switch {
case decompressor == nil:
if decompressor == nil {
uncompressedStream = updatedStream
case c.unimplemented:
_, err := decompressor(updatedStream)
assert.Error(t, err)
continue
default:
} else {
s, err := decompressor(updatedStream)
require.NoError(t, err)
defer s.Close()
uncompressedStream = s
}

uncompressedContents, err := ioutil.ReadAll(uncompressedStream)
require.NoError(t, err, c.filename)
assert.Equal(t, []byte("Hello"), uncompressedContents, c.filename)
require.NoError(t, err, c)
assert.Equal(t, []byte("Hello"), uncompressedContents, c)
}

// Empty input is handled reasonably.
Expand All @@ -84,3 +77,51 @@ func TestDetectCompression(t *testing.T) {
_, _, err = DetectCompression(reader)
assert.Error(t, err)
}

func TestAutoDecompress(t *testing.T) {
cases := []struct {
filename string
isCompressed bool
}{
{"fixtures/Hello.uncompressed", false},
{"fixtures/Hello.gz", true},
{"fixtures/Hello.bz2", true},
{"fixtures/Hello.xz", true},
}

// The correct decompressor is chosen, and the result is as expected.
for _, c := range cases {
stream, err := os.Open(c.filename)
require.NoError(t, err, c.filename)
defer stream.Close()

uncompressedStream, isCompressed, err := AutoDecompress(stream)
require.NoError(t, err, c.filename)
defer uncompressedStream.Close()

assert.Equal(t, c.isCompressed, isCompressed)

uncompressedContents, err := ioutil.ReadAll(uncompressedStream)
require.NoError(t, err, c.filename)
assert.Equal(t, []byte("Hello"), uncompressedContents, c.filename)
}

// Empty input is handled reasonably.
uncompressedStream, isCompressed, err := AutoDecompress(bytes.NewReader([]byte{}))
require.NoError(t, err)
assert.False(t, isCompressed)
uncompressedContents, err := ioutil.ReadAll(uncompressedStream)
require.NoError(t, err)
assert.Equal(t, []byte{}, uncompressedContents)

// Error initializing a decompressor (for a detected format)
uncompressedStream, isCompressed, err = AutoDecompress(bytes.NewReader([]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}))
assert.Error(t, err)

// Error reading input
reader, writer := io.Pipe()
defer reader.Close()
writer.CloseWithError(errors.New("Expected error reading input in AutoDecompress"))
_, _, err = AutoDecompress(reader)
assert.Error(t, err)
}
Loading