diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index cd1c1fb4affc4..4e70ebad75ee4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -22,6 +22,7 @@ import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_INCREMENTAL_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, generateFieldMap, toJavaOption} import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.bootstrap.index.BootstrapIndex import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} @@ -96,10 +97,24 @@ class SparkHoodieTableFileIndex(spark: SparkSession, val partitionFields = partitionColumns.get().map(column => StructField(column, StringType)) StructType(partitionFields) } else { - val partitionFields = partitionColumns.get().map(column => - nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + - s"$column' in the schema[${schema.fields.mkString(",")}]"))) - StructType(partitionFields) + val partitionFields = partitionColumns.get().filter(column => nameFieldMap.contains(column)) + .map(column => nameFieldMap.apply(column)) + + if (partitionFields.size != partitionColumns.get().size) { + val isBootstrapTable = BootstrapIndex.getBootstrapIndex(metaClient).useIndex() + if (isBootstrapTable) { + // For bootstrapped tables its possible the schema does not contain partition field when source table + // is hive style partitioned. In this case we would like to treat the table as non-partitioned + // as opposed to failing + new StructType() + } else { + throw new IllegalArgumentException(s"Cannot find columns: " + + s"'${partitionColumns.get().filter(col => !nameFieldMap.contains(col)).mkString(",")}' " + + s"in the schema[${schema.fields.mkString(",")}]") + } + } else { + new StructType(partitionFields) + } } } else { // If the partition columns have not stored in hoodie.properties(the table that was diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index eeed5fe75b84a..f0dd89df1c6db 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -109,9 +109,12 @@ class TestDataSourceForBootstrap { // check marked directory clean up assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) - // Read bootstrapped table and verify count - var hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") + // Read bootstrapped table and verify count using glob path + val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) + // Read bootstrapped table and verify count using Hudi file index + val hoodieROViewDF2 = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewDF2.count()) // Perform upsert val updateTimestamp = Instant.now.toEpochMilli @@ -130,11 +133,11 @@ class TestDataSourceForBootstrap { val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) - // Read table after upsert and verify count - hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") - assertEquals(numRecords, hoodieROViewDF1.count()) - assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count()) - // Read without * + // Read table after upsert and verify count using glob path + val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF3.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) + // Read with base path using Hudi file index val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath) assertEquals(numRecords, hoodieROViewDF1WithBasePath.count()) assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count()) @@ -169,6 +172,9 @@ class TestDataSourceForBootstrap { // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) + // Read bootstrapped table and verify count using Hudi file index + val hoodieROViewDF2 = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewDF2.count()) // Perform upsert val updateTimestamp = Instant.now.toEpochMilli @@ -189,10 +195,14 @@ class TestDataSourceForBootstrap { val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) - // Read table after upsert and verify count - val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*") - assertEquals(numRecords, hoodieROViewDF2.count()) - assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + // Read table after upsert and verify count using glob path + val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") + assertEquals(numRecords, hoodieROViewDF3.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) + // Read table after upsert and verify count using Hudi file index + val hoodieROViewDF4 = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewDF4.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true) } @@ -219,10 +229,10 @@ class TestDataSourceForBootstrap { val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr")) - // Read bootstrapped table and verify count + // Read bootstrapped table and verify count using glob path val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) - // Read without * + // Read with base path using Hudi file index val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath) assertEquals(numRecords, hoodieROViewWithBasePathDF1.count()) @@ -260,10 +270,14 @@ class TestDataSourceForBootstrap { val commitInstantTime3: String = HoodieDataSourceHelpers.latestCommit(fs, basePath) assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size()) - // Read table after upsert and verify count + // Read table after upsert and verify count using glob paths val hoodieROViewDF3 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF3.count()) assertEquals(numRecordsUpdate, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) + // Read table after upsert and verify count using Hudi file index + val hoodieROViewDF4 = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewDF4.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == $updateTimestamp").count()) verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false) }