Skip to content

Commit

Permalink
Feature/bigtable error handling (apache#29885)
Browse files Browse the repository at this point in the history
* first pass of wiring error handling into write files and adding tests

* fix error handling to solve constant filenaming policy returning a null destination

* fix tests, add a safety check to the error handler

* spotless

* add documentation

* add textio error handler pass-through

* add avroio error handler pass-through

* add documentation to avroio

* wire error handling into bigtable io

* catch batching error when errors are detected

* continue work on bigtable error handling

* break apart and retry batches when they fail in bigquery

* remove check for oversized mutation

* spotless

* fix merge error

* address comments

* add BigtableWriteIT test case. it unexpectedly passes

* support routing more malformed mutation types, clean ups based on PR comments

* make retrys non-async to remove race condition causing data loss

* spotless

* update comment on identifying data exception
  • Loading branch information
johnjcasey authored and JayajP committed Jan 22, 2024
1 parent 35b67c0 commit 70a7bb9
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -40,6 +42,15 @@ <RecordT> void route(
String description)
throws Exception;

<RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception;

class ThrowingBadRecordRouter implements BadRecordRouter {

@Override
Expand All @@ -50,6 +61,22 @@ public <RecordT> void route(
@Nullable Exception exception,
String description)
throws Exception {
route(record, exception);
}

@Override
public <RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception {
route(record, exception);
}

private <RecordT> void route(RecordT record, @Nullable Exception exception) throws Exception {
if (exception != null) {
throw exception;
} else {
Expand Down Expand Up @@ -81,5 +108,21 @@ public <RecordT> void route(
.get(BAD_RECORD_TAG)
.output(BadRecord.fromExceptionInformation(record, coder, exception, description));
}

@Override
public <RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception {
c.output(
BAD_RECORD_TAG,
BadRecord.fromExceptionInformation(record, coder, exception, description),
window.maxTimestamp(),
window);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

import static org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceFactory.BigtableServiceEntry;
import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.gax.batching.BatchingException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
Expand All @@ -35,11 +41,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
Expand Down Expand Up @@ -69,11 +78,17 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.ToStringHelper;
Expand Down Expand Up @@ -773,6 +788,10 @@ public abstract static class Write
@VisibleForTesting
abstract BigtableServiceFactory getServiceFactory();

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract BadRecordRouter getBadRecordRouter();

/**
* Returns the Google Cloud Bigtable instance being written to, and other parameters.
*
Expand All @@ -796,6 +815,8 @@ static Write create() {
.setBigtableConfig(config)
.setBigtableWriteOptions(writeOptions)
.setServiceFactory(new BigtableServiceFactory())
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.build();
}

Expand All @@ -808,6 +829,10 @@ abstract static class Builder {

abstract Builder setServiceFactory(BigtableServiceFactory factory);

abstract Builder setBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter);

abstract Write build();
}

Expand Down Expand Up @@ -1093,6 +1118,13 @@ public Write withFlowControl(boolean enableFlowControl) {
.build();
}

public Write withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
return toBuilder()
.setBadRecordErrorHandler(badRecordErrorHandler)
.setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER)
.build();
}

@VisibleForTesting
Write withServiceFactory(BigtableServiceFactory factory) {
return toBuilder().setServiceFactory(factory).build();
Expand All @@ -1104,7 +1136,11 @@ Write withServiceFactory(BigtableServiceFactory factory) {
*/
public WriteWithResults withWriteResults() {
return new WriteWithResults(
getBigtableConfig(), getBigtableWriteOptions(), getServiceFactory());
getBigtableConfig(),
getBigtableWriteOptions(),
getServiceFactory(),
getBadRecordErrorHandler(),
getBadRecordRouter());
}

@Override
Expand Down Expand Up @@ -1142,18 +1178,29 @@ public static class WriteWithResults

private static final String BIGTABLE_WRITER_WAIT_TIMEOUT_MS = "bigtable_writer_wait_timeout_ms";

private static final TupleTag<BigtableWriteResult> WRITE_RESULTS =
new TupleTag<>("writeResults");

private final BigtableConfig bigtableConfig;
private final BigtableWriteOptions bigtableWriteOptions;

private final BigtableServiceFactory factory;

private final ErrorHandler<BadRecord, ?> badRecordErrorHandler;

private final BadRecordRouter badRecordRouter;

WriteWithResults(
BigtableConfig bigtableConfig,
BigtableWriteOptions bigtableWriteOptions,
BigtableServiceFactory factory) {
BigtableServiceFactory factory,
ErrorHandler<BadRecord, ?> badRecordErrorHandler,
BadRecordRouter badRecordRouter) {
this.bigtableConfig = bigtableConfig;
this.bigtableWriteOptions = bigtableWriteOptions;
this.factory = factory;
this.badRecordErrorHandler = badRecordErrorHandler;
this.badRecordRouter = badRecordRouter;
}

@Override
Expand All @@ -1173,12 +1220,24 @@ public PCollection<BigtableWriteResult> expand(
closeWaitTimeout = Duration.millis(closeWaitTimeoutMs);
}

return input.apply(
ParDo.of(
new BigtableWriterFn(
factory,
bigtableConfig,
bigtableWriteOptions.toBuilder().setCloseWaitTimeout(closeWaitTimeout).build())));
PCollectionTuple results =
input.apply(
ParDo.of(
new BigtableWriterFn(
factory,
bigtableConfig,
bigtableWriteOptions
.toBuilder()
.setCloseWaitTimeout(closeWaitTimeout)
.build(),
input.getCoder(),
badRecordRouter))
.withOutputTags(WRITE_RESULTS, TupleTagList.of(BAD_RECORD_TAG)));

badRecordErrorHandler.addErrorCollection(
results.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));

return results.get(WRITE_RESULTS);
}

@Override
Expand Down Expand Up @@ -1221,17 +1280,25 @@ private static class BigtableWriterFn

private final BigtableServiceFactory factory;
private final BigtableServiceFactory.ConfigId id;
private final Coder<KV<ByteString, Iterable<Mutation>>> inputCoder;
private final BadRecordRouter badRecordRouter;

private transient Set<KV<BigtableWriteException, BoundedWindow>> badRecords = null;

// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;

BigtableWriterFn(
BigtableServiceFactory factory,
BigtableConfig bigtableConfig,
BigtableWriteOptions writeOptions) {
BigtableWriteOptions writeOptions,
Coder<KV<ByteString, Iterable<Mutation>>> inputCoder,
BadRecordRouter badRecordRouter) {
this.factory = factory;
this.config = bigtableConfig;
this.writeOptions = writeOptions;
this.inputCoder = inputCoder;
this.badRecordRouter = badRecordRouter;
this.failures = new ConcurrentLinkedQueue<>();
this.id = factory.newId();
LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions);
Expand All @@ -1247,33 +1314,95 @@ public void startBundle(StartBundleContext c) throws IOException {
factory.getServiceForWriting(id, config, writeOptions, c.getPipelineOptions());
bigtableWriter = serviceEntry.getService().openForWriting(writeOptions);
}

badRecords = new HashSet<>();
}

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
bigtableWriter
.writeRecord(record)
.whenComplete(
(mutationResult, exception) -> {
if (exception != null) {
failures.add(new BigtableWriteException(record, exception));
}
});
bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record, window));
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1);
}

