Skip to content

Commit

Permalink
Merge pull request #369 from ncw/fix-async
Browse files Browse the repository at this point in the history
Rework ArchiveAsync interface to make it return any archiving errors
  • Loading branch information
mholt authored Feb 8, 2023
2 parents 70981de + ef94c8d commit 4c6dd98
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 27 deletions.
24 changes: 21 additions & 3 deletions formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,23 @@ func (caf CompressedArchive) Archive(ctx context.Context, output io.Writer, file
return caf.Archival.Archive(ctx, output, files)
}

// ArchiveAsync adds files to the output archive while compressing the result asynchronously.
func (caf CompressedArchive) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error {
do, ok := caf.Archival.(ArchiverAsync)
if !ok {
return fmt.Errorf("%s archive does not support async writing", caf.Name())
}
if caf.Compression != nil {
wc, err := caf.Compression.OpenWriter(output)
if err != nil {
return err
}
defer wc.Close()
output = wc
}
return do.ArchiveAsync(ctx, output, jobs)
}

// Extract reads files out of an archive while decompressing the results.
func (caf CompressedArchive) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchive []string, handleFile FileHandler) error {
if caf.Compression != nil {
Expand Down Expand Up @@ -358,7 +375,8 @@ var formats = make(map[string]Format)

// Interface guards
var (
_ Format = (*CompressedArchive)(nil)
_ Archiver = (*CompressedArchive)(nil)
_ Extractor = (*CompressedArchive)(nil)
_ Format = (*CompressedArchive)(nil)
_ Archiver = (*CompressedArchive)(nil)
_ ArchiverAsync = (*CompressedArchive)(nil)
_ Extractor = (*CompressedArchive)(nil)
)
13 changes: 11 additions & 2 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,25 @@ type Archiver interface {
Archive(ctx context.Context, output io.Writer, files []File) error
}

// ArchiveAsyncJob contains a File to be archived and a channel that
// the result of the archiving should be returned on.
type ArchiveAsyncJob struct {
File File
Result chan<- error
}

// ArchiverAsync is an Archiver that can also create archives
// asynchronously by pumping files into a channel as they are
// discovered.
type ArchiverAsync interface {
Archiver

// Use ArchiveAsync if you can't pre-assemble a list of all
// the files for the archive. Close the files channel after
// the files for the archive. Close the jobs channel after
// all the files have been sent.
ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error
//
// This won't return until the channel is closed.
ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error
}

// Extractor can extract files from an archive.
Expand Down
19 changes: 7 additions & 12 deletions tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,12 @@ func (t Tar) Archive(ctx context.Context, output io.Writer, files []File) error
return nil
}

func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error {
func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error {
tw := tar.NewWriter(output)
defer tw.Close()

for file := range files {
if err := t.writeFileToArchive(ctx, tw, file); err != nil {
if t.ContinueOnError && ctx.Err() == nil { // context errors should always abort
log.Printf("[ERROR] %v", err)
continue
}
return err
}
for job := range jobs {
job.Result <- t.writeFileToArchive(ctx, tw, job.File)
}

return nil
Expand Down Expand Up @@ -234,7 +228,8 @@ func (t Tar) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv

// Interface guards
var (
_ Archiver = (*Tar)(nil)
_ Extractor = (*Tar)(nil)
_ Inserter = (*Tar)(nil)
_ Archiver = (*Tar)(nil)
_ ArchiverAsync = (*Tar)(nil)
_ Extractor = (*Tar)(nil)
_ Inserter = (*Tar)(nil)
)
21 changes: 11 additions & 10 deletions zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,13 @@ func (z Zip) Archive(ctx context.Context, output io.Writer, files []File) error
return nil
}

func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error {
func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error {
zw := zip.NewWriter(output)
defer zw.Close()

var i int
for file := range files {
if err := z.archiveOneFile(ctx, zw, i, file); err != nil {
if z.ContinueOnError && ctx.Err() == nil { // context errors should always abort
log.Printf("[ERROR] %v", err)
continue
}
return err
}
for job := range jobs {
job.Result <- z.archiveOneFile(ctx, zw, i, job.File)
i++
}

Expand Down Expand Up @@ -202,7 +196,7 @@ func (z Zip) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv
skipDirs := skipList{}

for i, f := range zr.File {
f := f // make a copy for the Open closure
f := f // make a copy for the Open closure
if err := ctx.Err(); err != nil {
return err // honor context cancellation
}
Expand Down Expand Up @@ -383,3 +377,10 @@ func decodeText(input, charset string) (string, error) {
}

var zipHeader = []byte("PK\x03\x04") // NOTE: headers of empty zip files might end with 0x05,0x06 or 0x06,0x06 instead of 0x03,0x04

// Interface guards
var (
_ Archiver = Zip{}
_ ArchiverAsync = Zip{}
_ Extractor = Zip{}
)

0 comments on commit 4c6dd98

Please sign in to comment.