Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Error Handlers to File IO and related IOs (TextIO, AvroIO) #29670

Merged
merged 16 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -236,6 +238,27 @@
* destination-dependent: every window/pane for every destination will use the same number of shards
* specified via {@link Write#withNumShards} or {@link Write#withSharding}.
*
* <h3>Handling Errors</h3>
*
* <p>When using dynamic destinations, or when using a formatting function to format a record for
* writing, it's possible for an individual record to be malformed, causing an exception. By
* default, these exceptions are propagated to the runner causing the bundle to fail. These are
* usually retried, though this depends on the runner. Alternately, these errors can be routed to
* another {@link PTransform} by using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The
* ErrorHandler is registered with the pipeline (see below). See {@link ErrorHandler} for more
* documentation. Of note, this error handling only handles errors related to specific records. It
* does not handle errors related to connectivity, authorization, etc. as those should be retried by
* the runner.
*
* <pre>{@code
* PCollection<> records = ...;
* PTransform<PCollection<BadRecord>,?> alternateSink = ...;
* try (BadRecordErrorHandler<?> handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
* records.apply("Write", FileIO.writeDynamic().otherConfigs()
* .withBadRecordErrorHandler(handler));
* }
* }</pre>
*
* <h3>Writing custom types to sinks</h3>
*
* <p>Normally, when writing a collection of a custom type using a {@link Sink} that takes a
Expand Down Expand Up @@ -1016,6 +1039,8 @@ public static FileNaming relativeFileNaming(

abstract boolean getNoSpilling();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<DestinationT, UserT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1062,6 +1087,9 @@ abstract Builder<DestinationT, UserT> setSharding(

abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);

abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Write<DestinationT, UserT> build();
}

Expand Down Expand Up @@ -1288,6 +1316,18 @@ public Write<DestinationT, UserT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/**
* Configures a new {@link Write} with an ErrorHandler. For configuring an ErrorHandler, see
* {@link ErrorHandler}. Whenever a record is formatted, or a lookup for a dynamic destination
* is performed, and that operation fails, the exception is passed to the error handler. This is
* intended to handle any errors related to the data of a record, but not any connectivity or IO
* errors related to the literal writing of a record.
*/
public Write<DestinationT, UserT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
if (getDynamic()) {
Expand Down Expand Up @@ -1391,6 +1431,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
if (getBadRecordErrorHandler() != null) {
writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
return input.apply(writeFiles);
}

Expand Down
20 changes: 20 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand Down Expand Up @@ -176,6 +178,10 @@
*
* <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link
* DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
*
* <p>Error handling for records that are malformed can be handled by using {@link
* TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in {@link FileIO} for
* details on usage
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -708,6 +714,8 @@ public abstract static class TypedWrite<UserT, DestinationT>
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<UserT, DestinationT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -754,6 +762,9 @@ abstract Builder<UserT, DestinationT> setNumShards(
abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);

abstract Builder<UserT, DestinationT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract TypedWrite<UserT, DestinationT> build();
}

Expand Down Expand Up @@ -993,6 +1004,12 @@ public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** See {@link FileIO.Write#withBadRecordErrorHandler(ErrorHandler)} for details on usage. */
public TypedWrite<UserT, DestinationT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

/** Don't write any output files if the PCollection is empty. */
public TypedWrite<UserT, DestinationT> skipIfEmpty() {
return toBuilder().setSkipIfEmpty(true).build();
Expand Down Expand Up @@ -1083,6 +1100,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
if (getSkipIfEmpty()) {
write = write.withSkipIfEmpty();
}
Expand Down
Loading
Loading