Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

/**
* Using space-filling curves to optimize the layout of table to boost query performance.
* The table data which sorted by space-filling curve has better aggregation;
* combine with min-max filtering, it can achieve good performance improvement.
*
* Notice:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering how much of this should go into the config docs itself?

* when we use this feature, we need specify the sort columns.
* The more columns involved in sorting, the worse the aggregation, and the smaller the query performance improvement.
* Choose the filter columns which commonly used in query sql as sort columns.
* It is recommend that 2 ~ 4 columns participate in sorting.
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable")
.defaultValue(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,14 @@ public static Dataset<Row> getMinMaxValue(Dataset<Row> df, List<String> cols) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
} else if (colType instanceof StringType) {
String minString = new String(((Binary)currentColRangeMetaData.getMinValue()).getBytes());
String maxString = new String(((Binary)currentColRangeMetaData.getMaxValue()).getBytes());
rows.add(minString);
rows.add(maxString);
rows.add(currentColRangeMetaData.getMinValueAsString());
rows.add(currentColRangeMetaData.getMaxValueAsString());
} else if (colType instanceof DecimalType) {
Double minDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMinValue().toString())));
Double maxDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMaxValue().toString())));
rows.add(BigDecimal.valueOf(minDecimal));
rows.add(BigDecimal.valueOf(maxDecimal));
rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString()));
rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString()));
} else if (colType instanceof DateType) {
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMinValue())));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMaxValue())));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString()));
rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString()));
} else if (colType instanceof LongType) {
rows.add(currentColRangeMetaData.getMinValue());
rows.add(currentColRangeMetaData.getMaxValue());
Expand Down Expand Up @@ -344,6 +340,8 @@ public static void saveStatisticsInfo(Dataset<Row> df, String cols, String index
List columns = Arrays.asList(statisticsDF.schema().fieldNames());
spark.sql(HoodieSparkUtils$
.MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString());
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
}
} else {
statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.common.model;

import org.apache.parquet.schema.PrimitiveStringifier;

import java.util.Objects;

/**
Expand All @@ -30,16 +28,28 @@ public class HoodieColumnRangeMetadata<T> {
private final String columnName;
private final T minValue;
private final T maxValue;
private final long numNulls;
private final PrimitiveStringifier stringifier;
private long numNulls;
// For Decimal Type/Date Type, minValue/maxValue cannot represent it's original value.
// eg: when parquet collects column information, the decimal type is collected as int/binary type.
// so we cannot use minValue and maxValue directly, use minValueAsString/maxValueAsString instead.
private final String minValueAsString;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at my comment below -- i don't think this is necessary we can handle all type conversions at the time of reading of the Footer and don't need to propagate it further

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I agree with you。But let's put this into #4060 as an optimization.
We can merge the PR first and then merge the #4060

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me as well.

private final String maxValueAsString;

public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) {
public HoodieColumnRangeMetadata(
final String filePath,
final String columnName,
final T minValue,
final T maxValue,
long numNulls,
final String minValueAsString,
final String maxValueAsString) {
this.filePath = filePath;
this.columnName = columnName;
this.minValue = minValue;
this.maxValue = maxValue;
this.numNulls = numNulls;
this.stringifier = stringifier;
this.numNulls = numNulls == -1 ? 0 : numNulls;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here in which case would numNulls == -1 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When parquet cannot collect statistics for a certain field. For example, parquet does not collect statistics information for timestamp type. how about throw exception directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so in this case, the numNulls will be -1 ? and what's the impact if we set numNulls to 0, will the query performance be affected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this value is - 1, it must be set to 0, This will result in incorrect query results. In fact, when a timestamp type is encountered in subsequent logic, an exception will be thrown directly to tell the user that indexing for timestamp is not supported.

Copy link
Contributor

@leesf leesf Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can not support index timestamp yet, we would throw exception explicitly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we have already throw exceptions in L264 in ZCurveOptimizeHelper.getMinMaxValue

this.minValueAsString = minValueAsString;
this.maxValueAsString = maxValueAsString;
}

public String getFilePath() {
Expand All @@ -58,8 +68,12 @@ public T getMaxValue() {
return this.maxValue;
}

public PrimitiveStringifier getStringifier() {
return stringifier;
public String getMaxValueAsString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should handle all type conversions at the stage of extracting this statistics from Parquet -- to make sure that HoodieColumnRangeMetadata users are not exposed to the need of conversion, and get Java native types out of the box

return maxValueAsString;
}

public String getMinValueAsString() {
return minValueAsString;
}

public long getNumNulls() {
Expand All @@ -79,12 +93,14 @@ public boolean equals(final Object o) {
&& Objects.equals(getColumnName(), that.getColumnName())
&& Objects.equals(getMinValue(), that.getMinValue())
&& Objects.equals(getMaxValue(), that.getMaxValue())
&& Objects.equals(getNumNulls(), that.getNumNulls());
&& Objects.equals(getNumNulls(), that.getNumNulls())
&& Objects.equals(getMinValueAsString(), that.getMinValueAsString())
&& Objects.equals(getMaxValueAsString(), that.getMaxValueAsString());
}

@Override
public int hashCode() {
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls());
return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls(), getMinValueAsString(), getMaxValueAsString());
}

@Override
Expand All @@ -94,6 +110,8 @@ public String toString() {
+ "columnName='" + columnName + '\''
+ ", minValue=" + minValue
+ ", maxValue=" + maxValue
+ ", numNulls=" + numNulls + '}';
+ ", numNulls=" + numNulls
+ ", minValueAsString=" + minValueAsString
+ ", minValueAsString=" + maxValueAsString + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -56,6 +57,8 @@
*/
public class ParquetUtils extends BaseFileUtils {

private static Object lock = new Object();

/**
* Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will
* return all the rowkeys.
Expand Down Expand Up @@ -283,17 +286,38 @@ public Boolean apply(String recordKey) {

/**
* Parse min/max statistics stored in parquet footers for all columns.
* ParquetRead.readFooter is not a thread safe method.
*
* @param conf hadoop conf.
* @param parquetFilePath file to be read.
* @param cols cols which need to collect statistics.
* @return a HoodieColumnRangeMetadata instance.
*/
public Collection<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath, List<String> cols) {
public Collection<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
Configuration conf,
Path parquetFilePath,
List<String> cols) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);
// collect stats from all parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> {
return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData ->
new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(),
columnChunkMetaData.getStatistics().genericGetMin(),
columnChunkMetaData.getStatistics().genericGetMax(),
columnChunkMetaData.getStatistics().getNumNulls(),
columnChunkMetaData.getPrimitiveType().stringifier()));
return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> {
String minAsString;
String maxAsString;
if (columnChunkMetaData.getPrimitiveType().getOriginalType() == OriginalType.DATE) {
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed by #4060

minAsString = columnChunkMetaData.getStatistics().minAsString();
maxAsString = columnChunkMetaData.getStatistics().maxAsString();
}
} else {
minAsString = columnChunkMetaData.getStatistics().minAsString();
maxAsString = columnChunkMetaData.getStatistics().maxAsString();
}
return new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(),
columnChunkMetaData.getStatistics().genericGetMin(),
columnChunkMetaData.getStatistics().genericGetMax(),
columnChunkMetaData.getStatistics().getNumNulls(),
minAsString, maxAsString);
});
}).collect(Collectors.groupingBy(e -> e.getColumnName()));

