Skip to content

fix(scanner): guard against nil AckFunc in ackOnce to prevent panic#683

Merged
jem-davies merged 1 commit intowarpstreamlabs:mainfrom
ksinica:fix/scanner-nil-ackfunc-panic
Feb 3, 2026
Merged

fix(scanner): guard against nil AckFunc in ackOnce to prevent panic#683
jem-davies merged 1 commit intowarpstreamlabs:mainfrom
ksinica:fix/scanner-nil-ackfunc-panic

Conversation

@ksinica
Copy link
Copy Markdown
Contributor

@ksinica ksinica commented Feb 2, 2026

This PR fixes panic in ackOnce when AckFunc is nil.

The S3 processor passes nil as the AckFunc to Scanner.Create(). When the CSV scanner encounters a non-EOF error (such as a malformed CSV row), it calls ackOnce(nil) which attempts to invoke nil(ctx, err), resulting in a panic.

Root Cause

The issue is in the interaction between:

  1. S3 Processor (internal/impl/aws/processor_s3.go:240): Creates a scanner without providing an AckFunc, passing nil instead.
  2. CSV Scanner (internal/impl/pure/scanner_csv.go): When it encounters a parse error (not EOF), it returns the error.
  3. Scanner wrapper (public/service/scanner.go:133): The managedAckBatchScanner.NextBatch calls s.sourceAck(ctx, err) on non-EOF errors.
  4. ackOnce (public/service/scanner.go:89): The wrapper doesn't guard against a nil underlying function, leading to the nil pointer dereference.

Fix

Add a nil check in ackOnce before calling the underlying AckFunc:

func ackOnce(fn AckFunc) AckFunc {
    var once sync.Once
    return func(ctx context.Context, err error) error {
        var ackErr error
        once.Do(func() {
            if fn != nil {  // <-- Added nil guard
                ackErr = fn(ctx, err)
            }
        })
        return ackErr
    }
}

Testing

Added unit tests in public/service/scanner_test.go:

  • TestAutoAggregateBatchScannerAcksWithNilAckFunc - Reproduces the panic scenario
  • TestAutoAggregateBatchScannerAcksWithNilAckFuncOnClose - Tests Close with nil AckFunc
  • TestAutoAggregateBatchScannerAcksWithValidAckFunc - Ensures normal operation still works
  • TestAutoAggregateBatchScannerAcksErrorPropagatesAck - Tests error propagation

Reproduction

Reproduction steps

Prerequisites

  • Docker and Docker Compose
  • Bento binary

Files

docker-compose.yaml

services:
  minio:
    image: minio/minio:latest
    container_name: minio-repro
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ":9001"
    healthcheck:
      test: ["CMD", "mc", "ready", "local"]
      interval: 5s
      timeout: 5s
      retries: 5

  minio-init:
    image: minio/mc:latest
    container_name: minio-init
    depends_on:
      minio:
        condition: service_healthy
    entrypoint:
      - /bin/sh
      - -c
      - |
        mc alias set local http://minio:9000 minioadmin minioadmin
        mc mb local/test-bucket --ignore-existing
        printf 'header1\001header2\001header3\nvalue1\001"unclosed quote' > /tmp/malformed.csv
        mc cp /tmp/malformed.csv local/test-bucket/malformed.csv
        echo 'Malformed CSV uploaded to test-bucket/malformed.csv'

nil_ack_panic.yaml

input:
  generate:
    count: 1
    interval: ""
    mapping: |
      root = {
        "bucket": env("TEST_BUCKET").or("test-bucket"),
        "key": env("TEST_KEY").or("malformed.csv")
      }

pipeline:
  processors:
    - aws_s3:
        endpoint: ${AWS_ENDPOINT_URL_S3:http://localhost:9000}
        bucket: ${! this.bucket }
        region: ${AWS_REGION:us-east-1}
        force_path_style_urls: true
        key: ${! this.key }
        credentials:
          id: ${AWS_ACCESS_KEY_ID:minioadmin}
          secret: ${AWS_SECRET_ACCESS_KEY:minioadmin}
        scanner:
          csv:
            custom_delimiter: "\x01"
            parse_header_row: true

output:
  stdout:
    codec: lines

Steps

  1. Start MinIO:

    docker-compose up -d
  2. Wait for minio-init to complete:

    while docker ps --format '{{.Names}}' | grep -q minio-init; do sleep 1; done
  3. Run the pipeline (before fix - will panic):

    bento run nil_ack_panic.yaml
  4. Observe the panic:

    panic: runtime error: invalid memory address or nil pointer dereference
    [signal SIGSEGV: segmentation violation code=0x2 addr=0x0 pc=0x108080a38]
    
    goroutine 83 [running]:
    github.com/warpstreamlabs/bento/internal/impl/pure.(*csvScannerCreator).Create.AutoAggregateBatchScannerAcks.ackOnce.func1.1()
        .../public/service/scanner.go:89 +0x28
    ...
    

The S3 processor passes nil as AckFunc to Scanner.Create(). When the
CSV scanner encounters a non-EOF error, it calls ackOnce(nil) which
attempts to invoke nil(ctx, err), resulting in a panic.

Add a nil check in ackOnce to safely handle nil AckFunc.
Copy link
Copy Markdown

@informalict informalict left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@gregfurman gregfurman added the bug Something isn't working label Feb 3, 2026
@jem-davies jem-davies merged commit c14d148 into warpstreamlabs:main Feb 3, 2026
3 checks passed
@ksinica ksinica deleted the fix/scanner-nil-ackfunc-panic branch February 4, 2026 17:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants