diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala index 547c6aed628cc..82c65705fbb00 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieConversionUtils.scala @@ -20,8 +20,21 @@ package org.apache.hudi import org.apache.hudi.common.config.TypedProperties +import java.{util => ju} +import scala.collection.JavaConverters + object HoodieConversionUtils { + /** + * Converts Java's [[ju.Map]] into Scala's (immutable) [[Map]] (by default [[JavaConverters]] convert to + * a mutable one) + */ + def mapAsScalaImmutableMap[K, V](map: ju.Map[K, V]): Map[K, V] = { + // NOTE: We have to use deprecated [[JavaConversions]] to stay compatible w/ Scala 2.11 + import scala.collection.JavaConversions.mapAsScalaMap + map.toMap + } + def toJavaOption[T](opt: Option[T]): org.apache.hudi.common.util.Option[T] = if (opt.isDefined) org.apache.hudi.common.util.Option.of(opt.get) else org.apache.hudi.common.util.Option.empty() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java new file mode 100644 index 0000000000000..aef791aa87cac --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ThreadUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.util.Arrays; +import java.util.List; + +public class ThreadUtils { + + /** + * Fetches all active threads currently running in the JVM + */ + public static List collectActiveThreads() { + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + while (threadGroup.getParent() != null) { + threadGroup = threadGroup.getParent(); + } + + Thread[] activeThreads = new Thread[threadGroup.activeCount()]; + threadGroup.enumerate(activeThreads); + + return Arrays.asList(activeThreads); + } + +} diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 322297dfdadf1..da5a42749ba6d 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -171,6 +171,14 @@ + + + io.streamnative.connectors + pulsar-spark-connector_${scala.binary.version} + ${pulsar.spark.version} + provided + + org.apache.logging.log4j diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index f3d9af3150706..386e808b08ac3 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -91,6 +91,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -124,7 +125,7 @@ /** * Sync's one batch of data to hoodie table. */ -public class DeltaSync implements Serializable { +public class DeltaSync implements Serializable, Closeable { private static final long serialVersionUID = 1L; private static final Logger LOG = LogManager.getLogger(DeltaSync.class); @@ -895,11 +896,15 @@ private void registerAvroSchemas(Schema sourceSchema, Schema targetSchema) { * Close all resources. */ public void close() { - if (null != writeClient) { + if (writeClient != null) { writeClient.close(); writeClient = null; } + if (formatAdapter != null) { + formatAdapter.close(); + } + LOG.info("Shutting down embedded timeline server"); if (embeddedTimelineService.isPresent()) { embeddedTimelineService.get().stop(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 1260acb1ce408..3514ace829ab5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -21,6 +21,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -38,13 +39,16 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; +import java.io.Closeable; +import java.io.IOException; + import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME; /** * Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer). */ -public final class SourceFormatAdapter { +public final class SourceFormatAdapter implements Closeable { private final Source source; @@ -123,4 +127,15 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS public Source getSource() { return source; } + + @Override + public void close() { + if (source instanceof Closeable) { + try { + ((Closeable) source).close(); + } catch (IOException e) { + throw new HoodieIOException(String.format("Failed to shutdown the source (%s)", source.getClass().getName()), e); + } + } + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java new file mode 100644 index 0000000000000..dbfd28f8065db --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/PulsarSource.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.HoodieConversionUtils; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.util.Lazy; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.pulsar.JsonUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.util.ThreadUtils.collectActiveThreads; + +/** + * Source fetching data from Pulsar topics + */ +public class PulsarSource extends RowSource implements Closeable { + + private static final Logger LOG = LogManager.getLogger(PulsarSource.class); + + private static final Duration GRACEFUL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(20); + + private static final String HUDI_PULSAR_CONSUMER_ID_FORMAT = "hudi-pulsar-consumer-%d"; + private static final String[] PULSAR_META_FIELDS = new String[]{ + "__key", + "__topic", + "__messageId", + "__publishTime", + "__eventTime", + "__messageProperties" + }; + + private final String topicName; + + private final String serviceEndpointURL; + private final String adminEndpointURL; + + // NOTE: We're keeping the client so that we can shut it down properly + private final Lazy pulsarClient; + private final Lazy> pulsarConsumer; + + public PulsarSource(TypedProperties props, + JavaSparkContext sparkContext, + SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + + DataSourceUtils.checkRequiredProperties(props, + Arrays.asList( + Config.PULSAR_SOURCE_TOPIC_NAME.key(), + Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key())); + + // Converting to a descriptor allows us to canonicalize the topic's name properly + this.topicName = TopicName.get(props.getString(Config.PULSAR_SOURCE_TOPIC_NAME.key())).toString(); + + // TODO validate endpoints provided in the appropriate format + this.serviceEndpointURL = props.getString(Config.PULSAR_SOURCE_SERVICE_ENDPOINT_URL.key()); + this.adminEndpointURL = props.getString(Config.PULSAR_SOURCE_ADMIN_ENDPOINT_URL.key()); + + this.pulsarClient = Lazy.lazily(this::initPulsarClient); + this.pulsarConsumer = Lazy.lazily(this::subscribeToTopic); + } + + @Override + protected Pair>, String> fetchNextBatch(Option lastCheckpointStr, long sourceLimit) { + Pair startingEndingOffsetsPair = computeOffsets(lastCheckpointStr, sourceLimit); + + MessageId startingOffset = startingEndingOffsetsPair.getLeft(); + MessageId endingOffset = startingEndingOffsetsPair.getRight(); + + String startingOffsetStr = convertToOffsetString(topicName, startingOffset); + String endingOffsetStr = convertToOffsetString(topicName, endingOffset); + + Dataset sourceRows = sparkSession.read() + .format("pulsar") + .option("service.url", serviceEndpointURL) + .option("admin.url", adminEndpointURL) + .option("topics", topicName) + .option("startingOffsets", startingOffsetStr) + .option("endingOffsets", endingOffsetStr) + .load(); + + return Pair.of(Option.of(transform(sourceRows)), endingOffsetStr); + } + + @Override + public void onCommit(String lastCheckpointStr) { + MessageId latestConsumedOffset = JsonUtils.topicOffsets(lastCheckpointStr).apply(topicName); + ackOffset(latestConsumedOffset); + } + + private Dataset transform(Dataset rows) { + return rows.drop(PULSAR_META_FIELDS); + } + + private Pair computeOffsets(Option lastCheckpointStrOpt, long sourceLimit) { + MessageId startingOffset = decodeStartingOffset(lastCheckpointStrOpt); + MessageId endingOffset = fetchLatestOffset(); + + if (endingOffset.compareTo(startingOffset) < 0) { + String message = String.format("Ending offset (%s) is preceding starting offset (%s) for '%s'", + endingOffset, startingOffset, topicName); + throw new HoodieException(message); + } + + // TODO support capping the amount of records fetched + Long maxRecordsLimit = computeTargetRecordLimit(sourceLimit, props); + + return Pair.of(startingOffset, endingOffset); + } + + private MessageId decodeStartingOffset(Option lastCheckpointStrOpt) { + return lastCheckpointStrOpt + .map(lastCheckpoint -> JsonUtils.topicOffsets(lastCheckpoint).apply(topicName)) + .orElseGet(() -> { + Config.OffsetAutoResetStrategy autoResetStrategy = Config.OffsetAutoResetStrategy.valueOf( + props.getString(Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.key(), + Config.PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY.defaultValue().name())); + + switch (autoResetStrategy) { + case LATEST: + return fetchLatestOffset(); + case EARLIEST: + return MessageId.earliest; + case FAIL: + throw new IllegalArgumentException("No checkpoint has been provided!"); + default: + throw new UnsupportedOperationException("Unsupported offset auto-reset strategy"); + } + }); + } + + private void ackOffset(MessageId latestConsumedOffset) { + try { + pulsarConsumer.get().acknowledgeCumulative(latestConsumedOffset); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to ack messageId (%s) for topic '%s'", latestConsumedOffset, topicName), e); + throw new HoodieIOException("Failed to ack message for topic", e); + } + } + + private MessageId fetchLatestOffset() { + try { + return pulsarConsumer.get().getLastMessageId(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to fetch latest messageId for topic '%s'", topicName), e); + throw new HoodieIOException("Failed to fetch latest messageId for topic", e); + } + } + + private Consumer subscribeToTopic() { + try { + // NOTE: We're generating unique subscription-id to make sure that subsequent invocation + // of the DS, do not interfere w/ each other + String subscriptionId = String.format(HUDI_PULSAR_CONSUMER_ID_FORMAT, System.currentTimeMillis()); + return pulsarClient.get() + .newConsumer() + .topic(topicName) + .subscriptionName(subscriptionId) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Exclusive) + .subscribe(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to subscribe to Pulsar topic '%s'", topicName), e); + throw new HoodieIOException("Failed to subscribe to Pulsar topic", e); + } + } + + private PulsarClient initPulsarClient() { + try { + return PulsarClient.builder() + .serviceUrl(serviceEndpointURL) + .build(); + } catch (PulsarClientException e) { + LOG.error(String.format("Failed to init Pulsar client connecting to '%s'", serviceEndpointURL), e); + throw new HoodieIOException("Failed to init Pulsar client", e); + } + } + + @Override + public void close() throws IOException { + shutdownPulsarClient(pulsarClient.get()); + } + + private static Long computeTargetRecordLimit(long sourceLimit, TypedProperties props) { + if (sourceLimit < Long.MAX_VALUE) { + return sourceLimit; + } else { + return props.getLong(Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.key(), + Config.PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP.defaultValue()); + } + } + + private static String convertToOffsetString(String topic, MessageId startingOffset) { + return JsonUtils.topicOffsets( + HoodieConversionUtils.mapAsScalaImmutableMap( + Collections.singletonMap(topic, startingOffset))); + } + + private static void shutdownPulsarClient(PulsarClient client) throws PulsarClientException { + client.close(); + // NOTE: Current version of Pulsar's client (in Pulsar Spark Connector 3.1.1.4) is not + // shutting down event-loop group properly, so we had to shut it down manually + try { + EventLoopGroup eventLoopGroup = ((PulsarClientImpl) client).eventLoopGroup(); + if (eventLoopGroup != null) { + eventLoopGroup.shutdownGracefully() + .await(GRACEFUL_SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + // No-op + } + + // NOTE: Pulsar clients initialized by the spark-connector, might be left not shutdown + // properly (see above). To work this around we employ "nuclear" option of + // fetching all Pulsar client threads and interrupting them forcibly (to make them + // shutdown) + collectActiveThreads().stream().sequential() + .filter(t -> t.getName().startsWith("pulsar-client-io")) + .forEach(Thread::interrupt); + } + + public static class Config { + private static final ConfigProperty PULSAR_SOURCE_TOPIC_NAME = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.topic") + .noDefaultValue() + .withDocumentation("Name of the target Pulsar topic to source data from"); + + private static final ConfigProperty PULSAR_SOURCE_SERVICE_ENDPOINT_URL = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.endpoint.service.url") + .defaultValue("pulsar://localhost:6650") + .withDocumentation("URL of the target Pulsar endpoint (of the form 'pulsar://host:port'"); + + private static final ConfigProperty PULSAR_SOURCE_ADMIN_ENDPOINT_URL = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.endpoint.admin.url") + .defaultValue("http://localhost:8080") + .withDocumentation("URL of the target Pulsar endpoint (of the form 'pulsar://host:port'"); + + public enum OffsetAutoResetStrategy { + LATEST, EARLIEST, FAIL + } + + private static final ConfigProperty PULSAR_SOURCE_OFFSET_AUTO_RESET_STRATEGY = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.offset.autoResetStrategy") + .defaultValue(OffsetAutoResetStrategy.LATEST) + .withDocumentation("Policy determining how offsets shall be automatically reset in case there's " + + "no checkpoint information present"); + + public static final ConfigProperty PULSAR_SOURCE_MAX_RECORDS_PER_BATCH_THRESHOLD_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.pulsar.maxRecords") + .defaultValue(5_000_000L) + .withDocumentation("Max number of records obtained in a single each batch"); + } +} diff --git a/pom.xml b/pom.xml index 54be08f6c03ba..ff1f1e26ea941 100644 --- a/pom.xml +++ b/pom.xml @@ -98,6 +98,9 @@ 2.0.0 2.4.1 2.8.1 + ${pulsar.spark.scala11.version} + 2.4.5 + 3.1.1.4 5.3.4 2.17 3.0.1-b12 @@ -1602,12 +1605,16 @@ scala-2.11 + + ${pulsar.spark.scala11.version} + scala-2.12 ${scala12.version} 2.12 + ${pulsar.spark.scala12.version}