Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topicPattern property to KafkaIO.Read to match topics using a regex #26948

Merged
merged 3 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
Expand Down Expand Up @@ -350,10 +351,11 @@
* href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
* href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link
* KafkaIO.Read} is, {@link ReadSourceDescriptors} doesn't require source descriptions(e.g., {@link
* KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
* KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the
* pipeline can populate these source descriptions during runtime. For example, the pipeline can
* query Kafka topics from a BigQuery table and read these topics via {@link ReadSourceDescriptors}.
* KafkaIO.Read#getTopicPattern()}, {@link KafkaIO.Read#getTopicPartitions()}, {@link
* KafkaIO.Read#getTopics()}, {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline
* construction time. Instead, the pipeline can populate these source descriptions during runtime.
* For example, the pipeline can query Kafka topics from a BigQuery table and read these topics via
* {@link ReadSourceDescriptors}.
*
* <h3>Common Kafka Consumer Configurations</h3>
*
Expand Down Expand Up @@ -633,6 +635,9 @@ public abstract static class Read<K, V>
@Pure
abstract @Nullable List<TopicPartition> getTopicPartitions();

@Pure
abstract @Nullable Pattern getTopicPattern();

@Pure
abstract @Nullable Coder<K> getKeyCoder();

Expand Down Expand Up @@ -692,6 +697,8 @@ abstract static class Builder<K, V> {

abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);

abstract Builder<K, V> setTopicPattern(Pattern topicPattern);

abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);

abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
Expand Down Expand Up @@ -922,8 +929,9 @@ public Read<K, V> withTopic(String topic) {
*/
public Read<K, V> withTopics(List<String> topics) {
checkState(
getTopicPartitions() == null || getTopicPartitions().isEmpty(),
"Only topics or topicPartitions can be set, not both");
(getTopicPartitions() == null || getTopicPartitions().isEmpty())
&& getTopicPattern() == null,
"Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopics(ImmutableList.copyOf(topics)).build();
}

Expand All @@ -936,11 +944,26 @@ public Read<K, V> withTopics(List<String> topics) {
*/
public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
checkState(
getTopics() == null || getTopics().isEmpty(),
"Only topics or topicPartitions can be set, not both");
(getTopics() == null || getTopics().isEmpty()) && getTopicPattern() == null,
"Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
*
* <p>See {@link KafkaUnboundedSource#split(int, PipelineOptions)} for description of how the
* partitions are distributed among the splits.
*/
public Read<K, V> withTopicPattern(String topicPattern) {
checkState(
(getTopics() == null || getTopics().isEmpty())
&& (getTopicPartitions() == null || getTopicPartitions().isEmpty()),
"Only one of topics, topicPartitions or topicPattern can be set");
return toBuilder().setTopicPattern(Pattern.compile(topicPattern)).build();
}

/**
* Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka.
*
Expand Down Expand Up @@ -1274,8 +1297,9 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
if (!isDynamicRead()) {
checkArgument(
(getTopics() != null && getTopics().size() > 0)
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0),
"Either withTopic(), withTopics() or withTopicPartitions() is required");
|| (getTopicPartitions() != null && getTopicPartitions().size() > 0)
|| getTopicPattern() != null,
"Either withTopic(), withTopics(), withTopicPartitions() or withTopicPattern() is required");
} else {
checkArgument(
ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"),
Expand Down Expand Up @@ -1537,6 +1561,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
kafkaRead.getConsumerConfig(),
kafkaRead.getCheckStopReadingFn(),
topics,
kafkaRead.getTopicPattern(),
kafkaRead.getStartReadTime(),
kafkaRead.getStopReadTime()));
} else {
Expand All @@ -1561,6 +1586,7 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr
this.consumerFactoryFn = read.getConsumerFactoryFn();
this.topics = read.getTopics();
this.topicPartitions = read.getTopicPartitions();
this.topicPattern = read.getTopicPattern();
this.startReadTime = read.getStartReadTime();
this.stopReadTime = read.getStopReadTime();
}
Expand All @@ -1578,15 +1604,30 @@ static class GenerateKafkaSourceDescriptor extends DoFn<byte[], KafkaSourceDescr

@VisibleForTesting final @Nullable List<String> topics;

private final @Nullable Pattern topicPattern;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not annotated with @VisibleForTesting since the property is not accessed directly in tests, but neither are the properties which do specify @VisibleForTesting.


@ProcessElement
public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
List<TopicPartition> partitions =
new ArrayList<>(Preconditions.checkStateNotNull(topicPartitions));
if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
for (String topic : Preconditions.checkStateNotNull(topics)) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
List<String> topics = Preconditions.checkStateNotNull(this.topics);
if (topics.isEmpty()) {
Pattern pattern = Preconditions.checkStateNotNull(topicPattern);
for (Map.Entry<String, List<PartitionInfo>> entry :
consumer.listTopics().entrySet()) {
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo p : entry.getValue()) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
} else {
for (String topic : topics) {
for (PartitionInfo p : consumer.partitionsFor(topic)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this is null at this point? In the split below you have null checks, but not here.

Copy link
Contributor Author

@sjvanrossum sjvanrossum Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this referring to topics? Both topicPartitions and topics are initialized as empty lists in the builder by default and replaced using .withTopics() and .withTopicPartitions(). The previous Preconditions.checkStateNotNull(topics) expression in the for loop should still not be null under any circumstance. Special care should be taken to carry that property forward when we add support for this property in KafkaIO's ExternalTransformRegistrar though, since it doesn't guarantee the same object state the builder guarantees.
In regards to topicPattern, if both topicPartitions and topics are empty, then topicPattern must be non-null, since the PTransform's expansion checks that at least one of those properties is set and the .withX() builder methods check that none are previously set.

As far as Kafka's topic metadata goes, .partitionsFor() will throw an exception if an unauthorized topic is requested and .listTopics() will only list all authorized topics. Both methods return initialized objects and ensure that potential null responses from the server are translated to empty collections (as far back as org.apache.kafka:kafka-clients:0.11.0.3) or throw an exception in the case of an authorization failure. I'd say that the existing check on partitionInfoList seems superfluous and could potentially be considered for deletion:

List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
    partitionInfoList != null,
    "Could not find any partitions info. Please check Kafka configuration and make sure "
        + "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
  partitions.add(new TopicPartition(p.topic(), p.partition()));
}

partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
}
Expand Down Expand Up @@ -1634,12 +1675,16 @@ public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
List<String> topics = Preconditions.checkStateNotNull(getTopics());
List<TopicPartition> topicPartitions = Preconditions.checkStateNotNull(getTopicPartitions());
Pattern topicPattern = getTopicPattern();
if (topics.size() > 0) {
builder.add(DisplayData.item("topics", Joiner.on(",").join(topics)).withLabel("Topic/s"));
} else if (topicPartitions.size() > 0) {
builder.add(
DisplayData.item("topicPartitions", Joiner.on(",").join(topicPartitions))
.withLabel("Topic Partition/s"));
} else if (topicPattern != null) {
builder.add(
DisplayData.item("topicPattern", topicPattern.pattern()).withLabel("Topic Pattern"));
}
Set<String> disallowedConsumerPropertiesKeys =
KafkaIOUtils.DISALLOWED_CONSUMER_PROPERTIES.keySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ enum KafkaIOReadProperties {
CONSUMER_CONFIG,
TOPICS,
TOPIC_PARTITIONS,
TOPIC_PATTERN,
KEY_CODER,
VALUE_CODER,
CONSUMER_FACTORY_FN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.UnboundedSource;
Expand Down Expand Up @@ -65,14 +67,26 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti

if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig())) {
for (String topic : Preconditions.checkStateNotNull(spec.getTopics())) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
List<String> topics = Preconditions.checkStateNotNull(spec.getTopics());
if (topics.isEmpty()) {
Pattern pattern = Preconditions.checkStateNotNull(spec.getTopicPattern());
for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
if (pattern.matcher(entry.getKey()).matches()) {
for (PartitionInfo p : entry.getValue()) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
} else {
for (String topic : topics) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
checkState(
partitionInfoList != null,
"Could not find any partitions info. Please check Kafka configuration and make sure "
+ "that provided topics exist.");
for (PartitionInfo p : partitionInfoList) {
partitions.add(new TopicPartition(p.topic(), p.partition()));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -64,6 +65,7 @@ class WatchForKafkaTopicPartitions extends PTransform<PBegin, PCollection<KafkaS
private final Map<String, Object> kafkaConsumerConfig;
private final @Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn;
private final Set<String> topics;
private final @Nullable Pattern topicPattern;
private final @Nullable Instant startReadTime;
private final @Nullable Instant stopReadTime;

Expand All @@ -73,13 +75,15 @@ public WatchForKafkaTopicPartitions(
Map<String, Object> kafkaConsumerConfig,
@Nullable SerializableFunction<TopicPartition, Boolean> checkStopReadingFn,
Set<String> topics,
@Nullable Pattern topicPattern,
@Nullable Instant startReadTime,
@Nullable Instant stopReadTime) {
this.checkDuration = firstNonNull(checkDuration, DEFAULT_CHECK_DURATION);
this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.checkStopReadingFn = checkStopReadingFn;
this.topics = topics;
this.topicPattern = topicPattern;
this.startReadTime = startReadTime;
this.stopReadTime = stopReadTime;
}
Expand All @@ -91,7 +95,8 @@ public PCollection<KafkaSourceDescriptor> expand(PBegin input) {
.apply(
"Match new TopicPartitions",
Watch.growthOf(
new WatchPartitionFn(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
new WatchPartitionFn(
kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern))
.withPollInterval(checkDuration))
.apply(ParDo.of(new ConvertToDescriptor(checkStopReadingFn, startReadTime, stopReadTime)));
}
Expand Down Expand Up @@ -134,22 +139,27 @@ private static class WatchPartitionFn extends PollFn<byte[], TopicPartition> {
kafkaConsumerFactoryFn;
private final Map<String, Object> kafkaConsumerConfig;
private final Set<String> topics;
private final @Nullable Pattern topicPattern;

private WatchPartitionFn(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
Map<String, Object> kafkaConsumerConfig,
Set<String> topics) {
Set<String> topics,
@Nullable Pattern topicPattern) {
this.kafkaConsumerFactoryFn = kafkaConsumerFactoryFn;
this.kafkaConsumerConfig = kafkaConsumerConfig;
this.topics = topics;
this.topicPattern = topicPattern;
}

@Override
public Watch.Growth.PollResult<TopicPartition> apply(byte[] element, Context c)
throws Exception {
Instant now = Instant.now();
return Watch.Growth.PollResult.incomplete(
now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
now,
getAllTopicPartitions(
kafkaConsumerFactoryFn, kafkaConsumerConfig, topics, topicPattern))
.withWatermark(now);
}
}
Expand All @@ -158,7 +168,8 @@ now, getAllTopicPartitions(kafkaConsumerFactoryFn, kafkaConsumerConfig, topics))
static List<TopicPartition> getAllTopicPartitions(
SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> kafkaConsumerFactoryFn,
Map<String, Object> kafkaConsumerConfig,
Set<String> topics) {
Set<String> topics,
@Nullable Pattern topicPattern) {
List<TopicPartition> current = new ArrayList<>();
try (Consumer<byte[], byte[]> kafkaConsumer =
kafkaConsumerFactoryFn.apply(kafkaConsumerConfig)) {
Expand All @@ -168,12 +179,13 @@ static List<TopicPartition> getAllTopicPartitions(
current.add(new TopicPartition(topic, partition.partition()));
}
}

} else {
for (Map.Entry<String, List<PartitionInfo>> topicInfo :
kafkaConsumer.listTopics().entrySet()) {
for (PartitionInfo partition : topicInfo.getValue()) {
current.add(new TopicPartition(topicInfo.getKey(), partition.partition()));
if (topicPattern == null || topicPattern.matcher(topicInfo.getKey()).matches()) {
for (PartitionInfo partition : topicInfo.getValue()) {
current.add(new TopicPartition(partition.topic(), partition.partition()));
}
}
}
}
Expand Down
Loading