Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] - Add DataOrErr #3520

Merged
merged 8 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 21 additions & 27 deletions pkg/handlers/ar.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,56 +22,50 @@ func newARHandler() *arHandler {

// HandleFile processes AR formatted files. This function needs to be implemented to extract or
// manage data from AR files according to specific requirements.
func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) {
archiveChan := make(chan []byte, defaultBufferSize)
func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

if feature.ForceSkipArchives.Load() {
close(archiveChan)
return archiveChan, nil
close(dataOrErrChan)
return dataOrErrChan
}

go func() {
ctx, cancel := logContext.WithTimeout(ctx, maxTimeout)
defer cancel()
defer close(archiveChan)

// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(start, err)
h.metrics.incFilesProcessed()
}()
defer close(dataOrErrChan)

// Defer a panic recovery to handle any panics that occur during the AR processing.
defer func() {
if r := recover(); r != nil {
// Return the panic as an error.
var panicErr error
if e, ok := r.(error); ok {
err = e
panicErr = e
} else {
err = fmt.Errorf("panic occurred: %v", r)
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(err, "Panic occurred when reading ar archive")
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open ar archive")
}
}()

var arReader *deb.Ar
arReader, err = deb.LoadAr(input)
start := time.Now()
arReader, err := deb.LoadAr(input)
if err != nil {
ctx.Logger().Error(err, "error reading AR")
ctx.Logger().Error(err, "Error loading AR file")
return
}

if err = h.processARFiles(ctx, arReader, archiveChan); err != nil {
ctx.Logger().Error(err, "error processing AR files")
err = h.processARFiles(ctx, arReader, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}

// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()

return archiveChan, nil
return dataOrErrChan
}

func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, archiveChan chan []byte) error {
func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, dataOrErrChan chan DataOrErr) error {
for {
select {
case <-ctx.Done():
Expand All @@ -94,7 +88,7 @@ func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, archi
return fmt.Errorf("error creating mime-type reader: %w", err)
}

if err := h.handleNonArchiveContent(fileCtx, rdr, archiveChan); err != nil {
if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
fileCtx.Logger().Error(err, "error handling archive content in AR")
h.metrics.incErrors()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/ar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func TestHandleARFile(t *testing.T) {
defer rdr.Close()

handler := newARHandler()
archiveChan, err := handler.HandleFile(context.AddLogger(ctx), rdr)
dataOrErrChan := handler.HandleFile(context.AddLogger(ctx), rdr)
assert.NoError(t, err)

wantChunkCount := 102
count := 0
for range archiveChan {
for range dataOrErrChan {
count++
}

Expand Down
50 changes: 26 additions & 24 deletions pkg/handlers/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,46 +44,43 @@ func newArchiveHandler() *archiveHandler {
// utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive,
// it processes it accordingly; otherwise, it handles the input as non-archive content.
// The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails.
func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) {
dataChan := make(chan []byte, defaultBufferSize)
func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

if feature.ForceSkipArchives.Load() {
close(dataChan)
return dataChan, nil
close(dataOrErrChan)
return dataOrErrChan
}

go func() {
var err error
defer close(dataChan)
defer close(dataOrErrChan)

// The underlying 7zip library may panic when attempting to open an archive.
// This is due to an Index Out Of Range (IOOR) error when reading the archive header.
// See: https://github.com/bodgit/sevenzip/blob/74bff0da9b233317e4ea7dd8c184a315db71af2a/types.go#L846
defer func() {
if r := recover(); r != nil {
// Return the panic as an error.
var panicErr error
if e, ok := r.(error); ok {
err = e
panicErr = e
} else {
err = fmt.Errorf("panic occurred: %v", r)
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(err, "Panic occurred when attempting to open archive")
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open archive")
}
}()

// Update the metrics for the file processing.
start := time.Now()
defer func() {
h.measureLatencyAndHandleErrors(start, err)
err := h.openArchive(ctx, 0, input, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}()

if err = h.openArchive(ctx, 0, input, dataChan); err != nil {
ctx.Logger().Error(err, "error unarchiving chunk.")
}

// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()

return dataChan, nil
return dataOrErrChan
}

var ErrMaxDepthReached = errors.New("max archive depth reached")
Expand All @@ -92,7 +89,12 @@ var ErrMaxDepthReached = errors.New("max archive depth reached")
// It takes a reader from which it attempts to identify and process the archive format. Depending on the archive type,
// it either decompresses or extracts the contents directly, sending data to the provided channel.
// Returns an error if the archive cannot be processed due to issues like exceeding maximum depth or unsupported formats.
func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader fileReader, archiveChan chan []byte) error {
func (h *archiveHandler) openArchive(
ctx logContext.Context,
depth int,
reader fileReader,
dataOrErrChan chan DataOrErr,
) error {
ctx.Logger().V(4).Info("Starting archive processing", "depth", depth)
defer ctx.Logger().V(4).Info("Finished archive processing", "depth", depth)

Expand All @@ -107,7 +109,7 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f

if reader.format == nil {
if depth > 0 {
return h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(reader), archiveChan)
return h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(reader), dataOrErrChan)
}
return fmt.Errorf("unknown archive format")
}
Expand Down Expand Up @@ -135,9 +137,9 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f
}
defer rdr.Close()

return h.openArchive(ctx, depth+1, rdr, archiveChan)
return h.openArchive(ctx, depth+1, rdr, dataOrErrChan)
case archiver.Extractor:
err := archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), reader, nil, h.extractorHandler(archiveChan))
err := archive.Extract(logContext.WithValue(ctx, depthKey, depth+1), reader, nil, h.extractorHandler(dataOrErrChan))
if err != nil {
return fmt.Errorf("error extracting archive with format: %s: %w", reader.format.Name(), err)
}
Expand All @@ -151,7 +153,7 @@ func (h *archiveHandler) openArchive(ctx logContext.Context, depth int, reader f
// It logs the extraction, checks for cancellation, and decides whether to skip the file based on its name or type,
// particularly for binary files if configured to skip. If the file is not skipped, it recursively calls openArchive
// to handle nested archives or to continue processing based on the file's content and depth in the archive structure.
func (h *archiveHandler) extractorHandler(archiveChan chan []byte) func(context.Context, archiver.File) error {
func (h *archiveHandler) extractorHandler(dataOrErrChan chan DataOrErr) func(context.Context, archiver.File) error {
return func(ctx context.Context, file archiver.File) error {
lCtx := logContext.WithValues(
logContext.AddLogger(ctx),
Expand Down Expand Up @@ -223,6 +225,6 @@ func (h *archiveHandler) extractorHandler(archiveChan chan []byte) func(context.
h.metrics.observeFileSize(fileSize)

lCtx.Logger().V(4).Info("Processed file successfully", "filename", file.Name(), "size", file.Size())
return h.openArchive(lCtx, depth, rdr, archiveChan)
return h.openArchive(lCtx, depth, rdr, dataOrErrChan)
}
}
10 changes: 5 additions & 5 deletions pkg/handlers/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestArchiveHandler(t *testing.T) {
}
defer newReader.Close()

archiveChan, err := handler.HandleFile(logContext.Background(), newReader)
dataOrErrChan := handler.HandleFile(logContext.Background(), newReader)
if testCase.expectErr {
assert.NoError(t, err)
return
Expand All @@ -100,9 +100,9 @@ func TestArchiveHandler(t *testing.T) {
count := 0
re := regexp.MustCompile(testCase.matchString)
matched := false
for chunk := range archiveChan {
for chunk := range dataOrErrChan {
count++
if re.Match(chunk) {
if re.Match(chunk.Data) {
matched = true
}
}
Expand All @@ -123,8 +123,8 @@ func TestOpenInvalidArchive(t *testing.T) {
assert.NoError(t, err)
defer rdr.Close()

archiveChan := make(chan []byte)
dataOrErrChan := make(chan DataOrErr)

err = handler.openArchive(ctx, 0, rdr, archiveChan)
err = handler.openArchive(ctx, 0, rdr, dataOrErrChan)
assert.Error(t, err)
}
55 changes: 39 additions & 16 deletions pkg/handlers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"context"
"errors"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -30,40 +31,53 @@ func newDefaultHandler(handlerType handlerType) *defaultHandler {
// utilizing a single output channel. It first tries to identify the input as an archive. If it is an archive,
// it processes it accordingly; otherwise, it handles the input as non-archive content.
// The function returns a channel that will receive the extracted data bytes and an error if the initial setup fails.
func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) (chan []byte, error) {
func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) chan DataOrErr {
// Shared channel for both archive and non-archive content.
dataChan := make(chan []byte, defaultBufferSize)
dataOrErrChan := make(chan DataOrErr, defaultBufferSize)

go func() {
defer close(dataChan)
defer close(dataOrErrChan)

// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(start, err)
err := h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}()

if err = h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataChan); err != nil {
ctx.Logger().Error(err, "error handling non-archive content.")
}

// Update the metrics for the file processing and handle errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()

return dataChan, nil
return dataOrErrChan
}

// measureLatencyAndHandleErrors measures the latency of the file processing and updates the metrics accordingly.
// It also records errors and timeouts in the metrics.
func (h *defaultHandler) measureLatencyAndHandleErrors(start time.Time, err error) {
func (h *defaultHandler) measureLatencyAndHandleErrors(
ctx logContext.Context,
start time.Time,
err error,
dataErrChan chan DataOrErr,
) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with this area of the codebase, but it's starting to smell like this method is doing a little too much.

Am I reading this right, that we're now writing to the dataErrChan instead of just recording metrics? (side nit: make dataErrChan a write-only parameter).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I can create a follow-up PR to break this function into smaller parts. You're correct—we need to pass all errors to dataErrChan instead of just logging them and moving on, as we did before.

if err == nil {
h.metrics.observeHandleFileLatency(time.Since(start).Milliseconds())
return
}
dataOrErr := DataOrErr{}

h.metrics.incErrors()
if errors.Is(err, context.DeadlineExceeded) {
h.metrics.incFileProcessingTimeouts()
dataOrErr.Err = fmt.Errorf("%w: error processing chunk", err)
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
return
}

dataOrErr.Err = err
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
}

Expand All @@ -72,7 +86,11 @@ func (h *defaultHandler) measureLatencyAndHandleErrors(start time.Time, err erro
// on the type, particularly for binary files. It manages reading file chunks and writing them to the archive channel,
// effectively collecting the final bytes for further processing. This function is a key component in ensuring that all
// file content, regardless of being an archive or not, is handled appropriately.
func (h *defaultHandler) handleNonArchiveContent(ctx logContext.Context, reader mimeTypeReader, archiveChan chan []byte) error {
func (h *defaultHandler) handleNonArchiveContent(
ctx logContext.Context,
reader mimeTypeReader,
dataOrErrChan chan DataOrErr,
) error {
mimeExt := reader.mimeExt

if common.SkipFile(mimeExt) || common.IsBinary(mimeExt) {
Expand All @@ -85,13 +103,18 @@ func (h *defaultHandler) handleNonArchiveContent(ctx logContext.Context, reader

chunkReader := sources.NewChunkReader()
for data := range chunkReader(ctx, reader) {
dataOrErr := DataOrErr{}
if err := data.Error(); err != nil {
ctx.Logger().Error(err, "error reading chunk")
h.metrics.incErrors()
dataOrErr.Err = fmt.Errorf("%w: error reading chunk", err)
if writeErr := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); writeErr != nil {
return fmt.Errorf("%w: error writing to data channel", writeErr)
}
continue
}

if err := common.CancellableWrite(ctx, archiveChan, data.Bytes()); err != nil {
dataOrErr.Data = data.Bytes()
if err := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); err != nil {
return err
}
h.metrics.incBytesProcessed(len(data.Bytes()))
Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func TestHandleNonArchiveFile(t *testing.T) {
defer rdr.Close()

handler := newDefaultHandler(defaultHandlerType)
archiveChan, err := handler.HandleFile(context.AddLogger(ctx), rdr)
dataOrErrChan := handler.HandleFile(context.AddLogger(ctx), rdr)
assert.NoError(t, err)

wantChunkCount := 6
count := 0
for range archiveChan {
for range dataOrErrChan {
count++
}

Expand Down
Loading
Loading