Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,13 +1154,13 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr

// === Report progress using the c.progress channel, if required.
if c.progress != nil && c.progressInterval > 0 {
destStream = &progressReader{
source: destStream,
channel: c.progress,
interval: c.progressInterval,
artifact: srcInfo,
lastTime: time.Now(),
}
destStream := newProgressReader(
destStream,
c.progress,
c.progressInterval,
srcInfo,
)
defer destStream.reportDone()
}

// === Finally, send the layer stream to dest.
Expand Down
69 changes: 60 additions & 9 deletions copy/progress_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,71 @@ import (

// progressReader is a reader that reports its progress on an interval.
type progressReader struct {
source io.Reader
channel chan types.ProgressProperties
interval time.Duration
artifact types.BlobInfo
lastTime time.Time
offset uint64
source io.Reader
channel chan<- types.ProgressProperties
interval time.Duration
artifact types.BlobInfo
lastUpdate time.Time
offset uint64
offsetUpdate uint64
}

// newProgressReader creates a new progress reader for:
// `source`: The source when internally reading bytes
// `channel`: The reporter channel to which the progress will be sent
// `interval`: The update interval to indicate how often the progress should update
// `artifact`: The blob metadata which is currently being progressed
func newProgressReader(
source io.Reader,
channel chan<- types.ProgressProperties,
interval time.Duration,
artifact types.BlobInfo,
) *progressReader {
// The progress reader constructor informs the progress channel
// that a new artifact will be read
channel <- types.ProgressProperties{
Event: types.ProgressEventNewArtifact,
Artifact: artifact,
}
return &progressReader{
source: source,
channel: channel,
interval: interval,
artifact: artifact,
lastUpdate: time.Now(),
offset: 0,
offsetUpdate: 0,
}
}

// reportDone indicates to the internal channel that the progress has been
// finished
func (r *progressReader) reportDone() {
r.channel <- types.ProgressProperties{
Event: types.ProgressEventDone,
Artifact: r.artifact,
Offset: r.offset,
OffsetUpdate: r.offsetUpdate,
}
}

// Read continuously reads bytes into the progress reader and reports the
// status via the internal channel
func (r *progressReader) Read(p []byte) (int, error) {
n, err := r.source.Read(p)
r.offset += uint64(n)
if time.Since(r.lastTime) > r.interval {
r.channel <- types.ProgressProperties{Artifact: r.artifact, Offset: r.offset}
r.lastTime = time.Now()
r.offsetUpdate += uint64(n)

// Fire the progress reader in the provided interval
if time.Since(r.lastUpdate) > r.interval {
r.channel <- types.ProgressProperties{
Event: types.ProgressEventRead,
Artifact: r.artifact,
Offset: r.offset,
OffsetUpdate: r.offsetUpdate,
}
r.lastUpdate = time.Now()
r.offsetUpdate = 0
}
return n, err
}
80 changes: 80 additions & 0 deletions copy/progress_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package copy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay, tests. Great!


import (
"bytes"
"io"
"testing"
"time"

"github.com/containers/image/v5/types"
"github.com/stretchr/testify/assert"
)

func newSUT(
t *testing.T,
reader io.Reader,
duration time.Duration,
channel chan types.ProgressProperties,
) *progressReader {
artifact := types.BlobInfo{Size: 10}

go func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: does the testing framework know it has to wait for all spawned goroutines?

res := <-channel
assert.Equal(t, res.Event, types.ProgressEventNewArtifact)
assert.Equal(t, res.Artifact, artifact)
}()
res := newProgressReader(reader, channel, duration, artifact)

return res
}

func TestNewProgressReader(t *testing.T) {
// Given
channel := make(chan types.ProgressProperties)
sut := newSUT(t, nil, time.Second, channel)
assert.NotNil(t, sut)

// When/Then
go func() {
res := <-channel
assert.Equal(t, res.Event, types.ProgressEventDone)
}()
sut.reportDone()
}

func TestReadWithoutEvent(t *testing.T) {
// Given
channel := make(chan types.ProgressProperties)
reader := bytes.NewReader([]byte{0, 1, 2})
sut := newSUT(t, reader, time.Second, channel)
assert.NotNil(t, sut)

// When
b := []byte{0, 1, 2, 3, 4}
read, err := reader.Read(b)

// Then
assert.Nil(t, err)
assert.Equal(t, read, 3)
}

func TestReadWithEvent(t *testing.T) {
// Given
channel := make(chan types.ProgressProperties)
reader := bytes.NewReader([]byte{0, 1, 2, 3, 4, 5, 6})
sut := newSUT(t, reader, time.Nanosecond, channel)
assert.NotNil(t, sut)
b := []byte{0, 1, 2, 3, 4}

// When/Then
go func() {
res := <-channel
assert.Equal(t, res.Event, types.ProgressEventRead)
assert.Equal(t, res.Offset, uint64(5))
assert.Equal(t, res.OffsetUpdate, uint64(5))
}()
read, err := reader.Read(b)
assert.Equal(t, read, 5)
assert.Nil(t, err)

}
30 changes: 29 additions & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,37 @@ type SystemContext struct {
CompressionLevel *int
}

// ProgressEvent is the type of events a progress reader can produce
// Warning: new event types may be added any time.
type ProgressEvent uint

const (
// ProgressEventNewArtifact will be fired on progress reader setup
ProgressEventNewArtifact ProgressEvent = iota

// ProgressEventRead indicates that the artifact download is currently in
// progress
ProgressEventRead

// ProgressEventDone is fired when the data transfer has been finished for
// the specific artifact
ProgressEventDone
)

// ProgressProperties is used to pass information from the copy code to a monitor which
// can use the real-time information to produce output or react to changes.
type ProgressProperties struct {
// The event indicating what
Event ProgressEvent

// The artifact which has been updated in this interval
Artifact BlobInfo
Offset uint64

// The currently downloaded size in bytes
// Increases from 0 to the final Artifact size
Offset uint64

// The additional offset which has been downloaded inside the last update
// interval. Will be reset after each ProgressEventRead event.
OffsetUpdate uint64
}