private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
if (isDataException(exception)) {
retryIndividualRecord(record, window);
} else {
failures.add(new BigtableWriteException(record, exception));
}
}
};
}

private void retryIndividualRecord(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window) {
try {
bigtableWriter.writeSingleRecord(record);
} catch (ApiException e) {
if (isDataException(e)) {
// if we get another NotFoundException, we know this is the bad record.
badRecords.add(KV.of(new BigtableWriteException(record, e), window));
} else {
failures.add(new BigtableWriteException(record, e));
}
}
}

// This method checks if an exception is the result of an error in the data.
// We first check if the exception is retryable, because if it is, we want to retry it via the
// runner. If the method is retryable, we check if it is a NotFoundException, or if it's an
// InvalidArgumentException. A NotFoundException likely means that the mutation is trying to
// write to a column family that doesn't exist. An InvalidArgumentException means that the
// mutation itself is invalid, with either an empty row key, invalid timestamp (ts <= -2),
// an empty mutation, or a column qualifier that is too large.
private static boolean isDataException(Throwable e) {
if (e instanceof ApiException && !((ApiException) e).isRetryable()) {
return e instanceof NotFoundException || e instanceof InvalidArgumentException;
}
return false;
}

@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
try {

if (bigtableWriter != null) {
bigtableWriter.close();
try {
bigtableWriter.close();
} catch (IOException e) {
// If the writer fails due to a batching exception, but no failures were detected
// it means that error handling was enabled, and that errors were detected and routed
// to the error queue. Bigtable will successfully write other failures in the batch,
// so this exception should be ignored
if (!(e.getCause() instanceof BatchingException)) {
throw e;
}
}
bigtableWriter = null;
}

for (KV<BigtableWriteException, BoundedWindow> badRecord : badRecords) {
try {
badRecordRouter.route(
c,
badRecord.getKey().getRecord(),
inputCoder,
(Exception) badRecord.getKey().getCause(),
"Failed to write malformed mutation to Bigtable",
badRecord.getValue());
} catch (Exception e) {
failures.add(badRecord.getKey());
}
}

checkForFailures();

LOG.debug("Wrote {} records", recordsWritten);

for (Map.Entry<BoundedWindow, Long> entry : seenWindows.entrySet()) {
Expand Down Expand Up @@ -1861,12 +1990,20 @@ public final long getSplitPointsConsumed() {

/** An exception that puts information about the failed record being written in its message. */
static class BigtableWriteException extends IOException {

private final KV<ByteString, Iterable<Mutation>> record;

public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
super(
String.format(
"Error mutating row %s with mutations %s",
record.getKey().toStringUtf8(), record.getValue()),
cause);
this.record = record;
}

public KV<ByteString, Iterable<Mutation>> getRecord() {
return record;
}
}
/**
Expand Down
Loading

0 comments on commit 70a7bb9

Please sign in to comment.