Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bigtable error handling #29885

Merged
merged 22 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c6ecbd4
first pass of wiring error handling into write files and adding tests
johnjcasey Dec 7, 2023
ab68b1e
fix error handling to solve constant filenaming policy returning a nu…
johnjcasey Dec 8, 2023
3d76ccd
fix tests, add a safety check to the error handler
johnjcasey Dec 11, 2023
abde5a3
spotless
johnjcasey Dec 11, 2023
ce965d4
add documentation
johnjcasey Dec 12, 2023
c61243d
add textio error handler pass-through
johnjcasey Dec 12, 2023
de5bd4c
add avroio error handler pass-through
johnjcasey Dec 18, 2023
92948aa
add documentation to avroio
johnjcasey Dec 18, 2023
3e0cae5
wire error handling into bigtable io
johnjcasey Dec 18, 2023
1f0b7f0
catch batching error when errors are detected
johnjcasey Dec 19, 2023
35c7124
continue work on bigtable error handling
johnjcasey Dec 20, 2023
cba5bbd
break apart and retry batches when they fail in bigquery
johnjcasey Dec 20, 2023
6d1cfeb
remove check for oversized mutation
johnjcasey Dec 28, 2023
41508bb
Merge remote-tracking branch 'upstream/master' into feature/bigtable-…
johnjcasey Dec 28, 2023
d606f89
spotless
johnjcasey Jan 2, 2024
ebc8577
fix merge error
johnjcasey Jan 2, 2024
a420d52
address comments
johnjcasey Jan 3, 2024
c630bc1
add BigtableWriteIT test case. it unexpectedly passes
johnjcasey Jan 4, 2024
5a97efc
support routing more malformed mutation types, clean ups based on PR …
johnjcasey Jan 8, 2024
8963686
make retrys non-async to remove race condition causing data loss
johnjcasey Jan 9, 2024
3086274
spotless
johnjcasey Jan 9, 2024
505319d
update comment on identifying data exception
johnjcasey Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -40,6 +42,15 @@ <RecordT> void route(
String description)
throws Exception;

<RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception;

class ThrowingBadRecordRouter implements BadRecordRouter {

@Override
Expand All @@ -65,6 +76,31 @@ public <RecordT> void route(
throw new RuntimeException("Encountered Bad Record: " + encodedRecord);
}
}

@Override
public <RecordT> void route(
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception {
if (exception != null) {
throw exception;
} else {
Preconditions.checkArgumentNotNull(record);
String encodedRecord =
BadRecord.Record.builder()
.addHumanReadableJson(record)
.build()
.getHumanReadableJsonRecord();
if (encodedRecord == null) {
encodedRecord = "Unable to serialize bad record";
}
throw new RuntimeException("Encountered Bad Record: " + encodedRecord);
}
}
}

class RecordingBadRecordRouter implements BadRecordRouter {
Expand All @@ -81,5 +117,21 @@ public <RecordT> void route(
.get(BAD_RECORD_TAG)
.output(BadRecord.fromExceptionInformation(record, coder, exception, description));
}

@Override
public <RecordT> void route(
DoFn<?, ?>.FinishBundleContext c,
RecordT record,
@Nullable Coder<RecordT> coder,
@Nullable Exception exception,
String description,
BoundedWindow window)
throws Exception {
c.output(
BAD_RECORD_TAG,
BadRecord.fromExceptionInformation(record, coder, exception, description),
window.maxTimestamp(),
window);
}
}
}
Loading
Loading