Skip to content

Conversation

@mxm
Copy link
Contributor

@mxm mxm commented Sep 1, 2025

Adds error stream functionality to the Flink dynamic sink system, allowing applications to handle processing errors gracefully instead of failing the entire job.

Benefits:

  • Fault tolerance: Applications can continue processing valid records while isolating problematic ones
  • Observability: Failed records and their exceptions are captured for debugging and monitoring
  • Flexibility: Users can implement custom error handling logic (retry, dead letter queue, logging, etc.)
  • Existing applications continue to work unchanged; error stream is opt-in

Usage:

  DynamicIcebergSink.forInput(dataStream)
      .generator(new MyGenerator())
      .catalogLoader(catalogLoader)
      .errorStreamConsumer(errorStream -> {
          // Handle errors
          errorStream.sinkTo(..)
      })
      .append();

@github-actions github-actions bot added the flink label Sep 1, 2025
Adds error stream functionality to the Flink dynamic sink system, allowing applications to handle processing errors gracefully instead of failing the entire job.

Benefits:
- Fault tolerance: Applications can continue processing valid records while isolating problematic ones
- Observability: Failed records and their exceptions are captured for debugging and monitoring
- Flexibility: Users can implement custom error handling logic (retry, dead letter queue, logging, etc.)
- Existing applications continue to work unchanged; error stream is opt-in

Usage:
```
  DynamicIcebergSink.forInput(dataStream)
      .generator(new MyGenerator())
      .catalogLoader(catalogLoader)
      .errorStreamConsumer(errorStream -> {
          // Handle errors
          errorStream.sinkTo(..)
      })
      .append();
```
@VisibleForTesting
static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream";

@VisibleForTesting static final String ERROR_STREAM = "error-stream";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not only visible for testing. This is reused in the DynamicTableUpdateOperator, so we should remove the annotation.
Also maybe move this to the DynamicIcebergSink class?

Comment on lines 39 to 40
@VisibleForTesting
static final String DYNAMIC_TABLE_UPDATE_STREAM = "dynamic-table-update-stream";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but this is also not just visible because of the testing. Also might be better in DynamicIcebergSink, as it is reused there

try {
out.collect(processRecord(record));
} catch (Exception e) {
// Send erroneous record to side output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: records?

@pvary
Copy link
Contributor

pvary commented Sep 12, 2025

How good is this solution in handling transient errors?

Let's say, we have a temporary network issue which prevents us accessing the catalogs for a sort time.
Will this route all of the records to the error stream?

Comment on lines +461 to +464
SideOutputDataStream<Tuple2<DynamicRecordInternal, Exception>> erroneousFromProcessor =
converted.getSideOutput(errorStreamOutputTag);
SideOutputDataStream<Tuple2<DynamicRecordInternal, Exception>> erroneousFromUpdater =
updater.getSideOutput(errorStreamOutputTag);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shall we just inline these? (optional)

Copy link
Contributor Author

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How good is this solution in handling transient errors?

Let's say, we have a temporary network issue which prevents us accessing the catalogs for a sort time.
Will this route all of the records to the error stream?

That's a fair point. I think we want to further narrow down the types of error. Catching all exceptions when processing is too broad. We already restrict the exception handling to the table metadata update logic, but we probably want to add a dedicated exception type to mark the non-transient parts of the update process.

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 23, 2025
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants