diff --git a/pkg/handlers/ar.go b/pkg/handlers/ar.go index 4155ad41bcdf..410df34ee2a2 100644 --- a/pkg/handlers/ar.go +++ b/pkg/handlers/ar.go @@ -22,56 +22,55 @@ func newARHandler() *arHandler { // HandleFile processes AR formatted files. This function needs to be implemented to extract or // manage data from AR files according to specific requirements. -func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) { - archiveChan := make(chan []byte, defaultBufferSize) +func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr { + dataOrErrChan := make(chan DataOrErr, defaultBufferSize) if feature.ForceSkipArchives.Load() { - close(archiveChan) - return archiveChan, nil + close(dataOrErrChan) + return dataOrErrChan } go func() { - ctx, cancel := logContext.WithTimeout(ctx, maxTimeout) - defer cancel() - defer close(archiveChan) + defer close(dataOrErrChan) - // Update the metrics for the file processing. start := time.Now() - var err error - defer func() { - h.measureLatencyAndHandleErrors(start, err) - h.metrics.incFilesProcessed() - }() // Defer a panic recovery to handle any panics that occur during the AR processing. defer func() { if r := recover(); r != nil { - // Return the panic as an error. + var panicErr error if e, ok := r.(error); ok { - err = e + panicErr = e } else { - err = fmt.Errorf("panic occurred: %v", r) + panicErr = fmt.Errorf("panic occurred: %v", r) + } + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr), } - ctx.Logger().Error(err, "Panic occurred when reading ar archive") } }() - var arReader *deb.Ar - arReader, err = deb.LoadAr(input) + arReader, err := deb.LoadAr(input) if err != nil { - ctx.Logger().Error(err, "error reading AR") + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: loading AR error: %v", ErrProcessingFatal, err), + } return } - if err = h.processARFiles(ctx, arReader, archiveChan); err != nil { - ctx.Logger().Error(err, "error processing AR files") + err = h.processARFiles(ctx, arReader, dataOrErrChan) + if err == nil { + h.metrics.incFilesProcessed() } + + // Update the metrics for the file processing and handle any errors. + h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan) }() - return archiveChan, nil + return dataOrErrChan } -func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, archiveChan chan []byte) error { +func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, dataOrErrChan chan DataOrErr) error { for { select { case <-ctx.Done(): @@ -91,12 +90,19 @@ func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, archi rdr, err := newMimeTypeReader(arEntry.Data) if err != nil { - return fmt.Errorf("error creating mime-type reader: %w", err) + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: error creating AR mime-type reader: %v", ErrProcessingWarning, err), + } + h.metrics.incErrors() + continue } - if err := h.handleNonArchiveContent(fileCtx, rdr, archiveChan); err != nil { - fileCtx.Logger().Error(err, "error handling archive content in AR") + if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil { + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: error handling archive content in AR: %v", ErrProcessingWarning, err), + } h.metrics.incErrors() + continue } h.metrics.incFilesProcessed() diff --git a/pkg/handlers/ar_test.go b/pkg/handlers/ar_test.go index 59285ca47efd..8658fdafcdff 100644 --- a/pkg/handlers/ar_test.go +++ b/pkg/handlers/ar_test.go @@ -23,12 +23,12 @@ func TestHandleARFile(t *testing.T) { defer rdr.Close() handler := newARHandler() - archiveChan, err := handler.HandleFile(context.AddLogger(ctx), rdr) + dataOrErrChan := handler.HandleFile(context.AddLogger(ctx), rdr) assert.NoError(t, err) wantChunkCount := 102 count := 0 - for range archiveChan { + for range dataOrErrChan { count++ } diff --git a/pkg/handlers/archive.go b/pkg/handlers/archive.go index f5777fa52b6c..2b0c90a379ce 100644 --- a/pkg/handlers/archive.go +++ b/pkg/handlers/archive.go @@ -44,46 +44,46 @@ func newArchiveHandler() *archiveHandler { // utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive, // it processes it accordingly; otherwise, it handles the input as non-archive content. // The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails. -func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) { - dataChan := make(chan []byte, defaultBufferSize) +func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr { + dataOrErrChan := make(chan DataOrErr, defaultBufferSize) if feature.ForceSkipArchives.Load() { - close(dataChan) - return dataChan, nil + close(dataOrErrChan) + return dataOrErrChan } go func() { - var err error - defer close(dataChan) + defer close(dataOrErrChan) // The underlying 7zip library may panic when attempting to open an archive. // This is due to an Index Out Of Range (IOOR) error when reading the archive header. // See: https://github.com/bodgit/sevenzip/blob/74bff0da9b233317e4ea7dd8c184a315db71af2a/types.go#L846 defer func() { if r := recover(); r != nil { - // Return the panic as an error. + var panicErr error if e, ok := r.(error); ok { - err = e + panicErr = e } else { - err = fmt.Errorf("panic occurred: %v", r) + panicErr = fmt.Errorf("panic occurred: %v", r) + } + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr), } - ctx.Logger().Error(err, "Panic occurred when attempting to open archive") } }() - // Update the metrics for the file processing. start := time.Now() - defer func() { - h.measureLatencyAndHandleErrors(start, err) - h.metrics.incFilesProcessed() - }() - if err = h.openArchive(ctx, 0, input, dataChan); err != nil { - ctx.Logger().Error(err, "error unarchiving chunk.") + err := h.openArchive(ctx, 0, input, dataOrErrChan) + if err == nil { + h.metrics.incFilesProcessed() } + + // Update the metrics for the file processing and handle any errors. + h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan) }() - return dataChan, nil + return dataOrErrChan } var ErrMaxDepthReached = errors.New("max archive depth reached") @@ -92,7 +92,12 @@ var ErrMaxDepthReached = errors.New("max archive depth reached") // It takes a reader from which it attempts to identify and process the archive format. Depending on the archive type, // it either decompresses or extracts the contents directly, sending data to the provided channel. // Returns an error if the archive cannot be processed due to issues like exceeding maximum depth or unsupported formats. -func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader fileReader, archiveChan chan []byte) error { +func (h *archiveHandler) openArchive( + ctx logContext.Context, + depth int, + reader fileReader, + dataOrErrChan chan DataOrErr, +) error { ctx.Logger().V(4).Info("Starting archive processing", "depth", depth) defer ctx.Logger().V(4).Info("Finished archive processing", "depth", depth) @@ -107,9 +112,9 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f if reader.format == nil { if depth > 0 { - return h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(reader), archiveChan) + return h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(reader), dataOrErrChan) } - return fmt.Errorf("unknown archive format") + return errors.New("unknown archive format") } switch archive := reader.format.(type) { @@ -131,9 +136,9 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f } defer rdr.Close() - return h.openArchive(ctx, depth+1, rdr, archiveChan) + return h.openArchive(ctx, depth+1, rdr, dataOrErrChan) case archiver.Extractor: - err := archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), reader, nil, h.extractorHandler(archiveChan)) + err := archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), reader, nil, h.extractorHandler(dataOrErrChan)) if err != nil { return fmt.Errorf("error extracting archive with format: %s: %w", reader.format.Name(), err) } @@ -147,7 +152,7 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f // It logs the extraction, checks for cancellation, and decides whether to skip the file based on its name or type, // particularly for binary files if configured to skip. If the file is not skipped, it recursively calls openArchive // to handle nested archives or to continue processing based on the file's content and depth in the archive structure. -func (h *archiveHandler) extractorHandler(archiveChan chan []byte) func(context.Context, archiver.File) error { +func (h *archiveHandler) extractorHandler(dataOrErrChan chan DataOrErr) func(context.Context, archiver.File) error { return func(ctx context.Context, file archiver.File) error { lCtx := logContext.WithValues( logContext.AddLogger(ctx), @@ -219,6 +224,6 @@ func (h *archiveHandler) extractorHandler(archiveChan chan []byte) func(context. h.metrics.observeFileSize(fileSize) lCtx.Logger().V(4).Info("Processed file successfully", "filename", file.Name(), "size", file.Size()) - return h.openArchive(lCtx, depth, rdr, archiveChan) + return h.openArchive(lCtx, depth, rdr, dataOrErrChan) } } diff --git a/pkg/handlers/archive_test.go b/pkg/handlers/archive_test.go index a24ab98a1775..3463b6220c90 100644 --- a/pkg/handlers/archive_test.go +++ b/pkg/handlers/archive_test.go @@ -91,7 +91,7 @@ func TestArchiveHandler(t *testing.T) { } defer newReader.Close() - archiveChan, err := handler.HandleFile(logContext.Background(), newReader) + dataOrErrChan := handler.HandleFile(logContext.Background(), newReader) if testCase.expectErr { assert.NoError(t, err) return @@ -100,9 +100,9 @@ func TestArchiveHandler(t *testing.T) { count := 0 re := regexp.MustCompile(testCase.matchString) matched := false - for chunk := range archiveChan { + for chunk := range dataOrErrChan { count++ - if re.Match(chunk) { + if re.Match(chunk.Data) { matched = true } } @@ -123,8 +123,8 @@ func TestOpenInvalidArchive(t *testing.T) { assert.NoError(t, err) defer rdr.Close() - archiveChan := make(chan []byte) + dataOrErrChan := make(chan DataOrErr) - err = handler.openArchive(ctx, 0, rdr, archiveChan) + err = handler.openArchive(ctx, 0, rdr, dataOrErrChan) assert.Error(t, err) } diff --git a/pkg/handlers/default.go b/pkg/handlers/default.go index 31ebbf8b4162..5baa3f8f0675 100644 --- a/pkg/handlers/default.go +++ b/pkg/handlers/default.go @@ -3,6 +3,7 @@ package handlers import ( "context" "errors" + "fmt" "io" "time" @@ -30,49 +31,67 @@ func newDefaultHandler(handlerType handlerType) *defaultHandler { // utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive, // it processes it accordingly; otherwise, it handles the input as non-archive content. // The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails. -func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) { - // Shared channel for both archive and non-archive content. - dataChan := make(chan []byte, defaultBufferSize) +func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr { + // Shared channel for archived data, non-archive data, and errors. + dataOrErrChan := make(chan DataOrErr, defaultBufferSize) go func() { - defer close(dataChan) + defer close(dataOrErrChan) - // Update the metrics for the file processing. start := time.Now() - var err error - defer func() { - h.measureLatencyAndHandleErrors(start, err) - h.metrics.incFilesProcessed() - }() - if err = h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataChan); err != nil { - ctx.Logger().Error(err, "error handling non-archive content.") + err := h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataOrErrChan) + if err == nil { + h.metrics.incFilesProcessed() } + + // Update the metrics for the file processing and handle errors. + h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan) }() - return dataChan, nil + return dataOrErrChan } // measureLatencyAndHandleErrors measures the latency of the file processing and updates the metrics accordingly. // It also records errors and timeouts in the metrics. -func (h *defaultHandler) measureLatencyAndHandleErrors(start time.Time, err error) { +func (h *defaultHandler) measureLatencyAndHandleErrors( + ctx logContext.Context, + start time.Time, + err error, + dataErrChan chan DataOrErr, +) { if err == nil { h.metrics.observeHandleFileLatency(time.Since(start).Milliseconds()) return } + dataOrErr := DataOrErr{} h.metrics.incErrors() if errors.Is(err, context.DeadlineExceeded) { h.metrics.incFileProcessingTimeouts() + dataOrErr.Err = fmt.Errorf("%w: error processing chunk: %v", ErrProcessingFatal, err) + if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil { + ctx.Logger().Error(err, "error writing to data channel") + } + return + } + + dataOrErr.Err = err + if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil { + ctx.Logger().Error(err, "error writing to data channel") } } -// handleNonArchiveContent processes files that do not contain nested archives, serving as the final stage in the -// extraction/decompression process. It reads the content to detect its MIME type and decides whether to skip based -// on the type, particularly for binary files. It manages reading file chunks and writing them to the archive channel, -// effectively collecting the final bytes for further processing. This function is a key component in ensuring that all -// file content, regardless of being an archive or not, is handled appropriately. -func (h *defaultHandler) handleNonArchiveContent(ctx logContext.Context, reader mimeTypeReader, archiveChan chan []byte) error { +// handleNonArchiveContent processes files that are not archives, +// serving as the final stage in the extraction/decompression pipeline. +// It manages MIME type detection, file skipping logic, and chunk-based +// reading and writing to the data channel. This function ensures +// all non-archive content is properly handled for subsequent processing. +func (h *defaultHandler) handleNonArchiveContent( + ctx logContext.Context, + reader mimeTypeReader, + dataOrErrChan chan DataOrErr, +) error { mimeExt := reader.mimeExt if common.SkipFile(mimeExt) || common.IsBinary(mimeExt) { @@ -85,13 +104,18 @@ func (h *defaultHandler) handleNonArchiveContent(ctx logContext.Context, reader chunkReader := sources.NewChunkReader() for data := range chunkReader(ctx, reader) { + dataOrErr := DataOrErr{} if err := data.Error(); err != nil { - ctx.Logger().Error(err, "error reading chunk") h.metrics.incErrors() + dataOrErr.Err = fmt.Errorf("%w: error reading chunk: %v", ErrProcessingWarning, err) + if writeErr := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); writeErr != nil { + return fmt.Errorf("%w: error writing to data channel: %v", ErrProcessingFatal, writeErr) + } continue } - if err := common.CancellableWrite(ctx, archiveChan, data.Bytes()); err != nil { + dataOrErr.Data = data.Bytes() + if err := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); err != nil { return err } h.metrics.incBytesProcessed(len(data.Bytes())) diff --git a/pkg/handlers/default_test.go b/pkg/handlers/default_test.go index 3d071ad6f382..613ce9dffdea 100644 --- a/pkg/handlers/default_test.go +++ b/pkg/handlers/default_test.go @@ -23,12 +23,12 @@ func TestHandleNonArchiveFile(t *testing.T) { defer rdr.Close() handler := newDefaultHandler(defaultHandlerType) - archiveChan, err := handler.HandleFile(context.AddLogger(ctx), rdr) + dataOrErrChan := handler.HandleFile(context.AddLogger(ctx), rdr) assert.NoError(t, err) wantChunkCount := 6 count := 0 - for range archiveChan { + for range dataOrErrChan { count++ } diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index 1561d826d0d5..4cfc7fcf48f0 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -2,6 +2,7 @@ package handlers import ( "bufio" + "context" "errors" "fmt" "io" @@ -38,7 +39,17 @@ type fileReader struct { *iobuf.BufferedReadSeeker } -var ErrEmptyReader = errors.New("reader is empty") +var ( + ErrEmptyReader = errors.New("reader is empty") + + // ErrProcessingFatal indicates a severe error that requires stopping the file processing. + + ErrProcessingFatal = errors.New("fatal error processing file") + + // ErrProcessingWarning indicates a recoverable error that can be logged, + // allowing processing to continue. + ErrProcessingWarning = errors.New("error processing file") +) // mimeTypeReader wraps an io.Reader with MIME type information. // This type is used to pass content through the processing pipeline @@ -80,19 +91,32 @@ func newMimeTypeReader(r io.Reader) (mimeTypeReader, error) { } // newFileReader creates a fileReader from an io.Reader, optionally using BufferedFileWriter for certain formats. -func newFileReader(r io.Reader) (fileReader, error) { - var fReader fileReader - - fReader.BufferedReadSeeker = iobuf.NewBufferedReaderSeeker(r) +// The caller is responsible for closing the reader when it is no longer needed. +func newFileReader(r io.Reader) (fReader fileReader, err error) { + // To detect the MIME type of the input data, we need a reader that supports seeking. + // This allows us to read the data multiple times if necessary without losing the original position. + // We use a BufferedReaderSeeker to wrap the original reader, enabling this functionality. + fReader.BufferedReadSeeker = iobuf.NewBufferedReadSeeker(r) + + // If an error occurs during MIME type detection, it is important we close the BufferedReaderSeeker + // to release any resources it holds (checked out buffers or temp file). + defer func() { + if err != nil { + if closeErr := fReader.Close(); closeErr != nil { + err = fmt.Errorf("%w; error closing reader: %w", err, closeErr) + } + } + }() - mime, err := mimetype.DetectReader(fReader) + var mime *mimetype.MIME + mime, err = mimetype.DetectReader(fReader) if err != nil { return fReader, fmt.Errorf("unable to detect MIME type: %w", err) } fReader.mime = mime // Reset the reader to the beginning because DetectReader consumes the reader. - if _, err := fReader.Seek(0, io.SeekStart); err != nil { + if _, err = fReader.Seek(0, io.SeekStart); err != nil { return fReader, fmt.Errorf("error resetting reader after MIME detection: %w", err) } @@ -102,7 +126,8 @@ func newFileReader(r io.Reader) (fileReader, error) { return fReader, nil } - format, _, err := archiver.Identify("", fReader) + var format archiver.Format + format, _, err = archiver.Identify("", fReader) switch { case err == nil: fReader.isGenericArchive = true @@ -117,18 +142,29 @@ func newFileReader(r io.Reader) (fileReader, error) { // Reset the reader to the beginning again to allow the handler to read from the start. // This is necessary because Identify consumes the reader. - if _, err := fReader.Seek(0, io.SeekStart); err != nil { + if _, err = fReader.Seek(0, io.SeekStart); err != nil { return fReader, fmt.Errorf("error resetting reader after archive identification: %w", err) } return fReader, nil } +// DataOrErr represents a result that can either contain data or an error. +// The Data field holds the byte slice of data, and the Err field holds any error that occurred. +// This structure is used to handle asynchronous file processing where each chunk of data +// or potential error needs to be communicated back to the caller. It allows for +// efficient streaming of file contents while also providing a way to propagate errors +// that may occur during the file handling process. +type DataOrErr struct { + Data []byte + Err error +} + // FileHandler represents a handler for files. // It has a single method, HandleFile, which takes a context and a fileReader as input, // and returns a channel of byte slices and an error. type FileHandler interface { - HandleFile(ctx logContext.Context, reader fileReader) (chan []byte, error) + HandleFile(ctx logContext.Context, reader fileReader) chan DataOrErr } // fileHandlingConfig encapsulates configuration settings that control the behavior of file processing. @@ -256,14 +292,25 @@ var maxTimeout = time.Duration(60) * time.Second func SetArchiveMaxTimeout(timeout time.Duration) { maxTimeout = timeout } // HandleFile orchestrates the complete file handling process for a given file. -// It determines the MIME type of the file, selects the appropriate handler based on this type, and processes the file. -// This function initializes the handling process and delegates to the specific handler to manage file -// extraction or processing. Errors at any stage result in an error return value. -// Successful handling passes the file content through a channel to be chunked and reported. -// The function will close the reader when it has consumed all the data. +// It determines the MIME type of the file, +// selects the appropriate handler based on this type, and processes the file. +// This function initializes the handling process and delegates to the specific +// handler to manage file extraction or processing. +// +// The function will return nil (success) in the following cases: +// - If the reader is empty (ErrEmptyReader) +// - If skipArchives option is true and the file is detected as an archive +// - If all chunks are processed successfully without critical errors // -// If the skipArchives option is set to true and the detected MIME type is a known archive type, -// the function will skip processing the file and return nil. +// The function will return an error in the following cases: +// - If the reader is nil +// - If there's an error creating the file reader +// - If there's an error closing the reader +// - If a critical error occurs during chunk processing (context cancellation, deadline exceeded, or ErrProcessingFatal) +// - If there's an error reporting a chunk +// +// Non-critical errors during chunk processing are logged +// but do not cause the function to return an error. func HandleFile( ctx logContext.Context, reader io.Reader, @@ -272,7 +319,7 @@ func HandleFile( options ...func(*fileHandlingConfig), ) error { if reader == nil { - return fmt.Errorf("reader is nil") + return errors.New("reader is nil") } rdr, err := newFileReader(reader) @@ -281,7 +328,7 @@ func HandleFile( ctx.Logger().V(5).Info("empty reader, skipping file") return nil } - return fmt.Errorf("error creating custom reader: %w", err) + return fmt.Errorf("unable to HandleFile, error creating file reader: %w", err) } defer func() { // Ensure all data is read to prevent broken pipe. @@ -307,42 +354,65 @@ func HandleFile( defer cancel() handler := selectHandler(mimeT, rdr.isGenericArchive) - archiveChan, err := handler.HandleFile(processingCtx, rdr) // Delegate to the specific handler to process the file. - if err != nil { - return fmt.Errorf("error handling file: %w", err) - } + dataOrErrChan := handler.HandleFile(processingCtx, rdr) // Delegate to the specific handler to process the file. - return handleChunks(processingCtx, archiveChan, chunkSkel, reporter) + return handleChunksWithError(processingCtx, dataOrErrChan, chunkSkel, reporter) } -// handleChunks reads data from the handlerChan and uses it to fill chunks according to a predefined skeleton (chunkSkel). -// Each filled chunk is reported using the provided reporter. This function manages the lifecycle of the channel, -// handling the termination condition when the channel closes and ensuring the cancellation of the operation if the context -// is done. It returns true if all chunks are processed successfully, otherwise returns false on errors or cancellation. -func handleChunks( +// handleChunksWithError processes data and errors received from the dataErrChan channel. +// For each DataOrErr received: +// - If it contains data, the function creates a chunk based on chunkSkel and reports it through the reporter. +// - If it contains an error, the function returns the error immediately. +// The function also listens for context cancellation to gracefully terminate processing if the context is done. +// It returns nil upon successful processing of all data, or the first encountered error. +func handleChunksWithError( ctx logContext.Context, - handlerChan chan []byte, + dataErrChan chan DataOrErr, chunkSkel *sources.Chunk, reporter sources.ChunkReporter, ) error { - if handlerChan == nil { - return fmt.Errorf("handler channel is nil") - } - for { select { - case data, open := <-handlerChan: - if !open { - ctx.Logger().V(5).Info("handler channel closed, all chunks processed") + case dataOrErr, ok := <-dataErrChan: + if !ok { + // Channel closed, processing complete. + ctx.Logger().V(5).Info("dataErrChan closed, all chunks processed") return nil } - chunk := *chunkSkel - chunk.Data = data - if err := reporter.ChunkOk(ctx, chunk); err != nil { - return fmt.Errorf("error reporting chunk: %w", err) + if dataOrErr.Err != nil { + if isFatal(dataOrErr.Err) { + return dataOrErr.Err + } + ctx.Logger().Error(dataOrErr.Err, "non-critical error processing chunk") + continue + } + if len(dataOrErr.Data) > 0 { + chunk := *chunkSkel + chunk.Data = dataOrErr.Data + if err := reporter.ChunkOk(ctx, chunk); err != nil { + return fmt.Errorf("error reporting chunk: %w", err) + } } case <-ctx.Done(): return ctx.Err() } } } + +// isFatal determines whether the given error is a fatal error that should +// terminate processing the current file, or a non-critical error that can be logged and ignored. +// "Fatal" errors include context cancellation, deadline exceeded, and the +// ErrProcessingFatal error. Non-fatal errors include the ErrProcessingWarning +// error as well as any other error that is not one of the fatal errors. +func isFatal(err error) bool { + switch { + case errors.Is(err, context.Canceled) || + errors.Is(err, context.DeadlineExceeded) || + errors.Is(err, ErrProcessingFatal): + return true + case errors.Is(err, ErrProcessingWarning): + return false + default: + return false + } +} diff --git a/pkg/handlers/handlers_test.go b/pkg/handlers/handlers_test.go index f4c3c54f26c8..45c2ff5c43f7 100644 --- a/pkg/handlers/handlers_test.go +++ b/pkg/handlers/handlers_test.go @@ -3,6 +3,8 @@ package handlers import ( "archive/zip" "bytes" + stdctx "context" + "errors" "fmt" "io" "net/http" @@ -11,6 +13,7 @@ import ( "path/filepath" "strings" "testing" + "testing/iotest" "time" "github.com/stretchr/testify/assert" @@ -517,13 +520,10 @@ func TestHandleGitCatFile(t *testing.T) { } defer os.RemoveAll(gitDir) - cmd := exec.Command("git", "-C", gitDir, "rev-parse", "HEAD") - hashBytes, err := cmd.Output() - assert.NoError(t, err, "Failed to get commit hash") - commitHash := strings.TrimSpace(string(hashBytes)) + commitHash := getGitCommitHash(t, gitDir) // Create a pipe to simulate the git cat-file stdout. - cmd = exec.Command("git", "-C", gitDir, "cat-file", "blob", fmt.Sprintf("%s:%s", commitHash, tt.fileName)) + cmd := exec.Command("git", "-C", gitDir, "cat-file", "blob", fmt.Sprintf("%s:%s", commitHash, tt.fileName)) var stderr bytes.Buffer cmd.Stderr = &stderr @@ -684,3 +684,175 @@ func setupTempGitRepoCommon(t *testing.T, fileName string, fileSize int, isUnsup return tempDir } + +func TestHandleFileNewFileReaderFailure(t *testing.T) { + customReader := iotest.ErrReader(errors.New("simulated newFileReader error")) + + chunkSkel := &sources.Chunk{} + chunkCh := make(chan *sources.Chunk) + reporter := sources.ChanReporter{Ch: chunkCh} + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := HandleFile(ctx, customReader, chunkSkel, reporter) + + assert.Error(t, err, "HandleFile should return an error when newFileReader fails") +} + +// errorInjectingReader is a custom io.Reader that injects an error after reading a certain number of bytes. +type errorInjectingReader struct { + reader io.Reader + injectAfter int64 // Number of bytes after which to inject the error + injected bool + bytesRead int64 + errorToInject error +} + +func (eir *errorInjectingReader) Read(p []byte) (int, error) { + if eir.injectAfter > 0 && eir.bytesRead >= eir.injectAfter && !eir.injected { + eir.injected = true + return 0, eir.errorToInject + } + + n, err := eir.reader.Read(p) + eir.bytesRead += int64(n) + return n, err +} + +// TestHandleGitCatFileWithPipeError tests that when an error is injected during the HandleFile processing, +// the error is reported and the git cat-file command completes successfully. +func TestHandleGitCatFileWithPipeError(t *testing.T) { + fileName := "largefile_with_error.bin" + fileSize := 100 * 1024 // 100 KB + injectErrorAfter := int64(50 * 1024) // Inject error after 50 KB + simulatedError := errors.New("simulated error during newFileReader") + + gitDir := setupTempGitRepo(t, fileName, fileSize) + defer os.RemoveAll(gitDir) + + commitHash := getGitCommitHash(t, gitDir) + + cmd := exec.Command("git", "-C", gitDir, "cat-file", "blob", fmt.Sprintf("%s:%s", commitHash, fileName)) + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + stdout, err := cmd.StdoutPipe() + assert.NoError(t, err, "Failed to create stdout pipe") + + err = cmd.Start() + assert.NoError(t, err, "Failed to start git cat-file command") + + // Wrap the stdout with errorInjectingReader to simulate an error after reading injectErrorAfter bytes. + wrappedReader := &errorInjectingReader{ + reader: stdout, + injectAfter: injectErrorAfter, + injected: false, + bytesRead: 0, + errorToInject: simulatedError, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + chunkCh := make(chan *sources.Chunk, 1000) + + go func() { + defer close(chunkCh) + err = HandleFile(ctx, wrappedReader, &sources.Chunk{}, sources.ChanReporter{Ch: chunkCh}, WithSkipArchives(false)) + assert.NoError(t, err, "HandleFile should not return an error") + }() + + for range chunkCh { + } + + err = cmd.Wait() + assert.NoError(t, err, "git cat-file command should complete without error") +} + +// getGitCommitHash retrieves the current commit hash of the Git repository. +func getGitCommitHash(t *testing.T, gitDir string) string { + t.Helper() + cmd := exec.Command("git", "-C", gitDir, "rev-parse", "HEAD") + hashBytes, err := cmd.Output() + assert.NoError(t, err, "Failed to get commit hash") + commitHash := strings.TrimSpace(string(hashBytes)) + return commitHash +} + +type mockReporter struct{ reportedChunks int } + +func (m *mockReporter) ChunkOk(logContext.Context, sources.Chunk) error { + m.reportedChunks++ + return nil +} + +func (m *mockReporter) ChunkErr(logContext.Context, error) error { return nil } + +func TestHandleChunksWithError(t *testing.T) { + tests := []struct { + name string + input []DataOrErr + expectedErr error + expectedReportedChunks int + }{ + { + name: "Non-Critical Error", + input: []DataOrErr{{Err: ErrProcessingWarning}}, + }, + { + name: "Critical Error", + input: []DataOrErr{{Err: ErrProcessingFatal}}, + expectedErr: ErrProcessingFatal, + }, + { + name: "No Error", + input: []DataOrErr{ + {Data: []byte("test data")}, + {Data: []byte("more data")}, + }, + expectedReportedChunks: 2, + }, + { + name: "Context Canceled", + input: []DataOrErr{{Err: stdctx.Canceled}}, + expectedErr: stdctx.Canceled, + }, + { + name: "Context Deadline Exceeded", + input: []DataOrErr{{Err: stdctx.DeadlineExceeded}}, + expectedErr: stdctx.DeadlineExceeded, + }, + { + name: "EOF Error", + input: []DataOrErr{{Err: io.EOF}}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + chunkSkel := &sources.Chunk{} + reporter := new(mockReporter) + + dataErrChan := make(chan DataOrErr, len(tc.input)) + for _, de := range tc.input { + dataErrChan <- de + } + close(dataErrChan) + + err := handleChunksWithError(ctx, dataErrChan, chunkSkel, reporter) + + if tc.expectedErr != nil { + assert.ErrorIs(t, err, tc.expectedErr, "handleChunksWithError should return the expected error") + } else { + assert.NoError(t, err, "handleChunksWithError should not return an error for non-critical errors") + } + + assert.Equal(t, tc.expectedReportedChunks, reporter.reportedChunks, "should have reported the expected number of chunks") + }) + } +} diff --git a/pkg/handlers/rpm.go b/pkg/handlers/rpm.go index 00142d15ff41..dc3bf000156f 100644 --- a/pkg/handlers/rpm.go +++ b/pkg/handlers/rpm.go @@ -22,63 +22,67 @@ func newRPMHandler() *rpmHandler { // HandleFile processes RPM formatted files. Further implementation is required to appropriately // handle RPM specific archive operations. -func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) { - archiveChan := make(chan []byte, defaultBufferSize) +func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr { + dataOrErrChan := make(chan DataOrErr, defaultBufferSize) if feature.ForceSkipArchives.Load() { - close(archiveChan) - return archiveChan, nil + close(dataOrErrChan) + return dataOrErrChan } go func() { - ctx, cancel := logContext.WithTimeout(ctx, maxTimeout) - defer cancel() - defer close(archiveChan) + defer close(dataOrErrChan) - // Update the metrics for the file processing. start := time.Now() - var err error - defer func() { - h.measureLatencyAndHandleErrors(start, err) - h.metrics.incFilesProcessed() - }() // Defer a panic recovery to handle any panics that occur during the RPM processing. defer func() { if r := recover(); r != nil { - // Return the panic as an error. + var panicErr error if e, ok := r.(error); ok { - err = e + panicErr = e } else { - err = fmt.Errorf("panic occurred: %v", r) + panicErr = fmt.Errorf("panic occurred: %v", r) + } + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr), } - ctx.Logger().Error(err, "Panic occurred when reading rpm archive") } }() - var rpm *rpmutils.Rpm - rpm, err = rpmutils.ReadRpm(input) + rpm, err := rpmutils.ReadRpm(input) if err != nil { - ctx.Logger().Error(err, "error reading RPM") + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: reading rpm error: %v", ErrProcessingFatal, err), + } return } - var reader rpmutils.PayloadReader - reader, err = rpm.PayloadReaderExtended() + reader, err := rpm.PayloadReaderExtended() if err != nil { - ctx.Logger().Error(err, "error getting RPM payload reader") + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: uncompressing rpm error: %v", ErrProcessingFatal, err), + } return } - if err = h.processRPMFiles(ctx, reader, archiveChan); err != nil { - ctx.Logger().Error(err, "error processing RPM files") + err = h.processRPMFiles(ctx, reader, dataOrErrChan) + if err == nil { + h.metrics.incFilesProcessed() } + + // Update the metrics for the file processing and handle any errors. + h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan) }() - return archiveChan, nil + return dataOrErrChan } -func (h *rpmHandler) processRPMFiles(ctx logContext.Context, reader rpmutils.PayloadReader, archiveChan chan []byte) error { +func (h *rpmHandler) processRPMFiles( + ctx logContext.Context, + reader rpmutils.PayloadReader, + dataOrErrChan chan DataOrErr, +) error { for { select { case <-ctx.Done(): @@ -101,8 +105,10 @@ func (h *rpmHandler) processRPMFiles(ctx logContext.Context, reader rpmutils.Pay return fmt.Errorf("error creating mime-type reader: %w", err) } - if err := h.handleNonArchiveContent(fileCtx, rdr, archiveChan); err != nil { - fileCtx.Logger().Error(err, "error handling archive content in RPM") + if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil { + dataOrErrChan <- DataOrErr{ + Err: fmt.Errorf("%w: error processing RPM archive: %v", ErrProcessingWarning, err), + } h.metrics.incErrors() } diff --git a/pkg/handlers/rpm_test.go b/pkg/handlers/rpm_test.go index f90d7b672fa0..7ed0e7ad7f6e 100644 --- a/pkg/handlers/rpm_test.go +++ b/pkg/handlers/rpm_test.go @@ -23,12 +23,12 @@ func TestHandleRPMFile(t *testing.T) { defer rdr.Close() handler := newRPMHandler() - archiveChan, err := handler.HandleFile(context.AddLogger(ctx), rdr) + dataOrErrChan := handler.HandleFile(context.AddLogger(ctx), rdr) assert.NoError(t, err) wantChunkCount := 179 count := 0 - for range archiveChan { + for range dataOrErrChan { count++ } diff --git a/pkg/iobuf/bufferedreaderseeker.go b/pkg/iobuf/bufferedreaderseeker.go index 47ba5119e5ba..8993d99fee80 100644 --- a/pkg/iobuf/bufferedreaderseeker.go +++ b/pkg/iobuf/bufferedreaderseeker.go @@ -70,10 +70,10 @@ func asSeeker(r io.Reader) io.Seeker { return seeker } -// NewBufferedReaderSeeker creates and initializes a BufferedReadSeeker. +// NewBufferedReadSeeker creates and initializes a BufferedReadSeeker. // It takes an io.Reader and checks if it supports seeking. // If the reader supports seeking, it is stored in the seeker field. -func NewBufferedReaderSeeker(r io.Reader) *BufferedReadSeeker { +func NewBufferedReadSeeker(r io.Reader) *BufferedReadSeeker { const defaultThreshold = 1 << 24 // 16MB threshold for switching to file buffering var ( @@ -82,9 +82,6 @@ func NewBufferedReaderSeeker(r io.Reader) *BufferedReadSeeker { ) seeker = asSeeker(r) - if seeker == nil { - buf = defaultBufferPool.Get() - } return &BufferedReadSeeker{ reader: r, @@ -266,6 +263,10 @@ func (br *BufferedReadSeeker) readToEnd() error { } func (br *BufferedReadSeeker) writeData(data []byte) error { + if br.buf == nil { + br.buf = br.bufPool.Get() + } + _, err := br.buf.Write(data) if err != nil { return err diff --git a/pkg/iobuf/bufferedreaderseeker_test.go b/pkg/iobuf/bufferedreaderseeker_test.go index 1f65f2dd33b1..148af2e368a6 100644 --- a/pkg/iobuf/bufferedreaderseeker_test.go +++ b/pkg/iobuf/bufferedreaderseeker_test.go @@ -121,7 +121,7 @@ func TestBufferedReaderSeekerRead(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - brs := NewBufferedReaderSeeker(tt.reader) + brs := NewBufferedReadSeeker(tt.reader) for i, readSize := range tt.reads { buf := make([]byte, readSize) @@ -241,7 +241,7 @@ func TestBufferedReaderSeekerSeek(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - brs := NewBufferedReaderSeeker(tt.reader) + brs := NewBufferedReadSeeker(tt.reader) pos, err := brs.Seek(tt.offset, tt.whence) if tt.expectedErr { assert.Error(t, err) @@ -336,7 +336,7 @@ func TestBufferedReaderSeekerReadAt(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - brs := NewBufferedReaderSeeker(tt.reader) + brs := NewBufferedReadSeeker(tt.reader) out := make([]byte, tt.length) n, err := brs.ReadAt(out, tt.offset) @@ -436,7 +436,7 @@ func TestBufferedReadSeekerSize(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - brs := NewBufferedReaderSeeker(tt.reader) + brs := NewBufferedReadSeeker(tt.reader) if tt.setup != nil { tt.setup(brs) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index c18c93879a9c..a859f33b5998 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -1256,7 +1256,15 @@ func (s *Git) handleBinary( catFileCtx, cancel := context.WithTimeoutCause(fileCtx, cmdTimeout, errors.New("git cat-file timeout")) defer cancel() - cmd := exec.CommandContext(catFileCtx, "git", "-C", gitDir, "cat-file", "blob", commitHash.String()+":"+path) + cmd := exec.CommandContext( + catFileCtx, + "git", + "-C", + gitDir, + "cat-file", + "blob", + commitHash.String()+":"+path, + ) var stderr bytes.Buffer cmd.Stderr = &stderr cmd.WaitDelay = waitDelay // give the command a chance to finish before the timeout :)