Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throw
throw new TableNotFoundException("Unable to find a hudi table for the user provided paths.");
}

public static String getDataPath(String tablePath, String partitionPath) {
// When the table is not partitioned
if (tablePath.equals(partitionPath)) {
return tablePath + "/*";
}
assert partitionPath.length() > tablePath.length();
assert partitionPath.startsWith(tablePath);
int n = partitionPath.substring(tablePath.length()).split("/").length;
String dataPathSuffix = String.join("/*", Collections.nCopies(n + 1, ""));
return tablePath + dataPathSuffix;
}


/**
* Create a key generator class via reflection, passing in any configs needed.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

package org.apache.hudi

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.SerializableConfiguration
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.hudi.metadata.FileSystemBackedTableMetadata
import org.apache.log4j.LogManager
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{Sink, Source}
Expand Down Expand Up @@ -67,7 +71,7 @@ class DefaultSource extends RelationProvider
optParams: Map[String, String],
schema: StructType): BaseRelation = {
// Add default options for unspecified read options keys.
val parameters = translateViewTypesToQueryTypes(optParams)
var parameters = translateViewTypesToQueryTypes(optParams)

val path = parameters.get("path")
val readPathsStr = parameters.get(DataSourceReadOptions.READ_PATHS_OPT_KEY)
Expand All @@ -84,6 +88,26 @@ class DefaultSource extends RelationProvider
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
log.info("Obtained hudi table path: " + tablePath)

if (path.nonEmpty) {
val _path = path.get.stripSuffix("/")
val pathTmp = new Path(_path).makeQualified(fs.getUri, fs.getWorkingDirectory)
// If the user specifies the table path, the data path is automatically inferred
if (pathTmp.toString.equals(tablePath)) {
val sparkEngineContext = new HoodieSparkEngineContext(sqlContext.sparkContext)
val fsBackedTableMetadata =
new FileSystemBackedTableMetadata(sparkEngineContext, new SerializableConfiguration(fs.getConf), tablePath, false)
val partitionPaths = fsBackedTableMetadata.getAllPartitionPaths
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@teeyog hello, now infer the partition for getallpartition paths from metadata table.
The partition mode is set as hoodie.datasource.write.partitionpath.field when write the hudi table. Can we persist the hoodie.datasource.write.partitionpath.field to metatable? Then read just get the properties , not get all the partition path? cc @vinothchandar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lw309637554 Thank you for your review, the previous path to get the hudi table can also be obtained through configuration instead of inference

val onePartitionPath = if(!partitionPaths.isEmpty && !StringUtils.isEmpty(partitionPaths.get(0))) {
tablePath + "/" + partitionPaths.get(0)
} else {
tablePath
}
val dataPath = DataSourceUtils.getDataPath(tablePath, onePartitionPath)
log.info("Obtained hudi data path: " + dataPath)
parameters += "path" -> dataPath
}
}

val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
log.info("Is bootstrapped table => " + isBootstrappedTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDat
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.fail
import org.apache.spark.sql.types.{DataTypes => _, DateType => _, IntegerType => _, StringType => _, TimestampType => _, _}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -583,4 +585,56 @@ class TestCOWDataSource extends HoodieClientTestBase {
.load(basePath + "/*")
assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0)
}

@Test def testAutoInferDataPath(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
// Default partition
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.mode(SaveMode.Overwrite)
.save(basePath)

// No need to specify basePath/*/*
spark.read.format("org.apache.hudi")
.load(basePath).show()

// Partition with org.apache.hudi.keygen.CustomKeyGenerator
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "rider:SIMPLE,begin_lon:SIMPLE,end_lon:SIMPLE")
.mode(SaveMode.Overwrite)
.save(basePath)

// No need to specify basePath/*/*/*/*
spark.read.format("org.apache.hudi")
.load(basePath).show()

// No partition
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "")
.mode(SaveMode.Overwrite)
.save(basePath)

// No need to specify basePath/*
spark.read.format("org.apache.hudi")
.load(basePath).show()

inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.CustomKeyGenerator")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "current_ts:TIMESTAMP")
.option(Config.TIMESTAMP_TYPE_FIELD_PROP, "EPOCHMILLISECONDS")
.option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyyMMdd")
.mode(SaveMode.Overwrite)
.save(basePath)

// Specify basePath/yyyyMMdd/*
val date = new DateTime().toString(DateTimeFormat.forPattern("yyyyMMdd"))
spark.read.format("org.apache.hudi")
.load(basePath + s"/$date/*").show()
}
}