Skip to content

Commit

Permalink
Wire error handling into PubSubIO and add initial tests (#30372)
Browse files Browse the repository at this point in the history
* Wire error handling into PubSubIO and add initial tests

* update comments to reflect that schema mismatches are not routed to DLQs

* spotless

* address nits
  • Loading branch information
johnjcasey authored Feb 28, 2024
1 parent 549faba commit 2b9d958
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 50 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
* Full Support for Storage Read and Write APIs
* Partial Support for File Loads (Failures writing to files supported, failures loading files to BQ unsupported)
* No Support for Extract or Streaming Inserts
* Added support for handling bad records to PubSubIO ([#30372](https://github.com/apache/beam/pull/30372)).
* Support is not available for handling schema mismatches, and enabling error handling for writing to pubsub topics with schemas is not recommended
* `--enableBundling` pipeline option for BigQueryIO DIRECT_READ is replaced by `--enableStorageReadApiV2`. Both were considered experimental and may subject to change (Java) ([#26354](https://github.com/apache/beam/issues/26354)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
Expand All @@ -41,6 +44,12 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;

private final BadRecordRouter badRecordRouter;

private final Coder<InputT> inputCoder;

private final TupleTag<PubsubMessage> outputTag;

static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize)
throws SizeLimitExceededException {
int payloadSize = message.getPayload().length;
Expand Down Expand Up @@ -113,10 +122,16 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
@Nullable
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
int maxPublishBatchSize) {
int maxPublishBatchSize,
BadRecordRouter badRecordRouter,
Coder<InputT> inputCoder,
TupleTag<PubsubMessage> outputTag) {
this.formatFunction = formatFunction;
this.topicFunction = topicFunction;
this.maxPublishBatchSize = maxPublishBatchSize;
this.badRecordRouter = badRecordRouter;
this.inputCoder = inputCoder;
this.outputTag = outputTag;
}

@ProcessElement
Expand All @@ -125,18 +140,42 @@ public void process(
@Timestamp Instant ts,
BoundedWindow window,
PaneInfo paneInfo,
OutputReceiver<PubsubMessage> o) {
MultiOutputReceiver o)
throws Exception {
ValueInSingleWindow<InputT> valueInSingleWindow =
ValueInSingleWindow.of(element, ts, window, paneInfo);
PubsubMessage message = formatFunction.apply(valueInSingleWindow);
PubsubMessage message;
try {
message = formatFunction.apply(valueInSingleWindow);
} catch (Exception e) {
badRecordRouter.route(
o,
element,
inputCoder,
e,
"Failed to serialize PubSub message with provided format function");
return;
}
if (topicFunction != null) {
message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
try {
message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
} catch (Exception e) {
badRecordRouter.route(
o, element, inputCoder, e, "Failed to determine PubSub topic using topic function");
return;
}
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
} catch (SizeLimitExceededException e) {
throw new IllegalArgumentException(e);
badRecordRouter.route(
o,
element,
inputCoder,
new IllegalArgumentException(e),
"PubSub message limit exceeded, see exception for details");
return;
}
o.output(message);
o.get(outputTag).output(message);
}
}
Loading

0 comments on commit 2b9d958

Please sign in to comment.