Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] - Propagate Async File Handling Errors #3403

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions pkg/handlers/ar.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,56 +22,55 @@ func newARHandler() *arHandler {

// HandleFile processes AR formatted files. This function needs to be implemented to extract or
// manage data from AR files according to specific requirements.
func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) {
archiveChan := make(chan []byte, defaultBufferSize)
func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

if feature.ForceSkipArchives.Load() {
close(archiveChan)
return archiveChan, nil
close(dataOrErrChan)
return dataOrErrChan
}

go func() {
ctx, cancel := logContext.WithTimeout(ctx, maxTimeout)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I left a comment, but it was probably on the closed PR. This was removed because the context timeout is set at the call site, so setting it here has no effect since it inherits the context from HandleFile.
here

defer cancel()
defer close(archiveChan)
defer close(dataOrErrChan)

// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(start, err)
h.metrics.incFilesProcessed()
}()

// Defer a panic recovery to handle any panics that occur during the AR processing.
defer func() {
if r := recover(); r != nil {
// Return the panic as an error.
var panicErr error
if e, ok := r.(error); ok {
err = e
panicErr = e
} else {
err = fmt.Errorf("panic occurred: %v", r)
panicErr = fmt.Errorf("panic occurred: %v", r)
}
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
ctx.Logger().Error(err, "Panic occurred when reading ar archive")
}
}()

var arReader *deb.Ar
arReader, err = deb.LoadAr(input)
arReader, err := deb.LoadAr(input)
if err != nil {
ctx.Logger().Error(err, "error reading AR")
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: loading AR error: %v", ErrProcessingFatal, err),
}
return
}

if err = h.processARFiles(ctx, arReader, archiveChan); err != nil {
ctx.Logger().Error(err, "error processing AR files")
err = h.processARFiles(ctx, arReader, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}

// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()

return archiveChan, nil
return dataOrErrChan
}

