Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bigtable error handling #29885

Merged
merged 22 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c6ecbd4
first pass of wiring error handling into write files and adding tests
johnjcasey Dec 7, 2023
ab68b1e
fix error handling to solve constant filenaming policy returning a nu…
johnjcasey Dec 8, 2023
3d76ccd
fix tests, add a safety check to the error handler
johnjcasey Dec 11, 2023
abde5a3
spotless
johnjcasey Dec 11, 2023
ce965d4
add documentation
johnjcasey Dec 12, 2023
c61243d
add textio error handler pass-through
johnjcasey Dec 12, 2023
de5bd4c
add avroio error handler pass-through
johnjcasey Dec 18, 2023
92948aa
add documentation to avroio
johnjcasey Dec 18, 2023
3e0cae5
wire error handling into bigtable io
johnjcasey Dec 18, 2023
1f0b7f0
catch batching error when errors are detected
johnjcasey Dec 19, 2023
35c7124
continue work on bigtable error handling
johnjcasey Dec 20, 2023
cba5bbd
break apart and retry batches when they fail in bigquery
johnjcasey Dec 20, 2023
6d1cfeb
remove check for oversized mutation
johnjcasey Dec 28, 2023
41508bb
Merge remote-tracking branch 'upstream/master' into feature/bigtable-…
johnjcasey Dec 28, 2023
d606f89
spotless
johnjcasey Jan 2, 2024
ebc8577
fix merge error
johnjcasey Jan 2, 2024
a420d52
address comments
johnjcasey Jan 3, 2024
c630bc1
add BigtableWriteIT test case. it unexpectedly passes
johnjcasey Jan 4, 2024
5a97efc
support routing more malformed mutation types, clean ups based on PR …
johnjcasey Jan 8, 2024
8963686
make retrys non-async to remove race condition causing data loss
johnjcasey Jan 9, 2024
3086274
spotless
johnjcasey Jan 9, 2024
505319d
update comment on identifying data exception
johnjcasey Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -40,6 +42,15 @@ <RecordT> void route(
String description)
throws Exception;

<RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception;

class ThrowingBadRecordRouter implements BadRecordRouter {

@Override
Expand All @@ -50,6 +61,22 @@ public <RecordT> void route(
@Nullable Exception exception,
String description)
throws Exception {
route(record, exception);
}

@Override
public <RecordT> void route(
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception {
route(record, exception);
}

private <RecordT> void route(RecordT record, @Nullable Exception exception) throws Exception {
if (exception != null) {
throw exception;
} else {
Expand Down Expand Up @@ -81,5 +108,21 @@ public <RecordT> void route(
.get(BAD_RECORD_TAG)
.output(BadRecord.fromExceptionInformation(record, coder, exception, description));
}

@Override
public <RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception {
c.output(
BAD_RECORD_TAG,
BadRecord.fromExceptionInformation(record, coder, exception, description),
window.maxTimestamp(),
window);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

import static org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceFactory.BigtableServiceEntry;
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
Expand All @@ -35,11 +41,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
Expand Down Expand Up @@ -69,11 +78,17 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.ToStringHelper;
Expand Down Expand Up @@ -773,6 +788,10 @@ public abstract static class Write
@VisibleForTesting
abstract BigtableServiceFactory getServiceFactory();

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract BadRecordRouter getBadRecordRouter();

/**
* Returns the Google Cloud Bigtable instance being written to, and other parameters.
*
Expand All @@ -796,6 +815,8 @@ static Write create() {
.setBigtableConfig(config)
.setBigtableWriteOptions(writeOptions)
.setServiceFactory(new BigtableServiceFactory())
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.build();
}

Expand All @@ -808,6 +829,10 @@ abstract static class Builder {

abstract Builder setServiceFactory(BigtableServiceFactory factory);

abstract Builder setBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter);

abstract Write build();
}

Expand Down Expand Up @@ -1093,6 +1118,13 @@ public Write withFlowControl(boolean enableFlowControl) {
.build();
}

public Write withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
return toBuilder()
.setBadRecordErrorHandler(badRecordErrorHandler)
.setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER)
.build();
}

@VisibleForTesting
Write withServiceFactory(BigtableServiceFactory factory) {
return toBuilder().setServiceFactory(factory).build();
Expand All @@ -1104,7 +1136,11 @@ Write withServiceFactory(BigtableServiceFactory factory) {
*/
public WriteWithResults withWriteResults() {
return new WriteWithResults(
getBigtableConfig(), getBigtableWriteOptions(), getServiceFactory());
getBigtableConfig(),
getBigtableWriteOptions(),
getServiceFactory(),
getBadRecordErrorHandler(),
getBadRecordRouter());
}

@Override
Expand Down Expand Up @@ -1142,18 +1178,29 @@ public static class WriteWithResults

private static final String BIGTABLE_WRITER_WAIT_TIMEOUT_MS = "bigtable_writer_wait_timeout_ms";

private static final TupleTag<BigtableWriteResult> WRITE_RESULTS =
new TupleTag<>("writeResults");

private final BigtableConfig bigtableConfig;
private final BigtableWriteOptions bigtableWriteOptions;

private final BigtableServiceFactory factory;

private final ErrorHandler<BadRecord, ?> badRecordErrorHandler;

private final BadRecordRouter badRecordRouter;

WriteWithResults(
BigtableConfig bigtableConfig,
BigtableWriteOptions bigtableWriteOptions,
BigtableServiceFactory factory) {
BigtableServiceFactory factory,
ErrorHandler<BadRecord, ?> badRecordErrorHandler,
BadRecordRouter badRecordRouter) {
this.bigtableConfig = bigtableConfig;
this.bigtableWriteOptions = bigtableWriteOptions;
this.factory = factory;
this.badRecordErrorHandler = badRecordErrorHandler;
this.badRecordRouter = badRecordRouter;
}

@Override
Expand All @@ -1173,12 +1220,24 @@ public PCollection<BigtableWriteResult> expand(
closeWaitTimeout = Duration.millis(closeWaitTimeoutMs);
}

return input.apply(
ParDo.of(
new BigtableWriterFn(
factory,
bigtableConfig,
bigtableWriteOptions.toBuilder().setCloseWaitTimeout(closeWaitTimeout).build())));
PCollectionTuple results =
input.apply(
ParDo.of(
new BigtableWriterFn(
factory,
bigtableConfig,
bigtableWriteOptions
.toBuilder()
.setCloseWaitTimeout(closeWaitTimeout)
.build(),
input.getCoder(),
badRecordRouter))
.withOutputTags(WRITE_RESULTS, TupleTagList.of(BAD_RECORD_TAG)));

badRecordErrorHandler.addErrorCollection(
results.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));

return results.get(WRITE_RESULTS);
}

@Override
Expand Down Expand Up @@ -1221,17 +1280,25 @@ private static class BigtableWriterFn

private final BigtableServiceFactory factory;
private final BigtableServiceFactory.ConfigId id;
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
private final BadRecordRouter badRecordRouter;

private transient Set<KV<BigtableWriteException, BoundedWindow>> badRecords = null;

// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;

BigtableWriterFn(
BigtableServiceFactory factory,
BigtableConfig bigtableConfig,
BigtableWriteOptions writeOptions) {
BigtableWriteOptions writeOptions,
Coder<KV<ByteString, Iterable<Mutation>>> inputCoder,
BadRecordRouter badRecordRouter) {
this.factory = factory;
this.config = bigtableConfig;
this.writeOptions = writeOptions;
this.inputCoder = inputCoder;
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions);
Expand All @@ -1247,33 +1314,95 @@ public void startBundle(StartBundleContext c) throws IOException {
factory.getServiceForWriting(id, config, writeOptions, c.getPipelineOptions());
bigtableWriter = serviceEntry.getService().openForWriting(writeOptions);
}

badRecords = new HashSet<>();
}

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
bigtableWriter
.writeRecord(record)
.whenComplete(
(mutationResult, exception) -> {
if (exception != null) {
failures.add(new BigtableWriteException(record, exception));
}
});
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record, window));
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1);
}

