diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 4ee5a6005ebd1..b1e03f86ff807 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -26,6 +26,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.hash.ColumnIndexID import org.apache.hudi.data.HoodieJavaRDD import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType} @@ -113,11 +114,11 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { * * @param spark Spark session ref * @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table - * @param targetColumns target columns to be included into the final table + * @param queryColumns target columns to be included into the final table * @param tableSchema schema of the source data table * @return reshaped table according to the format outlined above */ - def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = { + def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = { val colStatsSchema = colStatsDF.schema val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({ case (field, ordinal) => (field.name, ordinal) @@ -125,10 +126,6 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap - // NOTE: We're sorting the columns to make sure final index schema matches layout - // of the transposed table - val sortedColumns = TreeSet(targetColumns: _*) - val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME) val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE) val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE) @@ -136,36 +133,69 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT) val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT) - val transposedRDD = colStatsDF.rdd - .filter(row => sortedColumns.contains(row.getString(colNameOrdinal))) - .map { row => - val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal)) - val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal)) - - val colName = row.getString(colNameOrdinal) - val colType = tableSchemaFieldMap(colName).dataType + // NOTE: We have to collect list of indexed columns to make sure we properly align the rows + // w/in the transposed dataset: since some files might not have all of the columns indexed + // either due to the Column Stats Index config changes, schema evolution, etc, we have + // to make sure that all of the rows w/in transposed data-frame are properly padded (with null + // values) for such file-column combinations + val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect() - val rowValsSeq = row.toSeq.toArray - - rowValsSeq(minValueOrdinal) = deserialize(minValue, colType) - rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType) + // NOTE: We're sorting the columns to make sure final index schema matches layout + // of the transposed table + val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*) - Row(rowValsSeq:_*) + val transposedRDD = colStatsDF.rdd + .filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal))) + .map { row => + if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) { + // Corresponding row could be null in either of the 2 cases + // - Column contains only null values (in that case both min/max have to be nulls) + // - This is a stubbed Column Stats record (used as a tombstone) + row + } else { + val minValueStruct = row.getAs[Row](minValueOrdinal) + val maxValueStruct = row.getAs[Row](maxValueOrdinal) + + checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null") + + val colName = row.getString(colNameOrdinal) + val colType = tableSchemaFieldMap(colName).dataType + + val (minValue, _) = tryUnpackNonNullVal(minValueStruct) + val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct) + val rowValsSeq = row.toSeq.toArray + // Update min-/max-value structs w/ unwrapped values in-place + rowValsSeq(minValueOrdinal) = deserialize(minValue, colType) + rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType) + + Row(rowValsSeq: _*) + } } .groupBy(r => r.getString(fileNameOrdinal)) .foldByKey(Seq[Row]()) { - case (_, columnRows) => + case (_, columnRowsSeq) => // Rows seq is always non-empty (otherwise it won't be grouped into) - val fileName = columnRows.head.get(fileNameOrdinal) - val valueCount = columnRows.head.get(valueCountOrdinal) - - val coalescedRowValuesSeq = columnRows.toSeq - // NOTE: It's crucial to maintain appropriate ordering of the columns - // matching table layout - .sortBy(_.getString(colNameOrdinal)) - .foldLeft(Seq[Any](fileName, valueCount)) { - case (acc, columnRow) => - acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord)) + val fileName = columnRowsSeq.head.get(fileNameOrdinal) + val valueCount = columnRowsSeq.head.get(valueCountOrdinal) + + // To properly align individual rows (corresponding to a file) w/in the transposed projection, we need + // to align existing column-stats for individual file with the list of expected ones for the + // whole transposed projection (a superset of all files) + val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap + val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get) + + val coalescedRowValuesSeq = + alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) { + case (acc, opt) => + opt match { + case Some(columnStatsRow) => + acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord)) + case None => + // NOTE: Since we're assuming missing column to essentially contain exclusively + // null values, we set null-count to be equal to value-count (this behavior is + // consistent with reading non-existent columns from Parquet) + acc ++ Seq(null, null, valueCount) + } } Seq(Row(coalescedRowValuesSeq:_*)) @@ -176,7 +206,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport { // NOTE: It's crucial to maintain appropriate ordering of the columns // matching table layout: hence, we cherry-pick individual columns // instead of simply filtering in the ones we're interested in the schema - val indexSchema = composeIndexSchema(sortedColumns.toSeq, tableSchema) + val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema) spark.createDataFrame(transposedRDD, indexSchema) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/column-stats-index-table.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/column-stats-index-table.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00000-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00001-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00002-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/input-table-json/part-00003-4468afca-8a37-4ae8-a150-0c2fd3361080-c000.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..9c0daac405731 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00000-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":770,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9} +{"c1":768,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-10-13","c7":"AA==","c8":9} +{"c1":431,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-03-12","c7":"rw==","c8":9} +{"c1":427,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-10-08","c7":"qw==","c8":9} +{"c1":328,"c4":"2021-11-18T23:34:44.181-08:00","c5":34,"c6":"2020-10-21","c7":"SA==","c8":9} +{"c1":320,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-02-13","c7":"QA==","c8":9} +{"c1":317,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-10-10","c7":"PQ==","c8":9} +{"c1":308,"c4":"2021-11-18T23:34:44.180-08:00","c5":32,"c6":"2020-01-01","c7":"NA==","c8":9} +{"c1":304,"c4":"2021-11-18T23:34:44.179-08:00","c5":32,"c6":"2020-08-25","c7":"MA==","c8":9} +{"c1":300,"c4":"2021-11-18T23:34:44.179-08:00","c5":31,"c6":"2020-04-21","c7":"LA==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..d19386382bede --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00001-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":719,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-05-20","c7":"zw==","c8":9} +{"c1":715,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-01-16","c7":"yw==","c8":9} +{"c1":579,"c4":"2021-11-18T23:34:44.193-08:00","c5":59,"c6":"2020-08-20","c7":"Qw==","c8":9} +{"c1":568,"c4":"2021-11-18T23:34:44.193-08:00","c5":58,"c6":"2020-08-09","c7":"OA==","c8":9} +{"c1":367,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-05-04","c7":"bw==","c8":9} +{"c1":364,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-02-01","c7":"bA==","c8":9} +{"c1":250,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-09-27","c7":"+g==","c8":9} +{"c1":249,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-08-26","c7":"+Q==","c8":9} +{"c1":246,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-05-23","c7":"9g==","c8":9} +{"c1":125,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-05-14","c7":"fQ==","c8":9} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..602dbe87b1286 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00002-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":486,"c4":"2021-11-18T23:34:44.189-08:00","c5":50,"c6":"2020-03-11","c7":"5g==","c8":9} +{"c1":483,"c4":"2021-11-18T23:34:44.189-08:00","c5":49,"c6":"2020-11-08","c7":"4w==","c8":9} +{"c1":224,"c4":"2021-11-18T23:34:44.175-08:00","c5":24,"c6":"2020-05-01","c7":"4A==","c8":9} +{"c1":118,"c4":"2021-11-18T23:34:44.168-08:00","c5":13,"c6":"2020-09-07","c7":"dg==","c8":9} +{"c1":111,"c4":"2021-11-18T23:34:44.168-08:00","c5":12,"c6":"2020-02-28","c7":"bw==","c8":9} +{"c1":79,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-03-24","c7":"Tw==","c8":9} +{"c1":77,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-01-22","c7":"TQ==","c8":9} +{"c1":76,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-11-21","c7":"TA==","c8":9} +{"c1":60,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-06-05","c7":"PA==","c8":9} +{"c1":59,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-05-04","c7":"Ow==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json new file mode 100644 index 0000000000000..6232e862f92c4 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-another-input-table-json/part-00003-7e680484-e7e1-48b6-8289-1a7c483b530b-c000.json @@ -0,0 +1,10 @@ +{"c1":272,"c4":"2021-11-18T23:34:44.178-08:00","c5":28,"c6":"2020-09-21","c7":"EA==","c8":9} +{"c1":258,"c4":"2021-11-18T23:34:44.177-08:00","c5":27,"c6":"2020-06-07","c7":"Ag==","c8":9} +{"c1":240,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-10-17","c7":"8A==","c8":9} +{"c1":236,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-06-13","c7":"7A==","c8":9} +{"c1":137,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-06-26","c7":"iQ==","c8":9} +{"c1":134,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-03-23","c7":"hg==","c8":9} +{"c1":131,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-11-20","c7":"gw==","c8":9} +{"c1":129,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-09-18","c7":"gQ==","c8":9} +{"c1":24,"c4":"2021-11-18T23:34:44.161-08:00","c5":4,"c6":"2020-03-25","c7":"GA==","c8":9} +{"c1":8,"c4":"2021-11-18T23:34:44.159-08:00","c5":2,"c6":"2020-09-09","c7":"CA==","c8":9} diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json new file mode 100644 index 0000000000000..8405cdf91fc9b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/partial-column-stats-index-table.json @@ -0,0 +1,4 @@ +{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"valueCount":9} +{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"valueCount":8} +{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"valueCount":13} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table.json similarity index 100% rename from hudi-spark-datasource/hudi-spark/src/test/resources/index/zorder/updated-column-stats-index-table.json rename to hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-column-stats-index-table.json diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json new file mode 100644 index 0000000000000..8552fd3592cda --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/index/colstats/updated-partial-column-stats-index-table.json @@ -0,0 +1,8 @@ +{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_nullCount":15,"c3_nullCount":15,"valueCount":15} +{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_nullCount":12,"c3_nullCount":12,"valueCount":12} +{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_nullCount":7,"c3_nullCount":7,"valueCount":7} +{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9} +{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_nullCount":6,"c3_nullCount":6,"valueCount":6} +{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8} +{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10} +{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 841041e40cc99..75d3ce0b71287 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -63,6 +63,10 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup initPath() initSparkContexts() initFileSystem() + + setTableName("hoodie_test") + initMetaClient() + spark = sqlContext.sparkSession } @@ -93,10 +97,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" ) ++ metadataOpts - setTableName("hoodie_test") - initMetaClient() - - val sourceJSONTablePath = getClass.getClassLoader.getResource("index/zorder/input-table-json").toString + val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString // NOTE: Schema here is provided for validation that the input date is in the appropriate format val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) @@ -118,14 +119,14 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup .fromProperties(toProperties(metadataOpts)) .build() - val targetColumnsToRead: Seq[String] = { + val requestedColumns: Seq[String] = { // Providing empty seq of columns to [[readColumnStatsIndex]] will lead to the whole // MT to be read, and subsequently filtered if (testCase.readFullMetadataTable) Seq.empty else sourceTableSchema.fieldNames } - val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead) + val colStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) val transposedColStatsDF = transposeColumnStatsIndex(spark, colStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) val expectedColStatsSchema = composeIndexSchema(sourceTableSchema.fieldNames, sourceTableSchema) @@ -134,7 +135,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup val expectedColStatsIndexTableDf = spark.read .schema(expectedColStatsSchema) - .json(getClass.getClassLoader.getResource("index/zorder/column-stats-index-table.json").toString) + .json(getClass.getClassLoader.getResource("index/colstats/column-stats-index-table.json").toString) assertEquals(expectedColStatsIndexTableDf.schema, transposedColStatsDF.schema) // NOTE: We have to drop the `fileName` column as it contains semi-random components @@ -149,7 +150,7 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(transposedColStatsDF))) // do an upsert and validate - val updateJSONTablePath = getClass.getClassLoader.getResource("index/zorder/another-input-table-json").toString + val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/another-input-table-json").toString val updateDF = spark.read .schema(sourceTableSchema) .json(updateJSONTablePath) @@ -165,13 +166,13 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup metaClient = HoodieTableMetaClient.reload(metaClient) - val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, targetColumnsToRead) + val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, sourceTableSchema.fieldNames, sourceTableSchema) val expectedColStatsIndexUpdatedDF = spark.read .schema(expectedColStatsSchema) - .json(getClass.getClassLoader.getResource("index/zorder/updated-column-stats-index-table.json").toString) + .json(getClass.getClassLoader.getResource("index/colstats/updated-column-stats-index-table.json").toString) assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) @@ -183,6 +184,153 @@ class TestColumnStatsIndex extends HoodieClientTestBase with ColumnStatsIndexSup assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) } + @Test + def testMetadataColumnStatsIndexPartialProjection(): Unit = { + val targetColumnsToIndex = Seq("c1", "c2", "c3") + + val metadataOpts = Map( + HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",") + ) + + val opts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + RECORDKEY_FIELD.key -> "c1", + PRECOMBINE_FIELD.key -> "c1", + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + ) ++ metadataOpts + + val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString + + // NOTE: Schema here is provided for validation that the input date is in the appropriate format + val inputDF = spark.read.schema(sourceTableSchema).json(sourceJSONTablePath) + + inputDF + .sort("c1") + .repartition(4, new Column("c1")) + .write + .format("hudi") + .options(opts) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + + metaClient = HoodieTableMetaClient.reload(metaClient) + + val metadataConfig = HoodieMetadataConfig.newBuilder() + .fromProperties(toProperties(metadataOpts)) + .build() + + //////////////////////////////////////////////////////////////////////// + // Case #1: Empty CSI projection + // Projection is requested for columns which are NOT indexed + // by the CSI + //////////////////////////////////////////////////////////////////////// + + { + // These are NOT indexed + val requestedColumns = Seq("c4") + + val emptyColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) + val emptyTransposedColStatsDF = transposeColumnStatsIndex(spark, emptyColStatsDF, requestedColumns, sourceTableSchema) + + assertEquals(0, emptyColStatsDF.collect().length) + assertEquals(0, emptyTransposedColStatsDF.collect().length) + } + + //////////////////////////////////////////////////////////////////////// + // Case #2: Partial CSI projection + // Projection is requested for set of columns some of which are + // NOT indexed by the CSI + //////////////////////////////////////////////////////////////////////// + + { + // We have to include "c1", since we sort the expected outputs by this column + val requestedColumns = Seq("c1", "c4") + + val partialColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) + val partialTransposedColStatsDF = transposeColumnStatsIndex(spark, partialColStatsDF, requestedColumns, sourceTableSchema) + + val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns) + val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema) + + // Match against expected column stats table + val expectedColStatsIndexTableDf = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource("index/colstats/partial-column-stats-index-table.json").toString) + + assertEquals(expectedColStatsIndexTableDf.schema, partialTransposedColStatsDF.schema) + // NOTE: We have to drop the `fileName` column as it contains semi-random components + // that we can't control in this test. Nevertheless, since we manually verify composition of the + // ColStats Index by reading Parquet footers from individual Parquet files, this is not an issue + assertEquals(asJson(sort(expectedColStatsIndexTableDf)), asJson(sort(partialTransposedColStatsDF.drop("fileName")))) + + // Collect Column Stats manually (reading individual Parquet files) + val manualColStatsTableDF = + buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema) + + assertEquals(asJson(sort(manualColStatsTableDF)), asJson(sort(partialTransposedColStatsDF))) + } + + //////////////////////////////////////////////////////////////////////// + // Case #3: Aligned CSI projection + // Projection is requested for set of columns some of which are + // indexed only for subset of files + //////////////////////////////////////////////////////////////////////// + + { + // NOTE: The update we're writing is intentionally omitting some of the columns + // present in an earlier source + val missingCols = Seq("c2", "c3") + val partialSourceTableSchema = StructType(sourceTableSchema.fields.filterNot(f => missingCols.contains(f.name))) + + val updateJSONTablePath = getClass.getClassLoader.getResource("index/colstats/partial-another-input-table-json").toString + val updateDF = spark.read + .schema(partialSourceTableSchema) + .json(updateJSONTablePath) + + updateDF.repartition(4) + .write + .format("hudi") + .options(opts) + .option(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key, 10 * 1024) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + metaClient = HoodieTableMetaClient.reload(metaClient) + + val requestedColumns = sourceTableSchema.fieldNames + + // Nevertheless, the last update was written with a new schema (that is a subset of the original table schema), + // we should be able to read CSI, which will be properly padded (with nulls) after transposition + val updatedColStatsDF = readColumnStatsIndex(spark, basePath, metadataConfig, requestedColumns) + val transposedUpdatedColStatsDF = transposeColumnStatsIndex(spark, updatedColStatsDF, requestedColumns, sourceTableSchema) + + val targetIndexedColumns = targetColumnsToIndex.intersect(requestedColumns) + val expectedColStatsSchema = composeIndexSchema(targetIndexedColumns, sourceTableSchema) + + val expectedColStatsIndexUpdatedDF = + spark.read + .schema(expectedColStatsSchema) + .json(getClass.getClassLoader.getResource("index/colstats/updated-partial-column-stats-index-table.json").toString) + + assertEquals(expectedColStatsIndexUpdatedDF.schema, transposedUpdatedColStatsDF.schema) + assertEquals(asJson(sort(expectedColStatsIndexUpdatedDF)), asJson(sort(transposedUpdatedColStatsDF.drop("fileName")))) + + // Collect Column Stats manually (reading individual Parquet files) + val manualUpdatedColStatsTableDF = + buildColumnStatsTableManually(basePath, targetIndexedColumns, expectedColStatsSchema) + + assertEquals(asJson(sort(manualUpdatedColStatsTableDF)), asJson(sort(transposedUpdatedColStatsDF))) + } + } + @Test def testParquetMetadataRangeExtraction(): Unit = { val df = generateRandomDataFrame(spark)