func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, archiveChan chan []byte) error {
func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, dataOrErrChan chan DataOrErr) error {
for {
select {
case <-ctx.Done():
Expand All @@ -91,12 +90,19 @@ func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, archi

rdr, err := newMimeTypeReader(arEntry.Data)
if err != nil {
return fmt.Errorf("error creating mime-type reader: %w", err)
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error creating AR mime-type reader: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}

if err := h.handleNonArchiveContent(fileCtx, rdr, archiveChan); err != nil {
fileCtx.Logger().Error(err, "error handling archive content in AR")
if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error handling archive content in AR: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}

h.metrics.incFilesProcessed()
Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/ar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func TestHandleARFile(t *testing.T) {
defer rdr.Close()

handler := newARHandler()
archiveChan, err := handler.HandleFile(context.AddLogger(ctx), rdr)
dataOrErrChan := handler.HandleFile(context.AddLogger(ctx), rdr)
assert.NoError(t, err)

wantChunkCount := 102
count := 0
for range archiveChan {
for range dataOrErrChan {
count++
}

Expand Down
55 changes: 30 additions & 25 deletions pkg/handlers/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,46 +44,46 @@ func newArchiveHandler() *archiveHandler {
// utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive,
// it processes it accordingly; otherwise, it handles the input as non-archive content.
// The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails.
func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) {
dataChan := make(chan []byte, defaultBufferSize)
func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

if feature.ForceSkipArchives.Load() {
close(dataChan)
return dataChan, nil
close(dataOrErrChan)
return dataOrErrChan
}

go func() {
var err error
defer close(dataChan)
defer close(dataOrErrChan)

// The underlying 7zip library may panic when attempting to open an archive.
// This is due to an Index Out Of Range (IOOR) error when reading the archive header.
// See: https://github.com/bodgit/sevenzip/blob/74bff0da9b233317e4ea7dd8c184a315db71af2a/types.go#L846
defer func() {
if r := recover(); r != nil {
// Return the panic as an error.
var panicErr error
if e, ok := r.(error); ok {
err = e
panicErr = e
} else {
err = fmt.Errorf("panic occurred: %v", r)
panicErr = fmt.Errorf("panic occurred: %v", r)
}
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
ctx.Logger().Error(err, "Panic occurred when attempting to open archive")
}
}()

// Update the metrics for the file processing.
start := time.Now()
defer func() {
h.measureLatencyAndHandleErrors(start, err)
h.metrics.incFilesProcessed()
}()

if err = h.openArchive(ctx, 0, input, dataChan); err != nil {
ctx.Logger().Error(err, "error unarchiving chunk.")
err := h.openArchive(ctx, 0, input, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}

// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()

return dataChan, nil
return dataOrErrChan
}

var ErrMaxDepthReached = errors.New("max archive depth reached")
Expand All @@ -92,7 +92,12 @@ var ErrMaxDepthReached = errors.New("max archive depth reached")
// It takes a reader from which it attempts to identify and process the archive format. Depending on the archive type,
// it either decompresses or extracts the contents directly, sending data to the provided channel.
// Returns an error if the archive cannot be processed due to issues like exceeding maximum depth or unsupported formats.
func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader fileReader, archiveChan chan []byte) error {
func (h *archiveHandler) openArchive(
ctx logContext.Context,
depth int,
reader fileReader,
dataOrErrChan chan DataOrErr,
) error {
ctx.Logger().V(4).Info("Starting archive processing", "depth", depth)
defer ctx.Logger().V(4).Info("Finished archive processing", "depth", depth)

Expand All @@ -107,9 +112,9 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f

if reader.format == nil {
if depth > 0 {
return h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(reader), archiveChan)
return h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(reader), dataOrErrChan)
}
return fmt.Errorf("unknown archive format")
return errors.New("unknown archive format")
}

switch archive := reader.format.(type) {
Expand All @@ -131,9 +136,9 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f
}
defer rdr.Close()

return h.openArchive(ctx, depth+1, rdr, archiveChan)
return h.openArchive(ctx, depth+1, rdr, dataOrErrChan)
case archiver.Extractor:
err := archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), reader, nil, h.extractorHandler(archiveChan))
err := archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), reader, nil, h.extractorHandler(dataOrErrChan))
if err != nil {
return fmt.Errorf("error extracting archive with format: %s: %w", reader.format.Name(), err)
}
Expand All @@ -147,7 +152,7 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f
// It logs the extraction, checks for cancellation, and decides whether to skip the file based on its name or type,
// particularly for binary files if configured to skip. If the file is not skipped, it recursively calls openArchive
// to handle nested archives or to continue processing based on the file's content and depth in the archive structure.
func (h *archiveHandler) extractorHandler(archiveChan chan []byte) func(context.Context, archiver.File) error {
func (h *archiveHandler) extractorHandler(dataOrErrChan chan DataOrErr) func(context.Context, archiver.File) error {
return func(ctx context.Context, file archiver.File) error {
lCtx := logContext.WithValues(
logContext.AddLogger(ctx),
Expand Down Expand Up @@ -219,6 +224,6 @@ func (h *archiveHandler) extractorHandler(archiveChan chan []byte) func(context.
h.metrics.observeFileSize(fileSize)

lCtx.Logger().V(4).Info("Processed file successfully", "filename", file.Name(), "size", file.Size())
return h.openArchive(lCtx, depth, rdr, archiveChan)
return h.openArchive(lCtx, depth, rdr, dataOrErrChan)
}
}
10 changes: 5 additions & 5 deletions pkg/handlers/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestArchiveHandler(t *testing.T) {
}
defer newReader.Close()

archiveChan, err := handler.HandleFile(logContext.Background(), newReader)
dataOrErrChan := handler.HandleFile(logContext.Background(), newReader)
if testCase.expectErr {
assert.NoError(t, err)
return
Expand All @@ -100,9 +100,9 @@ func TestArchiveHandler(t *testing.T) {
count := 0
re := regexp.MustCompile(testCase.matchString)
matched := false
for chunk := range archiveChan {
for chunk := range dataOrErrChan {
count++
if re.Match(chunk) {
if re.Match(chunk.Data) {
matched = true
}
}
Expand All @@ -123,8 +123,8 @@ func TestOpenInvalidArchive(t *testing.T) {
assert.NoError(t, err)
defer rdr.Close()

archiveChan := make(chan []byte)
dataOrErrChan := make(chan DataOrErr)

err = handler.openArchive(ctx, 0, rdr, archiveChan)
err = handler.openArchive(ctx, 0, rdr, dataOrErrChan)
assert.Error(t, err)
}
68 changes: 46 additions & 22 deletions pkg/handlers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"errors"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -30,49 +31,67 @@ func newDefaultHandler(handlerType handlerType) *defaultHandler {
// utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive,
// it processes it accordingly; otherwise, it handles the input as non-archive content.
// The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails.
func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) {
// Shared channel for both archive and non-archive content.
dataChan := make(chan []byte, defaultBufferSize)
func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
// Shared channel for archived data, non-archive data, and errors.
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

go func() {
defer close(dataChan)
defer close(dataOrErrChan)

// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(start, err)
h.metrics.incFilesProcessed()
}()

if err = h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataChan); err != nil {
ctx.Logger().Error(err, "error handling non-archive content.")
err := h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}

// Update the metrics for the file processing and handle errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()

return dataChan, nil
return dataOrErrChan
}

// measureLatencyAndHandleErrors measures the latency of the file processing and updates the metrics accordingly.
// It also records errors and timeouts in the metrics.
func (h *defaultHandler) measureLatencyAndHandleErrors(start time.Time, err error) {
func (h *defaultHandler) measureLatencyAndHandleErrors(
ctx logContext.Context,
start time.Time,
err error,
dataErrChan chan DataOrErr,
) {
if err == nil {
h.metrics.observeHandleFileLatency(time.Since(start).Milliseconds())
return
}
dataOrErr := DataOrErr{}

h.metrics.incErrors()
if errors.Is(err, context.DeadlineExceeded) {
h.metrics.incFileProcessingTimeouts()
dataOrErr.Err = fmt.Errorf("%w: error processing chunk: %v", ErrProcessingFatal, err)
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
return
}

dataOrErr.Err = err
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
}

// handleNonArchiveContent processes files that do not contain nested archives, serving as the final stage in the
// extraction/decompression process. It reads the content to detect its MIME type and decides whether to skip based
// on the type, particularly for binary files. It manages reading file chunks and writing them to the archive channel,
// effectively collecting the final bytes for further processing. This function is a key component in ensuring that all
// file content, regardless of being an archive or not, is handled appropriately.
func (h *defaultHandler) handleNonArchiveContent(ctx logContext.Context, reader mimeTypeReader, archiveChan chan []byte) error {
// handleNonArchiveContent processes files that are not archives,
// serving as the final stage in the extraction/decompression pipeline.
// It manages MIME type detection, file skipping logic, and chunk-based
// reading and writing to the data channel. This function ensures
// all non-archive content is properly handled for subsequent processing.
func (h *defaultHandler) handleNonArchiveContent(
ctx logContext.Context,
reader mimeTypeReader,
dataOrErrChan chan DataOrErr,
) error {
mimeExt := reader.mimeExt

if common.SkipFile(mimeExt) || common.IsBinary(mimeExt) {
Expand All @@ -85,13 +104,18 @@ func (h *defaultHandler) handleNonArchiveContent(ctx logContext.Context, reader

chunkReader := sources.NewChunkReader()
for data := range chunkReader(ctx, reader) {
dataOrErr := DataOrErr{}
if err := data.Error(); err != nil {
ctx.Logger().Error(err, "error reading chunk")
h.metrics.incErrors()
dataOrErr.Err = fmt.Errorf("%w: error reading chunk: %v", ErrProcessingWarning, err)
if writeErr := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); writeErr != nil {
return fmt.Errorf("%w: error writing to data channel: %v", ErrProcessingFatal, writeErr)
}
continue
}

if err := common.CancellableWrite(ctx, archiveChan, data.Bytes()); err != nil {
dataOrErr.Data = data.Bytes()
if err := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); err != nil {
return err
}
h.metrics.incBytesProcessed(len(data.Bytes()))
Expand Down
Loading
Loading