diff --git a/formats.go b/formats.go index e59c9c93..db22811b 100644 --- a/formats.go +++ b/formats.go @@ -235,6 +235,23 @@ func (caf CompressedArchive) Archive(ctx context.Context, output io.Writer, file return caf.Archival.Archive(ctx, output, files) } +// ArchiveAsync adds files to the output archive while compressing the result asynchronously. +func (caf CompressedArchive) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error { + do, ok := caf.Archival.(ArchiverAsync) + if !ok { + return fmt.Errorf("%s archive does not support async writing", caf.Name()) + } + if caf.Compression != nil { + wc, err := caf.Compression.OpenWriter(output) + if err != nil { + return err + } + defer wc.Close() + output = wc + } + return do.ArchiveAsync(ctx, output, jobs) +} + // Extract reads files out of an archive while decompressing the results. func (caf CompressedArchive) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchive []string, handleFile FileHandler) error { if caf.Compression != nil { @@ -358,7 +375,8 @@ var formats = make(map[string]Format) // Interface guards var ( - _ Format = (*CompressedArchive)(nil) - _ Archiver = (*CompressedArchive)(nil) - _ Extractor = (*CompressedArchive)(nil) + _ Format = (*CompressedArchive)(nil) + _ Archiver = (*CompressedArchive)(nil) + _ ArchiverAsync = (*CompressedArchive)(nil) + _ Extractor = (*CompressedArchive)(nil) ) diff --git a/interfaces.go b/interfaces.go index 782cae64..bfc53163 100644 --- a/interfaces.go +++ b/interfaces.go @@ -60,6 +60,13 @@ type Archiver interface { Archive(ctx context.Context, output io.Writer, files []File) error } +// ArchiveAsyncJob contains a File to be archived and a channel that +// the result of the archiving should be returned on. +type ArchiveAsyncJob struct { + File File + Result chan<- error +} + // ArchiverAsync is an Archiver that can also create archives // asynchronously by pumping files into a channel as they are // discovered. @@ -67,9 +74,11 @@ type ArchiverAsync interface { Archiver // Use ArchiveAsync if you can't pre-assemble a list of all - // the files for the archive. Close the files channel after + // the files for the archive. Close the jobs channel after // all the files have been sent. - ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error + // + // This won't return until the channel is closed. + ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error } // Extractor can extract files from an archive. diff --git a/tar.go b/tar.go index 81f958b1..ce719695 100644 --- a/tar.go +++ b/tar.go @@ -60,18 +60,12 @@ func (t Tar) Archive(ctx context.Context, output io.Writer, files []File) error return nil } -func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error { +func (t Tar) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error { tw := tar.NewWriter(output) defer tw.Close() - for file := range files { - if err := t.writeFileToArchive(ctx, tw, file); err != nil { - if t.ContinueOnError && ctx.Err() == nil { // context errors should always abort - log.Printf("[ERROR] %v", err) - continue - } - return err - } + for job := range jobs { + job.Result <- t.writeFileToArchive(ctx, tw, job.File) } return nil @@ -234,7 +228,8 @@ func (t Tar) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv // Interface guards var ( - _ Archiver = (*Tar)(nil) - _ Extractor = (*Tar)(nil) - _ Inserter = (*Tar)(nil) + _ Archiver = (*Tar)(nil) + _ ArchiverAsync = (*Tar)(nil) + _ Extractor = (*Tar)(nil) + _ Inserter = (*Tar)(nil) ) diff --git a/zip.go b/zip.go index 30e7b5b1..421fe6ec 100644 --- a/zip.go +++ b/zip.go @@ -114,19 +114,13 @@ func (z Zip) Archive(ctx context.Context, output io.Writer, files []File) error return nil } -func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, files <-chan File) error { +func (z Zip) ArchiveAsync(ctx context.Context, output io.Writer, jobs <-chan ArchiveAsyncJob) error { zw := zip.NewWriter(output) defer zw.Close() var i int - for file := range files { - if err := z.archiveOneFile(ctx, zw, i, file); err != nil { - if z.ContinueOnError && ctx.Err() == nil { // context errors should always abort - log.Printf("[ERROR] %v", err) - continue - } - return err - } + for job := range jobs { + job.Result <- z.archiveOneFile(ctx, zw, i, job.File) i++ } @@ -202,7 +196,7 @@ func (z Zip) Extract(ctx context.Context, sourceArchive io.Reader, pathsInArchiv skipDirs := skipList{} for i, f := range zr.File { - f := f // make a copy for the Open closure + f := f // make a copy for the Open closure if err := ctx.Err(); err != nil { return err // honor context cancellation } @@ -383,3 +377,10 @@ func decodeText(input, charset string) (string, error) { } var zipHeader = []byte("PK\x03\x04") // NOTE: headers of empty zip files might end with 0x05,0x06 or 0x06,0x06 instead of 0x03,0x04 + +// Interface guards +var ( + _ Archiver = Zip{} + _ ArchiverAsync = Zip{} + _ Extractor = Zip{} +)