From 5a6029a6258b9b2c636c343043080c2f81e3190e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 4 Aug 2022 18:16:05 -0700 Subject: [PATCH 01/32] Scaffolded `PulsarSource` for DeltaStreamer --- .../apache/hudi/HoodieConversionUtils.scala | 9 +++ hudi-utilities/pom.xml | 7 +++ .../hudi/utilities/sources/PulsarSource.java | 56 +++++++++++++++++++ pom.xml | 3 +- 4 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala index 547c6aed628cc..8bc6796832c18 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala @@ -20,8 +20,17 @@ package org.apache.hudi import org.apache.hudi.common.config.TypedProperties +import java.{util => ju} +import scala.collection.JavaConverters + object HoodieConversionUtils { + /** + * TODO scala-doc + */ + def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = + JavaConverters.mapAsScalaMap(map).toMap + def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] = if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else org.apache.hudi.common.util.Option.empty() diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 322297dfdadf1..112f185366cc6 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -171,6 +171,13 @@ + + + io.streamnative.connectors + pulsar-spark-connector_${scala.binary.version} + ${pulsar.spark.version} + + org.apache.logging.log4j diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java new file mode 100644 index 0000000000000..7d75072fd79f8 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -0,0 +1,56 @@ +package org.apache.hudi.utilities.sources; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.HoodieConversionUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.pulsar.JsonUtils; +import scala.collection.JavaConverters; +import scala.collection.convert.AsScalaConverters; + +import java.util.Collections; + + +public class PulsarSource extends RowSource { + public PulsarSource(TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + } + + @Override + protected Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + String topic = "test-topic"; + MessageId startingOffset = MessageId.earliest; + MessageId endingOffset = MessageId.latest; + + String startingOffsets = convertToOffsetString(topic, startingOffset); + String endingOffsets = convertToOffsetString(topic, endingOffset); + + Dataset rows = sparkSession.read() + .format("pulsar") + .option("service.url", "pulsar://localhost:6650") + .option("topics", "topic1,topic2") + .option("startingOffsets", startingOffsets) + .option("endingOffsets", endingOffsets) + .load(); + + return Pair.of(Option.of(rows), endingOffsets); + } + + private static String convertToOffsetString(String topic, MessageId startingOffset) { + return JsonUtils.topicOffsets(HoodieConversionUtils.mapAsScalaImmutableMap(Collections.singletonMap(topic, startingOffset))); + } +} diff --git a/pom.xml b/pom.xml index 54be08f6c03ba..4ca37fc6c32d8 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,8 @@ 2.10.0 2.0.0 2.4.1 - 2.8.1 + 2.10.1 + 3.1.1.4 5.3.4 2.17 3.0.1-b12 From e4e16494f9d2fa1f12e13b3d774813b228a73fed Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 10 Aug 2022 18:38:38 -0700 Subject: [PATCH 02/32] Make `PulsarSource` to be properly configured --- .../hudi/utilities/sources/PulsarSource.java | 127 +++++++++++++++--- 1 file changed, 112 insertions(+), 15 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 7d75072fd79f8..f9fc3fae0697c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -1,56 +1,153 @@ package org.apache.hudi.utilities.sources; -import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.DataSourceUtils; import org.apache.hudi.HoodieConversionUtils; +import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.pulsar.JsonUtils; -import scala.collection.JavaConverters; -import scala.collection.convert.AsScalaConverters; +import java.util.Arrays; import java.util.Collections; public class PulsarSource extends RowSource { + + private static final String[] PULSAR_META_FIELDS = new String[] { + "__key", + "__topic", + "__messageId", + "__publishTime", + "__eventTime", + "__messageProperties" + }; + + private final String topicName; + private final String endpointURL; + public PulsarSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider) { super(props, sparkContext, sparkSession, schemaProvider); + + DataSourceUtils.checkRequiredProperties(props, + Arrays.asList( + Config.PULSAR_SOURCE_TOPIC_NAME.key(), + Config.PULSAR_SOURCE_ENDPOINT_URL.key())); + + this.topicName = props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key()); + // TODO validate endpoint provided in the appropriate format + this.endpointURL = props.getString(Config.PULSAR_SOURCE_ENDPOINT_URL.key()); } @Override - protected Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { - String topic = "test-topic"; + protected Pair>, String> fetchNextBatch(Option lastCheckpointStr, long sourceLimit) { + Pair startingEndingOffsetsPair = computeOffsets(lastCheckpointStr, sourceLimit); MessageId startingOffset = MessageId.earliest; MessageId endingOffset = MessageId.latest; - String startingOffsets = convertToOffsetString(topic, startingOffset); - String endingOffsets = convertToOffsetString(topic, endingOffset); + String startingOffsets = convertToOffsetString(topicName, startingOffset); + String endingOffsets = convertToOffsetString(topicName, endingOffset); + + // + // TODO + // - Handle checkpoints/offsets + // - From --checkpoint param + // - From persistence? + // - Dataset rows = sparkSession.read() + Dataset sourceRows = sparkSession.read() .format("pulsar") - .option("service.url", "pulsar://localhost:6650") - .option("topics", "topic1,topic2") + .option("service.url", endpointURL) + .option("topics", topicName) .option("startingOffsets", startingOffsets) .option("endingOffsets", endingOffsets) .load(); - return Pair.of(Option.of(rows), endingOffsets); + return Pair.of(Option.of(transform(sourceRows)), endingOffsets); + } + + private Dataset transform(Dataset rows) { + return rows.drop(PULSAR_META_FIELDS); + } + + private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { + MessageId startingOffset = fetchStartingOffset(lastCheckpointStrOpt); + + MessageId endingOffset; + Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); + + return null; + } + + private MessageId fetchStartingOffset(Option lastCheckpointStrOpt) { + return lastCheckpointStrOpt.map(lastCheckpoint -> { + lastCheckpoint + }) + .orElseGet(() -> { + Config.OffsetAutoResetStrategy autoResetStrategy = Config.OffsetAutoResetStrategy.valueOf( + props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(), + Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name())); + + switch (autoResetStrategy) { + case LATEST: + return MessageId.latest; + case EARLIEST: + return MessageId.earliest; + case FAIL: + throw new IllegalArgumentException("No checkpoint has been provided!"); + default: + throw new UnsupportedOperationException("Unsupported offset auto-reset strategy"); + } + }); + } + + private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties props) { + if (sourceLimit < Long.MAX_VALUE) { + return sourceLimit; + } else { + return props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(), + Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue()); + } } private static String convertToOffsetString(String topic, MessageId startingOffset) { return JsonUtils.topicOffsets(HoodieConversionUtils.mapAsScalaImmutableMap(Collections.singletonMap(topic, startingOffset))); } + + // TODO unify w/ Kafka + public static class Config { + private static final ConfigProperty PULSAR_SOURCE_TOPIC_NAME = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.topic") + .noDefaultValue() + .withDocumentation("Name of the target Pulsar topic to source data from"); + + private static final ConfigProperty PULSAR_SOURCE_ENDPOINT_URL = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.endpointUrl") + .noDefaultValue() + .withDocumentation("URL of the target Pulsar endpoint (of the form 'pulsar://host:port'"); + + public enum OffsetAutoResetStrategy { + LATEST, EARLIEST, FAIL + } + + private static final ConfigProperty PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy") + .defaultValue(OffsetAutoResetStrategy.LATEST) + .withDocumentation("Policy determining how offsets shall be automatically reset in case there's " + + "no checkpoint information present"); + + public static final ConfigProperty PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.maxRecords") + .defaultValue(10_000_000L) + .withDocumentation("Max number of records obtained in a single each batch"); + } } From c4d6ccb0b745e1e993ff44ec28b24089148edbcf Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 10 Aug 2022 18:43:04 -0700 Subject: [PATCH 03/32] Implemented proper offset-handling --- .../hudi/utilities/sources/PulsarSource.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index f9fc3fae0697c..6e72b5b947ff2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -51,11 +51,9 @@ public PulsarSource(TypedProperties props, @Override protected Pair>, String> fetchNextBatch(Option lastCheckpointStr, long sourceLimit) { Pair startingEndingOffsetsPair = computeOffsets(lastCheckpointStr, sourceLimit); - MessageId startingOffset = MessageId.earliest; - MessageId endingOffset = MessageId.latest; - String startingOffsets = convertToOffsetString(topicName, startingOffset); - String endingOffsets = convertToOffsetString(topicName, endingOffset); + MessageId startingOffset = startingEndingOffsetsPair.getLeft(); + MessageId endingOffset = startingEndingOffsetsPair.getRight(); // // TODO @@ -68,11 +66,11 @@ protected Pair>, String> fetchNextBatch(Option lastC .format("pulsar") .option("service.url", endpointURL) .option("topics", topicName) - .option("startingOffsets", startingOffsets) - .option("endingOffsets", endingOffsets) + .option("startingOffsets", convertToOffsetString(topicName, startingOffset)) + .option("endingOffsets", convertToOffsetString(topicName, endingOffset)) .load(); - return Pair.of(Option.of(transform(sourceRows)), endingOffsets); + return Pair.of(Option.of(transform(sourceRows)), convertToOffsetString(topicName, endingOffset)); } private Dataset transform(Dataset rows) { @@ -85,13 +83,12 @@ private Pair computeOffsets(Option lastCheckpointS MessageId endingOffset; Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); - return null; + return Pair.of(startingOffset, endingOffset); } private MessageId fetchStartingOffset(Option lastCheckpointStrOpt) { - return lastCheckpointStrOpt.map(lastCheckpoint -> { - lastCheckpoint - }) + return lastCheckpointStrOpt + .map(lastCheckpoint -> JsonUtils.topicOffsets(lastCheckpoint).apply(topicName)) .orElseGet(() -> { Config.OffsetAutoResetStrategy autoResetStrategy = Config.OffsetAutoResetStrategy.valueOf( props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(), From e2c9e3ac7359a7761de5326284cefc66765806e1 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 14:19:12 -0700 Subject: [PATCH 04/32] Fixing `DebeziumSource` to commit offsets to Kafka --- .../utilities/sources/debezium/DebeziumSource.java | 8 ++++++++ .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 10 +++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java index d9be692b5bc57..f05cd12d99d47 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java @@ -239,5 +239,13 @@ private static Dataset convertArrayColumnsToString(Dataset dataset) { return dataset; } + + @Override + public void onCommit(String lastCkptStr) { + if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), + KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { + offsetGen.commitOffsetToKafka(lastCkptStr); + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index cc577621fbf7c..49dd63f550a16 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -110,11 +110,13 @@ public static OffsetRange[] computeOffsetRanges(Map fromOf Comparator byPartition = Comparator.comparing(OffsetRange::partition); // Create initial offset ranges for each 'to' partition, with from = to offsets. - OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; - toOffsetMap.keySet().stream().map(tp -> { + OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> { long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); return OffsetRange.create(tp, fromOffset, fromOffset); - }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); + }) + .sorted(byPartition) + .collect(Collectors.toList()) + .toArray(new OffsetRange[toOffsetMap.size()]); long allocedEvents = 0; Set exhaustedPartitions = new HashSet<>(); @@ -290,6 +292,7 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long numEvents = sourceLimit; } + // TODO remove if (numEvents < toOffsets.size()) { throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); } @@ -309,6 +312,7 @@ private List fetchPartitionInfos(KafkaConsumer consumer, String t List partitionInfos; do { + // TODO cleanup, introduce retrying client partitionInfos = consumer.partitionsFor(topicName); try { TimeUnit.SECONDS.sleep(10); From 9e590a279647808a96083e139494ef9931a65425 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 17:24:07 -0700 Subject: [PATCH 05/32] Fixing compilation --- .../apache/hudi/utilities/sources/PulsarSource.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 6e72b5b947ff2..4a2cea744d8e2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -17,7 +17,9 @@ import java.util.Arrays; import java.util.Collections; - +/** + * TODO java-doc + */ public class PulsarSource extends RowSource { private static final String[] PULSAR_META_FIELDS = new String[] { @@ -74,7 +76,7 @@ protected Pair>, String> fetchNextBatch(Option lastC } private Dataset transform(Dataset rows) { - return rows.drop(PULSAR_META_FIELDS); + return rows.drop(PULSAR_META_FIELDS); } private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { @@ -139,8 +141,8 @@ public enum OffsetAutoResetStrategy { private static final ConfigProperty PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy") .defaultValue(OffsetAutoResetStrategy.LATEST) - .withDocumentation("Policy determining how offsets shall be automatically reset in case there's " + - "no checkpoint information present"); + .withDocumentation("Policy determining how offsets shall be automatically reset in case there's " + + "no checkpoint information present"); public static final ConfigProperty PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP = ConfigProperty .key("hoodie.deltastreamer.source.pulsar.maxRecords") From f132f7197b221be097d18aa0660afbfe703a6dc8 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 17:27:48 -0700 Subject: [PATCH 06/32] Make admin-endpoint URL configurable as well --- .../hudi/utilities/sources/PulsarSource.java | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 4a2cea744d8e2..4bc851531a562 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -32,7 +32,8 @@ public class PulsarSource extends RowSource { }; private final String topicName; - private final String endpointURL; + private final String serviceEndpointURL; + private final String adminEndpointURL; public PulsarSource(TypedProperties props, JavaSparkContext sparkContext, @@ -43,11 +44,12 @@ public PulsarSource(TypedProperties props, DataSourceUtils.checkRequiredProperties(props, Arrays.asList( Config.PULSAR_SOURCE_TOPIC_NAME.key(), - Config.PULSAR_SOURCE_ENDPOINT_URL.key())); + Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key())); this.topicName = props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key()); - // TODO validate endpoint provided in the appropriate format - this.endpointURL = props.getString(Config.PULSAR_SOURCE_ENDPOINT_URL.key()); + // TODO validate endpoints provided in the appropriate format + this.serviceEndpointURL = props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()); + this.adminEndpointURL = props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key()); } @Override @@ -66,7 +68,8 @@ protected Pair>, String> fetchNextBatch(Option lastC Dataset sourceRows = sparkSession.read() .format("pulsar") - .option("service.url", endpointURL) + .option("service.url", serviceEndpointURL) + .option("admin.url", adminEndpointURL) .option("topics", topicName) .option("startingOffsets", convertToOffsetString(topicName, startingOffset)) .option("endingOffsets", convertToOffsetString(topicName, endingOffset)) @@ -82,8 +85,8 @@ private Dataset transform(Dataset rows) { private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { MessageId startingOffset = fetchStartingOffset(lastCheckpointStrOpt); - MessageId endingOffset; Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); + MessageId endingOffset = MessageId.latest; return Pair.of(startingOffset, endingOffset); } @@ -119,7 +122,9 @@ private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties p } private static String convertToOffsetString(String topic, MessageId startingOffset) { - return JsonUtils.topicOffsets(HoodieConversionUtils.mapAsScalaImmutableMap(Collections.singletonMap(topic, startingOffset))); + return JsonUtils.topicOffsets( + HoodieConversionUtils.mapAsScalaImmutableMap( + Collections.singletonMap(topic, startingOffset))); } // TODO unify w/ Kafka @@ -129,9 +134,14 @@ public static class Config { .noDefaultValue() .withDocumentation("Name of the target Pulsar topic to source data from"); - private static final ConfigProperty PULSAR_SOURCE_ENDPOINT_URL = ConfigProperty - .key("hoodie.deltastreamer.source.pulsar.endpointUrl") - .noDefaultValue() + private static final ConfigProperty PULSAR_SOURCE_SERVICE_ENDPOINT_URL = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.endpoint.service.url") + .defaultValue("pulsar://localhost:6650") + .withDocumentation("URL of the target Pulsar endpoint (of the form 'pulsar://host:port'"); + + private static final ConfigProperty PULSAR_SOURCE_ADMIN_ENDPOINT_URL = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.endpoint.admin.url") + .defaultValue("http://localhost:8080") .withDocumentation("URL of the target Pulsar endpoint (of the form 'pulsar://host:port'"); public enum OffsetAutoResetStrategy { From 7319ec308625df42c14fb0681bfe24c967caad7b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 17:36:35 -0700 Subject: [PATCH 07/32] Missing license --- .../hudi/utilities/sources/PulsarSource.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 4bc851531a562..2660888144c17 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hudi.utilities.sources; import org.apache.hudi.DataSourceUtils; From 3d9823d749b6ec704b85699de5678f3b5710f238 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 17:43:18 -0700 Subject: [PATCH 08/32] Make pulsar-spark-connector dep be provided --- hudi-utilities/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 112f185366cc6..da5a42749ba6d 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -176,6 +176,7 @@ io.streamnative.connectors pulsar-spark-connector_${scala.binary.version} ${pulsar.spark.version} + provided From ebf95c62810f7edb039cab3df7659bcd9b9b0b2e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 17:48:10 -0700 Subject: [PATCH 09/32] Make sure "pulsar-client-api" is packaged into "hudi-utilities-bundle" --- packaging/hudi-utilities-bundle/pom.xml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 75f8ecdefd988..45005ad719113 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -136,6 +136,9 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + org.apache.pulsar:pulsar-client + org.apache.pulsar:pulsar-client-api + org.apache.pulsar:pulsar-client-admin-api org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version} org.apache.kafka:kafka_${scala.binary.version} From f9836a569f3f89ee6574353332016c916f141db8 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 18:38:44 -0700 Subject: [PATCH 10/32] Fetch ending offset from Pulsar (batch API doesn't permit using "latest" shortcut) --- .../hudi/utilities/sources/PulsarSource.java | 62 +++++++++++++++++-- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 2660888144c17..a04e74ac45c0a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -24,8 +24,16 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.util.Lazy; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -40,6 +48,8 @@ */ public class PulsarSource extends RowSource { + private static final Logger LOG = LogManager.getLogger(PulsarSource.class); + private static final String[] PULSAR_META_FIELDS = new String[] { "__key", "__topic", @@ -50,9 +60,14 @@ public class PulsarSource extends RowSource { }; private final String topicName; + private final String serviceEndpointURL; private final String adminEndpointURL; + // TODO dedupe + private final Lazy pulsarClient; + private final Lazy> pulsarConsumer; + public PulsarSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, @@ -65,9 +80,13 @@ public PulsarSource(TypedProperties props, Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key())); this.topicName = props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key()); + // TODO validate endpoints provided in the appropriate format this.serviceEndpointURL = props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()); this.adminEndpointURL = props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key()); + + this.pulsarClient = Lazy.lazily(this::initPulsarClient); + this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic); } @Override @@ -79,9 +98,9 @@ protected Pair>, String> fetchNextBatch(Option lastC // // TODO - // - Handle checkpoints/offsets - // - From --checkpoint param - // - From persistence? + // - [P0] Commit offsets (to Pulsar) + // - [P0] Add support for schema-provider + // - [P1] Add support for auth // Dataset sourceRows = sparkSession.read() @@ -104,11 +123,21 @@ private Pair computeOffsets(Option lastCheckpointS MessageId startingOffset = fetchStartingOffset(lastCheckpointStrOpt); Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); - MessageId endingOffset = MessageId.latest; + + MessageId endingOffset = fetchEndingOffset(); return Pair.of(startingOffset, endingOffset); } + private MessageId fetchEndingOffset() { + try { + return pulsarConsumer.get().getLastMessageId(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to fetch latest messageId for topic '%s'", topicName), e); + throw new HoodieIOException("Failed to fetch latest messageId for topic", e); + } + } + private MessageId fetchStartingOffset(Option lastCheckpointStrOpt) { return lastCheckpointStrOpt .map(lastCheckpoint -> JsonUtils.topicOffsets(lastCheckpoint).apply(topicName)) @@ -130,6 +159,31 @@ private MessageId fetchStartingOffset(Option lastCheckpointStrOpt) { }); } + private Consumer subscribeToTopic() { + try { + return pulsarClient.get() + .newConsumer() + .topic(topicName) + .subscriptionName("hudi-pulsar-consumer") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", topicName), e); + throw new HoodieIOException("Failed to subscribe to Pulsar topic", e); + } + } + + private PulsarClient initPulsarClient() { + try { + return PulsarClient.builder() + .serviceUrl(serviceEndpointURL) + .build(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to init Pulsar client connecting to '%s'", serviceEndpointURL), e); + throw new HoodieIOException("Failed to init Pulsar client", e); + } + } + private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties props) { if (sourceLimit < Long.MAX_VALUE) { return sourceLimit; From 0ffb51a3c58cc3b8095d10b1cb924320c4884a34 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 18:48:15 -0700 Subject: [PATCH 11/32] Fetch latest offset instead of using `MessageId.latest` shortcut --- .../hudi/utilities/sources/PulsarSource.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index a04e74ac45c0a..1b93310b6c057 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -120,16 +120,25 @@ private Dataset transform(Dataset rows) { } private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { - MessageId startingOffset = fetchStartingOffset(lastCheckpointStrOpt); + MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt); Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); - MessageId endingOffset = fetchEndingOffset(); + MessageId endingOffset = fetchLatestOffset(); return Pair.of(startingOffset, endingOffset); } - private MessageId fetchEndingOffset() { + private void ackOffset(MessageId latestConsumedOffset) { + try { + pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", latestConsumedOffset, topicName), e); + throw new HoodieIOException("Failed to ack message for topic", e); + } + } + + private MessageId fetchLatestOffset() { try { return pulsarConsumer.get().getLastMessageId(); } catch (PulsarClientException e) { @@ -138,7 +147,7 @@ private MessageId fetchEndingOffset() { } } - private MessageId fetchStartingOffset(Option lastCheckpointStrOpt) { + private MessageId decodeStartingOffset(Option lastCheckpointStrOpt) { return lastCheckpointStrOpt .map(lastCheckpoint -> JsonUtils.topicOffsets(lastCheckpoint).apply(topicName)) .orElseGet(() -> { @@ -148,7 +157,7 @@ private MessageId fetchStartingOffset(Option lastCheckpointStrOpt) { switch (autoResetStrategy) { case LATEST: - return MessageId.latest; + return fetchLatestOffset(); case EARLIEST: return MessageId.earliest; case FAIL: From 0c0aa09df6c6ee8e8b3841b1a45a27ab46864b82 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 11 Aug 2022 18:48:29 -0700 Subject: [PATCH 12/32] Ack latest consumed offset --- .../org/apache/hudi/utilities/sources/PulsarSource.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 1b93310b6c057..5d90080aae300 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -98,7 +98,6 @@ protected Pair>, String> fetchNextBatch(Option lastC // // TODO - // - [P0] Commit offsets (to Pulsar) // - [P0] Add support for schema-provider // - [P1] Add support for auth // @@ -115,6 +114,12 @@ protected Pair>, String> fetchNextBatch(Option lastC return Pair.of(Option.of(transform(sourceRows)), convertToOffsetString(topicName, endingOffset)); } + @Override + public void onCommit(String lastCheckpointStr) { + MessageId latestConsumedOffset = JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName); + ackOffset(latestConsumedOffset); + } + private Dataset transform(Dataset rows) { return rows.drop(PULSAR_META_FIELDS); } From eed8b9bc7418e82b99fbb814ce4d0cf62e5659b7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 11:27:31 -0700 Subject: [PATCH 13/32] Removing "pulsar-client" jars from "hudi-utilities-bundle" since they are not compatible w/ "pulsar-spark-connector" (due to using of cyclic annotations) --- packaging/hudi-utilities-bundle/pom.xml | 3 --- 1 file changed, 3 deletions(-) diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 45005ad719113..75f8ecdefd988 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -136,9 +136,6 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common - org.apache.pulsar:pulsar-client - org.apache.pulsar:pulsar-client-api - org.apache.pulsar:pulsar-client-admin-api org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version} org.apache.kafka:kafka_${scala.binary.version} From 35eccfa8808c7472c3e0bf8489d46b217d26545f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 12:04:37 -0700 Subject: [PATCH 14/32] Fixing subscription to be exclusive to be able to ack "cumulatively" --- .../java/org/apache/hudi/utilities/sources/PulsarSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 5d90080aae300..99ea924d8f225 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -127,6 +127,7 @@ private Dataset transform(Dataset rows) { private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt); + // TODO support capping the amount of records fetched Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); MessageId endingOffset = fetchLatestOffset(); @@ -179,7 +180,7 @@ private Consumer subscribeToTopic() { .newConsumer() .topic(topicName) .subscriptionName("hudi-pulsar-consumer") - .subscriptionType(SubscriptionType.Shared) + .subscriptionType(SubscriptionType.Exclusive) .subscribe(); } catch (PulsarClientException e) { LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", topicName), e); From ed2fd411f2847f38f6a73602e22980e79ba07e75 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 12:13:17 -0700 Subject: [PATCH 15/32] Canonicalize topic's name properly --- .../hudi/utilities/sources/PulsarSource.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 99ea924d8f225..baec21feac07b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -50,6 +51,7 @@ public class PulsarSource extends RowSource { private static final Logger LOG = LogManager.getLogger(PulsarSource.class); + private static final String HUDI_PULSAR_CONSUMER_ID = "hudi-pulsar-consumer"; private static final String[] PULSAR_META_FIELDS = new String[] { "__key", "__topic", @@ -79,7 +81,8 @@ public PulsarSource(TypedProperties props, Config.PULSAR_SOURCE_TOPIC_NAME.key(), Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key())); - this.topicName = props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key()); + // Converting to a descriptor allows us to canonicalize the topic's name properly + this.topicName = TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString(); // TODO validate endpoints provided in the appropriate format this.serviceEndpointURL = props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()); @@ -102,16 +105,19 @@ protected Pair>, String> fetchNextBatch(Option lastC // - [P1] Add support for auth // + String startingOffsetStr = convertToOffsetString(topicName, startingOffset); + String endingOffsetStr = convertToOffsetString(topicName, endingOffset); + Dataset sourceRows = sparkSession.read() .format("pulsar") .option("service.url", serviceEndpointURL) .option("admin.url", adminEndpointURL) .option("topics", topicName) - .option("startingOffsets", convertToOffsetString(topicName, startingOffset)) - .option("endingOffsets", convertToOffsetString(topicName, endingOffset)) + .option("startingOffsets", startingOffsetStr) + .option("endingOffsets", endingOffsetStr) .load(); - return Pair.of(Option.of(transform(sourceRows)), convertToOffsetString(topicName, endingOffset)); + return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr); } @Override @@ -179,7 +185,7 @@ private Consumer subscribeToTopic() { return pulsarClient.get() .newConsumer() .topic(topicName) - .subscriptionName("hudi-pulsar-consumer") + .subscriptionName(HUDI_PULSAR_CONSUMER_ID) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); } catch (PulsarClientException e) { From 7b892b3125c228453767b22e0d218c116104995c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 12:18:10 -0700 Subject: [PATCH 16/32] Make `PulsarSource` closeable --- .../hudi/utilities/sources/PulsarSource.java | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index baec21feac07b..8384c058e8456 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -41,13 +41,15 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.pulsar.JsonUtils; +import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; /** * TODO java-doc */ -public class PulsarSource extends RowSource { +public class PulsarSource extends RowSource implements Closeable { private static final Logger LOG = LogManager.getLogger(PulsarSource.class); @@ -66,7 +68,7 @@ public class PulsarSource extends RowSource { private final String serviceEndpointURL; private final String adminEndpointURL; - // TODO dedupe + // NOTE: We're keeping the client so that we can shut it down properly private final Lazy pulsarClient; private final Lazy> pulsarConsumer; @@ -141,24 +143,6 @@ private Pair computeOffsets(Option lastCheckpointS return Pair.of(startingOffset, endingOffset); } - private void ackOffset(MessageId latestConsumedOffset) { - try { - pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset); - } catch (PulsarClientException e) { - LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", latestConsumedOffset, topicName), e); - throw new HoodieIOException("Failed to ack message for topic", e); - } - } - - private MessageId fetchLatestOffset() { - try { - return pulsarConsumer.get().getLastMessageId(); - } catch (PulsarClientException e) { - LOG.error(String.format("Failed to fetch latest messageId for topic '%s'", topicName), e); - throw new HoodieIOException("Failed to fetch latest messageId for topic", e); - } - } - private MessageId decodeStartingOffset(Option lastCheckpointStrOpt) { return lastCheckpointStrOpt .map(lastCheckpoint -> JsonUtils.topicOffsets(lastCheckpoint).apply(topicName)) @@ -180,6 +164,24 @@ private MessageId decodeStartingOffset(Option lastCheckpointStrOpt) { }); } + private void ackOffset(MessageId latestConsumedOffset) { + try { + pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", latestConsumedOffset, topicName), e); + throw new HoodieIOException("Failed to ack message for topic", e); + } + } + + private MessageId fetchLatestOffset() { + try { + return pulsarConsumer.get().getLastMessageId(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to fetch latest messageId for topic '%s'", topicName), e); + throw new HoodieIOException("Failed to fetch latest messageId for topic", e); + } + } + private Consumer subscribeToTopic() { try { return pulsarClient.get() @@ -205,6 +207,11 @@ private PulsarClient initPulsarClient() { } } + @Override + public void close() throws IOException { + pulsarClient.get().close(); + } + private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties props) { if (sourceLimit < Long.MAX_VALUE) { return sourceLimit; From 16cf07e6ec8065e4041bd1bcab3b9b4c3494068d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 14:09:46 -0700 Subject: [PATCH 17/32] Make `SourceFormatAdapter` closeable; Make sure `DeltaSync` properly shuts down source upon termination --- .../hudi/utilities/deltastreamer/DeltaSync.java | 10 ++++++++-- .../deltastreamer/HoodieDeltaStreamer.java | 2 +- .../deltastreamer/SourceFormatAdapter.java | 17 ++++++++++++++++- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index f3d9af3150706..15c27bee109dc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -91,6 +91,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -124,7 +125,7 @@ /** * Sync's one batch of data to hoodie table. */ -public class DeltaSync implements Serializable { +public class DeltaSync implements Serializable, Closeable { private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(DeltaSync.class); @@ -338,6 +339,7 @@ public Pair, JavaRDD> syncOnce() throws IOException metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis()); + // TODO revisit (too early to unpersist) // Clear persistent RDDs jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); return result; @@ -895,11 +897,15 @@ private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) { * Close all resources. */ public void close() { - if (null != writeClient) { + if (writeClient != null) { writeClient.close(); writeClient = null; } + if (formatAdapter != null) { + formatAdapter.close(); + } + LOG.info("Shutting down embedded timeline server"); if (embeddedTimelineService.isPresent()) { embeddedTimelineService.get().stop(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index a22a3581ae94a..c4146662d30c0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -827,7 +827,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { * Close all resources. */ public void close() { - if (null != deltaSync) { + if (deltaSync != null) { deltaSync.close(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 1260acb1ce408..3514ace829ab5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -21,6 +21,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -38,13 +39,16 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; +import java.io.Closeable; +import java.io.IOException; + import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; /** * Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer). */ -public final class SourceFormatAdapter { +public final class SourceFormatAdapter implements Closeable { private final Source source; @@ -123,4 +127,15 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS public Source getSource() { return source; } + + @Override + public void close() { + if (source instanceof Closeable) { + try { + ((Closeable) source).close(); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to shutdown the source (%s)", source.getClass().getName()), e); + } + } + } } From 55658694094c339316e2f751a1a51952ed781ab7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 14:51:12 -0700 Subject: [PATCH 18/32] Worked around Pulsar's client failure to shutdown properly --- .../hudi/utilities/sources/PulsarSource.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 8384c058e8456..55c4afe499f0c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -209,7 +210,7 @@ private PulsarClient initPulsarClient() { @Override public void close() throws IOException { - pulsarClient.get().close(); + shutdownPulsarClient(pulsarClient.get()); } private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties props) { @@ -227,6 +228,19 @@ private static String convertToOffsetString(String topic, MessageId startingOffs Collections.singletonMap(topic, startingOffset))); } + private static void shutdownPulsarClient(PulsarClient client) throws PulsarClientException { + client.close(); + // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 3.1.1.4) is not + // shutting down event-loop group properly, so we had to shut it down manually + try { + ((PulsarClientImpl) client).eventLoopGroup() + .shutdownGracefully() + .await(); + } catch (InterruptedException e) { + // No-op + } + } + // TODO unify w/ Kafka public static class Config { private static final ConfigProperty PULSAR_SOURCE_TOPIC_NAME = ConfigProperty From 871afb102f94db1fc8e7a769cb5632d0c131a7a8 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 15:04:21 -0700 Subject: [PATCH 19/32] Generate unique subscription-id (for isolation); Tidying up --- .../hudi/utilities/sources/PulsarSource.java | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 55c4afe499f0c..c691bc5e4658b 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -48,13 +48,13 @@ import java.util.Collections; /** - * TODO java-doc + * Source fetching data from Pulsar topics */ public class PulsarSource extends RowSource implements Closeable { private static final Logger LOG = LogManager.getLogger(PulsarSource.class); - private static final String HUDI_PULSAR_CONSUMER_ID = "hudi-pulsar-consumer"; + private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d"; private static final String[] PULSAR_META_FIELDS = new String[] { "__key", "__topic", @@ -102,12 +102,6 @@ protected Pair>, String> fetchNextBatch(Option lastC MessageId startingOffset = startingEndingOffsetsPair.getLeft(); MessageId endingOffset = startingEndingOffsetsPair.getRight(); - // - // TODO - // - [P0] Add support for schema-provider - // - [P1] Add support for auth - // - String startingOffsetStr = convertToOffsetString(topicName, startingOffset); String endingOffsetStr = convertToOffsetString(topicName, endingOffset); @@ -135,12 +129,11 @@ private Dataset transform(Dataset rows) { private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt); + MessageId endingOffset = fetchLatestOffset(); // TODO support capping the amount of records fetched Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); - MessageId endingOffset = fetchLatestOffset(); - return Pair.of(startingOffset, endingOffset); } @@ -185,10 +178,13 @@ private MessageId fetchLatestOffset() { private Consumer subscribeToTopic() { try { + // NOTE: We're generating unique subscription-id to make sure that subsequent invocation + // of the DS, do not interfere w/ each other + String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, System.currentTimeMillis()); return pulsarClient.get() .newConsumer() .topic(topicName) - .subscriptionName(HUDI_PULSAR_CONSUMER_ID) + .subscriptionName(subscriptionId) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); } catch (PulsarClientException e) { @@ -241,7 +237,6 @@ private static void shutdownPulsarClient(PulsarClient client) throws PulsarClien } } - // TODO unify w/ Kafka public static class Config { private static final ConfigProperty PULSAR_SOURCE_TOPIC_NAME = ConfigProperty .key("hoodie.deltastreamer.source.pulsar.topic") From a99ac5bd30282416ba2a964fa306504da11eff5c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 15:10:08 -0700 Subject: [PATCH 20/32] Tidying up --- .../src/main/scala/org/apache/hudi/HoodieConversionUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala index 8bc6796832c18..f278d44c94005 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala @@ -26,7 +26,8 @@ import scala.collection.JavaConverters object HoodieConversionUtils { /** - * TODO scala-doc + * Converts Java's [[ju.Map]] into Scala's (immutable) [[Map]] (by defautl [[JavaConverters]] convert to + * a mutable one) */ def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = JavaConverters.mapAsScalaMap(map).toMap From db7f5a251a0af69cac26f01e0a754995ce184c9e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 16:30:53 -0700 Subject: [PATCH 21/32] Fixing compilation in Scala 2.11 --- .../scala/org/apache/hudi/HoodieConversionUtils.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala index f278d44c94005..82c65705fbb00 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala @@ -26,11 +26,14 @@ import scala.collection.JavaConverters object HoodieConversionUtils { /** - * Converts Java's [[ju.Map]] into Scala's (immutable) [[Map]] (by defautl [[JavaConverters]] convert to + * Converts Java's [[ju.Map]] into Scala's (immutable) [[Map]] (by default [[JavaConverters]] convert to * a mutable one) */ - def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = - JavaConverters.mapAsScalaMap(map).toMap + def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = { + // NOTE: We have to use deprecated [[JavaConversions]] to stay compatible w/ Scala 2.11 + import scala.collection.JavaConversions.mapAsScalaMap + map.toMap + } def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] = if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else org.apache.hudi.common.util.Option.empty() From 1bf346e84c0355ac2595cf517b629e30912c186f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 16:47:08 -0700 Subject: [PATCH 22/32] Fixing subscription to always start at the beginning of the topic --- .../hudi/utilities/sources/PulsarSource.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index c691bc5e4658b..71aa5dd41f8ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; @@ -185,6 +186,7 @@ private Consumer subscribeToTopic() { .newConsumer() .topic(topicName) .subscriptionName(subscriptionId) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) .subscribe(); } catch (PulsarClientException e) { @@ -232,6 +234,20 @@ private static void shutdownPulsarClient(PulsarClient client) throws PulsarClien ((PulsarClientImpl) client).eventLoopGroup() .shutdownGracefully() .await(); + + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + + Thread[] activeThreads = new Thread[threadGroup.activeCount()]; + threadGroup.enumerate(activeThreads); + + for (Thread activeThread : activeThreads) { + if (activeThread.getName().startsWith("pulsar-client-io")) { + activeThread.interrupt(); + } + } } catch (InterruptedException e) { // No-op } From 7ab92abd271868cd2c8860bb8fd9b5798796dd78 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 16:50:28 -0700 Subject: [PATCH 23/32] Fix pulsar-spark-connector version for Spark 2.x --- pom.xml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4ca37fc6c32d8..a83bd6d75b13d 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,8 @@ 2.0.0 2.4.1 2.10.1 - 3.1.1.4 + 2.4.5 + 3.1.1.4 5.3.4 2.17 3.0.1-b12 @@ -1651,6 +1652,7 @@ hudi-spark-datasource/hudi-spark2-common + ${pulsar.spark2.version} true @@ -1671,6 +1673,7 @@ 2.4 + ${pulsar.spark2.version} true @@ -1697,6 +1700,7 @@ 1.11.0 1.7.4 4.8 + ${pulsar.spark3.version} 2.13.3 ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1730,6 +1734,7 @@ ${scalatest.spark3.version} ${kafka.spark3.version} 4.8-1 + ${pulsar.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1765,6 +1770,7 @@ 1.10.2 1.6.12 4.8 + ${pulsar.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1800,6 +1806,7 @@ 1.11.0 1.7.4 4.8 + ${pulsar.spark3.version} 2.13.3 ${fasterxml.spark3.version} ${fasterxml.spark3.version} From 7f0658d53af55818631bb8cba77d22a1105a4d42 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 17:21:44 -0700 Subject: [PATCH 24/32] Subscribe in `Durable` mode; Throw an exception in case latest available offset is preceding previous checkpoint (data-loss) --- .../apache/hudi/utilities/sources/PulsarSource.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 71aa5dd41f8ab..77f2443a63a50 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.util.Lazy; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -34,6 +35,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; @@ -42,6 +44,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.pulsar.JsonUtils; +import org.apache.thrift.protocol.TMessage; import java.io.Closeable; import java.io.IOException; @@ -132,6 +135,12 @@ private Pair computeOffsets(Option lastCheckpointS MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt); MessageId endingOffset = fetchLatestOffset(); + if (endingOffset.compareTo(startingOffset) < 0) { + String message = String.format("Ending offset (%s) is preceding starting offset (%s) for '%s'", + endingOffset, startingOffset, topicName); + throw new HoodieException(message); + } + // TODO support capping the amount of records fetched Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); @@ -188,6 +197,9 @@ private Consumer subscribeToTopic() { .subscriptionName(subscriptionId) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) + // We're using [[SubscriptionMode.Durable]] subscription to make sure that messages + // are retained until they are ack'd as consumed + .subscriptionMode(SubscriptionMode.Durable) .subscribe(); } catch (PulsarClientException e) { LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", topicName), e); From 96b561fe949bc889d39995de0657894e060b1d1b Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 17:33:16 -0700 Subject: [PATCH 25/32] Added `ThreadUtils`; Handle NPE in `PulsarSource`; Rebased it to use `ThreadUtils` --- .../apache/hudi/common/util/ThreadUtils.java | 41 +++++++++++++++++++ .../hudi/utilities/sources/PulsarSource.java | 31 +++++++------- 2 files changed, 55 insertions(+), 17 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java new file mode 100644 index 0000000000000..aef791aa87cac --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.util.Arrays; +import java.util.List; + +public class ThreadUtils { + + /** + * Fetches all active threads currently running in the JVM + */ + public static List collectActiveThreads() { + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + + Thread[] activeThreads = new Thread[threadGroup.activeCount()]; + threadGroup.enumerate(activeThreads); + + return Arrays.asList(activeThreads); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 77f2443a63a50..45319bad4742a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -39,18 +39,20 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.pulsar.JsonUtils; -import org.apache.thrift.protocol.TMessage; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads; + /** * Source fetching data from Pulsar topics */ @@ -243,26 +245,21 @@ private static void shutdownPulsarClient(PulsarClient client) throws PulsarClien // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 3.1.1.4) is not // shutting down event-loop group properly, so we had to shut it down manually try { - ((PulsarClientImpl) client).eventLoopGroup() - .shutdownGracefully() - .await(); - - ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); - while (threadGroup.getParent() != null) { - threadGroup = threadGroup.getParent(); - } - - Thread[] activeThreads = new Thread[threadGroup.activeCount()]; - threadGroup.enumerate(activeThreads); - - for (Thread activeThread : activeThreads) { - if (activeThread.getName().startsWith("pulsar-client-io")) { - activeThread.interrupt(); - } + EventLoopGroup eventLoopGroup = ((PulsarClientImpl) client).eventLoopGroup(); + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully().await(); } } catch (InterruptedException e) { // No-op } + + // NOTE: Pulsar clients initialized by the spark-connector, might be left not shutdown + // properly (see above). To work this around we employ "nuclear" option of + // fetching all Pulsar client threads and interrupting them forcibly (to make them + // shutdown) + collectActiveThreads().stream().sequential() + .filter(t -> t.getName().startsWith("pulsar-client-io")) + .forEach(Thread::interrupt); } public static class Config { From 76f1335eed2d89ea4c1e632c513859f4489b7674 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 12 Aug 2022 17:43:26 -0700 Subject: [PATCH 26/32] Fixing compilation --- .../org/apache/hudi/utilities/sources/PulsarSource.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 45319bad4742a..eadab6f442a20 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -35,7 +35,6 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; @@ -61,7 +60,7 @@ public class PulsarSource extends RowSource implements Closeable { private static final Logger LOG = LogManager.getLogger(PulsarSource.class); private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d"; - private static final String[] PULSAR_META_FIELDS = new String[] { + private static final String[] PULSAR_META_FIELDS = new String[]{ "__key", "__topic", "__messageId", @@ -199,9 +198,6 @@ private Consumer subscribeToTopic() { .subscriptionName(subscriptionId) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Exclusive) - // We're using [[SubscriptionMode.Durable]] subscription to make sure that messages - // are retained until they are ack'd as consumed - .subscriptionMode(SubscriptionMode.Durable) .subscribe(); } catch (PulsarClientException e) { LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", topicName), e); From a3ee9246d61fa507fdd93e045c3c5148375fa53e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 15 Aug 2022 13:15:00 -0700 Subject: [PATCH 27/32] Aligning configs w/ Kafka --- .../java/org/apache/hudi/utilities/sources/PulsarSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index eadab6f442a20..780b08ef7c80a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -286,7 +286,7 @@ public enum OffsetAutoResetStrategy { public static final ConfigProperty PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP = ConfigProperty .key("hoodie.deltastreamer.source.pulsar.maxRecords") - .defaultValue(10_000_000L) + .defaultValue(5_000_000L) .withDocumentation("Max number of records obtained in a single each batch"); } } From d2e681612054a071cd3d0b7410dd5fd296ee80ee Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 15 Aug 2022 13:35:24 -0700 Subject: [PATCH 28/32] Fixed dep configuration --- pom.xml | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index a83bd6d75b13d..9ac9568a10967 100644 --- a/pom.xml +++ b/pom.xml @@ -98,8 +98,9 @@ 2.0.0 2.4.1 2.10.1 - 2.4.5 - 3.1.1.4 + ${pulsar.spark.scala11.version} + 2.4.5 + 3.1.1.4 5.3.4 2.17 3.0.1-b12 @@ -1604,12 +1605,16 @@ scala-2.11 + + ${pulsar.spark.scala11.version} + scala-2.12 ${scala12.version} 2.12 + ${pulsar.spark.scala12.version} @@ -1652,7 +1657,6 @@ hudi-spark-datasource/hudi-spark2-common - ${pulsar.spark2.version} true @@ -1673,7 +1677,6 @@ 2.4 - ${pulsar.spark2.version} true @@ -1700,7 +1703,6 @@ 1.11.0 1.7.4 4.8 - ${pulsar.spark3.version} 2.13.3 ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1734,7 +1736,6 @@ ${scalatest.spark3.version} ${kafka.spark3.version} 4.8-1 - ${pulsar.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1770,7 +1771,6 @@ 1.10.2 1.6.12 4.8 - ${pulsar.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} ${fasterxml.spark3.version} @@ -1806,7 +1806,6 @@ 1.11.0 1.7.4 4.8 - ${pulsar.spark3.version} 2.13.3 ${fasterxml.spark3.version} ${fasterxml.spark3.version} From f3399b438356902237a619a2510d47f849cd81e3 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 15 Aug 2022 13:35:32 -0700 Subject: [PATCH 29/32] Tidying up --- .../apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 49dd63f550a16..1e78610ced0f2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -292,7 +292,7 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long numEvents = sourceLimit; } - // TODO remove + // TODO(HUDI-4625) remove if (numEvents < toOffsets.size()) { throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); } @@ -312,7 +312,7 @@ private List fetchPartitionInfos(KafkaConsumer consumer, String t List partitionInfos; do { - // TODO cleanup, introduce retrying client + // TODO(HUDI-4625) cleanup, introduce retrying client partitionInfos = consumer.partitionsFor(topicName); try { TimeUnit.SECONDS.sleep(10); From b71d1a0b547f7bd0c94d2c6c45f5c593bc7ffb85 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 16 Aug 2022 21:23:45 -0700 Subject: [PATCH 30/32] Reverting to previous Pulsar version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9ac9568a10967..ff1f1e26ea941 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ 2.10.0 2.0.0 2.4.1 - 2.10.1 + 2.8.1 ${pulsar.spark.scala11.version} 2.4.5 3.1.1.4 From 79ae2b395759dcc104cb5c95834f8494026ff04c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 16 Aug 2022 21:21:59 -0700 Subject: [PATCH 31/32] Reverting unrelated changes --- .../apache/hudi/utilities/deltastreamer/DeltaSync.java | 1 - .../utilities/deltastreamer/HoodieDeltaStreamer.java | 2 +- .../utilities/sources/debezium/DebeziumSource.java | 8 -------- .../hudi/utilities/sources/helpers/KafkaOffsetGen.java | 10 +++------- 4 files changed, 4 insertions(+), 17 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 15c27bee109dc..386e808b08ac3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -339,7 +339,6 @@ public Pair, JavaRDD> syncOnce() throws IOException metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis()); - // TODO revisit (too early to unpersist) // Clear persistent RDDs jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); return result; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index c4146662d30c0..a22a3581ae94a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -827,7 +827,7 @@ protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) { * Close all resources. */ public void close() { - if (deltaSync != null) { + if (null != deltaSync) { deltaSync.close(); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java index f05cd12d99d47..d9be692b5bc57 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java @@ -239,13 +239,5 @@ private static Dataset convertArrayColumnsToString(Dataset dataset) { return dataset; } - - @Override - public void onCommit(String lastCkptStr) { - if (this.props.getBoolean(KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.key(), - KafkaOffsetGen.Config.ENABLE_KAFKA_COMMIT_OFFSET.defaultValue())) { - offsetGen.commitOffsetToKafka(lastCkptStr); - } - } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 1e78610ced0f2..cc577621fbf7c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -110,13 +110,11 @@ public static OffsetRange[] computeOffsetRanges(Map fromOf Comparator byPartition = Comparator.comparing(OffsetRange::partition); // Create initial offset ranges for each 'to' partition, with from = to offsets. - OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> { + OffsetRange[] ranges = new OffsetRange[toOffsetMap.size()]; + toOffsetMap.keySet().stream().map(tp -> { long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); return OffsetRange.create(tp, fromOffset, fromOffset); - }) - .sorted(byPartition) - .collect(Collectors.toList()) - .toArray(new OffsetRange[toOffsetMap.size()]); + }).sorted(byPartition).collect(Collectors.toList()).toArray(ranges); long allocedEvents = 0; Set exhaustedPartitions = new HashSet<>(); @@ -292,7 +290,6 @@ public OffsetRange[] getNextOffsetRanges(Option lastCheckpointStr, long numEvents = sourceLimit; } - // TODO(HUDI-4625) remove if (numEvents < toOffsets.size()) { throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); } @@ -312,7 +309,6 @@ private List fetchPartitionInfos(KafkaConsumer consumer, String t List partitionInfos; do { - // TODO(HUDI-4625) cleanup, introduce retrying client partitionInfos = consumer.partitionsFor(topicName); try { TimeUnit.SECONDS.sleep(10); From 513b7d8c4d6209f62bd0075de4cf1a07113b0fb9 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 17 Aug 2022 22:11:10 -0700 Subject: [PATCH 32/32] Added graceful shutdown timeout --- .../org/apache/hudi/utilities/sources/PulsarSource.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java index 780b08ef7c80a..dbfd28f8065db 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -47,8 +47,10 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.concurrent.TimeUnit; import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads; @@ -59,6 +61,8 @@ public class PulsarSource extends RowSource implements Closeable { private static final Logger LOG = LogManager.getLogger(PulsarSource.class); + private static final Duration GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20); + private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d"; private static final String[] PULSAR_META_FIELDS = new String[]{ "__key", @@ -243,7 +247,8 @@ private static void shutdownPulsarClient(PulsarClient client) throws PulsarClien try { EventLoopGroup eventLoopGroup = ((PulsarClientImpl) client).eventLoopGroup(); if (eventLoopGroup != null) { - eventLoopGroup.shutdownGracefully().await(); + eventLoopGroup.shutdownGracefully() + .await(GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS); } } catch (InterruptedException e) { // No-op