From ba9dd7a5dbe13aabd3a0ad7c7b376488c120cd77 Mon Sep 17 00:00:00 2001 From: teeyog Date: Fri, 22 Jan 2021 23:16:23 +0800 Subject: [PATCH 1/5] [HUDI-1527] automatically infer the data directory, users only need to specify the table directory --- .../java/org/apache/hudi/DataSourceUtils.java | 13 +++++ .../scala/org/apache/hudi/DefaultSource.scala | 26 +++++++++- .../hudi/functional/TestCOWDataSource.scala | 52 +++++++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 18c51e3fd3eb2..f821a9c9dfd77 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -84,6 +84,19 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw throw new TableNotFoundException("Unable to find a hudi table for the user provided paths."); } + public static String getDataPath(String tablePath, String partitionPath) { + // When the table is not partitioned + if (tablePath.equals(partitionPath)) { + return tablePath + "/*"; + } + assert partitionPath.length() > tablePath.length(); + assert partitionPath.startsWith(tablePath); + int n = partitionPath.substring(tablePath.length()).split("/").length; + String dataPathSuffix = String.join("/*", Collections.nCopies(n + 1, "")); + return tablePath + dataPathSuffix; + } + + /** * Create a key generator class via reflection, passing in any configs needed. *

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 3299b8f2597bf..389cfdd24dfdf 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -17,14 +17,18 @@ package org.apache.hudi +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.SerializableConfiguration import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter +import org.apache.hudi.metadata.FileSystemBackedTableMetadata import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{Sink, Source} @@ -67,7 +71,7 @@ class DefaultSource extends RelationProvider optParams: Map[String, String], schema: StructType): BaseRelation = { // Add default options for unspecified read options keys. - val parameters = translateViewTypesToQueryTypes(optParams) + var parameters = translateViewTypesToQueryTypes(optParams) val path = parameters.get("path") val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY) @@ -84,6 +88,26 @@ class DefaultSource extends RelationProvider val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) log.info("Obtained hudi table path: " + tablePath) + if (path.nonEmpty) { + val _path = path.get.stripSuffix("/") + val pathTmp = new Path(_path).makeQualified(fs.getUri, fs.getWorkingDirectory) + // If the user specifies the table path, the data path is automatically inferred + if (pathTmp.toString.equals(tablePath)) { + val sparkEngineContext = new HoodieSparkEngineContext(sqlContext.sparkContext) + val fsBackedTableMetadata = + new FileSystemBackedTableMetadata(sparkEngineContext, new SerializableConfiguration(fs.getConf), tablePath, false) + val partitionPaths = fsBackedTableMetadata.getAllPartitionPaths + val onePartitionPath = if(!partitionPaths.isEmpty && !StringUtils.isEmpty(partitionPaths.get(0))) { + tablePath + "/" + partitionPaths.get(0) + } else { + tablePath + } + val dataPath = DataSourceUtils.getDataPath(tablePath, onePartitionPath) + log.info("Obtained hudi data path: " + dataPath) + parameters += "path" -> dataPath + } + } + val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent log.info("Is bootstrapped table => " + isBootstrappedTable) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 856cc008dd9e9..445db48158069 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -583,4 +583,56 @@ class TestCOWDataSource extends HoodieClientTestBase { .load(basePath + "/*") assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) } + + @Test def testAutoInferDataPath(): Unit = { + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + // default partition + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Overwrite) + .save(basePath) + + // no need to specify basePath/*/* + spark.read.format("org.apache.hudi") + .load(basePath).show() + + // partition with org.apache.hudi.keygen.CustomKeyGenerator + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "rider:SIMPLE,begin_lon:SIMPLE,end_lon:SIMPLE") + .mode(SaveMode.Overwrite) + .save(basePath) + + // no need to specify basePath/*/*/*/* + spark.read.format("org.apache.hudi") + .load(basePath).show() + + // no partition + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "") + .mode(SaveMode.Overwrite) + .save(basePath) + + // no need to specify basePath/* + spark.read.format("org.apache.hudi") + .load(basePath).show() + + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_ts:SIMPLE") + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") + .mode(SaveMode.Overwrite) + .save(basePath) + + // specify basePath/yyyyMMdd/* + val date = new DateTime().toString(DateTimeFormat.forPattern("yyyyMMdd")) + spark.read.format("org.apache.hudi") + .load(basePath + s"/$date/*").show() + } } From 6758f1515e2591e36c1269e1a9d98225e98ea646 Mon Sep 17 00:00:00 2001 From: teeyog Date: Sat, 23 Jan 2021 20:23:51 +0800 Subject: [PATCH 2/5] [HUDI-1527] automatically infer the data directory, users only need to specify the table directory [adapt no partition] --- .../java/org/apache/hudi/DataSourceUtils.java | 24 +++++++++++++++++++ .../hudi/functional/TestCOWDataSource.scala | 14 +++++------ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index f821a9c9dfd77..83498a3032c28 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -84,6 +84,30 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw throw new TableNotFoundException("Unable to find a hudi table for the user provided paths."); } +<<<<<<< HEAD +======= + public static Option getOnePartitionPath(FileSystem fs, Path tablePath) throws IOException { + // When the table is not partitioned + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, tablePath)) { + return Option.of(tablePath.toString()); + } + FileStatus[] statuses = fs.listStatus(tablePath); + for (FileStatus status : statuses) { + if (status.isDirectory()) { + if (HoodiePartitionMetadata.hasPartitionMetadata(fs, status.getPath())) { + return Option.of(status.getPath().toString()); + } else { + Option partitionPath = getOnePartitionPath(fs, status.getPath()); + if (partitionPath.isPresent()) { + return partitionPath; + } + } + } + } + return Option.empty(); + } + +>>>>>>> [HUDI-1527] automatically infer the data directory, users only need to specify the table directory [adapt no partition] public static String getDataPath(String tablePath, String partitionPath) { // When the table is not partitioned if (tablePath.equals(partitionPath)) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 445db48158069..6bca5d0d14fd1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -587,17 +587,17 @@ class TestCOWDataSource extends HoodieClientTestBase { @Test def testAutoInferDataPath(): Unit = { val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - // default partition + // Default partition inputDF1.write.format("org.apache.hudi") .options(commonOpts) .mode(SaveMode.Overwrite) .save(basePath) - // no need to specify basePath/*/* + // No need to specify basePath/*/* spark.read.format("org.apache.hudi") .load(basePath).show() - // partition with org.apache.hudi.keygen.CustomKeyGenerator + // Partition with org.apache.hudi.keygen.CustomKeyGenerator inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator") @@ -605,11 +605,11 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) - // no need to specify basePath/*/*/*/* + // No need to specify basePath/*/*/*/* spark.read.format("org.apache.hudi") .load(basePath).show() - // no partition + // No partition inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator") @@ -617,7 +617,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) - // no need to specify basePath/* + // No need to specify basePath/* spark.read.format("org.apache.hudi") .load(basePath).show() @@ -630,7 +630,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(SaveMode.Overwrite) .save(basePath) - // specify basePath/yyyyMMdd/* + // Specify basePath/yyyyMMdd/* val date = new DateTime().toString(DateTimeFormat.forPattern("yyyyMMdd")) spark.read.format("org.apache.hudi") .load(basePath + s"/$date/*").show() From 9689c7b443c16a0063a9fe5e10fa2a0a2de294de Mon Sep 17 00:00:00 2001 From: teeyog Date: Sun, 7 Feb 2021 14:02:40 +0800 Subject: [PATCH 3/5] [HUDI-1527] automatically infer the data directory, users only need to specify the table directory [adapt no partition] --- .../java/org/apache/hudi/DataSourceUtils.java | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 83498a3032c28..f821a9c9dfd77 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -84,30 +84,6 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw throw new TableNotFoundException("Unable to find a hudi table for the user provided paths."); } -<<<<<<< HEAD -======= - public static Option getOnePartitionPath(FileSystem fs, Path tablePath) throws IOException { - // When the table is not partitioned - if (HoodiePartitionMetadata.hasPartitionMetadata(fs, tablePath)) { - return Option.of(tablePath.toString()); - } - FileStatus[] statuses = fs.listStatus(tablePath); - for (FileStatus status : statuses) { - if (status.isDirectory()) { - if (HoodiePartitionMetadata.hasPartitionMetadata(fs, status.getPath())) { - return Option.of(status.getPath().toString()); - } else { - Option partitionPath = getOnePartitionPath(fs, status.getPath()); - if (partitionPath.isPresent()) { - return partitionPath; - } - } - } - } - return Option.empty(); - } - ->>>>>>> [HUDI-1527] automatically infer the data directory, users only need to specify the table directory [adapt no partition] public static String getDataPath(String tablePath, String partitionPath) { // When the table is not partitioned if (tablePath.equals(partitionPath)) { From 0e4f1ee154bbb1807b62a39d9366469c9b8c4204 Mon Sep 17 00:00:00 2001 From: teeyog Date: Mon, 22 Feb 2021 17:33:31 +0800 Subject: [PATCH 4/5] [HUDI-1527] automatically infer the data directory, users only need to specify the table directory [adapt no partition] --- .../scala/org/apache/hudi/functional/TestCOWDataSource.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 6bca5d0d14fd1..82269b0fbaa47 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -32,9 +32,11 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.fail +import org.apache.spark.sql.types.{DataTypes => _, DateType => _, IntegerType => _, StringType => _, TimestampType => _, _} import org.joda.time.DateTime import org.joda.time.format.DateTimeFormat -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource From 62cff1e1b984c800d44e8f33df23f9ccb9fa4c97 Mon Sep 17 00:00:00 2001 From: teeyog Date: Tue, 23 Feb 2021 09:13:28 +0800 Subject: [PATCH 5/5] [HUDI-1527] automatically infer the data directory, users only need to specify the table directory [adapt no partition] --- .../scala/org/apache/hudi/functional/TestCOWDataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 82269b0fbaa47..a55ec77e00c4e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -626,7 +626,7 @@ class TestCOWDataSource extends HoodieClientTestBase { inputDF1.write.format("org.apache.hudi") .options(commonOpts) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator") - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_ts:SIMPLE") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_ts:TIMESTAMP") .option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS") .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd") .mode(SaveMode.Overwrite)