Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide large file progress reporting hooks to copy operations #219

Merged
merged 1 commit into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"reflect"
"time"

pb "gopkg.in/cheggaaa/pb.v1"

Expand Down Expand Up @@ -46,6 +47,8 @@ type imageCopier struct {
diffIDsAreNeeded bool
canModifyManifest bool
reportWriter io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
Expand Down Expand Up @@ -93,14 +96,22 @@ type Options struct {
ReportWriter io.Writer
SourceCtx *types.SystemContext
DestinationCtx *types.SystemContext
ProgressInterval time.Duration // time to wait between reports to signal the progress channel
Progress chan types.ProgressProperties // Reported to when ProgressInterval has arrived for a single artifact+offset.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is a receive only channel, could you strongly type it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work because we do send to the channel eventually from our code, not the foreign code.

Unless I misunderstood you, when I implemented this, as soon as the send is hit by the compiler in copy/progress_reader.go it aborts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mmm ok not blocking this PR I guess

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS this should actually be a send-only channel. The code calling copy.Image needs a bidirectional channel, but copy.Image will only be sending to it, and the rest of the code calling copy.Image will only be receiving from it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so yeah, my point was: let's strongly type the direction when we can in order to allow the go compiler (or runtime?) to be more efficient (I read this somewhere, I don't remember where, I just remember strongly typing the direction of the channel helped). Again, not blocking this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ACK, strongly typing is useful enough just for the documentation value; but not blocking for me either.

}

// Image copies image from srcRef to destRef, using policyContext to validate source image admissibility.
func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageReference, options *Options) error {
if options == nil {
options = &Options{}
}

reportWriter := ioutil.Discard
if options != nil && options.ReportWriter != nil {

if options.ReportWriter != nil {
reportWriter = options.ReportWriter
}

writeReport := func(f string, a ...interface{}) {
fmt.Fprintf(reportWriter, f, a...)
}
Expand Down Expand Up @@ -139,7 +150,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
}

var sigs [][]byte
if options != nil && options.RemoveSignatures {
if options.RemoveSignatures {
sigs = [][]byte{}
} else {
writeReport("Getting image source signatures\n")
Expand Down Expand Up @@ -174,6 +185,8 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates),
canModifyManifest: canModifyManifest,
reportWriter: reportWriter,
progressInterval: options.ProgressInterval,
progress: options.Progress,
}

if err := ic.copyLayers(); err != nil {
Expand All @@ -200,7 +213,7 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
return err
}

if options != nil && options.SignBy != "" {
if options.SignBy != "" {
mech, err := signature.NewGPGSigningMechanism()
if err != nil {
return errors.Wrap(err, "Error initializing GPG")
Expand Down Expand Up @@ -490,6 +503,17 @@ func (ic *imageCopier) copyBlobFromStream(srcStream io.Reader, srcInfo types.Blo
inputInfo.Size = -1
}

// === Report progress using the ic.progress channel, if required.
if ic.progress != nil && ic.progressInterval > 0 {
destStream = &progressReader{
source: destStream,
channel: ic.progress,
interval: ic.progressInterval,
artifact: srcInfo,
lastTime: time.Now(),
}
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions copy/progress_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package copy

import (
"io"
"time"

"github.com/containers/image/types"
)

// progressReader is a reader that reports its progress on an interval.
type progressReader struct {
source io.Reader
channel chan types.ProgressProperties
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for the receive only channel I guess

interval time.Duration
artifact types.BlobInfo
lastTime time.Time
offset uint64
}

func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.source.Read(p)
r.offset += uint64(n)
if time.Since(r.lastTime) > r.interval {
r.channel <- types.ProgressProperties{Artifact: r.artifact, Offset: r.offset}
r.lastTime = time.Now()
}
return n, err
}
7 changes: 7 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ type SystemContext struct {
DockerDisableV1Ping bool
}

// ProgressProperties is used to pass information from the copy code to a monitor which
// can use the real-time information to produce output or react to changes.
type ProgressProperties struct {
Artifact BlobInfo
Offset uint64
}

var (
// ErrBlobNotFound can be returned by an ImageDestination's HasBlob() method
ErrBlobNotFound = errors.New("no such blob present")
Expand Down