diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 73c66d8038e5d..e0e31ee587de4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -142,6 +142,17 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.9.0") .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); + /** + * Using space-filling curves to optimize the layout of table to boost query performance. + * The table data which sorted by space-filling curve has better aggregation; + * combine with min-max filtering, it can achieve good performance improvement. + * + * Notice: + * when we use this feature, we need specify the sort columns. + * The more columns involved in sorting, the worse the aggregation, and the smaller the query performance improvement. + * Choose the filter columns which commonly used in query sql as sort columns. + * It is recommend that 2 ~ 4 columns participate in sorting. + */ public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable") .defaultValue(false) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java index 7ba1c9465bfd0..d2bd257f1ba4c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java @@ -230,18 +230,14 @@ public static Dataset getMinMaxValue(Dataset df, List cols) { rows.add(currentColRangeMetaData.getMinValue()); rows.add(currentColRangeMetaData.getMaxValue()); } else if (colType instanceof StringType) { - String minString = new String(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); - String maxString = new String(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); - rows.add(minString); - rows.add(maxString); + rows.add(currentColRangeMetaData.getMinValueAsString()); + rows.add(currentColRangeMetaData.getMaxValueAsString()); } else if (colType instanceof DecimalType) { - Double minDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMinValue().toString()))); - Double maxDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMaxValue().toString()))); - rows.add(BigDecimal.valueOf(minDecimal)); - rows.add(BigDecimal.valueOf(maxDecimal)); + rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString())); + rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString())); } else if (colType instanceof DateType) { - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMinValue()))); - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMaxValue()))); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString())); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString())); } else if (colType instanceof LongType) { rows.add(currentColRangeMetaData.getMinValue()); rows.add(currentColRangeMetaData.getMaxValue()); @@ -344,6 +340,8 @@ public static void saveStatisticsInfo(Dataset df, String cols, String index List columns = Arrays.asList(statisticsDF.schema().fieldNames()); spark.sql(HoodieSparkUtils$ .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); + } else { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); } } else { statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java index ca977ae53b5f9..33491ff13ff63 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.parquet.schema.PrimitiveStringifier; - import java.util.Objects; /** @@ -30,16 +28,28 @@ public class HoodieColumnRangeMetadata { private final String columnName; private final T minValue; private final T maxValue; - private final long numNulls; - private final PrimitiveStringifier stringifier; + private long numNulls; + // For Decimal Type/Date Type, minValue/maxValue cannot represent it's original value. + // eg: when parquet collects column information, the decimal type is collected as int/binary type. + // so we cannot use minValue and maxValue directly, use minValueAsString/maxValueAsString instead. + private final String minValueAsString; + private final String maxValueAsString; - public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) { + public HoodieColumnRangeMetadata( + final String filePath, + final String columnName, + final T minValue, + final T maxValue, + long numNulls, + final String minValueAsString, + final String maxValueAsString) { this.filePath = filePath; this.columnName = columnName; this.minValue = minValue; this.maxValue = maxValue; - this.numNulls = numNulls; - this.stringifier = stringifier; + this.numNulls = numNulls == -1 ? 0 : numNulls; + this.minValueAsString = minValueAsString; + this.maxValueAsString = maxValueAsString; } public String getFilePath() { @@ -58,8 +68,12 @@ public T getMaxValue() { return this.maxValue; } - public PrimitiveStringifier getStringifier() { - return stringifier; + public String getMaxValueAsString() { + return maxValueAsString; + } + + public String getMinValueAsString() { + return minValueAsString; } public long getNumNulls() { @@ -79,12 +93,14 @@ public boolean equals(final Object o) { && Objects.equals(getColumnName(), that.getColumnName()) && Objects.equals(getMinValue(), that.getMinValue()) && Objects.equals(getMaxValue(), that.getMaxValue()) - && Objects.equals(getNumNulls(), that.getNumNulls()); + && Objects.equals(getNumNulls(), that.getNumNulls()) + && Objects.equals(getMinValueAsString(), that.getMinValueAsString()) + && Objects.equals(getMaxValueAsString(), that.getMaxValueAsString()); } @Override public int hashCode() { - return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls()); + return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls(), getMinValueAsString(), getMaxValueAsString()); } @Override @@ -94,6 +110,8 @@ public String toString() { + "columnName='" + columnName + '\'' + ", minValue=" + minValue + ", maxValue=" + maxValue - + ", numNulls=" + numNulls + '}'; + + ", numNulls=" + numNulls + + ", minValueAsString=" + minValueAsString + + ", minValueAsString=" + maxValueAsString + '}'; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index c142e8a9608be..985c9788ea8fd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -39,6 +39,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; import java.io.IOException; import java.util.ArrayList; @@ -56,6 +57,8 @@ */ public class ParquetUtils extends BaseFileUtils { + private static Object lock = new Object(); + /** * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will * return all the rowkeys. @@ -283,17 +286,38 @@ public Boolean apply(String recordKey) { /** * Parse min/max statistics stored in parquet footers for all columns. + * ParquetRead.readFooter is not a thread safe method. + * + * @param conf hadoop conf. + * @param parquetFilePath file to be read. + * @param cols cols which need to collect statistics. + * @return a HoodieColumnRangeMetadata instance. */ - public Collection> readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath, List cols) { + public Collection> readRangeFromParquetMetadata( + Configuration conf, + Path parquetFilePath, + List cols) { ParquetMetadata metadata = readMetadata(conf, parquetFilePath); // collect stats from all parquet blocks Map>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> { - return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> - new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), - columnChunkMetaData.getStatistics().genericGetMin(), - columnChunkMetaData.getStatistics().genericGetMax(), - columnChunkMetaData.getStatistics().getNumNulls(), - columnChunkMetaData.getPrimitiveType().stringifier())); + return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> { + String minAsString; + String maxAsString; + if (columnChunkMetaData.getPrimitiveType().getOriginalType() == OriginalType.DATE) { + synchronized (lock) { + minAsString = columnChunkMetaData.getStatistics().minAsString(); + maxAsString = columnChunkMetaData.getStatistics().maxAsString(); + } + } else { + minAsString = columnChunkMetaData.getStatistics().minAsString(); + maxAsString = columnChunkMetaData.getStatistics().maxAsString(); + } + return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), + columnChunkMetaData.getStatistics().genericGetMin(), + columnChunkMetaData.getStatistics().genericGetMax(), + columnChunkMetaData.getStatistics().getNumNulls(), + minAsString, maxAsString); + }); }).collect(Collectors.groupingBy(e -> e.getColumnName())); // we only intend to keep file level statistics. @@ -316,23 +340,41 @@ private HoodieColumnRangeMetadata combineRanges(HoodieColumnRangeMet HoodieColumnRangeMetadata range2) { final Comparable minValue; final Comparable maxValue; + final String minValueAsString; + final String maxValueAsString; if (range1.getMinValue() != null && range2.getMinValue() != null) { - minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? range1.getMinValue() : range2.getMinValue(); + if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) { + minValue = range1.getMinValue(); + minValueAsString = range1.getMinValueAsString(); + } else { + minValue = range2.getMinValue(); + minValueAsString = range2.getMinValueAsString(); + } } else if (range1.getMinValue() == null) { minValue = range2.getMinValue(); + minValueAsString = range2.getMinValueAsString(); } else { minValue = range1.getMinValue(); + minValueAsString = range1.getMinValueAsString(); } if (range1.getMaxValue() != null && range2.getMaxValue() != null) { - maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? range2.getMaxValue() : range1.getMaxValue(); + if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) { + maxValue = range2.getMaxValue(); + maxValueAsString = range2.getMaxValueAsString(); + } else { + maxValue = range1.getMaxValue(); + maxValueAsString = range1.getMaxValueAsString(); + } } else if (range1.getMaxValue() == null) { maxValue = range2.getMaxValue(); + maxValueAsString = range2.getMaxValueAsString(); } else { maxValue = range1.getMaxValue(); + maxValueAsString = range1.getMaxValueAsString(); } return new HoodieColumnRangeMetadata<>(range1.getFilePath(), - range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), range1.getStringifier()); + range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), minValueAsString, maxValueAsString); } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index 45a7aec142d5a..d26b669dd1511 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -100,7 +100,7 @@ object DataSkippingUtils { // query filter "colA >= b" convert it to "colA_maxValue >= b" for index table case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) => val colName = getTargetColNameParts(attribute) - GreaterThanOrEqual(maxValue(colName), right) + reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right)) // query filter "b >= colA" convert it to "colA_minValue <= b" for index table case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) => val colName = getTargetColNameParts(attribute) @@ -179,7 +179,7 @@ object DataSkippingUtils { def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = { val basePath = new Path(indexPath) basePath.getFileSystem(conf) - .listStatus(basePath).filterNot(f => f.getPath.getName.endsWith(".parquet")) + .listStatus(basePath).filter(f => f.getPath.getName.endsWith(".parquet")) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala index 06ac600b0346e..b0fcbec27d07e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala @@ -21,9 +21,11 @@ package org.apache.hudi.functional import java.sql.{Date, Timestamp} import org.apache.hadoop.fs.Path +import org.apache.hudi.common.model.HoodieFileFormat import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.ZCurveOptimizeHelper import org.apache.spark.sql._ @@ -88,8 +90,23 @@ class TestOptimizeTable extends HoodieClientTestBase { .save(basePath) assertEquals(1000, spark.read.format("hudi").load(basePath).count()) - assertEquals(1000, - spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(basePath).count()) + // use unsorted col as filter. + assertEquals(spark.read + .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count(), + spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count()) + // use sorted col as filter. + assertEquals(spark.read.format("hudi").load(basePath) + .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count(), + spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi").load(basePath) + .where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count()) + // use sorted cols and unsorted cols as filter + assertEquals(spark.read.format("hudi").load(basePath) + .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count(), + spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi").load(basePath) + .where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count()) } @Test @@ -97,10 +114,13 @@ class TestOptimizeTable extends HoodieClientTestBase { val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax") val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat") val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + val complexDataFrame = createComplexDataFrame(spark) + complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) + val df = spark.read.load(testPath.toString) try { - val complexDataFrame = createComplexDataFrame(spark) - complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) - val df = spark.read.load(testPath.toString) + // test z-order sort for all primitive type, should not throw exception. + ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1) + ZCurveOptimizeHelper.createZIndexedDataFrameBySample(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1) // do not support TimeStampType, so if we collect statistics for c4, should throw exception val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") colDf.cache() @@ -115,12 +135,59 @@ class TestOptimizeTable extends HoodieClientTestBase { ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) assertEquals(!fs.exists(new Path(statisticPath, "2")), true) assertEquals(fs.exists(new Path(statisticPath, "3")), true) + // test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved. + ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) + assertEquals(fs.exists(new Path(statisticPath, "5")), true) } finally { if (fs.exists(testPath)) fs.delete(testPath) if (fs.exists(statisticPath)) fs.delete(statisticPath) } } + // test collect min-max statistic info for DateType in the case of multithreading. + // parquet will give a wrong statistic result for DateType in the case of multithreading. + @Test + def testMultiThreadParquetFooterReadForDateType(): Unit = { + // create parquet file with DateType + val rdd = spark.sparkContext.parallelize(0 to 100, 1) + .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}"))) + val df = spark.createDataFrame(rdd, new StructType().add("id", DateType)) + val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType") + val conf = spark.sparkContext.hadoopConfiguration + val cols = new java.util.ArrayList[String] + cols.add("id") + try { + df.repartition(3).write.mode("overwrite").save(testPath.toString) + val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x) + + val realResult = new Array[(String, String)](3) + inputFiles.zipWithIndex.foreach { case (f, index) => + val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] + val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() + realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString) + } + + // multi thread read with no lock + val resUseLock = new Array[(String, String)](3) + inputFiles.zipWithIndex.par.foreach { case (f, index) => + val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] + val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() + resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString) + } + + // check resUseNoLock, + // We can't guarantee that there must be problems in the case of multithreading. + // In order to make ut pass smoothly, we will not check resUseNoLock. + // check resUseLock + // should pass assert + realResult.zip(resUseLock).foreach { case (realValue, testValue) => + assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}") + } + } finally { + if (fs.exists(testPath)) fs.delete(testPath) + } + } + def createComplexDataFrame(spark: SparkSession): DataFrame = { val schema = new StructType() .add("c1", IntegerType)