Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.data.HoodieJavaRDD
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata, HoodieTableMetadataUtil, MetadataPartitionType}
Expand Down Expand Up @@ -113,59 +114,88 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
*
* @param spark Spark session ref
* @param colStatsDF [[DataFrame]] bearing raw Column Stats Index table
* @param targetColumns target columns to be included into the final table
* @param queryColumns target columns to be included into the final table
* @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, targetColumns: Seq[String], tableSchema: StructType): DataFrame = {
def transposeColumnStatsIndex(spark: SparkSession, colStatsDF: DataFrame, queryColumns: Seq[String], tableSchema: StructType): DataFrame = {
val colStatsSchema = colStatsDF.schema
val colStatsSchemaOrdinalsMap = colStatsSchema.fields.zipWithIndex.map({
case (field, ordinal) => (field.name, ordinal)
}).toMap

val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap

// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedColumns = TreeSet(targetColumns: _*)

val colNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
val minValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE)
val maxValueOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE)
val fileNameOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
val nullCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT)
val valueCountOrdinal = colStatsSchemaOrdinalsMap(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT)

val transposedRDD = colStatsDF.rdd
.filter(row => sortedColumns.contains(row.getString(colNameOrdinal)))
.map { row =>
val (minValue, _) = tryUnpackNonNullVal(row.getAs[Row](minValueOrdinal))
val (maxValue, _) = tryUnpackNonNullVal(row.getAs[Row](maxValueOrdinal))

val colName = row.getString(colNameOrdinal)
val colType = tableSchemaFieldMap(colName).dataType
// NOTE: We have to collect list of indexed columns to make sure we properly align the rows
// w/in the transposed dataset: since some files might not have all of the columns indexed
// either due to the Column Stats Index config changes, schema evolution, etc, we have
// to make sure that all of the rows w/in transposed data-frame are properly padded (with null
// values) for such file-column combinations
val indexedColumns: Seq[String] = colStatsDF.rdd.map(row => row.getString(colNameOrdinal)).distinct().collect()
Comment on lines +136 to +141
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do this one level above in readColumnStatsIndex so that colStatsDF itself is correctly populated and transposeColumnStatsIndex simply transposes as today?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

colStatsDF is populated correctly -- it bears 1 row / column (let's call it "row-based"), therefore for all columns in a file we will have N rows corresponding to it (eq to the # of columns in that file).

Transposed table is "column-based", ie there's 1 row / file and each column's stat is mapped to a column in such view. Therefore only in that view we have a need to align the rows (to pad them).


val rowValsSeq = row.toSeq.toArray

rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)
// NOTE: We're sorting the columns to make sure final index schema matches layout
// of the transposed table
val sortedTargetColumns = TreeSet(queryColumns.intersect(indexedColumns): _*)

Row(rowValsSeq:_*)
val transposedRDD = colStatsDF.rdd
.filter(row => sortedTargetColumns.contains(row.getString(colNameOrdinal)))
.map { row =>
if (row.isNullAt(minValueOrdinal) && row.isNullAt(maxValueOrdinal)) {
// Corresponding row could be null in either of the 2 cases
// - Column contains only null values (in that case both min/max have to be nulls)
// - This is a stubbed Column Stats record (used as a tombstone)
row
} else {
val minValueStruct = row.getAs[Row](minValueOrdinal)
val maxValueStruct = row.getAs[Row](maxValueOrdinal)

checkState(minValueStruct != null && maxValueStruct != null, "Invalid Column Stats record: either both min/max have to be null, or both have to be non-null")

val colName = row.getString(colNameOrdinal)
val colType = tableSchemaFieldMap(colName).dataType

val (minValue, _) = tryUnpackNonNullVal(minValueStruct)
val (maxValue, _) = tryUnpackNonNullVal(maxValueStruct)
val rowValsSeq = row.toSeq.toArray
// Update min-/max-value structs w/ unwrapped values in-place
rowValsSeq(minValueOrdinal) = deserialize(minValue, colType)
rowValsSeq(maxValueOrdinal) = deserialize(maxValue, colType)

Row(rowValsSeq: _*)
}
}
.groupBy(r => r.getString(fileNameOrdinal))
.foldByKey(Seq[Row]()) {
case (_, columnRows) =>
case (_, columnRowsSeq) =>
// Rows seq is always non-empty (otherwise it won't be grouped into)
val fileName = columnRows.head.get(fileNameOrdinal)
val valueCount = columnRows.head.get(valueCountOrdinal)

val coalescedRowValuesSeq = columnRows.toSeq
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout
.sortBy(_.getString(colNameOrdinal))
.foldLeft(Seq[Any](fileName, valueCount)) {
case (acc, columnRow) =>
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnRow.get(ord))
val fileName = columnRowsSeq.head.get(fileNameOrdinal)
val valueCount = columnRowsSeq.head.get(valueCountOrdinal)

// To properly align individual rows (corresponding to a file) w/in the transposed projection, we need
// to align existing column-stats for individual file with the list of expected ones for the
// whole transposed projection (a superset of all files)
val columnRowsMap = columnRowsSeq.map(row => (row.getString(colNameOrdinal), row)).toMap
val alignedColumnRowsSeq = sortedTargetColumns.toSeq.map(columnRowsMap.get)

val coalescedRowValuesSeq =
alignedColumnRowsSeq.foldLeft(Seq[Any](fileName, valueCount)) {
case (acc, opt) =>
opt match {
case Some(columnStatsRow) =>
acc ++ Seq(minValueOrdinal, maxValueOrdinal, nullCountOrdinal).map(ord => columnStatsRow.get(ord))
case None =>
// NOTE: Since we're assuming missing column to essentially contain exclusively
// null values, we set null-count to be equal to value-count (this behavior is
// consistent with reading non-existent columns from Parquet)
acc ++ Seq(null, null, valueCount)
}
}

Seq(Row(coalescedRowValuesSeq:_*))
Expand All @@ -176,7 +206,7 @@ trait ColumnStatsIndexSupport extends SparkAdapterSupport {
// NOTE: It's crucial to maintain appropriate ordering of the columns
// matching table layout: hence, we cherry-pick individual columns
// instead of simply filtering in the ones we're interested in the schema
val indexSchema = composeIndexSchema(sortedColumns.toSeq, tableSchema)
val indexSchema = composeIndexSchema(sortedTargetColumns.toSeq, tableSchema)

spark.createDataFrame(transposedRDD, indexSchema)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"c1":770,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-01-15","c7":"Ag==","c8":9}
{"c1":768,"c4":"2021-11-18T23:34:44.201-08:00","c5":78,"c6":"2020-10-13","c7":"AA==","c8":9}
{"c1":431,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-03-12","c7":"rw==","c8":9}
{"c1":427,"c4":"2021-11-18T23:34:44.186-08:00","c5":44,"c6":"2020-10-08","c7":"qw==","c8":9}
{"c1":328,"c4":"2021-11-18T23:34:44.181-08:00","c5":34,"c6":"2020-10-21","c7":"SA==","c8":9}
{"c1":320,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-02-13","c7":"QA==","c8":9}
{"c1":317,"c4":"2021-11-18T23:34:44.180-08:00","c5":33,"c6":"2020-10-10","c7":"PQ==","c8":9}
{"c1":308,"c4":"2021-11-18T23:34:44.180-08:00","c5":32,"c6":"2020-01-01","c7":"NA==","c8":9}
{"c1":304,"c4":"2021-11-18T23:34:44.179-08:00","c5":32,"c6":"2020-08-25","c7":"MA==","c8":9}
{"c1":300,"c4":"2021-11-18T23:34:44.179-08:00","c5":31,"c6":"2020-04-21","c7":"LA==","c8":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"c1":719,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-05-20","c7":"zw==","c8":9}
{"c1":715,"c4":"2021-11-18T23:34:44.199-08:00","c5":73,"c6":"2020-01-16","c7":"yw==","c8":9}
{"c1":579,"c4":"2021-11-18T23:34:44.193-08:00","c5":59,"c6":"2020-08-20","c7":"Qw==","c8":9}
{"c1":568,"c4":"2021-11-18T23:34:44.193-08:00","c5":58,"c6":"2020-08-09","c7":"OA==","c8":9}
{"c1":367,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-05-04","c7":"bw==","c8":9}
{"c1":364,"c4":"2021-11-18T23:34:44.183-08:00","c5":38,"c6":"2020-02-01","c7":"bA==","c8":9}
{"c1":250,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-09-27","c7":"+g==","c8":9}
{"c1":249,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-08-26","c7":"+Q==","c8":9}
{"c1":246,"c4":"2021-11-18T23:34:44.176-08:00","c5":26,"c6":"2020-05-23","c7":"9g==","c8":9}
{"c1":125,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-05-14","c7":"fQ==","c8":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"c1":486,"c4":"2021-11-18T23:34:44.189-08:00","c5":50,"c6":"2020-03-11","c7":"5g==","c8":9}
{"c1":483,"c4":"2021-11-18T23:34:44.189-08:00","c5":49,"c6":"2020-11-08","c7":"4w==","c8":9}
{"c1":224,"c4":"2021-11-18T23:34:44.175-08:00","c5":24,"c6":"2020-05-01","c7":"4A==","c8":9}
{"c1":118,"c4":"2021-11-18T23:34:44.168-08:00","c5":13,"c6":"2020-09-07","c7":"dg==","c8":9}
{"c1":111,"c4":"2021-11-18T23:34:44.168-08:00","c5":12,"c6":"2020-02-28","c7":"bw==","c8":9}
{"c1":79,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-03-24","c7":"Tw==","c8":9}
{"c1":77,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-01-22","c7":"TQ==","c8":9}
{"c1":76,"c4":"2021-11-18T23:34:44.166-08:00","c5":9,"c6":"2020-11-21","c7":"TA==","c8":9}
{"c1":60,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-06-05","c7":"PA==","c8":9}
{"c1":59,"c4":"2021-11-18T23:34:44.164-08:00","c5":7,"c6":"2020-05-04","c7":"Ow==","c8":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"c1":272,"c4":"2021-11-18T23:34:44.178-08:00","c5":28,"c6":"2020-09-21","c7":"EA==","c8":9}
{"c1":258,"c4":"2021-11-18T23:34:44.177-08:00","c5":27,"c6":"2020-06-07","c7":"Ag==","c8":9}
{"c1":240,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-10-17","c7":"8A==","c8":9}
{"c1":236,"c4":"2021-11-18T23:34:44.176-08:00","c5":25,"c6":"2020-06-13","c7":"7A==","c8":9}
{"c1":137,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-06-26","c7":"iQ==","c8":9}
{"c1":134,"c4":"2021-11-18T23:34:44.170-08:00","c5":15,"c6":"2020-03-23","c7":"hg==","c8":9}
{"c1":131,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-11-20","c7":"gw==","c8":9}
{"c1":129,"c4":"2021-11-18T23:34:44.169-08:00","c5":14,"c6":"2020-09-18","c7":"gQ==","c8":9}
{"c1":24,"c4":"2021-11-18T23:34:44.161-08:00","c5":4,"c6":"2020-03-25","c7":"GA==","c8":9}
{"c1":8,"c4":"2021-11-18T23:34:44.159-08:00","c5":2,"c6":"2020-09-09","c7":"CA==","c8":9}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"valueCount":9}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"valueCount":10}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"valueCount":13}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"c1_maxValue":568,"c1_minValue":8,"c1_nullCount":0,"c2_nullCount":15,"c3_nullCount":15,"valueCount":15}
{"c1_maxValue":715,"c1_minValue":76,"c1_nullCount":0,"c2_nullCount":12,"c3_nullCount":12,"valueCount":12}
{"c1_maxValue":768,"c1_minValue":59,"c1_nullCount":0,"c2_nullCount":7,"c3_nullCount":7,"valueCount":7}
{"c1_maxValue":769,"c1_minValue":309,"c1_nullCount":0,"c2_maxValue":" 769sdc","c2_minValue":" 309sdc","c2_nullCount":0,"c3_maxValue":919.769,"c3_minValue":76.430,"c3_nullCount":0,"valueCount":9}
{"c1_maxValue":770,"c1_minValue":129,"c1_nullCount":0,"c2_nullCount":6,"c3_nullCount":6,"valueCount":6}
{"c1_maxValue":932,"c1_minValue":0,"c1_nullCount":0,"c2_maxValue":" 932sdc","c2_minValue":" 0sdc","c2_nullCount":0,"c3_maxValue":994.355,"c3_minValue":19.000,"c3_nullCount":0,"valueCount":8}
{"c1_maxValue":943,"c1_minValue":89,"c1_nullCount":0,"c2_maxValue":" 943sdc","c2_minValue":" 200sdc","c2_nullCount":0,"c3_maxValue":854.690,"c3_minValue":100.556,"c3_nullCount":0,"valueCount":10}
{"c1_maxValue":959,"c1_minValue":74,"c1_nullCount":0,"c2_maxValue":" 959sdc","c2_minValue":" 181sdc","c2_nullCount":0,"c3_maxValue":980.213,"c3_minValue":38.740,"c3_nullCount":0,"valueCount":13}
Loading