Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
Expand Down Expand Up @@ -96,10 +97,24 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
val partitionFields = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields = partitionColumns.get().map(column =>
nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" +
s"$column' in the schema[${schema.fields.mkString(",")}]")))
StructType(partitionFields)
val partitionFields = partitionColumns.get().filter(column => nameFieldMap.contains(column))
.map(column => nameFieldMap.apply(column))

if (partitionFields.size != partitionColumns.get().size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is hacky. Could we remove this check? At the minimum, for bootstrapped table, we disable partition schema. For better, we need to find a way to get the schema from the bootstrap base path. How is the schema fetched for reading bootstrapped table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua I don't think we should remove this check. It is deliberately added to cover cases when bootstrapped table have had upserts. After the initial bootstrap, new upserts will have all the columns written in the hudi table. At that time I believe it will also have the partition column and then we should start treating it as a normal table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that in general this is just a temporary solution to not break bootstrap tables. This is tricky to handle. Because its not just about obtaining the partition schema from source, but also extracting the partition column values from the source path and writing them as correct data type in the target location. I remember having several discussions about it a year back.

As per your question => https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java#L61 we are simply reading the source file footer right now to get the source schema. I think EMR team can take it up in the next release, but for now we should atleast prevent failures. @rahil-c can you create a jira to track this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. For the time being, I'll land this fix. As a follow-up, could one of you add docs in the code to clarify why the check is needed? It's not clear from reading the code at first glance.

val isBootstrapTable = BootstrapIndex.getBootstrapIndex(metaClient).useIndex()
if (isBootstrapTable) {
// For bootstrapped tables its possible the schema does not contain partition field when source table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't played much w/ bootstrapped table. help me clarify something. in this case, hudi table is actually non-partitioned is it? i.e. when source table has hive style partitioned, but does not contain the actual partition field in the dataframe ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if yes, I agree. but if its feasible to generate a partitioned hudi table, we can't proceed w/ this fix right. can you help me understand please.

Copy link
Contributor

@zhedoubushishi zhedoubushishi Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nsivabalan.
In this case, let' say the source table is a Hive style partitioned parquet table(partition column is not included in the parquet files) and after bootstrapping, we generated a partitioned Hudi table. But when reading this Hudi table, within this FileIndex code path, now we treat it as a non-partitioned table because the partition column is not included in the data files.

Yes in the long term, we should be able to infer the partition column and schema type in the case of bootstrapped tables but it is a more complex issue to resolve at this time.

We identified that the partition validation logic in FileIndex mainly serves the purpose to allow partition pruning in HoodieFileIndex.

Rather than entirely breaking bootstrap feature we have decided in the case of bootstrapped tables to ignore this validation and treat queries as non-partitioned tables. The impact of this is that queries will not see the effects of partition pruning through Hudi.

// is hive style partitioned. In this case we would like to treat the table as non-partitioned
// as opposed to failing
new StructType()
} else {
throw new IllegalArgumentException(s"Cannot find columns: " +
s"'${partitionColumns.get().filter(col => !nameFieldMap.contains(col)).mkString(",")}' " +
s"in the schema[${schema.fields.mkString(",")}]")
}
} else {
new StructType(partitionFields)
}
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,12 @@ class TestDataSourceForBootstrap {
// check marked directory clean up
assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))

// Read bootstrapped table and verify count
var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
// Read bootstrapped table and verify count using glob path
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
// Read bootstrapped table and verify count using Hudi file index
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath)
assertEquals(numRecords, hoodieROViewDF2.count())

// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
Expand All @@ -130,11 +133,11 @@ class TestDataSourceForBootstrap {
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())

// Read table after upsert and verify count
hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count())
// Read without *
// Read table after upsert and verify count using glob path
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF3.count())
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
// Read with base path using Hudi file index
val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath)
assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
Expand Down Expand Up @@ -169,6 +172,9 @@ class TestDataSourceForBootstrap {
// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
// Read bootstrapped table and verify count using Hudi file index
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath)
assertEquals(numRecords, hoodieROViewDF2.count())

// Perform upsert
val updateTimestamp = Instant.now.toEpochMilli
Expand All @@ -189,10 +195,14 @@ class TestDataSourceForBootstrap {
val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())

// Read table after upsert and verify count
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())
// Read table after upsert and verify count using glob path
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF3.count())
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
// Read table after upsert and verify count using Hudi file index
val hoodieROViewDF4 = spark.read.format("hudi").load(basePath)
assertEquals(numRecords, hoodieROViewDF4.count())
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())

verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true)
}
Expand All @@ -219,10 +229,10 @@ class TestDataSourceForBootstrap {
val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))

// Read bootstrapped table and verify count
// Read bootstrapped table and verify count using glob path
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF1.count())
// Read without *
// Read with base path using Hudi file index
val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath)
assertEquals(numRecords, hoodieROViewWithBasePathDF1.count())

Expand Down Expand Up @@ -260,10 +270,14 @@ class TestDataSourceForBootstrap {
val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())

// Read table after upsert and verify count
// Read table after upsert and verify count using glob paths
val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF3.count())
assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count())
// Read table after upsert and verify count using Hudi file index
val hoodieROViewDF4 = spark.read.format("hudi").load(basePath)
assertEquals(numRecords, hoodieROViewDF4.count())
assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == $updateTimestamp").count())

verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false)
}
Expand Down