private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
failures.add(new BigtableWriteException(record, exception));
}
}
};
}

private void retryIndividualRecord(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
try {
bigtableWriter.writeSingleRecord(record);
} catch (ApiException e) {
if (isDataException(e)) {
// if we get another NotFoundException, we know this is the bad record.
badRecords.add(KV.of(new BigtableWriteException(record, e), window));
} else {
failures.add(new BigtableWriteException(record, e));
}
}
}

// This method checks if an exception is the result of an error in the data.
// We first check if the exception is retryable, because if it is, we want to retry it via the
// runner. If the method is retryable, we check if it is a NotFoundException, or if it's an
// InvalidArgumentException. A NotFoundException likely means that the mutation is trying to
// write to a column family that doesn't exist. An InvalidArgumentException means that the
// mutation itself is invalid, with either an empty row key, invalid timestamp (ts <= -2),
// an empty mutation, or a column qualifier that is too large.
private static boolean isDataException(Throwable e) {
if (e instanceof ApiException && !((ApiException) e).isRetryable()) {
return e instanceof NotFoundException || e instanceof InvalidArgumentException;
}
return false;
}

@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
try {

if (bigtableWriter != null) {
bigtableWriter.close();
try {
bigtableWriter.close();
} catch (IOException e) {
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
// If the writer fails due to a batching exception, but no failures were detected
// it means that error handling was enabled, and that errors were detected and routed
// to the error queue. Bigtable will successfully write other failures in the batch,
// so this exception should be ignored
if (!(e.getCause() instanceof BatchingException)) {
throw e;
}
}
bigtableWriter = null;
}

for (KV<BigtableWriteException, BoundedWindow> badRecord : badRecords) {
try {
badRecordRouter.route(
c,
badRecord.getKey().getRecord(),
inputCoder,
(Exception) badRecord.getKey().getCause(),
"Failed to write malformed mutation to Bigtable",
badRecord.getValue());
} catch (Exception e) {
failures.add(badRecord.getKey());
}
}

checkForFailures();

johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("Wrote {} records", recordsWritten);

for (Map.Entry<BoundedWindow, Long> entry : seenWindows.entrySet()) {
Expand Down Expand Up @@ -1861,12 +1990,20 @@ public final long getSplitPointsConsumed() {

/** An exception that puts information about the failed record being written in its message. */
static class BigtableWriteException extends IOException {

private final KV<ByteString, Iterable<Mutation>> record;

public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
super(
String.format(
"Error mutating row %s with mutations %s",
record.getKey().toStringUtf8(), record.getValue()),
cause);
this.record = record;
}

public KV<ByteString, Iterable<Mutation>> getRecord() {
return record;
}
}
/**
Expand Down
Loading
Loading