From 176ca585718ec15394db4eca3896accbbe392a9e Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Wed, 15 Nov 2023 17:14:50 +0800 Subject: [PATCH 1/6] [HUDI-7110] Add call procedure for show column stats information --- .../apache/hudi/ColumnStatsIndexSupport.scala | 2 +- .../command/procedures/HoodieProcedures.scala | 1 + ...howMetadataTableColumnStatsProcedure.scala | 105 ++++++++++++++++++ .../procedure/TestMetadataProcedure.scala | 50 +++++++++ 4 files changed, 157 insertions(+), 1 deletion(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index dd76aee2f187b..9cdb15092b08d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -309,7 +309,7 @@ class ColumnStatsIndexSupport(spark: SparkSession, colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*) } - private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = { + def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = { // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by // - Fetching the records from CSI by key-prefixes (encoded column names) // - Extracting [[HoodieMetadataColumnStats]] records diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index ad63ddbb29eeb..1a960ecb8fd6b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -66,6 +66,7 @@ object HoodieProcedures { ,(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder) ,(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder) ,(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder) + ,(ShowMetadataTableColumnStatsProcedure.NAME, ShowMetadataTableColumnStatsProcedure.builder) ,(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder) ,(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder) ,(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala new file mode 100644 index 0000000000000..aa1303918d07c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.avro.generic.IndexedRecord +import org.apache.hudi.avro.model.{BooleanWrapper, BytesWrapper, DateWrapper, DecimalWrapper, DoubleWrapper, FloatWrapper, HoodieMetadataColumnStats, IntWrapper, LongWrapper, StringWrapper, TimeMicrosWrapper, TimestampMicrosWrapper} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.data.HoodieData +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.Supplier + + +class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "targetColumns", DataTypes.StringType) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("min_value", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("max_value", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("null_num", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)) + val targetColumns = getArgValueOrDefault(args, PARAMETERS(1)).getOrElse("").toString + val targetColumnsSeq = targetColumns.split(",").toSeq + val basePath = getBasePath(table) + val metadataConfig = HoodieMetadataConfig.newBuilder + .enable(true) + .build + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val schemaUtil = new TableSchemaResolver(metaClient) + var schema = AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) + val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, false) + + val rows = new util.ArrayList[Row] + colStatsRecords.collectAsList() + .stream() + .forEach(c => { + rows.add(Row(c.getFileName, c.getColumnName, getColumnStatsValue(c.getMinValue), getColumnStatsValue(c.getMaxValue), c.getNullCount.longValue())) + }) + rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + } + + def getColumnStatsValue(stats_value: Any): String = { + stats_value match { + case _: IntWrapper | + _: BooleanWrapper | + _: BytesWrapper | + _: DateWrapper | + _: DecimalWrapper | + _: DoubleWrapper | + _: FloatWrapper | + _: LongWrapper | + _: StringWrapper | + _: TimeMicrosWrapper | + _: TimestampMicrosWrapper => + String.valueOf(stats_value.asInstanceOf[IndexedRecord].get(0)) + case _ => throw new Exception("Unsupported type.") + } + } + + override def build: Procedure = new ShowMetadataTableColumnStatsProcedure() +} + +object ShowMetadataTableColumnStatsProcedure { + val NAME = "show_metadata_table_column_stats" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowMetadataTableColumnStatsProcedure() + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala index c618c227ce1d1..a1cff2e03a0de 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -91,6 +91,56 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call show_metadata_table_column_stats Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | c1 int, + | c2 boolean, + | c3 binary, + | c4 date, + | c5 decimal, + | c6 double, + | c7 float, + | c8 long, + | c9 string, + | c10 timestamp + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'c1', + | preCombineField = 'c8', + | hoodie.metadata.enable="true", + | hoodie.metadata.index.column.stats.enable="true" + | ) + """.stripMargin) + // insert data to table + + spark.sql( + s""" + |insert into table $tableName + |values (1, true, CAST('binary data' AS BINARY), CAST('2021-01-01' AS DATE), 10.5, 3.14, 2.5, 1000, 'example string', CAST('2021-01-01 00:00:00' AS TIMESTAMP)) + |""".stripMargin) + spark.sql( + s""" + |insert into table $tableName + |values (10, false, CAST('binary data' AS BINARY), CAST('2022-02-02' AS DATE), 20.5, 6.28, 3.14, 2000, 'another string', CAST('2022-02-02 00:00:00' AS TIMESTAMP)) + |""".stripMargin) + + // collect column stats for table + for (i <- 3 to 3) { + val columnName = s"c$i" + val metadataStats = spark.sql(s"""call show_metadata_table_column_stats(table => '$tableName', targetColumns => '$columnName')""").collect() + assertResult(2) { + metadataStats.length + } + } + } + } + test("Test Call show_metadata_table_stats Procedure") { withTempDir { tmp => val tableName = generateTableName From 64444a28e3e403a80915a14859011522a2b44fd0 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Thu, 16 Nov 2023 20:11:56 +0800 Subject: [PATCH 2/6] [HUDI-7110] Add call procedure for show column stats information --- .../apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala index a1cff2e03a0de..a2a5252aafaba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -131,7 +131,7 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { |""".stripMargin) // collect column stats for table - for (i <- 3 to 3) { + for (i <- 1 to 10) { val columnName = s"c$i" val metadataStats = spark.sql(s"""call show_metadata_table_column_stats(table => '$tableName', targetColumns => '$columnName')""").collect() assertResult(2) { From d1ef73322fc84fe62a9d1afebbcc5e3c1b7ae694 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Fri, 17 Nov 2023 15:16:35 +0800 Subject: [PATCH 3/6] [HUDI-7110] Add call procedure for show column stats information --- .../ShowMetadataTableColumnStatsProcedure.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala index aa1303918d07c..efb5034d4e29c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala @@ -78,9 +78,7 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure stats_value match { case _: IntWrapper | _: BooleanWrapper | - _: BytesWrapper | _: DateWrapper | - _: DecimalWrapper | _: DoubleWrapper | _: FloatWrapper | _: LongWrapper | @@ -88,6 +86,12 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure _: TimeMicrosWrapper | _: TimestampMicrosWrapper => String.valueOf(stats_value.asInstanceOf[IndexedRecord].get(0)) + case _: BytesWrapper => + var bytes_value = stats_value.asInstanceOf[BytesWrapper].getValue + util.Arrays.toString(bytes_value.array()) + case _: DecimalWrapper => + var decimal_value = stats_value.asInstanceOf[DecimalWrapper].getValue + util.Arrays.toString(decimal_value.array()) case _ => throw new Exception("Unsupported type.") } } From 435fd8c81acf0d06b3100ec38437ee8c4d9a20ce Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Tue, 21 Nov 2023 16:09:10 +0800 Subject: [PATCH 4/6] [HUDI-7110] Add call procedure for show column stats information --- ...howMetadataTableColumnStatsProcedure.scala | 88 ++++++++++++++++--- .../procedure/TestMetadataProcedure.scala | 22 ++++- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala index efb5034d4e29c..d47134c8dcb82 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala @@ -18,23 +18,35 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.avro.generic.IndexedRecord -import org.apache.hudi.avro.model.{BooleanWrapper, BytesWrapper, DateWrapper, DecimalWrapper, DoubleWrapper, FloatWrapper, HoodieMetadataColumnStats, IntWrapper, LongWrapper, StringWrapper, TimeMicrosWrapper, TimestampMicrosWrapper} +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.avro.model._ +import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.data.HoodieData +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, HoodieInstant} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.metadata.HoodieTableMetadata import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport} import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import java.util -import java.util.function.Supplier +import java.util.function.{Function, Supplier} +import scala.collection.{JavaConversions, mutable} +import scala.jdk.CollectionConverters.{asScalaBufferConverter, asScalaIteratorConverter} class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with ProcedureBuilder with Logging { private val PARAMETERS = Array[ProcedureParameter]( ProcedureParameter.required(0, "table", DataTypes.StringType), - ProcedureParameter.optional(1, "targetColumns", DataTypes.StringType) + ProcedureParameter.optional(1, "partition", DataTypes.StringType), + ProcedureParameter.optional(2, "targetColumns", DataTypes.StringType) ) private val OUTPUT_TYPE = new StructType(Array[StructField]( @@ -53,7 +65,10 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure super.checkArgs(PARAMETERS, args) val table = getArgValueOrDefault(args, PARAMETERS(0)) - val targetColumns = getArgValueOrDefault(args, PARAMETERS(1)).getOrElse("").toString + val partitions = getArgValueOrDefault(args, PARAMETERS(1)).getOrElse("").toString + val partitionsSeq = partitions.split(",").filter(_.nonEmpty).toSeq + + val targetColumns = getArgValueOrDefault(args, PARAMETERS(2)).getOrElse("").toString val targetColumnsSeq = targetColumns.split(",").toSeq val basePath = getBasePath(table) val metadataConfig = HoodieMetadataConfig.newBuilder @@ -64,14 +79,33 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure var schema = AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, false) + val fsView = buildFileSystemView(table) + val allFileSlices: Set[FileSlice] = { + if (partitionsSeq.isEmpty) { + val engineCtx = new HoodieSparkEngineContext(jsc) + val metaTable = HoodieTableMetadata.create(engineCtx, metadataConfig, basePath) + metaTable.getAllPartitionPaths + .asScala + .flatMap(path => fsView.getLatestFileSlices(path).iterator().asScala) + .toSet + } else { + partitionsSeq + .flatMap(partition => fsView.getLatestFileSlices(partition).iterator().asScala) + .toSet + } + } + + val allFileNames: Set[String] = allFileSlices.map(_.getBaseFile.get().getFileName) + + val rows = mutable.ListBuffer[Row]() + colStatsRecords.collectAsList().asScala + .filter(c => allFileNames.contains(c.getFileName)) + .foreach { c => + rows += Row(c.getFileName, c.getColumnName, getColumnStatsValue(c.getMinValue), + getColumnStatsValue(c.getMaxValue), c.getNullCount.longValue()) + } - val rows = new util.ArrayList[Row] - colStatsRecords.collectAsList() - .stream() - .forEach(c => { - rows.add(Row(c.getFileName, c.getColumnName, getColumnStatsValue(c.getMinValue), getColumnStatsValue(c.getMaxValue), c.getNullCount.longValue())) - }) - rows.stream().toArray().map(r => r.asInstanceOf[Row]).toList + rows.toList } def getColumnStatsValue(stats_value: Any): String = { @@ -87,15 +121,41 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure _: TimestampMicrosWrapper => String.valueOf(stats_value.asInstanceOf[IndexedRecord].get(0)) case _: BytesWrapper => - var bytes_value = stats_value.asInstanceOf[BytesWrapper].getValue + val bytes_value = stats_value.asInstanceOf[BytesWrapper].getValue util.Arrays.toString(bytes_value.array()) case _: DecimalWrapper => - var decimal_value = stats_value.asInstanceOf[DecimalWrapper].getValue + val decimal_value = stats_value.asInstanceOf[DecimalWrapper].getValue util.Arrays.toString(decimal_value.array()) - case _ => throw new Exception("Unsupported type.") + case _ => + throw new HoodieException(s"Unsupported type: ${stats_value.getClass.getSimpleName}") } } + def buildFileSystemView(table: Option[Any]): HoodieTableFileSystemView = { + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val fs = metaClient.getFs + val globPath = s"$basePath/*/*/*" + val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath)) + + val timeline = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants() + + val maxInstant = metaClient.createNewInstantTime() + val instants = timeline.getInstants.iterator().asScala.filter(_.getTimestamp < maxInstant) + + val details = new Function[HoodieInstant, org.apache.hudi.common.util.Option[Array[Byte]]] + with java.io.Serializable { + override def apply(instant: HoodieInstant): HOption[Array[Byte]] = { + metaClient.getActiveTimeline.getInstantDetails(instant) + } + } + + val filteredTimeline = new HoodieDefaultTimeline( + new java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(), details) + + new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](statuses.size))) + } + override def build: Procedure = new ShowMetadataTableColumnStatsProcedure() } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala index a2a5252aafaba..0b53bb5e14d2a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -130,12 +130,28 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { |values (10, false, CAST('binary data' AS BINARY), CAST('2022-02-02' AS DATE), 20.5, 6.28, 3.14, 2000, 'another string', CAST('2022-02-02 00:00:00' AS TIMESTAMP)) |""".stripMargin) - // collect column stats for table + // Only numerical and string types are compared for clarity on min/max values. + val expectedValues = Map( + 1 -> ("1", "10"), + 2 -> ("false", "true"), + 6 -> ("3.14", "6.28"), + 7 -> ("2.5", "3.14"), + 8 -> ("1000", "2000"), + 9 -> ("another string", "example string") + ) + for (i <- 1 to 10) { val columnName = s"c$i" val metadataStats = spark.sql(s"""call show_metadata_table_column_stats(table => '$tableName', targetColumns => '$columnName')""").collect() - assertResult(2) { - metadataStats.length + assertResult(1)(metadataStats.length) + val minVal: String = metadataStats(0).getAs[String]("min_value") + val maxVal: String = metadataStats(0).getAs[String]("max_value") + + expectedValues.get(i) match { + case Some((expectedMin, expectedMax)) => + assertResult(expectedMin)(minVal) + assertResult(expectedMax)(maxVal) + case None => // Do nothing if no expected values found } } } From 946ec17878c5110741fa3b1e3bbff4fc804d77ed Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Wed, 22 Nov 2023 13:49:49 +0800 Subject: [PATCH 5/6] [HUDI-7110] Add call procedure for show column stats information --- .../procedures/ShowMetadataTableColumnStatsProcedure.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala index d47134c8dcb82..60aa0f054b9c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala @@ -76,9 +76,9 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure .build val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build val schemaUtil = new TableSchemaResolver(metaClient) - var schema = AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + val schema = AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) - val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, false) + val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, shouldReadInMemory = false) val fsView = buildFileSystemView(table) val allFileSlices: Set[FileSlice] = { if (partitionsSeq.isEmpty) { @@ -108,7 +108,7 @@ class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with Procedure rows.toList } - def getColumnStatsValue(stats_value: Any): String = { + private def getColumnStatsValue(stats_value: Any): String = { stats_value match { case _: IntWrapper | _: BooleanWrapper | From a7f986bd546e2c38c241ee743734dbec491b0351 Mon Sep 17 00:00:00 2001 From: majian1998 <1253791041@qq.com> Date: Wed, 22 Nov 2023 20:15:25 +0800 Subject: [PATCH 6/6] [HUDI-7110] Add call procedure for show column stats information --- .../spark/sql/hudi/procedure/TestMetadataProcedure.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala index 0b53bb5e14d2a..b3ce71c70eb96 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -102,7 +102,7 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { | c2 boolean, | c3 binary, | c4 date, - | c5 decimal, + | c5 decimal(10,1), | c6 double, | c7 float, | c8 long, @@ -122,12 +122,12 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { spark.sql( s""" |insert into table $tableName - |values (1, true, CAST('binary data' AS BINARY), CAST('2021-01-01' AS DATE), 10.5, 3.14, 2.5, 1000, 'example string', CAST('2021-01-01 00:00:00' AS TIMESTAMP)) + |values (1, true, CAST('binary data' AS BINARY), CAST('2021-01-01' AS DATE), CAST(10.5 AS DECIMAL(10,1)), CAST(3.14 AS DOUBLE), CAST(2.5 AS FLOAT), 1000, 'example string', CAST('2021-01-01 00:00:00' AS TIMESTAMP)) |""".stripMargin) spark.sql( s""" |insert into table $tableName - |values (10, false, CAST('binary data' AS BINARY), CAST('2022-02-02' AS DATE), 20.5, 6.28, 3.14, 2000, 'another string', CAST('2022-02-02 00:00:00' AS TIMESTAMP)) + |values (10, false, CAST('binary data' AS BINARY), CAST('2022-02-02' AS DATE), CAST(20.5 AS DECIMAL(10,1)), CAST(6.28 AS DOUBLE), CAST(3.14 AS FLOAT), 2000, 'another string', CAST('2022-02-02 00:00:00' AS TIMESTAMP)) |""".stripMargin) // Only numerical and string types are compared for clarity on min/max values.