diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java index 509ab5ed5ab30..5eda5db4e9dcd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java @@ -50,6 +50,9 @@ import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_MAX_FILE_SIZE; import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.PATH_BASED_PARTITION_FIELDS; +import static org.apache.spark.sql.functions.input_file_name; +import static org.apache.spark.sql.functions.split; /** * Generic helper methods to fetch from Cloud Storage during incremental fetch from cloud storage buckets. @@ -176,6 +179,18 @@ public static Option> loadAsDataset(SparkSession spark, List dataset = reader.load(paths.toArray(new String[cloudObjectMetadata.size()])).coalesce(numPartitions); + + // add partition column from source path if configured + if (props.containsKey(PATH_BASED_PARTITION_FIELDS)) { + String[] partitionKeysToAdd = props.getString(PATH_BASED_PARTITION_FIELDS).split(","); + // Add partition column for all path-based partition keys. If key is not present in path, the value will be null. + for (String partitionKey : partitionKeysToAdd) { + String partitionPathPattern = String.format("%s=", partitionKey); + LOG.info(String.format("Adding column %s to dataset", partitionKey)); + dataset = dataset.withColumn(partitionKey, split(split(input_file_name(), partitionPathPattern).getItem(1), "/").getItem(0)); + } + } + return Option.of(dataset); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java index 7bc94e709ee28..98f36ff03c5d8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudStoreIngestionConfig.java @@ -103,4 +103,8 @@ public class CloudStoreIngestionConfig { @Deprecated public static final String DATAFILE_FORMAT = CloudSourceConfig.DATAFILE_FORMAT.key(); + /** + * A comma delimited list of path-based partition fields in the source file structure + */ + public static final String PATH_BASED_PARTITION_FIELDS = "hoodie.deltastreamer.source.cloud.data.partition.fields.from.path"; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java new file mode 100644 index 0000000000000..eae15763be71d --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.testutils.HoodieClientTestHarness; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import org.apache.spark.sql.RowFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; + +public class TestCloudObjectsSelectorCommon extends HoodieClientTestHarness { + + @BeforeEach + void setUp() { + initSparkContexts(); + } + + @AfterEach + public void teardown() throws Exception { + cleanupResources(); + } + + @Test + public void emptyMetadataReturnsEmptyOption() { + Option> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, Collections.emptyList(), new TypedProperties(), "json"); + Assertions.assertFalse(result.isPresent()); + } + + @Test + public void filesFromMetadataRead() { + List input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); + Option> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, new TypedProperties(), "json"); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(1, result.get().count()); + Row expected = RowFactory.create("some data"); + Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); + } + + @Test + public void partitionValueAddedToRow() { + List input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); + + TypedProperties properties = new TypedProperties(); + properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "country,state"); + Option> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(1, result.get().count()); + Row expected = RowFactory.create("some data", "US", "CA"); + Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); + } + + @Test + public void partitionKeyNotPresentInPath() { + List input = Collections.singletonList(new CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json", 1)); + TypedProperties properties = new TypedProperties(); + properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", "unknown"); + Option> result = CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties, "json"); + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(1, result.get().count()); + Row expected = RowFactory.create("some data", null); + Assertions.assertEquals(Collections.singletonList(expected), result.get().collectAsList()); + } +} diff --git a/hudi-utilities/src/test/resources/data/partitioned/country=US/state=CA/data.json b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=CA/data.json new file mode 100644 index 0000000000000..9fb29b4dcf47c --- /dev/null +++ b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=CA/data.json @@ -0,0 +1 @@ +{"data": "some data"} \ No newline at end of file