// we only intend to keep file level statistics.
Expand All @@ -316,23 +340,41 @@ private HoodieColumnRangeMetadata<Comparable> combineRanges(HoodieColumnRangeMet
HoodieColumnRangeMetadata<Comparable> range2) {
final Comparable minValue;
final Comparable maxValue;
final String minValueAsString;
final String maxValueAsString;
if (range1.getMinValue() != null && range2.getMinValue() != null) {
minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? range1.getMinValue() : range2.getMinValue();
if (range1.getMinValue().compareTo(range2.getMinValue()) < 0) {
minValue = range1.getMinValue();
minValueAsString = range1.getMinValueAsString();
} else {
minValue = range2.getMinValue();
minValueAsString = range2.getMinValueAsString();
}
} else if (range1.getMinValue() == null) {
minValue = range2.getMinValue();
minValueAsString = range2.getMinValueAsString();
} else {
minValue = range1.getMinValue();
minValueAsString = range1.getMinValueAsString();
}

if (range1.getMaxValue() != null && range2.getMaxValue() != null) {
maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? range2.getMaxValue() : range1.getMaxValue();
if (range1.getMaxValue().compareTo(range2.getMaxValue()) < 0) {
maxValue = range2.getMaxValue();
maxValueAsString = range2.getMaxValueAsString();
} else {
maxValue = range1.getMaxValue();
maxValueAsString = range1.getMaxValueAsString();
}
} else if (range1.getMaxValue() == null) {
maxValue = range2.getMaxValue();
maxValueAsString = range2.getMaxValueAsString();
} else {
maxValue = range1.getMaxValue();
maxValueAsString = range1.getMaxValueAsString();
}

return new HoodieColumnRangeMetadata<>(range1.getFilePath(),
range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), range1.getStringifier());
range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), minValueAsString, maxValueAsString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object DataSkippingUtils {
// query filter "colA >= b" convert it to "colA_maxValue >= b" for index table
case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
val colName = getTargetColNameParts(attribute)
GreaterThanOrEqual(maxValue(colName), right)
reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), right))
// query filter "b >= colA" convert it to "colA_minValue <= b" for index table
case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
val colName = getTargetColNameParts(attribute)
Expand Down Expand Up @@ -179,7 +179,7 @@ object DataSkippingUtils {
def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = {
val basePath = new Path(indexPath)
basePath.getFileSystem(conf)
.listStatus(basePath).filterNot(f => f.getPath.getName.endsWith(".parquet"))
.listStatus(basePath).filter(f => f.getPath.getName.endsWith(".parquet"))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very sorry for that。
My local code is filter, and it is mistakenly written as filternot when submitting


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package org.apache.hudi.functional
import java.sql.{Date, Timestamp}

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.model.HoodieFileFormat
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.ZCurveOptimizeHelper
import org.apache.spark.sql._
Expand Down Expand Up @@ -88,19 +90,37 @@ class TestOptimizeTable extends HoodieClientTestBase {
.save(basePath)

assertEquals(1000, spark.read.format("hudi").load(basePath).count())
assertEquals(1000,
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(basePath).count())
// use unsorted col as filter.
assertEquals(spark.read
.format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count(),
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi").load(basePath).where("end_lat >= 0 and rider != '1' and weight > 0.0").count())
// use sorted col as filter.
assertEquals(spark.read.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count(),
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and begin_lon >= 0.49 and begin_lon < 0.51").count())
// use sorted cols and unsorted cols as filter
assertEquals(spark.read.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count(),
spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true")
.format("hudi").load(basePath)
.where("begin_lat >= 0.49 and begin_lat < 0.51 and end_lat > 0.56").count())
}

@Test
def testCollectMinMaxStatistics(): Unit = {
val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax")
val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat")
val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration)
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
try {
val complexDataFrame = createComplexDataFrame(spark)
complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString)
val df = spark.read.load(testPath.toString)
// test z-order sort for all primitive type, should not throw exception.
ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1)
ZCurveOptimizeHelper.createZIndexedDataFrameBySample(df, "c1,c2,c3,c4,c5,c6,c7,c8", 20).show(1)
// do not support TimeStampType, so if we collect statistics for c4, should throw exception
val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8")
colDf.cache()
Expand All @@ -115,12 +135,59 @@ class TestOptimizeTable extends HoodieClientTestBase {
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3"))
assertEquals(!fs.exists(new Path(statisticPath, "2")), true)
assertEquals(fs.exists(new Path(statisticPath, "3")), true)
// test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved.
ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4"))
assertEquals(fs.exists(new Path(statisticPath, "5")), true)
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
if (fs.exists(statisticPath)) fs.delete(statisticPath)
}
}

// test collect min-max statistic info for DateType in the case of multithreading.
// parquet will give a wrong statistic result for DateType in the case of multithreading.
@Test
def testMultiThreadParquetFooterReadForDateType(): Unit = {
// create parquet file with DateType
val rdd = spark.sparkContext.parallelize(0 to 100, 1)
.map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}")))
val df = spark.createDataFrame(rdd, new StructType().add("id", DateType))
val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType")
val conf = spark.sparkContext.hadoopConfiguration
val cols = new java.util.ArrayList[String]
cols.add("id")
try {
df.repartition(3).write.mode("overwrite").save(testPath.toString)
val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x)

val realResult = new Array[(String, String)](3)
inputFiles.zipWithIndex.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString)
}

// multi thread read with no lock
val resUseLock = new Array[(String, String)](3)
inputFiles.zipWithIndex.par.foreach { case (f, index) =>
val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils]
val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next()
resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString)
}

// check resUseNoLock,
// We can't guarantee that there must be problems in the case of multithreading.
// In order to make ut pass smoothly, we will not check resUseNoLock.
// check resUseLock
// should pass assert
realResult.zip(resUseLock).foreach { case (realValue, testValue) =>
assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}")
}
} finally {
if (fs.exists(testPath)) fs.delete(testPath)
}
}

def createComplexDataFrame(spark: SparkSession): DataFrame = {
val schema = new StructType()
.add("c1", IntegerType)
Expand Down