diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 16d0d47d4f098..de76930a422df 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -443,6 +443,23 @@ aws-java-sdk-sqs ${aws.sdk.version} + + + + com.google.cloud + google-cloud-pubsub + ${google.cloud.pubsub.version} + provided + + + + com.google.cloud.bigdataoss + gcs-connector + ${gcs.connector.version} + provided + + + diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java new file mode 100644 index 0000000000000..51ed43da2ffe3 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsHoodieIncrSource.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.hudi.DataSourceUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy; +import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher; +import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher; +import org.apache.hudi.utilities.sources.helpers.gcs.QueryInfo; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.DATAFILE_FORMAT; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.DEFAULT_ENABLE_EXISTS_CHECK; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.ENABLE_EXISTS_CHECK; +import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.calculateBeginAndEndInstants; +import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.getMissingCheckpointStrategy; + +/** + * An incremental source that detects new data in a source table containing metadata about GCS files, + * downloads the actual content of these files from GCS and stores them as records into a destination table. + *

+ * You should set spark.driver.extraClassPath in spark-defaults.conf to + * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode): + * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore). + + absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar: + absolute_path_to/31.1-jre/guava-31.1-jre.jar: + absolute_path_to/mysql-connector-java-8.0.30.jar + + This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end. + $ bin/spark-submit \ + --packages com.google.cloud:google-cloud-pubsub:1.120.0 \ + --packages com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.7 \ + --driver-memory 4g \ + --executor-memory 4g \ + --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ + absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \ + --source-class org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource \ + --op INSERT \ + --hoodie-conf hoodie.deltastreamer.source.hoodieincr.file.format="parquet" \ + --hoodie-conf hoodie.deltastreamer.source.cloud.data.select.file.extension="jsonl" \ + --hoodie-conf hoodie.deltastreamer.source.cloud.data.datafile.format="json" \ + --hoodie-conf hoodie.deltastreamer.source.cloud.data.select.relpath.prefix="country" \ + --hoodie-conf hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix="blah" \ + --hoodie-conf hoodie.deltastreamer.source.cloud.data.ignore.relpath.substring="blah" \ + --hoodie-conf hoodie.datasource.write.recordkey.field=id \ + --hoodie-conf hoodie.datasource.write.partitionpath.field= \ + --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \ + --filter-dupes \ + --hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \ + --hoodie-conf hoodie.combine.before.insert=true \ + --source-ordering-field id \ + --table-type COPY_ON_WRITE \ + --target-base-path file:\/\/\/absolute_path_to/data-gcs \ + --target-table gcs_data \ + --continuous \ + --source-limit 100 \ + --min-sync-interval-seconds 60 \ + --hoodie-conf hoodie.deltastreamer.source.hoodieincr.path=file:\/\/\/absolute_path_to/meta-gcs \ + --hoodie-conf hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT \ + --enable-hive-sync \ + --hoodie-conf hoodie.datasource.hive_sync.database=default \ + --hoodie-conf hoodie.datasource.hive_sync.table=gcs_data \ + */ +public class GcsEventsHoodieIncrSource extends HoodieIncrSource { + + private final String srcPath; + private final boolean checkIfFileExists; + private final int numInstantsPerFetch; + + private final MissingCheckpointStrategy missingCheckpointStrategy; + private final FilePathsFetcher filePathsFetcher; + private final FileDataFetcher fileDataFetcher; + + private static final Logger LOG = LogManager.getLogger(GcsEventsHoodieIncrSource.class); + + public GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, + SchemaProvider schemaProvider) { + + this(props, jsc, spark, schemaProvider, + new FilePathsFetcher(props, getSourceFileFormat(props)), + new FileDataFetcher(props, props.getString(DATAFILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT)) + ); + } + + GcsEventsHoodieIncrSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, + SchemaProvider schemaProvider, FilePathsFetcher filePathsFetcher, FileDataFetcher fileDataFetcher) { + super(props, jsc, spark, schemaProvider); + + DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH)); + srcPath = props.getString(HOODIE_SRC_BASE_PATH); + missingCheckpointStrategy = getMissingCheckpointStrategy(props); + numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH); + checkIfFileExists = props.getBoolean(ENABLE_EXISTS_CHECK, DEFAULT_ENABLE_EXISTS_CHECK); + + this.filePathsFetcher = filePathsFetcher; + this.fileDataFetcher = fileDataFetcher; + + LOG.info("srcPath: " + srcPath); + LOG.info("missingCheckpointStrategy: " + missingCheckpointStrategy); + LOG.info("numInstantsPerFetch: " + numInstantsPerFetch); + LOG.info("checkIfFileExists: " + checkIfFileExists); + } + + @Override + public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + QueryInfo queryInfo = getQueryInfo(lastCkptStr); + + if (queryInfo.areStartAndEndInstantsEqual()) { + LOG.info("Already caught up. Begin Checkpoint was: " + queryInfo.getStartInstant()); + return Pair.of(Option.empty(), queryInfo.getStartInstant()); + } + + Dataset sourceForFilenames = queryInfo.initializeSourceForFilenames(srcPath, sparkSession); + + if (sourceForFilenames.isEmpty()) { + LOG.info("Source of file names is empty. Returning empty result and endInstant: " + + queryInfo.getEndInstant()); + return Pair.of(Option.empty(), queryInfo.getEndInstant()); + } + + return extractData(queryInfo, sourceForFilenames); + } + + private Pair>, String> extractData(QueryInfo queryInfo, Dataset sourceForFilenames) { + List filepaths = filePathsFetcher.getGcsFilePaths(sparkContext, sourceForFilenames, checkIfFileExists); + + LOG.debug("Extracted " + filepaths.size() + " distinct files." + + " Some samples " + filepaths.stream().limit(10).collect(Collectors.toList())); + + Option> fileDataRows = fileDataFetcher.fetchFileData(sparkSession, filepaths, props); + return Pair.of(fileDataRows, queryInfo.getEndInstant()); + } + + private QueryInfo getQueryInfo(Option lastCkptStr) { + Option beginInstant = getBeginInstant(lastCkptStr); + + Pair> queryInfoPair = calculateBeginAndEndInstants( + sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy + ); + + QueryInfo queryInfo = new QueryInfo(queryInfoPair.getLeft(), queryInfoPair.getRight().getLeft(), + queryInfoPair.getRight().getRight()); + + if (LOG.isDebugEnabled()) { + queryInfo.logDetails(); + } + + return queryInfo; + } + + private Option getBeginInstant(Option lastCheckpoint) { + if (lastCheckpoint.isPresent() && !isNullOrEmpty(lastCheckpoint.get())) { + return lastCheckpoint; + } + + return Option.empty(); + } + + private static String getSourceFileFormat(TypedProperties props) { + return props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java new file mode 100644 index 0000000000000..70b7f149a7987 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/GcsEventsSource.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher; +import org.apache.hudi.utilities.sources.helpers.gcs.MessageBatch; +import org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity; +import org.apache.hudi.utilities.sources.helpers.gcs.MetadataMessage; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.ACK_MESSAGES; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.ACK_MESSAGES_DEFAULT_VALUE; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.BATCH_SIZE_CONF; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.DEFAULT_BATCH_SIZE; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID; +import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.ProcessingDecision.DO_SKIP; + +/* + * An incremental source to fetch from a Google Cloud Pubsub topic (a subscription, to be precise), + * and download them into a Hudi table. The messages are assumed to be of type Cloud Storage Pubsub Notification. + * + * You should set spark.driver.extraClassPath in spark-defaults.conf to + * look like below WITHOUT THE NEWLINES (or give the equivalent as CLI options if in cluster mode): + * (mysql-connector at the end is only needed if Hive Sync is enabled and Mysql is used for Hive Metastore). + + absolute_path_to/protobuf-java-3.21.1.jar:absolute_path_to/failureaccess-1.0.1.jar: + absolute_path_to/31.1-jre/guava-31.1-jre.jar: + absolute_path_to/mysql-connector-java-8.0.30.jar + +This class can be invoked via spark-submit as follows. There's a bunch of optional hive sync flags at the end: +$ bin/spark-submit \ +--driver-memory 4g \ +--executor-memory 4g \ +--packages com.google.cloud:google-cloud-pubsub:1.120.0 \ +--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ +absolute_path_to/hudi-utilities-bundle_2.12-0.13.0-SNAPSHOT.jar \ +--source-class org.apache.hudi.utilities.sources.GcsEventsSource \ +--op INSERT \ +--hoodie-conf hoodie.datasource.write.recordkey.field="id" \ +--source-ordering-field timeCreated \ +--hoodie-conf hoodie.index.type=GLOBAL_BLOOM \ +--filter-dupes \ +--allow-commit-on-no-checkpoint-change \ +--hoodie-conf hoodie.datasource.write.insert.drop.duplicates=true \ +--hoodie-conf hoodie.combine.before.insert=true \ +--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \ +--hoodie-conf hoodie.datasource.write.partitionpath.field=bucket \ +--hoodie-conf hoodie.deltastreamer.source.gcs.project.id=infra-dev-358110 \ +--hoodie-conf hoodie.deltastreamer.source.gcs.subscription.id=gcs-obj-8-sub-1 \ +--hoodie-conf hoodie.deltastreamer.source.cloud.meta.ack=true \ +--table-type COPY_ON_WRITE \ +--target-base-path file:\/\/\/absolute_path_to/meta-gcs \ +--target-table gcs_meta \ +--continuous \ +--source-limit 100 \ +--min-sync-interval-seconds 100 \ +--enable-hive-sync \ +--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \ +--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ +--hoodie-conf hoodie.datasource.hive_sync.database=default \ +--hoodie-conf hoodie.datasource.hive_sync.table=gcs_meta \ +--hoodie-conf hoodie.datasource.hive_sync.partition_fields=bucket \ +*/ +public class GcsEventsSource extends RowSource { + + private final PubsubMessagesFetcher pubsubMessagesFetcher; + private final boolean ackMessages; + + private final List messagesToAck = new ArrayList<>(); + + private static final String CHECKPOINT_VALUE_ZERO = "0"; + + private static final Logger LOG = LogManager.getLogger(GcsEventsSource.class); + + public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, + SchemaProvider schemaProvider) { + this( + props, jsc, spark, schemaProvider, + new PubsubMessagesFetcher( + props.getString(GOOGLE_PROJECT_ID), props.getString(PUBSUB_SUBSCRIPTION_ID), + props.getInteger(BATCH_SIZE_CONF, DEFAULT_BATCH_SIZE) + ) + ); + } + + public GcsEventsSource(TypedProperties props, JavaSparkContext jsc, SparkSession spark, + SchemaProvider schemaProvider, PubsubMessagesFetcher pubsubMessagesFetcher) { + super(props, jsc, spark, schemaProvider); + + this.pubsubMessagesFetcher = pubsubMessagesFetcher; + this.ackMessages = props.getBoolean(ACK_MESSAGES, ACK_MESSAGES_DEFAULT_VALUE); + + LOG.info("Created GcsEventsSource"); + } + + @Override + protected Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + LOG.info("fetchNextBatch(): Input checkpoint: " + lastCkptStr); + + MessageBatch messageBatch = fetchFileMetadata(); + + if (messageBatch.isEmpty()) { + LOG.info("No new data. Returning empty batch with checkpoint value: " + CHECKPOINT_VALUE_ZERO); + return Pair.of(Option.empty(), CHECKPOINT_VALUE_ZERO); + } + + Dataset eventRecords = sparkSession.createDataset(messageBatch.getMessages(), Encoders.STRING()); + + LOG.info("Returning checkpoint value: " + CHECKPOINT_VALUE_ZERO); + + return Pair.of(Option.of(sparkSession.read().json(eventRecords)), CHECKPOINT_VALUE_ZERO); + } + + @Override + public void onCommit(String lastCkptStr) { + LOG.info("onCommit(): Checkpoint: " + lastCkptStr); + + if (ackMessages) { + ackOutstandingMessages(); + } else { + LOG.warn("Not acknowledging messages. Can result in repeated redeliveries."); + } + } + + MessageBatch fetchFileMetadata() { + List receivedMessages = pubsubMessagesFetcher.fetchMessages(); + return processMessages(receivedMessages); + } + + /** + * Convert Pubsub messages into a batch of GCS file MetadataMsg objects, skipping those that + * don't need to be processed. + * + * @param receivedMessages Pubsub messages + * @return A batch of GCS file metadata messages + */ + private MessageBatch processMessages(List receivedMessages) { + List messages = new ArrayList<>(); + + for (ReceivedMessage received : receivedMessages) { + MetadataMessage message = new MetadataMessage(received.getMessage()); + String msgStr = message.toStringUtf8(); + + logDetails(message, msgStr); + + messagesToAck.add(received.getAckId()); + + MessageValidity messageValidity = message.shouldBeProcessed(); + if (messageValidity.getDecision() == DO_SKIP) { + LOG.info("Skipping message: " + messageValidity.getDescription()); + continue; + } + + messages.add(msgStr); + } + + return new MessageBatch(messages); + } + + private void ackOutstandingMessages() { + if (messagesToAck.isEmpty()) { + return; + } + + try { + pubsubMessagesFetcher.sendAcks(messagesToAck); + messagesToAck.clear(); + } catch (IOException e) { + throw new HoodieException("Error when acknowledging messages from Pubsub", e); + } + } + + private void logDetails(MetadataMessage message, String msgStr) { + LOG.info("eventType: " + message.getEventType() + ", objectId: " + message.getObjectId()); + + if (LOG.isDebugEnabled()) { + LOG.debug("msg: " + msgStr); + } + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index aa1e261c250b5..a6b979fcbf1a7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -41,7 +41,7 @@ public class HoodieIncrSource extends RowSource { private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class); - static class Config { + public static class Config { /** * {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie table. @@ -74,15 +74,15 @@ static class Config { * instant when checkpoint is not provided. This config is deprecated. Please refer to {@link #MISSING_CHECKPOINT_STRATEGY}. */ @Deprecated - static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = + public static final String READ_LATEST_INSTANT_ON_MISSING_CKPT = "hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt"; - static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false; + public static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false; /** * {@value #MISSING_CHECKPOINT_STRATEGY} allows delta-streamer to decide the checkpoint to consume from when checkpoint is not set. * instant when checkpoint is not provided. */ - static final String MISSING_CHECKPOINT_STRATEGY = "hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy"; + public static final String MISSING_CHECKPOINT_STRATEGY = "hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy"; /** * {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet. diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java new file mode 100644 index 0000000000000..32300b7481e14 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Row; + +import java.io.IOException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Generic helper methods to fetch from Cloud Storage during incremental fetch from cloud storage buckets. + * NOTE: DO NOT use any implementation specific classes here. This class is supposed to across S3EventsSource, + * GcsEventsSource etc...so you can't assume the classes for your specific implementation will be available here. + */ +public class CloudObjectsSelectorCommon { + + private static final Logger LOG = LogManager.getLogger(CloudObjectsSelectorCommon.class); + + /** + * Return a function that extracts filepaths from a list of Rows. + * Here Row is assumed to have the schema [bucket_name, filepath_relative_to_bucket] + * @param storageUrlSchemePrefix Eg: s3:// or gs://. The storage-provider-specific prefix to use within the URL. + * @param serializableConfiguration + * @param checkIfExists check if each file exists, before adding it to the returned list + * @return + */ + public static FlatMapFunction, String> getCloudFilesPerPartition( + String storageUrlSchemePrefix, SerializableConfiguration serializableConfiguration, boolean checkIfExists) { + return rows -> { + List cloudFilesPerPartition = new ArrayList<>(); + rows.forEachRemaining(row -> { + Option filePathUrl = getUrlForFile(row, storageUrlSchemePrefix, serializableConfiguration, + checkIfExists); + filePathUrl.ifPresent(url -> { + LOG.info("Adding file: " + url); + cloudFilesPerPartition.add(url); + }); + }); + + return cloudFilesPerPartition.iterator(); + }; + } + + /** + * Construct a full qualified URL string to a cloud file from a given Row. Optionally check if the file exists. + * Here Row is assumed to have the schema [bucket_name, filepath_relative_to_bucket]. + * The checkIfExists logic assumes that the relevant impl classes for the storageUrlSchemePrefix are already present + * on the classpath! + * @param storageUrlSchemePrefix Eg: s3:// or gs://. The storage-provider-specific prefix to use within the URL. + */ + private static Option getUrlForFile(Row row, String storageUrlSchemePrefix, + SerializableConfiguration serializableConfiguration, + boolean checkIfExists) { + final Configuration configuration = serializableConfiguration.newCopy(); + + String bucket = row.getString(0); + String filePath = storageUrlSchemePrefix + bucket + "/" + row.getString(1); + + try { + String filePathUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name()); + if (!checkIfExists) { + return Option.of(filePathUrl); + } + boolean exists = checkIfFileExists(storageUrlSchemePrefix, bucket, filePathUrl, configuration); + return exists ? Option.of(filePathUrl) : Option.empty(); + } catch (Exception exception) { + LOG.warn(String.format("Failed to generate path to cloud file %s", filePath), exception); + throw new HoodieException(String.format("Failed to generate path to cloud file %s", filePath), exception); + } + } + + /** + * Check if file with given path URL exists + * @param storageUrlSchemePrefix Eg: s3:// or gs://. The storage-provider-specific prefix to use within the URL. + */ + private static boolean checkIfFileExists(String storageUrlSchemePrefix, String bucket, String filePathUrl, + Configuration configuration) { + try { + FileSystem fs = FSUtils.getFs(storageUrlSchemePrefix + bucket, configuration); + return fs.exists(new Path(filePathUrl)); + } catch (IOException ioe) { + String errMsg = String.format("Error while checking path exists for %s ", filePathUrl); + LOG.error(errMsg, ioe); + throw new HoodieIOException(errMsg, ioe); + } + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java new file mode 100644 index 0000000000000..aa764bf598143 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +/** + * Configs that are common during ingestion across different cloud stores + */ +public class CloudStoreIngestionConfig { + + /** + * How many metadata messages to pull at a time. + * Also see {@link #DEFAULT_BATCH_SIZE}. + */ + public static final String BATCH_SIZE_CONF = "hoodie.deltastreamer.source.cloud.meta.batch.size"; + + /** + * Provide a reasonable setting to use for default batch size when fetching File Metadata as part of Cloud Ingestion. + * If batch size is too big, two possible issues can happen: + * i) Acknowledgement takes too long (given that Hudi needs to commit first). + * ii) In the case of Google Cloud Pubsub: + * a) it will keep delivering the same message since it wasn't acked in time. + * b) The size of the request that acks outstanding messages may exceed the limit, + * which is 512KB as per Google's docs. See: https://cloud.google.com/pubsub/quotas#resource_limits + */ + public static final int DEFAULT_BATCH_SIZE = 10; + + /** + * Whether to acknowledge Metadata messages during Cloud Ingestion or not. This is useful during dev and testing. + * In Prod this should always be true. + * In case of Cloud Pubsub, not acknowledging means Pubsub will keep redelivering the same messages. + */ + public static final String ACK_MESSAGES = "hoodie.deltastreamer.source.cloud.meta.ack"; + + /** + * Default value for {@link #ACK_MESSAGES} + */ + public static final boolean ACK_MESSAGES_DEFAULT_VALUE = true; + + /** + * Check whether file exists before attempting to pull it + */ + public static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.cloud.data.check.file.exists"; + + /** + * Default value for {@link #ENABLE_EXISTS_CHECK} + */ + public static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false; + + // Only select objects in the bucket whose relative path matches this prefix + public static final String SELECT_RELATIVE_PATH_PREFIX = + "hoodie.deltastreamer.source.cloud.data.select.relpath.prefix"; + + // Ignore objects in the bucket whose relative path matches this prefix + public static final String IGNORE_RELATIVE_PATH_PREFIX = + "hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix"; + + // Ignore objects in the bucket whose relative path contains this substring + public static final String IGNORE_RELATIVE_PATH_SUBSTR = + "hoodie.deltastreamer.source.cloud.data.ignore.relpath.substring"; + + /** + * A JSON string passed to the Spark DataFrameReader while loading the dataset. + * Example: hoodie.deltastreamer.gcp.spark.datasource.options={"header":"true","encoding":"UTF-8"} + */ + public static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.cloud.data.datasource.options"; + + /** + * Only match files with this extension. By default, this is the same as + * {@link HoodieIncrSource.Config#SOURCE_FILE_FORMAT}. + */ + public static final String CLOUD_DATAFILE_EXTENSION = + "hoodie.deltastreamer.source.cloud.data.select.file.extension"; + + /** + * Format of the data file. By default, this will be the same as + * {@link HoodieIncrSource.Config#SOURCE_FILE_FORMAT}. + */ + public static final String DATAFILE_FORMAT = "hoodie.deltastreamer.source.cloud.data.datafile.format"; + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceCloudStorageHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceCloudStorageHelper.java new file mode 100644 index 0000000000000..1bad73793a3e1 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceCloudStorageHelper.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SPARK_DATASOURCE_OPTIONS; + +/** + * Helper methods for when the incremental source is fetching from Cloud Storage, like AWS S3 buckets or GCS. + */ +public class IncrSourceCloudStorageHelper { + + private static final Logger LOG = LogManager.getLogger(IncrSourceCloudStorageHelper.class); + + /** + * @param filepaths Files from which to fetch data + * @return Data in the given list of files, as a Spark DataSet + */ + public static Option> fetchFileData(SparkSession spark, List filepaths, + TypedProperties props, String fileFormat) { + if (filepaths.isEmpty()) { + return Option.empty(); + } + + DataFrameReader dfReader = getDataFrameReader(spark, props, fileFormat); + Dataset fileDataDs = dfReader.load(filepaths.toArray(new String[0])); + return Option.of(fileDataDs); + } + + private static DataFrameReader getDataFrameReader(SparkSession spark, TypedProperties props, String fileFormat) { + DataFrameReader dataFrameReader = spark.read().format(fileFormat); + + if (isNullOrEmpty(props.getString(SPARK_DATASOURCE_OPTIONS, null))) { + return dataFrameReader; + } + + final ObjectMapper mapper = new ObjectMapper(); + Map sparkOptionsMap = null; + + try { + sparkOptionsMap = mapper.readValue(props.getString(SPARK_DATASOURCE_OPTIONS), Map.class); + } catch (IOException e) { + throw new HoodieException(String.format("Failed to parse sparkOptions: %s", + props.getString(SPARK_DATASOURCE_OPTIONS)), e); + } + + LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap)); + + return dataFrameReader.options(sparkOptionsMap); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index d9415d036c312..b6e17799e61d2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources.helpers; import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -26,11 +27,16 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Row; import java.util.Objects; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.MISSING_CHECKPOINT_STRATEGY; +import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT; + public class IncrSourceHelper { private static final String DEFAULT_BEGIN_TIMESTAMP = "000"; @@ -118,4 +124,25 @@ public static void validateInstantTime(Row row, String instantTime, String since "Instant time(_hoodie_commit_time) in row (" + row + ") was : " + instantTime + "but expected to be between " + sinceInstant + "(excl) - " + endInstant + "(incl)"); } + + /** + * Determine the policy to choose if a checkpoint is missing (detected by the absence of a beginInstant), + * during a run of a {@link HoodieIncrSource}. + * @param props the usual Hudi props object + * @return + */ + public static MissingCheckpointStrategy getMissingCheckpointStrategy(TypedProperties props) { + boolean readLatestOnMissingCkpt = props.getBoolean( + READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT); + + if (readLatestOnMissingCkpt) { + return MissingCheckpointStrategy.READ_LATEST; + } + + if (props.containsKey(MISSING_CHECKPOINT_STRATEGY)) { + return MissingCheckpointStrategy.valueOf(props.getString(MISSING_CHECKPOINT_STRATEGY)); + } + + return null; + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FileDataFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FileDataFetcher.java new file mode 100644 index 0000000000000..aa80ad8b000c3 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FileDataFetcher.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.sources.helpers.IncrSourceCloudStorageHelper; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import java.io.Serializable; +import java.util.List; + +/** + * Connects to GCS from Spark and downloads data from a given list of files. + * Assumes SparkContext is already configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs(). + */ +public class FileDataFetcher implements Serializable { + + private final String fileFormat; + private TypedProperties props; + + private static final Logger LOG = LogManager.getLogger(FileDataFetcher.class); + + private static final long serialVersionUID = 1L; + + public FileDataFetcher(TypedProperties props, String fileFormat) { + this.fileFormat = fileFormat; + this.props = props; + } + + public Option> fetchFileData(SparkSession spark, List filepaths, TypedProperties props) { + return IncrSourceCloudStorageHelper.fetchFileData(spark, filepaths, props, fileFormat); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java new file mode 100644 index 0000000000000..c14935f26867f --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.io.Serializable; +import java.util.List; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.CLOUD_DATAFILE_EXTENSION; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SELECT_RELATIVE_PATH_PREFIX; + +/** + * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset as input. + * Optionally: + * i) Match the filename and path against provided input filter strings + * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already + * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs(). + */ +public class FilePathsFetcher implements Serializable { + + /** + * The default file format to assume if {@link GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION} is not given. + */ + private final String fileFormat; + private final TypedProperties props; + + private static final String GCS_PREFIX = "gs://"; + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LogManager.getLogger(FilePathsFetcher.class); + + /** + * @param fileFormat The default file format to assume if {@link GcsIngestionConfig#GCS_INCR_DATAFILE_EXTENSION} + * is not given. + */ + public FilePathsFetcher(TypedProperties props, String fileFormat) { + this.props = props; + this.fileFormat = fileFormat; + } + + /** + * @param sourceForFilenames a Dataset that contains metadata about files on GCS. Assumed to be a persisted form + * of a Cloud Storage Pubsub Notification event. + * @param checkIfExists Check if each file exists, before returning its full path + * @return A list of fully qualified GCS file paths. + */ + public List getGcsFilePaths(JavaSparkContext jsc, Dataset sourceForFilenames, boolean checkIfExists) { + String filter = createFilter(); + LOG.info("Adding filter string to Dataset: " + filter); + + SerializableConfiguration serializableConfiguration = new SerializableConfiguration( + jsc.hadoopConfiguration()); + + return sourceForFilenames + .filter(filter) + .select("bucket", "name") + .distinct() + .rdd().toJavaRDD().mapPartitions( + CloudObjectsSelectorCommon.getCloudFilesPerPartition(GCS_PREFIX, serializableConfiguration, checkIfExists) + ).collect(); + } + + /** + * Add optional filters that narrow down the list of filenames to fetch. + */ + private String createFilter() { + StringBuilder filter = new StringBuilder("size > 0"); + + getPropVal(SELECT_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name like '" + val + "%'")); + getPropVal(IGNORE_RELATIVE_PATH_PREFIX).ifPresent(val -> filter.append(" and name not like '" + val + "%'")); + getPropVal(IGNORE_RELATIVE_PATH_SUBSTR).ifPresent(val -> filter.append(" and name not like '%" + val + "%'")); + + // Match files with a given extension, or use the fileFormat as the default. + getPropVal(CLOUD_DATAFILE_EXTENSION).or(() -> Option.of(fileFormat)) + .map(val -> filter.append(" and name like '%" + val + "'")); + + return filter.toString(); + } + + private Option getPropVal(String propName) { + if (!isNullOrEmpty(props.getString(propName, null))) { + return Option.of(props.getString(propName)); + } + + return Option.empty(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java new file mode 100644 index 0000000000000..71baba300c307 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsIngestionConfig.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +/** + * Config keys and defaults for GCS Ingestion + */ +public class GcsIngestionConfig { + + /** + * The GCP Project Id where the Pubsub Subscription to ingest from resides. Needed to connect + * to the Pubsub subscription + */ + public static final String GOOGLE_PROJECT_ID = "hoodie.deltastreamer.source.gcs.project.id"; + + /** + * The GCP Pubsub subscription id for the GCS Notifications. Needed to connect to the Pubsub + * subscription. + */ + public static final String PUBSUB_SUBSCRIPTION_ID = "hoodie.deltastreamer.source.gcs.subscription.id"; + + // Size of inbound messages when pulling data, in bytes + public static final int DEFAULT_MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // bytes + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageBatch.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageBatch.java new file mode 100644 index 0000000000000..c6608ebf9c9f2 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageBatch.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import java.util.List; + +/** + * A batch of messages fetched from Google Cloud Pubsub within the metadata fetcher of + * Incremental GCS ingestion module. + */ +public class MessageBatch { + private final List messages; + + public MessageBatch(List messages) { + this.messages = messages; + } + + public List getMessages() { + return messages; + } + + public boolean isEmpty() { + return messages.isEmpty(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageValidity.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageValidity.java new file mode 100644 index 0000000000000..27aa906619817 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MessageValidity.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; + +/** + * Whether a message should be processed or not, and an optional description about the message. + */ +public class MessageValidity { + + private final ProcessingDecision processingDecision; + private final Option description; + + public static final MessageValidity DEFAULT_VALID_MESSAGE = new MessageValidity(ProcessingDecision.DO_PROCESS, + "Valid message"); + + MessageValidity(ProcessingDecision processingDecision, String description) { + this.processingDecision = processingDecision; + this.description = StringUtils.isNullOrEmpty(description) ? Option.empty() : Option.of(description); + } + + public ProcessingDecision getDecision() { + return processingDecision; + } + + public Option getDescription() { + return description; + } + + /** + * A decision whether to process the message or not + * */ + public enum ProcessingDecision { + DO_PROCESS, + DO_SKIP; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java new file mode 100644 index 0000000000000..e42ed7fe6d2e1 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/MetadataMessage.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import com.google.pubsub.v1.PubsubMessage; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.sources.helpers.gcs.MessageValidity.ProcessingDecision.DO_SKIP; + +/** + * Wraps a PubsubMessage assuming it's from Cloud Storage Pubsub Notifications (CSPN), and + * adds relevant helper methods. + * For details of CSPN messages see: https://cloud.google.com/storage/docs/pubsub-notifications + */ +public class MetadataMessage { + + // The CSPN message to wrap + private final PubsubMessage message; + + private static final String EVENT_NAME_OBJECT_FINALIZE = "OBJECT_FINALIZE"; + + private static final String ATTR_EVENT_TYPE = "eventType"; + private static final String ATTR_OBJECT_ID = "objectId"; + private static final String ATTR_OVERWROTE_GENERATION = "overwroteGeneration"; + + public MetadataMessage(PubsubMessage message) { + this.message = message; + } + + public String toStringUtf8() { + return message.getData().toStringUtf8(); + } + + /** + * Whether a message is valid to be ingested and stored by this Metadata puller. + * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#events + */ + public MessageValidity shouldBeProcessed() { + if (!isNewFileCreation()) { + return new MessageValidity(DO_SKIP, "eventType: " + getEventType() + ". Not a file creation message."); + } + + if (isOverwriteOfExistingFile()) { + return new MessageValidity(DO_SKIP, + "eventType: " + getEventType() + + ". Overwrite of existing objectId: " + getObjectId() + + " with generation numner: " + getOverwroteGeneration() + ); + } + + return MessageValidity.DEFAULT_VALID_MESSAGE; + } + + /** + * Whether message represents an overwrite of an existing file. + * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#replacing_objects + */ + private boolean isOverwriteOfExistingFile() { + return !isNullOrEmpty(getOverwroteGeneration()); + } + + /** + * Returns true if message corresponds to new file creation, false if not. + * Ref: https://cloud.google.com/storage/docs/pubsub-notifications#events + */ + private boolean isNewFileCreation() { + return EVENT_NAME_OBJECT_FINALIZE.equals(getEventType()); + } + + public String getEventType() { + return getAttr(ATTR_EVENT_TYPE); + } + + public String getObjectId() { + return getAttr(ATTR_OBJECT_ID); + } + + public String getOverwroteGeneration() { + return getAttr(ATTR_OVERWROTE_GENERATION); + } + + private String getAttr(String attrName) { + return message.getAttributesMap().get(attrName); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java new file mode 100644 index 0000000000000..fdbb85dfd2e96 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/PubsubMessagesFetcher.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import static com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.DEFAULT_MAX_INBOUND_MESSAGE_SIZE; + +import java.io.IOException; +import java.util.List; + +/** + * Fetch messages from a specified Google Cloud Pubsub subscription. + */ +public class PubsubMessagesFetcher { + + private final String googleProjectId; + private final String pubsubSubscriptionId; + + private final int batchSize; + private final SubscriberStubSettings subscriberStubSettings; + + private static final Logger LOG = LogManager.getLogger(PubsubMessagesFetcher.class); + + public PubsubMessagesFetcher(String googleProjectId, String pubsubSubscriptionId, int batchSize) { + this.googleProjectId = googleProjectId; + this.pubsubSubscriptionId = pubsubSubscriptionId; + this.batchSize = batchSize; + + try { + /** For details of timeout and retry configs, + * see {@link com.google.cloud.pubsub.v1.stub.SubscriberStubSettings#initDefaults()}, + * and the static code block in SubscriberStubSettings */ + subscriberStubSettings = + SubscriberStubSettings.newBuilder() + .setTransportChannelProvider( + SubscriberStubSettings.defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(DEFAULT_MAX_INBOUND_MESSAGE_SIZE) + .build()) + .build(); + } catch (IOException e) { + throw new HoodieException("Error creating subscriber stub settings", e); + } + } + + public List fetchMessages() { + try { + try (SubscriberStub subscriber = createSubscriber()) { + String subscriptionName = getSubscriptionName(); + PullResponse pullResponse = makePullRequest(subscriber, subscriptionName); + return pullResponse.getReceivedMessagesList(); + } + } catch (IOException e) { + throw new HoodieException("Error when fetching metadata", e); + } + } + + public void sendAcks(List messagesToAck) throws IOException { + String subscriptionName = getSubscriptionName(); + try (SubscriberStub subscriber = createSubscriber()) { + + AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() + .setSubscription(subscriptionName) + .addAllAckIds(messagesToAck) + .build(); + + subscriber.acknowledgeCallable().call(acknowledgeRequest); + + LOG.info("Acknowledged messages: " + messagesToAck); + } + } + + private PullResponse makePullRequest(SubscriberStub subscriber, String subscriptionName) { + PullRequest pullRequest = PullRequest.newBuilder() + .setMaxMessages(batchSize) + .setSubscription(subscriptionName) + .build(); + + return subscriber.pullCallable().call(pullRequest); + } + + private GrpcSubscriberStub createSubscriber() throws IOException { + return create(subscriberStubSettings); + } + + private String getSubscriptionName() { + return ProjectSubscriptionName.format(googleProjectId, pubsubSubscriptionId); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java new file mode 100644 index 0000000000000..52003f671740f --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/QueryInfo.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME; +import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME; +import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE; +import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL; +import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL; + +/** + * Uses the start and end instants of a DeltaStreamer Source to help construct the right kind + * of query for subsequent requests. + */ +public class QueryInfo { + + private final String queryType; + private final String startInstant; + private final String endInstant; + + private static final Logger LOG = LogManager.getLogger(QueryInfo.class); + + public QueryInfo(String queryType, String startInstant, String endInstant) { + this.queryType = queryType; + this.startInstant = startInstant; + this.endInstant = endInstant; + } + + public Dataset initializeSourceForFilenames(String srcPath, SparkSession sparkSession) { + if (isIncremental()) { + return incrementalQuery(sparkSession).load(srcPath); + } + + // Issue a snapshot query. + return snapshotQuery(sparkSession).load(srcPath) + .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, getStartInstant())); + } + + public boolean areStartAndEndInstantsEqual() { + return getStartInstant().equals(getEndInstant()); + } + + private DataFrameReader snapshotQuery(SparkSession sparkSession) { + return sparkSession.read().format("org.apache.hudi") + .option(QUERY_TYPE().key(), QUERY_TYPE_SNAPSHOT_OPT_VAL()); + } + + private DataFrameReader incrementalQuery(SparkSession sparkSession) { + return sparkSession.read().format("org.apache.hudi") + .option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL()) + .option(BEGIN_INSTANTTIME().key(), getStartInstant()) + .option(END_INSTANTTIME().key(), getEndInstant()); + } + + public boolean isIncremental() { + return QUERY_TYPE_INCREMENTAL_OPT_VAL().equals(queryType); + } + + public String getStartInstant() { + return startInstant; + } + + public String getEndInstant() { + return endInstant; + } + + public void logDetails() { + LOG.debug("queryType: " + queryType + ", startInstant: " + startInstant + ", endInstant: " + endInstant); + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java new file mode 100644 index 0000000000000..f8d52159710f6 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.hudi.utilities.sources.helpers.gcs.FileDataFetcher; +import org.apache.hudi.utilities.sources.helpers.gcs.FilePathsFetcher; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestGcsEventsHoodieIncrSource extends SparkClientFunctionalTestHarness { + + @TempDir + protected java.nio.file.Path tempDir; + + @Mock + FilePathsFetcher filePathsFetcher; + + @Mock + FileDataFetcher fileDataFetcher; + + protected FilebasedSchemaProvider schemaProvider; + private HoodieTableMetaClient metaClient; + + private static final Logger LOG = LogManager.getLogger(TestGcsEventsHoodieIncrSource.class); + + @BeforeEach + public void setUp() throws IOException { + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + MockitoAnnotations.initMocks(this); + } + + @Override + public String basePath() { + return tempDir.toAbsolutePath().toUri().toString(); + } + + @Test + public void shouldNotFindNewDataIfCommitTimeOfWriteAndReadAreEqual() throws IOException { + String commitTimeForWrites = "1"; + String commitTimeForReads = commitTimeForWrites; + + Pair> inserts = writeGcsMetadataRecords(commitTimeForWrites); + + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 0, inserts.getKey()); + + verify(filePathsFetcher, times(0)).getGcsFilePaths(Mockito.any(), Mockito.any(), + anyBoolean()); + verify(fileDataFetcher, times(0)).fetchFileData( + Mockito.any(), Mockito.any(), Mockito.any()); + } + + @Test + public void shouldFetchDataIfCommitTimeForReadsLessThanForWrites() throws IOException { + String commitTimeForWrites = "2"; + String commitTimeForReads = "1"; + + Pair> inserts = writeGcsMetadataRecords(commitTimeForWrites); + List dataFiles = Arrays.asList("data-file-1.json", "data-file-2.json"); + when(filePathsFetcher.getGcsFilePaths(Mockito.any(), Mockito.any(), anyBoolean())).thenReturn(dataFiles); + + List recs = Arrays.asList( + new GcsDataRecord("1", "Hello 1"), + new GcsDataRecord("2", "Hello 2"), + new GcsDataRecord("3", "Hello 3"), + new GcsDataRecord("4", "Hello 4") + ); + + Dataset rows = spark().createDataFrame(recs, GcsDataRecord.class); + + when(fileDataFetcher.fetchFileData(Mockito.any(), eq(dataFiles), Mockito.any())).thenReturn(Option.of(rows)); + + readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(commitTimeForReads), 4, inserts.getKey()); + + verify(filePathsFetcher, times(1)).getGcsFilePaths(Mockito.any(), Mockito.any(), + anyBoolean()); + verify(fileDataFetcher, times(1)).fetchFileData(Mockito.any(), + eq(dataFiles), Mockito.any()); + } + + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, + Option checkpointToPull, int expectedCount, String expectedCheckpoint) { + TypedProperties typedProperties = setProps(missingCheckpointStrategy); + + GcsEventsHoodieIncrSource incrSource = new GcsEventsHoodieIncrSource(typedProperties, jsc(), + spark(), schemaProvider, filePathsFetcher, fileDataFetcher); + + Pair>, String> dataAndCheckpoint = incrSource.fetchNextBatch(checkpointToPull, 100); + + Option> datasetOpt = dataAndCheckpoint.getLeft(); + String nextCheckPoint = dataAndCheckpoint.getRight(); + + Assertions.assertNotNull(nextCheckPoint); + + if (expectedCount == 0) { + assertFalse(datasetOpt.isPresent()); + } else { + assertEquals(datasetOpt.get().count(), expectedCount); + } + + Assertions.assertEquals(nextCheckPoint, expectedCheckpoint); + } + + private HoodieRecord getGcsMetadataRecord(String commitTime, String filename, String bucketName, String generation) { + Schema sourceSchema = new MetadataSchemaProvider().getSourceSchema(); + LOG.info("schema: " + sourceSchema); + + String partitionPath = bucketName; + + String id = "id:" + bucketName + "/" + filename + "/" + generation; + String mediaLink = String.format("https://storage.googleapis.com/download/storage/v1/b/%s/o/%s" + + "?generation=%s&alt=media", bucketName, filename, generation); + String selfLink = String.format("https://www.googleapis.com/storage/v1/b/%s/o/%s", bucketName, filename); + + GenericRecord rec = new GenericData.Record(sourceSchema); + rec.put("_row_key", id); + rec.put("partition_path", bucketName); + rec.put("timestamp", Long.parseLong(commitTime)); + + rec.put("bucket", bucketName); + rec.put("contentLanguage", "en"); + rec.put("contentType", "application/octet-stream"); + rec.put("crc32c", "oRB3Aw=="); + rec.put("etag", "CP7EwYCu6/kCEAE="); + rec.put("generation", generation); + rec.put("id", id); + rec.put("kind", "storage#object"); + rec.put("md5Hash", "McsS8FkcDSrB3cGfb18ysA=="); + rec.put("mediaLink", mediaLink); + rec.put("metageneration", "1"); + rec.put("name", filename); + rec.put("selfLink", selfLink); + rec.put("size", "370"); + rec.put("storageClass", "STANDARD"); + rec.put("timeCreated", "2022-08-29T05:52:55.869Z"); + rec.put("timeStorageClassUpdated", "2022-08-29T05:52:55.869Z"); + rec.put("updated", "2022-08-29T05:52:55.869Z"); + + HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec)); + return new HoodieAvroRecord(new HoodieKey(id, partitionPath), payload); + } + + private HoodieWriteConfig getWriteConfig() { + return getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); + } + + private Pair> writeGcsMetadataRecords(String commitTime) throws IOException { + HoodieWriteConfig writeConfig = getWriteConfig(); + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + + writeClient.startCommitWithTime(commitTime); + List gcsMetadataRecords = Arrays.asList( + getGcsMetadataRecord(commitTime, "data-file-1.json", "bucket-1", "1"), + getGcsMetadataRecord(commitTime, "data-file-2.json", "bucket-1", "1"), + getGcsMetadataRecord(commitTime, "data-file-3.json", "bucket-1", "1"), + getGcsMetadataRecord(commitTime, "data-file-4.json", "bucket-1", "1") + ); + JavaRDD result = writeClient.upsert(jsc().parallelize(gcsMetadataRecords, 1), commitTime); + + List statuses = result.collect(); + assertNoWriteErrors(statuses); + + return Pair.of(commitTime, gcsMetadataRecords); + } + + private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy) { + Properties properties = new Properties(); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", + missingCheckpointStrategy.name()); + properties.setProperty("hoodie.deltastreamer.source.gcsincr.datafile.format", "json"); + return new TypedProperties(properties); + } + + private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) { + return HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withSchema(new MetadataSchemaProvider().getSourceSchema().toString()) + .withParallelism(2, 2) + .withBulkInsertParallelism(2) + .withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable(metaClient.getTableConfig().getTableName()); + } + + private static class MetadataSchemaProvider extends SchemaProvider { + + private final Schema schema; + + public MetadataSchemaProvider() { + super(new TypedProperties()); + this.schema = SchemaTestUtil.getSchemaFromResource( + TestGcsEventsHoodieIncrSource.class, + "/delta-streamer-config/gcs-metadata.avsc", true); + } + + @Override + public Schema getSourceSchema() { + return schema; + } + } + + public static class GcsDataRecord { + public String id; + public String text; + + public GcsDataRecord(String id, String text) { + this.id = id; + this.text = text; + } + + public String getId() { + return id; + } + + public String getText() { + return text; + } + } + +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java new file mode 100644 index 0000000000000..2106afe11f755 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.ReceivedMessage; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.sources.helpers.gcs.PubsubMessagesFetcher; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GOOGLE_PROJECT_ID; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.PUBSUB_SUBSCRIPTION_ID; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestGcsEventsSource extends UtilitiesTestBase { + + @Mock + PubsubMessagesFetcher pubsubMessagesFetcher; + + protected FilebasedSchemaProvider schemaProvider; + private TypedProperties props; + + private static final String CHECKPOINT_VALUE_ZERO = "0"; + + @BeforeAll + public static void beforeAll() throws Exception { + UtilitiesTestBase.initTestServices(false, false); + } + + @AfterAll + public static void afterAll() { + UtilitiesTestBase.cleanupClass(); + } + + @BeforeEach + public void beforeEach() throws Exception { + super.setup(); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + MockitoAnnotations.initMocks(this); + + props = new TypedProperties(); + props.put(GOOGLE_PROJECT_ID, "dummy-project"); + props.put(PUBSUB_SUBSCRIPTION_ID, "dummy-subscription"); + } + + @AfterEach + public void afterEach() throws Exception { + super.teardown(); + } + + @Test + public void shouldReturnEmptyOnNoMessages() { + when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList()); + + GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null, + pubsubMessagesFetcher); + + Pair>, String> expected = Pair.of(Option.empty(), "0"); + Pair>, String> dataAndCheckpoint = source.fetchNextBatch(Option.of("0"), 100); + + assertEquals(expected, dataAndCheckpoint); + } + + @Test + public void shouldReturnDataOnValidMessages() { + ReceivedMessage msg1 = fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}"); + ReceivedMessage msg2 = fileCreateMessage("objectId-2", "{'data':{'bucket':'bucket-2'}}"); + + when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(msg1, msg2)); + + GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null, + pubsubMessagesFetcher); + Pair>, String> dataAndCheckpoint = source.fetchNextBatch(Option.of("0"), 100); + source.onCommit(dataAndCheckpoint.getRight()); + + assertEquals(CHECKPOINT_VALUE_ZERO, dataAndCheckpoint.getRight()); + + Dataset resultDs = dataAndCheckpoint.getLeft().get(); + List result = resultDs.collectAsList(); + + assertBucket(result.get(0), "bucket-1"); + assertBucket(result.get(1), "bucket-2"); + + verify(pubsubMessagesFetcher).fetchMessages(); + } + + @Test + public void shouldFetchMessagesInBatches() { + ReceivedMessage msg1 = fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}"); + ReceivedMessage msg2 = fileCreateMessage("objectId-2", "{'data':{'bucket':'bucket-2'}}"); + ReceivedMessage msg3 = fileCreateMessage("objectId-3", "{'data':{'bucket':'bucket-3'}}"); + ReceivedMessage msg4 = fileCreateMessage("objectId-4", "{'data':{'bucket':'bucket-4'}}"); + + // dataFetcher should return only two messages each time it's called + when(pubsubMessagesFetcher.fetchMessages()) + .thenReturn(Arrays.asList(msg1, msg2)) + .thenReturn(Arrays.asList(msg3, msg4)); + + GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null, + pubsubMessagesFetcher); + Pair>, String> dataAndCheckpoint1 = source.fetchNextBatch(Option.of("0"), 100); + source.onCommit(dataAndCheckpoint1.getRight()); + + assertEquals(CHECKPOINT_VALUE_ZERO, dataAndCheckpoint1.getRight()); + List result1 = dataAndCheckpoint1.getLeft().get().collectAsList(); + assertBucket(result1.get(0), "bucket-1"); + assertBucket(result1.get(1), "bucket-2"); + + Pair>, String> dataAndCheckpoint2 = source.fetchNextBatch(Option.of("0"), 100); + source.onCommit(dataAndCheckpoint2.getRight()); + + List result2 = dataAndCheckpoint2.getLeft().get().collectAsList(); + assertBucket(result2.get(0), "bucket-3"); + assertBucket(result2.get(1), "bucket-4"); + + verify(pubsubMessagesFetcher, times(2)).fetchMessages(); + } + + @Test + public void shouldSkipInvalidMessages1() { + ReceivedMessage invalid1 = fileDeleteMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}"); + ReceivedMessage invalid2 = fileCreateMessageWithOverwroteGen("objectId-2", "{'data':{'bucket':'bucket-2'}}"); + ReceivedMessage valid1 = fileCreateMessage("objectId-3", "{'data':{'bucket':'bucket-3'}}"); + + when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(invalid1, valid1, invalid2)); + + GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null, + pubsubMessagesFetcher); + Pair>, String> dataAndCheckpoint = source.fetchNextBatch(Option.of("0"), 100); + source.onCommit(dataAndCheckpoint.getRight()); + assertEquals(CHECKPOINT_VALUE_ZERO, dataAndCheckpoint.getRight()); + + Dataset resultDs = dataAndCheckpoint.getLeft().get(); + List result = resultDs.collectAsList(); + + assertEquals(1, result.size()); + assertBucket(result.get(0), "bucket-3"); + + verify(pubsubMessagesFetcher).fetchMessages(); + } + + @Test + public void shouldGcsEventsSourceDoesNotDedupeInterally() { + ReceivedMessage dupe1 = fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}"); + ReceivedMessage dupe2 = fileCreateMessage("objectId-1", "{'data':{'bucket':'bucket-1'}}"); + + when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Arrays.asList(dupe1, dupe2)); + + GcsEventsSource source = new GcsEventsSource(props, jsc, sparkSession, null, + pubsubMessagesFetcher); + Pair>, String> dataAndCheckpoint = source.fetchNextBatch(Option.of("0"), 100); + source.onCommit(dataAndCheckpoint.getRight()); + + assertEquals(CHECKPOINT_VALUE_ZERO, dataAndCheckpoint.getRight()); + + Dataset resultDs = dataAndCheckpoint.getLeft().get(); + List result = resultDs.collectAsList(); + assertEquals(2, result.size()); + assertBucket(result.get(0), "bucket-1"); + assertBucket(result.get(1), "bucket-1"); + + verify(pubsubMessagesFetcher).fetchMessages(); + } + + private ReceivedMessage fileCreateMessageWithOverwroteGen(String objectId, String payload) { + Map attrs = new HashMap<>(); + attrs.put("overwroteGeneration", "objectId-N"); + + return ReceivedMessage.newBuilder().setMessage( + objectWithEventTypeAndAttrs(objectId, "OBJECT_FINALIZE", attrs, payload) + ).setAckId(objectId).build(); + } + + private ReceivedMessage fileCreateMessage(String objectId, String payload) { + return ReceivedMessage.newBuilder().setMessage( + objectFinalizeMessage(objectId, payload) + ).setAckId(objectId).build(); + } + + private ReceivedMessage fileDeleteMessage(String objectId, String payload) { + return ReceivedMessage.newBuilder().setMessage( + objectDeleteMessage(objectId, payload) + ).setAckId(objectId).build(); + } + + private PubsubMessage.Builder objectFinalizeMessage(String objectId, String dataMessage) { + return objectWithEventType(objectId, "OBJECT_FINALIZE", dataMessage); + } + + private PubsubMessage.Builder objectDeleteMessage(String objectId, String dataMessage) { + return objectWithEventType(objectId, "OBJECT_DELETE", dataMessage); + } + + private PubsubMessage.Builder objectWithEventType(String objectId, String eventType, String dataMessage) { + return messageWithAttrs(createBasicAttrs(objectId, eventType), dataMessage); + } + + private PubsubMessage.Builder objectWithEventTypeAndAttrs(String objectId, String eventType, + Map attrs, String dataMessage) { + Map allAttrs = createBasicAttrs(objectId, eventType); + allAttrs.putAll(attrs); + + return messageWithAttrs(allAttrs, dataMessage); + } + + private Map createBasicAttrs(String objectId, String eventType) { + Map map = new HashMap<>(); + map.put("objectId", objectId); + map.put("eventType", eventType); + + return map; + } + + private PubsubMessage.Builder messageWithAttrs(Map attrs, String dataMessage) { + return PubsubMessage.newBuilder() + .putAllAttributes(new HashMap<>(attrs)) + .setData(ByteString.copyFrom(dataMessage.getBytes())); + } + + private void assertBucket(Row row, String expectedBucketName) { + Row record = row.getAs("data"); + String bucket = record.getAs("bucket"); + assertEquals(expectedBucketName, bucket); + } +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/gcs-metadata.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/gcs-metadata.avsc new file mode 100644 index 0000000000000..79baf5eb80d93 --- /dev/null +++ b/hudi-utilities/src/test/resources/delta-streamer-config/gcs-metadata.avsc @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "gcs.schema", + "type": "record", + "name": "gcs_metadata", + "fields": [ + { + "name": "_row_key", + "type": "string" + }, + { + "name": "partition_path", + "type": "string" + }, + { + "name": "timestamp", + "type": "long" + }, + { + "name": "bucket", + "type": "string" + }, + { + "name": "contentLanguage", + "type": "string" + }, + { + "name": "contentType", + "type": "string" + }, + { + "name": "crc32c", + "type": "string" + }, + { + "name": "etag", + "type": "string" + }, + { + "name": "generation", + "type": "string" + }, + { + "name": "id", + "type": "string" + }, + { + "name": "kind", + "type": "string" + }, + { + "name": "md5Hash", + "type": "string" + }, + { + "name": "mediaLink", + "type": "string" + }, + { + "name": "metageneration", + "type": "string" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "selfLink", + "type": "string" + }, + { + "name": "size", + "type": "string" + }, + { + "name": "storageClass", + "type": "string" + }, + { + "name": "timeCreated", + "type": "string" + }, + { + "name": "timeStorageClassUpdated", + "type": "string" + }, + { + "name": "updated", + "type": "string" + } + ] +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 62817b127edfc..7cfe643fc9d3a 100644 --- a/pom.xml +++ b/pom.xml @@ -201,6 +201,8 @@ 1.1.0 3.5.7 0.16 + 1.120.0 + hadoop2-2.2.7 8000 http://localhost:${dynamodb-local.port} 2.7.3