-
Notifications
You must be signed in to change notification settings - Fork 379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provide large file progress reporting hooks to copy operations #219
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There already is a progress reporting mechanism in imageCopier.copyBlobFromStream
. Having every source/destination implement this individually seems wasteful.
Would it work for you to instead somehow parametrize the progress reporting in imageCopier.copyBlobFromStream
, so that a caller could provide its own reporting object / callback / something in copy.Options
, and if that is defined, it would be used instead of writing a progress bar to copy.Options.ReportWriter
?
@runcom , you probably know much more about the available options and the commonly used interfaces in this area.
docker/daemon/daemon_dest.go
Outdated
} | ||
}() | ||
|
||
resp, err := c.ImageLoad(ctx, reader, true) | ||
if sysctx.CopyHook != nil && sysctx.CopyHookInterval > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sysctx
is allowed to be nil
docker/daemon/daemon_src.go
Outdated
@@ -73,8 +74,14 @@ func newImageSource(ctx *types.SystemContext, ref daemonReference) (types.ImageS | |||
} | |||
}() | |||
|
|||
if _, err := io.Copy(tarCopyFile, inputStream); err != nil { | |||
return nil, err | |||
if ctx == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx != nil && ctx.CopyHookInterval > 0 && …
?
Yeah this could have been an oversight on my part, I was just adding things
I needed until I could start porting. :)
I'll review this and close tonight if so.
…On Thu, Jan 19, 2017 at 10:46 AM, Miloslav Trmač ***@***.***> wrote:
***@***.**** requested changes on this pull request.
There *already* is a progress reporting mechanism in
imageCopier.copyBlobFromStream. Having every source/destination implement
this individually seems wasteful.
Would it work for you to instead somehow parametrize the progress
reporting in imageCopier.copyBlobFromStream, so that a caller could
provide its own reporting object / callback / something in copy.Options,
and if that is defined, it would be used instead of writing a progress bar
to copy.Options.ReportWriter?
@runcom <https://github.com/runcom> , you probably know much more about
the available options and the commonly used interfaces in this area.
------------------------------
In docker/daemon/daemon_dest.go
<#219 (review)>:
> }
}()
- resp, err := c.ImageLoad(ctx, reader, true)
+ if sysctx.CopyHook != nil && sysctx.CopyHookInterval > 0 {
sysctx is allowed to be nil
------------------------------
In docker/daemon/daemon_src.go
<#219 (review)>:
> @@ -73,8 +74,14 @@ func newImageSource(ctx *types.SystemContext, ref daemonReference) (types.ImageS
}
}()
- if _, err := io.Copy(tarCopyFile, inputStream); err != nil {
- return nil, err
+ if ctx == nil {
ctx != nil && ctx.CopyHookInterval > 0 && … ?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#219 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AABJ63ZdqR2HvVARrGA0jPe5Lr4AZwrCks5rT6-FgaJpZM4LoK3o>
.
|
64293b8
to
672557f
Compare
I've rebased this to the point tests should pass; I will leave it alone until we decide what to do. In the meantime, I'm going to port box to use containers/image so I can get a better feel for what I actually need, but I'd like to keep my progress meters :) |
I have made an attempt to pretty up the API; let me know what you think. This |
|
Not exactly; if you pass the copyfunc into the options it is directly added to
This is for flexibility, it can be useful when doing it unidirectionally. package main
import (
"fmt"
"time"
"github.com/containers/image/copy"
"github.com/containers/image/docker/daemon"
"github.com/containers/image/docker/reference"
"github.com/containers/image/signature"
)
func main() {
ref, err := daemon.ParseReference("docker.io/library/golang:latest")
if err != nil {
panic(err)
}
// you can do it this way too
img, err := ref.NewImage(&types.SystemContext{
})
CopyHookInterval: 100 * time.Millisecond,
CopyHook: func(offset uint64) {
fmt.Println(offset)
},
if err != nil {
panic(err)
}
defer img.Close()
tgtRef, _ := reference.ParseNamed("docker.io/erikh/test:latest")
tgt, err := daemon.NewReference("", tgtRef)
if err != nil {
panic(err)
}
b, _, err := img.Manifest()
if err != nil {
panic(err)
}
fmt.Println(string(b))
pc, err := signature.NewPolicyContext(&signature.Policy{
Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()},
})
if err != nil {
panic(err)
}
err = copy.Image(
pc,
tgt,
ref,
©.Options{
CopyHookInterval: 100 * time.Millisecond,
CopyHook: func(offset uint64) {
fmt.Println(offset)
},
RemoveSignatures: true,
})
if err != nil {
panic(err)
}
}
> - As it is, `CopyHook` has no information at all about which stream it is
> being called for, it just receives a successive series of completely
> context-less integers which have no obvious relation to each other. (This is
> already true when hooking both the source and destination, and it will get
> much worse if #105 ever happens). Hence my question to @runcom about
> existence of a commonly accepted interface which does provide sufficient
> information and context. E.g., I _guess_ (without knowing the field) that
> `docker/docker/pkg/progress` might be an option—it is still a series of
> unconnected data points, with the recipient having to sort them out, but at
> least they have a disambiguating `ID` for each of the parallel operations.
Perhaps I need to comment the source; the successive series is a increasing
value based on the progress of the download. It has context, and can be
leveraged with closures to get flexibility while processing it.
I really would prefer a hook if possible. Dealing with docker's progress meters
is pretty terrible and I have a nice, colorized/tty-aware UI with clear progress
information that only triggers when necessary (e.g, for copying large files and
layers only). I really don't want to ditch that. Basically, I'd really like to
avoid using a canned progress meter. It's really not the responsibility of this
library to implement progress meters if you ask me, but I realize it's not my
say too. :) |
I agree the docker progress package isn't exactly what we're looking for there (and I'm not in favor of it here!). |
I had another idea: what if we pass a send-only channel that passes a struct, e.g., the struct could have the byte offset, but also the percentage if know it. The channel is optional. They can do whatever the heck they want with it then. :) |
That’s not what I mean; what you say allows the caller to set a single hook, but the implementations are not shared. We have 6 transports, * 2 for source/destination, that’s 12 places which would need to now care about progress reporting, and 12 places which would have to be kept in sync. Whereas a progress reporting in
I can’t see that this code demonstrates anything; it doesn’t treat sources/destinations differently at all if I am not mistaken (though it also appears somehow truncated?!). Sure, a hook in
But which download? With the existing code, copying a 3-layer image with layers A, B, C, the hook could very well receive
Sure, |
Well, a
For all of this, a typed interface with appropriate method names would probably be a better fit than a channel (which could be used, but every consumer would then have to do a And yes, this seems fairly complex, which is why I keep asking whether there is a nice existing interface definition we could just use without having to invent this. |
Yeah see I don't think that's necessary at all -- nor will we have the capability of determining things like network bandwidth consumed without a lot of extra tooling. However, one can derive that pretty trivially with the implementation above -- just divide the number by the time transpired at any reporting interval. What I really just want -- and literally nothing else -- is something that has nothing to do with progress bars itself, but the reporting of the progress. Error states, etc can come from different sources. I can build progress meters from that. |
I can buy that for the extra details and errors, but not the start/stop indications. How can anyone consume the one-value "here’s an integer” data when |
Here's the latest program and patch: package main
import (
"fmt"
"time"
"github.com/containers/image/copy"
"github.com/containers/image/docker/daemon"
"github.com/containers/image/docker/reference"
"github.com/containers/image/signature"
"github.com/containers/image/types"
)
func main() {
ref, err := daemon.ParseReference("docker.io/library/golang:latest")
if err != nil {
panic(err)
}
signal := make(chan types.SignalProperties)
go func() {
for prop := range signal {
fmt.Println(prop.Artifact, prop.Delta)
}
}()
// you can do it this way too
img, err := ref.NewImage(&types.SystemContext{
SignalInterval: 100 * time.Millisecond,
Signal: signal,
})
if err != nil {
panic(err)
}
defer img.Close()
close(signal)
tgtRef, _ := reference.ParseNamed("docker.io/erikh/test:latest")
tgt, err := daemon.NewReference("", tgtRef)
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
b, _, err := img.Manifest()
if err != nil {
panic(err)
}
fmt.Println(string(b))
pc, err := signature.NewPolicyContext(&signature.Policy{
Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()},
})
if err != nil {
panic(err)
}
signal = make(chan types.SignalProperties)
go func() {
for prop := range signal {
fmt.Println(prop.Artifact, prop.Delta)
}
}()
err = copy.Image(
pc,
tgt,
ref,
©.Options{
SignalInterval: 100 * time.Millisecond,
Signal: signal,
RemoveSignatures: true,
})
close(signal)
if err != nil {
panic(err)
}
} As you can see I've moved it to use a channel with a communicated struct that |
Thanks.
|
Hmm. I disagree on the interface but I can implement it NP. I just think a
struct is simple enough and can accept methods if absolutely necessary; I
don't see how polymorphism is a benefit here.
I'll try to get you something in the next day or two.
…On Mon, Feb 6, 2017 at 7:51 AM, Miloslav Trmač ***@***.***> wrote:
Thanks.
- I still think that the progress hook should only be in copy.Image,
not in the individual transport implementations; the underlying
implementation can be made available to low-level users who want to call
PutBlob/GetBlob directly.
- Also, an interface would be more suitable than a channel, because it
allows “starting an artifact copy” / “ending an artifact copy”
notifications in a clean, type-safe way. At least the latter is very
valuable.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#219 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AABJ6x1cICUduIhGmMdJRfvRIX87tJ5aks5rZ0F9gaJpZM4LoK3o>
.
|
@runcom , opinions on the channel vs. interface issue? |
Hey @mtrmac, I updated to use copy.Image and I think you will find the API |
The last posted example should still work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This broadly makes sense to me, these are just implementation-level comments.
copy/copy.go
Outdated
@@ -173,6 +179,8 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe | |||
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates), | |||
canModifyManifest: canModifyManifest, | |||
reportWriter: reportWriter, | |||
signalInterval: options.SignalInterval, | |||
signal: options.Signal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
options
may be nil
copy/copy.go
Outdated
stream = r | ||
} else { | ||
stream = destStream | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the same stage description structure as the rest of the steps, this function is pretty complex and the regularity makes it easier to understand:
- Create a separate newline-separated section for this stage in the pipeline, starting with a
// ===
-prefixed comment - Use
destStream
as both the input and output of that pipeline stage (i.e.if ic.signal != nil … { … destStream = r }
with noelse
branch)
streamcopy/copy.go
Outdated
"github.com/containers/image/types" | ||
) | ||
|
||
const megaByte = float64(1024 * 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not used.
streamcopy/copy.go
Outdated
// WithProgress implements io.Copy with a buffered reader, then measures | ||
// reports to the offsetFunc with the count processed at the interval. | ||
// If io.EOF is not returned then the error is returned. Otherwise, it is nil. | ||
// WithProgress closes the writer after finishing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment needs updating (e.g. there is no offsetFunc
)
streamcopy/copy.go
Outdated
// reports to the offsetFunc with the count processed at the interval. | ||
// If io.EOF is not returned then the error is returned. Otherwise, it is nil. | ||
// WithProgress closes the writer after finishing. | ||
func WithProgress(writer io.WriteCloser, reader io.Reader, artifact types.BlobInfo, interval time.Duration, signal chan types.SignalProperties) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning a value from a goroutine is not really useful.
(It might well make sense to encapsulate a bit more, i.e. to have a public function here which accepts an io.Reader
and returns an io.Reader
, making the pipe and the goroutine entirely invisible to the rest of the code.)
streamcopy/copy.go
Outdated
} else { | ||
return nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This goto
seems unnecessary; AFAICS this guarantees that there will not be a “signal” with the final 100% count
value. But…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you just objecting to it becuase it uses goto?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I am not dogmatic about goto
, though it does seem to me that a goto
-less equivalent would be a bit shorter. The objection was really about skipping the final update with 100% progress.
Though, with the point about io.Reader
returning rn >0 && err != nil
, I expect this to be restructured so much that nothing about these particular lines will remain applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I'll review and adjust. Thanks.
streamcopy/copy.go
Outdated
} | ||
if err != nil { | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is documented that io.Reader
can return rn >0 && err != nil
at the same time; i.e. the code should first write any outstanding data if any, and then check err
. (See the implementation of io.Copy
)
streamcopy/copy.go
Outdated
|
||
if interval > 0 && signal != nil { | ||
if time.Since(t) > interval { | ||
signal <- types.SignalProperties{Delta: count, Artifact: artifact} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So Delta
is not a “delta” (change) since the last update, but the total offset? It would be more natural to call the member Offset
then.
copy/copy.go
Outdated
// intermediary to signal progress information to any registered channel. | ||
if ic.signal != nil && ic.signalInterval > 0 { | ||
r, w := io.Pipe() | ||
go streamcopy.WithProgress(w, destStream, inputInfo, ic.signalInterval, ic.signal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn’t this use srcInfo
instead of inputInfo
? inputInfo
may be {Digest: "", Size:-1}
in the “compressing on the fly” case.
types/types.go
Outdated
|
||
// SignalProperties is used to pass information from the copy code to a monitor which | ||
// can use the real-time information to produce output or react to changes. | ||
type SignalProperties struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
”Signal” is pretty generic. s/Signal/Progress/g
?
copy/copy.go
Outdated
// the pipe is here to facilitate the stream copier which will act as an | ||
// intermediary to signal progress information to any registered channel. | ||
if ic.signal != nil && ic.signalInterval > 0 { | ||
r, w := io.Pipe() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer r.Close()
somewhere, otherwise we could exit with an unreferenced unclosed PipeReader
and a PipeWriter
still blocked on trying to write to it.
ok I'll make these changes; some of this was just churn due to the design
adjustments fwiw.
Gimme a few hours.
…On Tue, Feb 7, 2017 at 6:56 AM, Miloslav Trmač ***@***.***> wrote:
***@***.**** commented on this pull request.
Thanks! This broadly makes sense, these are just implementation-level
comments.
------------------------------
In copy/copy.go
<#219 (review)>:
> @@ -173,6 +179,8 @@ func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageRe
diffIDsAreNeeded: src.UpdatedImageNeedsLayerDiffIDs(manifestUpdates),
canModifyManifest: canModifyManifest,
reportWriter: reportWriter,
+ signalInterval: options.SignalInterval,
+ signal: options.Signal,
options may be nil
------------------------------
In copy/copy.go
<#219 (review)>:
> // === Finally, send the layer stream to dest.
- uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
+ // the pipe is here to facilitate the stream copier which will act as an
+ // intermediary to signal progress information to any registered channel.
+ if ic.signal != nil && ic.signalInterval > 0 {
+ r, w := io.Pipe()
+ go streamcopy.WithProgress(w, destStream, inputInfo, ic.signalInterval, ic.signal)
+ stream = r
+ } else {
+ stream = destStream
+ }
Please use the same stage description structure as the rest of the steps,
this function is pretty complex and the regularity makes it easier to
understand:
- Create a separate newline-separated section for this stage in the
pipeline, starting with a // ===-prefixed comment
- Use destStream as both the input and output of that pipeline stage
(i.e. if ic.signal != nil … { … destStream = r } with no else branch)
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> @@ -0,0 +1,57 @@
+package streamcopy
+
+import (
+ "bufio"
+ "io"
+ "time"
+
+ "github.com/containers/image/types"
+)
+
+const megaByte = float64(1024 * 1024)
This is not used.
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> +
+import (
+ "bufio"
+ "io"
+ "time"
+
+ "github.com/containers/image/types"
+)
+
+const megaByte = float64(1024 * 1024)
+const readerSize = 32768
+
+// WithProgress implements io.Copy with a buffered reader, then measures
+// reports to the offsetFunc with the count processed at the interval.
+// If io.EOF is not returned then the error is returned. Otherwise, it is nil.
+// WithProgress closes the writer after finishing.
The comment needs updating (e.g. there is no offsetFunc)
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> +import (
+ "bufio"
+ "io"
+ "time"
+
+ "github.com/containers/image/types"
+)
+
+const megaByte = float64(1024 * 1024)
+const readerSize = 32768
+
+// WithProgress implements io.Copy with a buffered reader, then measures
+// reports to the offsetFunc with the count processed at the interval.
+// If io.EOF is not returned then the error is returned. Otherwise, it is nil.
+// WithProgress closes the writer after finishing.
+func WithProgress(writer io.WriteCloser, reader io.Reader, artifact types.BlobInfo, interval time.Duration, signal chan types.SignalProperties) error {
Returning a value from a goroutine is not really useful.
(It might well make sense to encapsulate a bit more, i.e. to have a public
function here which accepts an io.Reader and returns an io.Reader, making
the pipe and the goroutine entirely invisible to the rest of the code.)
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> + "bufio"
+ "io"
+ "time"
+
+ "github.com/containers/image/types"
+)
+
+const megaByte = float64(1024 * 1024)
+const readerSize = 32768
+
+// WithProgress implements io.Copy with a buffered reader, then measures
+// reports to the offsetFunc with the count processed at the interval.
+// If io.EOF is not returned then the error is returned. Otherwise, it is nil.
+// WithProgress closes the writer after finishing.
+func WithProgress(writer io.WriteCloser, reader io.Reader, artifact types.BlobInfo, interval time.Duration, signal chan types.SignalProperties) error {
+ defer writer.Close()
The writer’s CloseWithError is the only real error reporting mechanism of
this goroutine, so we need to take advantage of it.
See how compressGoroutine uses an err variable and defer to report errors
from the goroutine into the PipeWriter.
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> + "io"
+ "time"
+
+ "github.com/containers/image/types"
+)
+
+const megaByte = float64(1024 * 1024)
+const readerSize = 32768
+
+// WithProgress implements io.Copy with a buffered reader, then measures
+// reports to the offsetFunc with the count processed at the interval.
+// If io.EOF is not returned then the error is returned. Otherwise, it is nil.
+// WithProgress closes the writer after finishing.
+func WithProgress(writer io.WriteCloser, reader io.Reader, artifact types.BlobInfo, interval time.Duration, signal chan types.SignalProperties) error {
+ defer writer.Close()
+ rd := bufio.NewReaderSize(reader, readerSize)
Why is the buffering useful in here? Immediately below we are creating our
own buffer.
(I’m not saying it isn’t, I just can’t immediately see that it is.)
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> + rd := bufio.NewReaderSize(reader, readerSize)
+
+ count := uint64(0)
+ buf := make([]byte, readerSize)
+ t := time.Now()
+ for {
+ rn, err := rd.Read(buf)
+ count += uint64(rn)
+
+ if err == io.EOF {
+ if rn > 0 {
+ goto write
+ } else {
+ return nil
+ }
+ }
This goto seems unnecessary; AFAICS this *guarantees* that there will not
be a “signal” with the final 100% count value. But…
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> + buf := make([]byte, readerSize)
+ t := time.Now()
+ for {
+ rn, err := rd.Read(buf)
+ count += uint64(rn)
+
+ if err == io.EOF {
+ if rn > 0 {
+ goto write
+ } else {
+ return nil
+ }
+ }
+ if err != nil {
+ return err
+ }
It is documented that io.Reader can return rn >0 && err != nil at the
same time; i.e. the code should first write any outstanding data if any,
and *then* check err. (See the implementation of io.Copy)
------------------------------
In streamcopy/copy.go
<#219 (review)>:
> + count += uint64(rn)
+
+ if err == io.EOF {
+ if rn > 0 {
+ goto write
+ } else {
+ return nil
+ }
+ }
+ if err != nil {
+ return err
+ }
+
+ if interval > 0 && signal != nil {
+ if time.Since(t) > interval {
+ signal <- types.SignalProperties{Delta: count, Artifact: artifact}
So Delta is not a “delta” (change) since the last update, but the total
offset? It would be more natural to call the member Offset then.
------------------------------
In copy/copy.go
<#219 (review)>:
> // === Finally, send the layer stream to dest.
- uploadedInfo, err := ic.dest.PutBlob(destStream, inputInfo)
+ // the pipe is here to facilitate the stream copier which will act as an
+ // intermediary to signal progress information to any registered channel.
+ if ic.signal != nil && ic.signalInterval > 0 {
+ r, w := io.Pipe()
+ go streamcopy.WithProgress(w, destStream, inputInfo, ic.signalInterval, ic.signal)
Shouldn’t this use srcInfo instead of inputInfo? inputInfo may be {Digest:
"", Size:-1} in the “compressing on the fly” case.
------------------------------
In types/types.go
<#219 (review)>:
> @@ -293,6 +293,16 @@ type SystemContext struct {
// Note that this field is used mainly to integrate containers/image into projectatomic/docker
// in order to not break any existing docker's integration tests.
DockerDisableV1Ping bool
+
+ SignalInterval time.Duration // time to wait between reports to the Signal
+ Signal chan SignalProperties // Reported to when SignalInterval has arrived
+}
+
+// SignalProperties is used to pass information from the copy code to a monitor which
+// can use the real-time information to produce output or react to changes.
+type SignalProperties struct {
”Signal” is pretty generic. s/Signal/Progress/g?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#219 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AABJ6w6ooz0XrV8IXGq5omX9ShXJip3Kks5raIYmgaJpZM4LoK3o>
.
|
9374e25
to
c0cb039
Compare
Not finshed with review comments, but I did add a test if you'd like to peek. |
I've updated to use your new pattern but the tests aren't passing. Not sure how to best test this yet, so I'm going to tinker for a while. Let me know if you'd like additional changes. |
749d70e
to
a553ec7
Compare
giving up on a test unless you have a better idea :) Let me know if you'd like anything changed, but I believe this is merge-worthy at this point. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! ACK.
It might be useful to have a NewProgressReader
as a public API (it would be very easy to combine with GetBlob
/PutBlob
for those who don’t use copy.Image
), but that is not at all essential for this PR and it can be done at any later time if needed.
WRT testing, it seems to me that the difficult question is what behavior should the test check for. It is possible to create an artificial test expecting exactly the implementation artifacts of a particular implementation; for a generic test, there’s not that much I suppose (artifact ID matches, offset is in the expected interval and doesn’t go backwards, perhaps with a delayingReader
as an input the reported offset is never larger than what we know the reader has provided.)
We do have test-skopeo
which ensures that this has not broken anything in the default case, at least.
copy/copy.go
Outdated
options = &Options{} | ||
} | ||
|
||
if options.ReportWriter != nil { | ||
reportWriter = options.ReportWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last thing, please move the reportWriter := ioutil.Discard
initialization near this if
.
Ok. I will make the get/putblob patch in a second one. I could probably use
it as well.
…On Fri, Feb 10, 2017 at 8:17 AM, Miloslav Trmač ***@***.***> wrote:
***@***.**** approved this pull request.
Thanks! ACK.
It might be useful to have a NewProgressReader as a public API (it would
be very easy to combine with GetBlob/PutBlob for those who don’t use
copy.Image), but that is not at all essential for this PR and it can be
done at any later time if needed.
WRT testing, it seems to me that the difficult question is what behavior
should the test check for. It is possible to create an artificial test
expecting *exactly* the implementation artifacts of a particular
implementation; for a generic test, there’s not that much I suppose
(artifact ID matches, offset is in the expected interval and doesn’t go
backwards, perhaps with a delayingReader as an input the reported offset
is never larger than what we know the reader has provided.)
We do have test-skopeo which ensures that this has not broken anything in
the default case, at least.
------------------------------
In copy/copy.go
<#219 (review)>:
> }
// Image copies image from srcRef to destRef, using policyContext to validate source image admissibility.
func Image(policyContext *signature.PolicyContext, destRef, srcRef types.ImageReference, options *Options) error {
reportWriter := ioutil.Discard
- if options != nil && options.ReportWriter != nil {
+
+ if options == nil {
+ options = &Options{}
+ }
+
+ if options.ReportWriter != nil {
reportWriter = options.ReportWriter
One last thing, please move the reportWriter := ioutil.Discard
initialization near this if.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#219 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AABJ6z_AJlwmm_vCCGK4qJ9X20l_BW9uks5rbI2egaJpZM4LoK3o>
.
|
This should be ready now. |
Just kicked the tests, sorry this took so long. |
tests are passing, looks like it needs another approval to get through |
@runcom PTAL |
@@ -92,14 +95,22 @@ type Options struct { | |||
ReportWriter io.Writer | |||
SourceCtx *types.SystemContext | |||
DestinationCtx *types.SystemContext | |||
ProgressInterval time.Duration // time to wait between reports to signal the progress channel | |||
Progress chan types.ProgressProperties // Reported to when ProgressInterval has arrived for a single artifact+offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this is a receive only channel, could you strongly type it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work because we do send to the channel eventually from our code, not the foreign code.
Unless I misunderstood you, when I implemented this, as soon as the send is hit by the compiler in copy/progress_reader.go
it aborts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmm ok not blocking this PR I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICS this should actually be a send-only channel. The code calling copy.Image
needs a bidirectional channel, but copy.Image
will only be sending to it, and the rest of the code calling copy.Image
will only be receiving from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so yeah, my point was: let's strongly type the direction when we can in order to allow the go compiler (or runtime?) to be more efficient (I read this somewhere, I don't remember where, I just remember strongly typing the direction of the channel helped). Again, not blocking this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ACK, strongly typing is useful enough just for the documentation value; but not blocking for me either.
// progressReader is a reader that reports its progress on an interval. | ||
type progressReader struct { | ||
source io.Reader | ||
channel chan types.ProgressProperties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for the receive only channel I guess
Could you rebase this @erikh ? |
Signed-off-by: Erik Hollensbe <[email protected]>
done |
@erikh one last check before I merge, is there any skopeo PR which takes care of the test failure here? |
(Um, it would still have been useful to merge #240 first and see the tests pass. But, well, it’s done, not worth reverting at this point just for insiting on the format process.) |
Again, including the other commits for ease of getting things to compile; there
is an example at the end. Only the last commit is relevant in this PR.
This provides a hook and an interval timer for progressed copy operations, such
as image packaging. It only covers the docker endpoints for now and is
untested, again, as a proof of concept for approval.
Users can specify a function which is called at each interval as long as the
copy is progressing. This function is passed the current byte offset of the
copy operation. They can use this to calculate sizes, or percentages, or the
phase of the moon at the time the current byte is downloading. Resolution of
the timer is completely independent.
Here's some code that works with this: it reports byte offsets every 100ms
during both download and upload operations during this layer edit.
Thank you again for the consideration.