From 6f77921db7732c8e610977fb958a26e925bac349 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Tue, 9 Nov 2021 11:16:52 +0800 Subject: [PATCH 01/12] [HUDI-2102][WIP] support hilbert curve for hudi. --- .../hudi/config/HoodieClusteringConfig.java | 23 + .../apache/hudi/optimize/HilbertCurve.java | 321 +++++++++++++ .../apache/hudi/optimize/ZOrderingUtil.java | 10 +- ...atialCurveOptimizationSortPartitioner.java | 8 +- .../table/HoodieSparkCopyOnWriteTable.java | 4 +- .../spark/SpaceCurveOptimizeHelper.java | 428 ++++++++++++++++++ .../sql/hudi/execution/RangeSample.scala | 30 +- .../TestTableLayoutOptimization.scala | 239 ++++++++++ 8 files changed, 1047 insertions(+), 16 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 9a10965427e07..21d4fa800a4a8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -542,4 +542,27 @@ public static BuildCurveStrategyType fromValue(String value) { } } } + + /** + * strategy types for optimize layout for hudi data. + */ + public enum BuildLayoutOptimizationStrategy { + ZORDER("z-order"), + HILBERT("hilbert"); + private final String value; + BuildLayoutOptimizationStrategy(String value) { + this.value = value; + } + + public static BuildLayoutOptimizationStrategy fromValue(String value) { + switch (value.toLowerCase(Locale.ROOT)) { + case "z-order": + return ZORDER; + case "hilbert": + return HILBERT; + default: + throw new HoodieException("Invalid value of Type."); + } + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java new file mode 100644 index 0000000000000..2289ef254d371 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.optimize; + +import java.math.BigInteger; +import java.util.Arrays; + +/** + * Converts between Hilbert index ({@code BigInteger}) and N-dimensional points. + * + *

+ * Note: + * GitHub). + * the Licensed of above link is also http://www.apache.org/licenses/LICENSE-2.0 + */ +public final class HilbertCurve { + + private final int bits; + private final int dimensions; + // cached calculations + private final int length; + + private HilbertCurve(int bits, int dimensions) { + this.bits = bits; + this.dimensions = dimensions; + // cache a calculated values for small perf improvements + this.length = bits * dimensions; + } + + /** + * Returns a builder for and object that performs transformations for a Hilbert + * curve with the given number of bits. + * + * @param bits + * depth of the Hilbert curve. If bits is one, this is the top-level + * Hilbert curve + * @return builder for object to do transformations with the Hilbert Curve + */ + public static Builder bits(int bits) { + return new Builder(bits); + } + + /** + * Builds a {@link HilbertCurve} instance. + */ + public static final class Builder { + final int bits; + + private Builder(int bits) { + if (bits <= 0 || bits >= 64) { + throw new IllegalArgumentException(String.format("bits must be greater than zero and less than 64, now found bits value: %s", bits)); + } + this.bits = bits; + } + + public HilbertCurve dimensions(int dimensions) { + if (dimensions < 2) { + throw new IllegalArgumentException(String.format("dimensions must be at least 2, now found dimensions value: %s", dimensions)); + } + return new HilbertCurve(bits, dimensions); + } + } + + /** + * Converts a point to its Hilbert curve index. + * + * @param point + * an array of {@code long}. Each ordinate can be between 0 and + * 2bits-1. + * @return index (nonnegative {@link BigInteger}) + * @throws IllegalArgumentException + * if length of point array is not equal to the number of + * dimensions. + */ + public BigInteger index(long... point) { + if (point.length != dimensions) { + throw new IllegalArgumentException(String.format("length of point array must equal to the number of dimensions")); + } + return toIndex(transposedIndex(bits, point)); + } + + public byte[] indexBytes(long... point) { + if (point.length != dimensions) { + throw new IllegalArgumentException(String.format("length of point array must equal to the number of dimensions")); + } + return toIndexBytes(transposedIndex(bits, point)); + } + + /** + * Converts a {@link BigInteger} index (distance along the Hilbert Curve from 0) + * to a point of dimensions defined in the constructor of {@code this}. + * + * @param index + * index along the Hilbert Curve from 0. Maximum value 2 bits * + * dimensions-1. + * @return array of longs being the point + * @throws NullPointerException + * if index is null + * @throws IllegalArgumentException + * if index is negative + */ + public long[] point(BigInteger index) { + if (index == null) { + throw new NullPointerException("index must not be null"); + } + if (index.signum() == -1) { + throw new IllegalArgumentException("index cannot be negative"); + } + return transposedIndexToPoint(bits, transpose(index)); + } + + public void point(BigInteger index, long[] x) { + if (index == null) { + throw new NullPointerException("index must not be null"); + } + if (index.signum() == -1) { + throw new IllegalArgumentException("index cannot be negative"); + } + Arrays.fill(x, 0); + transpose(index, x); + transposedIndexToPoint(bits, x); + } + + public void point(long i, long[] x) { + point(BigInteger.valueOf(i), x); + } + + /** + * Converts a {@code long} index (distance along the Hilbert Curve from 0) to a + * point of dimensions defined in the constructor of {@code this}. + * + * @param index + * index along the Hilbert Curve from 0. Maximum value 2 + * bits+1-1. + * @return array of longs being the point + * @throws IllegalArgumentException + * if index is negative + */ + public long[] point(long index) { + return point(BigInteger.valueOf(index)); + } + + /** + * Returns the transposed representation of the Hilbert curve index. + * + *

+ * The Hilbert index is expressed internally as an array of transposed bits. + * + *

+   Example: 5 bits for each of n=3 coordinates.
+   15-bit Hilbert integer = A B C D E F G H I J K L M N O is stored
+   as its Transpose                        ^
+   X[0] = A D G J M                    X[2]|  7
+   X[1] = B E H K N        <------->       | /X[1]
+   X[2] = C F I L O                   axes |/
+   high low                         0------> X[0]
+   * 
+ * + * @param index + * index to be tranposed + * @return transposed index + */ + long[] transpose(BigInteger index) { + long[] x = new long[dimensions]; + transpose(index, x); + return x; + } + + private void transpose(BigInteger index, long[] x) { + byte[] b = index.toByteArray(); + for (int idx = 0; idx < 8 * b.length; idx++) { + if ((b[b.length - 1 - idx / 8] & (1L << (idx % 8))) != 0) { + int dim = (length - idx - 1) % dimensions; + int shift = (idx / dimensions) % bits; + x[dim] |= 1L << shift; + } + } + } + + /** + *

+ * Given the axes (coordinates) of a point in N-Dimensional space, find the + * distance to that point along the Hilbert curve. That distance will be + * transposed; broken into pieces and distributed into an array. + * + *

+ * The number of dimensions is the length of the hilbertAxes array. + * + *

+ * Note: In Skilling's paper, this function is called AxestoTranspose. + * + * @param bits + * @param point + * Point in N-space + * @return The Hilbert distance (or index) as a transposed Hilbert index + */ + static long[] transposedIndex(int bits, long... point) { + final long M = 1L << (bits - 1); + final int n = point.length; // n: Number of dimensions + final long[] x = Arrays.copyOf(point, n); + long p; + long q; + long t; + int i; + // Inverse undo + for (q = M; q > 1; q >>= 1) { + p = q - 1; + for (i = 0; i < n; i++) { + if ((x[i] & q) != 0) { + x[0] ^= p; // invert + } else { + t = (x[0] ^ x[i]) & p; + x[0] ^= t; + x[i] ^= t; + } + } + } // exchange + // Gray encode + for (i = 1; i < n; i++) { + x[i] ^= x[i - 1]; + } + t = 0; + for (q = M; q > 1; q >>= 1) { + if ((x[n - 1] & q) != 0) { + t ^= q - 1; + } + } + for (i = 0; i < n; i++) { + x[i] ^= t; + } + return x; + } + + /** + * Converts the Hilbert transposed index into an N-dimensional point expressed + * as a vector of {@code long}. + * + * In Skilling's paper this function is named {@code TransposeToAxes} + * + * @param bits + * @param x + * @return the coordinates of the point represented by the transposed index on + * the Hilbert curve + */ + static long[] transposedIndexToPoint(int bits, long... x) { + final long N = 2L << (bits - 1); + // Note that x is mutated by this method (as a performance improvement + // to avoid allocation) + int n = x.length; // number of dimensions + long p; + long q; + long t; + int i; + // Gray decode by H ^ (H/2) + t = x[n - 1] >> 1; + // Corrected error in Skilling's paper on the following line. The + // appendix had i >= 0 leading to negative array index. + for (i = n - 1; i > 0; i--) { + x[i] ^= x[i - 1]; + } + x[0] ^= t; + // Undo excess work + for (q = 2; q != N; q <<= 1) { + p = q - 1; + for (i = n - 1; i >= 0; i--) { + if ((x[i] & q) != 0L) { + x[0] ^= p; // invert + } else { + t = (x[0] ^ x[i]) & p; + x[0] ^= t; + x[i] ^= t; + } + } + } // exchange + return x; + } + + // Quote from Paul Chernoch + // Interleaving means take one bit from the first matrix element, one bit + // from the next, etc, then take the second bit from the first matrix + // element, second bit from the second, all the way to the last bit of the + // last element. Combine those bits in that order into a single BigInteger, + // which can have as many bits as necessary. This converts the array into a + // single number. + BigInteger toIndex(long... transposedIndex) { + return new BigInteger(1, toIndexBytes(transposedIndex)); + } + + byte[] toIndexBytes(long... transposedIndex) { + byte[] b = new byte[length]; + int bIndex = length - 1; + long mask = 1L << (bits - 1); + for (int i = 0; i < bits; i++) { + for (int j = 0; j < transposedIndex.length; j++) { + if ((transposedIndex[j] & mask) != 0) { + b[length - 1 - bIndex / 8] |= 1 << (bIndex % 8); + } + bIndex--; + } + mask >>= 1; + } + // b is expected to be BigEndian + return b; + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java index 3aa808075d330..0b641c2125c6c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java @@ -176,9 +176,17 @@ public static byte[] utf8To8Byte(String a) { public static Long convertStringToLong(String a) { byte[] bytes = utf8To8Byte(a); + return convertBytesToLong(bytes); + } + + public static long convertBytesToLong(byte[] bytes) { + byte[] padBytes = bytes; + if (bytes.length != 8) { + padBytes = paddingTo8Byte(bytes); + } long temp = 0L; for (int i = 7; i >= 0; i--) { - temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8); + temp = temp | (((long)padBytes[i] & 0xff) << (7 - i) * 8); } return temp; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index 03fdf5a5b8813..58adb42b8d637 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -33,7 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.index.zorder.ZOrderingIndexHelper; +import org.apache.spark.SpaceCurveOptimizeHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -79,10 +79,12 @@ private JavaRDD prepareGenericRecord(JavaRDD> inp switch (config.getLayoutOptimizationCurveBuildMethod()) { case DIRECT: - zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups); + zDataFrame = SpaceCurveOptimizeHelper + .createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); break; case SAMPLE: - zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups); + zDataFrame = SpaceCurveOptimizeHelper + .createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); break; default: throw new HoodieException("Not a valid build curve method for doWriteOperation: "); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 280d24f718e65..fe44a08a3b28e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -78,7 +78,7 @@ import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.hudi.index.zorder.ZOrderingIndexHelper; +import org.apache.spark.SpaceCurveOptimizeHelper; import org.apache.spark.api.java.JavaRDD; import javax.annotation.Nonnull; @@ -213,7 +213,7 @@ private void updateZIndex( new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() ); - ZOrderingIndexHelper.updateZIndexFor( + SpaceCurveOptimizeHelper.updateZIndexFor( sparkEngineContext.getSqlContext().sparkSession(), AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), touchedFiles, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java new file mode 100644 index 0000000000000..44beff81c353f --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.optimize.HilbertCurve; +import scala.collection.JavaConversions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.HoodieSparkUtils$; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.optimize.ZOrderingUtil; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.Row$; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.hudi.execution.RangeSampleSort$; +import org.apache.spark.sql.hudi.execution.ZorderingBinarySort; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.util.SerializableConfiguration; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SpaceCurveOptimizeHelper { + + private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; + + /** + * Create optimized DataFrame directly + * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte + * this method is more effective than createOptimizeDataFrameBySample + * + * @param df a spark DataFrame holds parquet files to be read. + * @param sortCols z-sort/hilbert-sort cols + * @param fileNum spark partition num + * @param sortMode layout optimization strategy + * @return a dataFrame sorted by z-order/hilbert. + */ + public static Dataset createOptimizedDataFrameByMapValue(Dataset df, List sortCols, int fileNum, String sortMode) { + Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); + int fieldNum = df.schema().fields().length; + List checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); + if (sortCols.size() != checkCols.size()) { + return df; + } + // only one col to sort, no need to use z-order + if (sortCols.size() == 1) { + return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0))); + } + Map fieldMap = sortCols + .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); + // do optimize + JavaRDD sortedRDD = null; + switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) { + case ZORDER: + sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); + break; + case HILBERT: + sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum); + break; + default: + throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode)); + } + // create new StructType + List newFields = new ArrayList<>(); + newFields.addAll(Arrays.asList(df.schema().fields())); + newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty())); + + // create new DataFrame + return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index"); + } + + private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { + return originRDD.map(row -> { + List zBytesList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + } else if (dataType instanceof DoubleType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + } else if (dataType instanceof FloatType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + } else if (dataType instanceof StringType) { + return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + } else if (dataType instanceof DateType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + } else if (dataType instanceof TimestampType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + } else if (dataType instanceof ByteType) { + return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + } else if (dataType instanceof ShortType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + } else if (dataType instanceof DecimalType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return ZOrderingUtil.intTo8Byte(value ? 1 : 0); + } else if (dataType instanceof BinaryType) { + return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + byte[][] zBytes = new byte[zBytesList.size()][]; + for (int i = 0; i < zBytesList.size(); i++) { + zBytes[i] = zBytesList.get(i); + } + List zVaules = new ArrayList<>(); + zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); + }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); + } + + private static JavaRDD createHilbertSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) { + return originRDD.mapPartitions(rows -> { + HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size()); + return new Iterator() { + + @Override + public boolean hasNext() { + return rows.hasNext(); + } + + @Override + public Row next() { + Row row = rows.next(); + List longList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index); + } else if (dataType instanceof DoubleType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index); + } else if (dataType instanceof FloatType) { + return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); + } else if (dataType instanceof StringType) { + return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertStringToLong(row.getString(index)); + } else if (dataType instanceof DateType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); + } else if (dataType instanceof TimestampType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); + } else if (dataType instanceof ByteType) { + return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong(new byte[] {row.getByte(index)}); + } else if (dataType instanceof ShortType) { + return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index); + } else if (dataType instanceof DecimalType) { + return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue(); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return value ? Long.MAX_VALUE : 0; + } else if (dataType instanceof BinaryType) { + return row.isNullAt(index) ? Long.MAX_VALUE : ZOrderingUtil.convertBytesToLong((byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + + byte[] hilbertValue = hilbertCurve.indexBytes(longList.stream().mapToLong(l -> l).toArray()); + List values = new ArrayList<>(); + values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + values.add(hilbertValue); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values)); + } + }; + }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); + } + + public static Dataset createOptimizedDataFrameByMapValue(Dataset df, String sortCols, int fileNum, String sortMode) { + if (sortCols == null || sortCols.isEmpty() || fileNum <= 0) { + return df; + } + return createOptimizedDataFrameByMapValue(df, + Arrays.stream(sortCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode); + } + + public static Dataset createOptimizeDataFrameBySample(Dataset df, List zCols, int fileNum, String sortMode) { + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, sortMode); + } + + public static Dataset createOptimizeDataFrameBySample(Dataset df, String zCols, int fileNum, String sortMode) { + if (zCols == null || zCols.isEmpty() || fileNum <= 0) { + return df; + } + return createOptimizeDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum, sortMode); + } + + /** + * Parse min/max statistics stored in parquet footers for z-sort cols. + * no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType. + * to do adapt for rfc-27 + * + * @param df a spark DataFrame holds parquet files to be read. + * @param cols z-sort cols + * @return a dataFrame holds all statistics info. + */ + public static Dataset getMinMaxValue(Dataset df, List cols) { + Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType())); + + List scanFiles = Arrays.asList(df.inputFiles()); + SparkContext sc = df.sparkSession().sparkContext(); + JavaSparkContext jsc = new JavaSparkContext(sc); + + SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); + int numParallelism = (scanFiles.size() / 3 + 1); + List> colMinMaxInfos; + String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); + try { + jsc.setJobDescription("Listing parquet column statistics"); + colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> { + Configuration conf = serializableConfiguration.value(); + ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + List>> results = new ArrayList<>(); + while (paths.hasNext()) { + String path = paths.next(); + results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols)); + } + return results.stream().flatMap(f -> f.stream()).iterator(); + }).collect(); + } finally { + jsc.setJobDescription(previousJobDescription); + } + + Map>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath())); + JavaRDD allMetaDataRDD = jsc.parallelize(new ArrayList<>(fileToStatsListMap.values()), 1).map(f -> { + int colSize = f.size(); + if (colSize == 0) { + return null; + } else { + List rows = new ArrayList<>(); + rows.add(f.get(0).getFilePath()); + cols.stream().forEach(col -> { + HoodieColumnRangeMetadata currentColRangeMetaData = + f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null); + DataType colType = columnsMap.get(col); + if (currentColRangeMetaData == null || colType == null) { + throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col)); + } + if (colType instanceof IntegerType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof DoubleType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof StringType) { + rows.add(currentColRangeMetaData.getMinValueAsString()); + rows.add(currentColRangeMetaData.getMaxValueAsString()); + } else if (colType instanceof DecimalType) { + rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString())); + rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString())); + } else if (colType instanceof DateType) { + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString())); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString())); + } else if (colType instanceof LongType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof ShortType) { + rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString())); + rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString())); + } else if (colType instanceof FloatType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof BinaryType) { + rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); + rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); + } else if (colType instanceof BooleanType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof ByteType) { + rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString())); + rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString())); + } else { + throw new HoodieException(String.format("Not support type: %s", colType)); + } + rows.add(currentColRangeMetaData.getNumNulls()); + }); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows)); + } + }).filter(f -> f != null); + List allMetaDataSchema = new ArrayList<>(); + allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty())); + cols.forEach(col -> { + allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty())); + allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty())); + allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty())); + }); + return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema)); + } + + public static Dataset getMinMaxValue(Dataset df, String cols) { + List rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList()); + return getMinMaxValue(df, rawCols); + } + + /** + * Update statistics info. + * this method will update old index table by full out join, + * and save the updated table into a new index table based on commitTime. + * old index table will be cleaned also. + * + * @param df a spark DataFrame holds parquet files to be read. + * @param cols z-sort cols. + * @param indexPath index store path. + * @param commitTime current operation commitTime. + * @param validateCommits all validate commits for current table. + * @return + */ + public static void saveStatisticsInfo(Dataset df, String cols, String indexPath, String commitTime, List validateCommits) { + Path savePath = new Path(indexPath, commitTime); + SparkSession spark = df.sparkSession(); + FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration()); + Dataset statisticsDF = SpaceCurveOptimizeHelper.getMinMaxValue(df, cols); + // try to find last validate index table from index path + try { + // If there's currently no index, create one + if (!fs.exists(new Path(indexPath))) { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + return; + } + + // Otherwise, clean up all indexes but the most recent one + + List allIndexTables = Arrays + .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); + List candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); + List residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); + Option latestIndexData = Option.empty(); + if (!candidateIndexTables.isEmpty()) { + latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); + // clean old index table, keep at most 1 index table. + candidateIndexTables.remove(candidateIndexTables.size() - 1); + candidateIndexTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); + } + }); + } + + // clean residualTables + // retried cluster operations at the same instant time is also considered, + // the residual files produced by retried are cleaned up before save statistics + // save statistics info to index table which named commitTime + residualTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); + } + }); + + if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { + // update the statistics info + String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + latestIndexData.get().registerTempTable(originalTable); + statisticsDF.registerTempTable(updateTable); + // update table by full out join + List columns = Arrays.asList(statisticsDF.schema().fieldNames()); + spark.sql(HoodieSparkUtils$ + .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); + } else { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + } + } catch (IOException e) { + throw new HoodieException(e); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index c392f127d51b5..1d9cd431de652 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.hudi.execution import org.apache.hudi.config.HoodieClusteringConfig import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} -import org.apache.hudi.optimize.ZOrderingUtil +import org.apache.hudi.optimize.{HilbertCurve, ZOrderingUtil} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering @@ -335,16 +335,21 @@ object RangeSampleSort { } /** - * create z-order DataFrame by sample - * first, sample origin data to get z-cols bounds, then create z-order DataFrame + * create optimize DataFrame by sample + * first, sample origin data to get order-cols bounds, then apply sort to produce DataFrame * support all type data. - * this method need more resource and cost more time than createZIndexedDataFrameByMapValue + * this method need more resource and cost more time than createOptimizedDataFrameByMapValue */ - def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = { + def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int, sortMode: String): DataFrame = { val spark = df.sparkSession val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap val fieldNum = df.schema.fields.length val checkCols = zCols.filter(col => columnsMap(col) != null) + val useHilbert = sortMode match { + case "hilbert" => true + case "z-order" => false + case other => throw new IllegalArgumentException(s"new only support z-order/hilbert optimize but find: ${other}") + } if (zCols.isEmpty || checkCols.isEmpty) { df @@ -441,6 +446,7 @@ object RangeSampleSort { val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) val indexRdd = rawRdd.mapPartitions { iter => + val hilbertCurve = if (useHilbert) Some(HilbertCurve.bits(32).dimensions(zFields.length)) else None val expandBoundsWithFactor = boundBroadCast.value val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max val longDecisionBound = new RawDecisionBound(Ordering[Long]) @@ -507,17 +513,21 @@ object RangeSampleSort { case _ => -1 } - }.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray - val zValues = ZOrderingUtil.interleaving(values, 8) - Row.fromSeq(row.toSeq ++ Seq(zValues)) + }.filter(v => v != -1) + val mapValues = if (hilbertCurve.isDefined) { + hilbertCurve.get.indexBytes(values.map(_.toLong): _*) + } else { + ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8) + } + Row.fromSeq(row.toSeq ++ Seq(mapValues)) } }.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum) val newDF = df.sparkSession.createDataFrame(indexRdd, StructType( df.schema.fields ++ Seq( - StructField(s"zindex", + StructField(s"index", BinaryType, false)) )) - newDF.drop("zindex") + newDF.drop("index") } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala new file mode 100644 index 0000000000000..7ce4ba63c2ead --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.spark.SpaceCurveOptimizeHelper +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import java.sql.{Date, Timestamp} +import scala.collection.JavaConversions._ +import scala.util.Random + +@Tag("functional") +class TestTableLayoutOptimization extends HoodieClientTestBase { + var spark: SparkSession = _ + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE, hilbert", "COPY_ON_WRITE, z-order", "MERGE_ON_READ, hilbert", "MERGE_ON_READ, z-order")) + def testOptimizewithClustering(parameter: String): Unit = { + val splits = parameter.split(",").map(_.trim) + val tableType = splits(0) + val optimizeMode = splits(1) + val targetRecordsCount = 10000 + // Bulk Insert Operation + val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList + val writeDf: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + writeDf.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) + // option for clustering + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") + .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") + .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), optimizeMode) + .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") + .mode(SaveMode.Overwrite) + .save(basePath) + + val readDf = + spark.read + .format("hudi") + .load(basePath) + + val readDfSkip = + spark.read + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true") + .format("hudi") + .load(basePath) + + assertEquals(targetRecordsCount, readDf.count()) + assertEquals(targetRecordsCount, readDfSkip.count()) + + readDf.createOrReplaceTempView("hudi_snapshot_raw") + readDfSkip.createOrReplaceTempView("hudi_snapshot_skipping") + + def select(tableName: String) = + spark.sql(s"SELECT * FROM $tableName WHERE begin_lat >= 0.49 AND begin_lat < 0.51 AND begin_lon >= 0.49 AND begin_lon < 0.51") + + assertRowsMatch( + select("hudi_snapshot_raw"), + select("hudi_snapshot_skipping") + ) + } + + def assertRowsMatch(one: DataFrame, other: DataFrame) = { + val rows = one.count() + assert(rows == other.count() && one.intersect(other).count() == rows) + } + + @Test + def testCollectMinMaxStatistics(): Unit = { + val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax") + val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat") + val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + val complexDataFrame = createComplexDataFrame(spark) + complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) + val df = spark.read.load(testPath.toString) + try { + // test z-order/hilbert sort for all primitive type + // shoud not throw exception. + SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) + SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) + SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) + SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) + try { + // do not support TimeStampType, so if we collect statistics for c4, should throw exception + val colDf = SpaceCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") + colDf.cache() + assertEquals(colDf.count(), 3) + assertEquals(colDf.take(1)(0).length, 22) + colDf.unpersist() + // try to save statistics + SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) + // save again + SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) + // test old index table clean + SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) + assertEquals(!fs.exists(new Path(statisticPath, "2")), true) + assertEquals(fs.exists(new Path(statisticPath, "3")), true) + // test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved. + SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) + assertEquals(fs.exists(new Path(statisticPath, "5")), true) + } finally { + if (fs.exists(testPath)) fs.delete(testPath) + if (fs.exists(statisticPath)) fs.delete(statisticPath) + } + } + } + + // test collect min-max statistic info for DateType in the case of multithreading. + // parquet will give a wrong statistic result for DateType in the case of multithreading. + @Test + def testMultiThreadParquetFooterReadForDateType(): Unit = { + // create parquet file with DateType + val rdd = spark.sparkContext.parallelize(0 to 100, 1) + .map(item => RowFactory.create(Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}"))) + val df = spark.createDataFrame(rdd, new StructType().add("id", DateType)) + val testPath = new Path(System.getProperty("java.io.tmpdir"), "testCollectDateType") + val conf = spark.sparkContext.hadoopConfiguration + val cols = new java.util.ArrayList[String] + cols.add("id") + try { + df.repartition(3).write.mode("overwrite").save(testPath.toString) + val inputFiles = spark.read.load(testPath.toString).inputFiles.sortBy(x => x) + + val realResult = new Array[(String, String)](3) + inputFiles.zipWithIndex.foreach { case (f, index) => + val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] + val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() + realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString) + } + + // multi thread read with no lock + val resUseLock = new Array[(String, String)](3) + inputFiles.zipWithIndex.par.foreach { case (f, index) => + val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] + val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() + resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString) + } + + // check resUseNoLock, + // We can't guarantee that there must be problems in the case of multithreading. + // In order to make ut pass smoothly, we will not check resUseNoLock. + // check resUseLock + // should pass assert + realResult.zip(resUseLock).foreach { case (realValue, testValue) => + assert(realValue == testValue, s" expect realValue: ${realValue} but find ${testValue}") + } + } finally { + if (fs.exists(testPath)) fs.delete(testPath) + } + } + + def createComplexDataFrame(spark: SparkSession): DataFrame = { + val schema = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9,3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => + val c1 = Integer.valueOf(item) + val c2 = s" ${item}sdc" + val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") + val c4 = new Timestamp(System.currentTimeMillis()) + val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") + val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") + val c7 = Array(item).map(_.toByte) + val c8 = java.lang.Byte.valueOf("9") + + RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) + } + spark.createDataFrame(rdd, schema) + } +} + From 06686633aad06b068585441ebe9e48e6bf27e1e5 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Thu, 25 Nov 2021 12:51:01 +0800 Subject: [PATCH 02/12] add benchmark exmaple for space-curve optimize --- .../SpaceCurveOptimizeBenchMark.scala | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala new file mode 100644 index 0000000000000..45731f1f85740 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.hadoop.fs.Path +import org.apache.spark.SpaceCurveOptimizeHelper +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.hudi.TestHoodieSqlBase + +import scala.util.Random + +object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { + + def getSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= { + val minMax = SpaceCurveOptimizeHelper + .getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1}, ${co2}") + .collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) + var c = 0 + for (elem <- minMax) { + if ((elem._1 <= value1 && elem._2 >= value1) || (elem._3 <= value2 && elem._4 >= value2)) { + c = c + 1 + } + } + + val p = c / minMax.size.toDouble + println(s"for table ${tableName} with query filter: ${co1} = ${value1} or ${co2} = ${value2} we can achieve skipping percent ${1.0 - p}") + } + + /* + for table table_z_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.8 + for table table_z_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.77 + for table table_hilbert_sort_byMap with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.855 + for table table_hilbert_sort_bySample with query filter: c1_int = 500000 or c2_int = 500000 we can achieve skipping percent 0.83 + */ + def runNormalTableSkippingBenchMark(): Unit = { + withTempDir { f => + withTempTable("table_z_sort_byMap", "table_z_sort_bySample", "table_hilbert_sort_byMap", "table_hilbert_sort_bySample") { + prepareNormalTable(new Path(f.getAbsolutePath), 1000000) + // choose median value as filter condition. + // the median value of c1_int is 500000 + // the median value of c2_int is 500000 + getSkippingPercent("table_z_sort_byMap", "c1_int", "c2_int", 500000, 500000) + getSkippingPercent("table_z_sort_bySample", "c1_int", "c2_int", 500000, 500000) + getSkippingPercent("table_hilbert_sort_byMap", "c1_int", "c2_int", 500000, 500000) + getSkippingPercent("table_hilbert_sort_bySample", "c1_int", "c2_int", 500000, 500000) + } + } + } + + /* + for table table_z_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.0 + for table table_z_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.78 + for table table_hilbert_sort_byMap_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.05500000000000005 + for table table_hilbert_sort_bySample_skew with query filter: c1_int = 5000 or c2_int = 500000 we can achieve skipping percent 0.84 + */ + def runSkewTableSkippingBenchMark(): Unit = { + withTempDir { f => + withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew", "table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") { + prepareSkewTable(new Path(f.getAbsolutePath), 1000000) + // choose median value as filter condition. + // the median value of c1_int is 5000 + // the median value of c2_int is 500000 + getSkippingPercent("table_z_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) + getSkippingPercent("table_z_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) + getSkippingPercent("table_hilbert_sort_byMap_skew", "c1_int", "c2_int", 5000, 500000) + getSkippingPercent("table_hilbert_sort_bySample_skew", "c1_int", "c2_int", 5000, 500000) + } + } + } + + def main(args: Array[String]): Unit = { + runNormalTableSkippingBenchMark() + runSkewTableSkippingBenchMark() + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def prepareNormalTable(tablePath: Path, numRows: Int): Unit = { + import spark.implicits._ + val df = spark.range(numRows).map(_ => (Random.nextInt(1000000), Random.nextInt(1000000))).toDF("c1_int", "c2_int") + val dfOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") + val dfOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") + + val dfHilbertOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") + val dfHilbertOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") + + saveAsTable(dfOptimizeByMap, tablePath, "z_sort_byMap") + saveAsTable(dfOptimizeBySample, tablePath, "z_sort_bySample") + saveAsTable(dfHilbertOptimizeByMap, tablePath, "hilbert_sort_byMap") + saveAsTable(dfHilbertOptimizeBySample, tablePath, "hilbert_sort_bySample") + } + + def prepareSkewTable(tablePath: Path, numRows: Int): Unit = { + import spark.implicits._ + val df = spark.range(numRows).map(_ => (Random.nextInt(10000), Random.nextInt(1000000))).toDF("c1_int", "c2_int") + val dfOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") + val dfOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") + + val dfHilbertOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") + val dfHilbertOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") + + saveAsTable(dfOptimizeByMap, tablePath, "z_sort_byMap_skew") + saveAsTable(dfOptimizeBySample, tablePath, "z_sort_bySample_skew") + saveAsTable(dfHilbertOptimizeByMap, tablePath, "hilbert_sort_byMap_skew") + saveAsTable(dfHilbertOptimizeBySample, tablePath, "hilbert_sort_bySample_skew") + } + + def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = { + + df.write.mode("overwrite").save(new Path(savePath, suffix).toString) + spark.read.parquet(new Path(savePath, suffix).toString).createOrReplaceTempView("table_" + suffix) + } +} + From 36f3dc609f59a8aa73e1fefb11f1d0500d0fad1b Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Thu, 25 Nov 2021 15:11:28 +0800 Subject: [PATCH 03/12] fix comments --- .../apache/hudi/optimize/HilbertCurve.java | 83 ++++++------------- .../hudi/optimize/TestHilbertCurve.java | 65 +++++++++++++++ .../hudi/optimize/TestZOrderingUtil.java | 17 ++++ .../sql/hudi/execution/RangeSample.scala | 4 +- .../TestTableLayoutOptimization.scala | 11 +-- .../SpaceCurveOptimizeBenchMark.scala | 30 ++----- 6 files changed, 122 insertions(+), 88 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java index 2289ef254d371..5e60dc8907309 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java @@ -24,7 +24,6 @@ /** * Converts between Hilbert index ({@code BigInteger}) and N-dimensional points. * - *

* Note: * GitHub). * the Licensed of above link is also http://www.apache.org/licenses/LICENSE-2.0 @@ -47,9 +46,7 @@ private HilbertCurve(int bits, int dimensions) { * Returns a builder for and object that performs transformations for a Hilbert * curve with the given number of bits. * - * @param bits - * depth of the Hilbert curve. If bits is one, this is the top-level - * Hilbert curve + * @param bits depth of the Hilbert curve. If bits is one, this is the top-level Hilbert curve * @return builder for object to do transformations with the Hilbert Curve */ public static Builder bits(int bits) { @@ -80,22 +77,18 @@ public HilbertCurve dimensions(int dimensions) { /** * Converts a point to its Hilbert curve index. * - * @param point - * an array of {@code long}. Each ordinate can be between 0 and - * 2bits-1. - * @return index (nonnegative {@link BigInteger}) - * @throws IllegalArgumentException - * if length of point array is not equal to the number of - * dimensions. + * @param point an array of dimensions + * @return hilbert index + * @throws IllegalArgumentException if length of point array is not equal to the number of dimensions. */ - public BigInteger index(long... point) { + public BigInteger index(long[] point) { if (point.length != dimensions) { throw new IllegalArgumentException(String.format("length of point array must equal to the number of dimensions")); } return toIndex(transposedIndex(bits, point)); } - public byte[] indexBytes(long... point) { + public byte[] indexBytes(long[] point) { if (point.length != dimensions) { throw new IllegalArgumentException(String.format("length of point array must equal to the number of dimensions")); } @@ -106,14 +99,10 @@ public byte[] indexBytes(long... point) { * Converts a {@link BigInteger} index (distance along the Hilbert Curve from 0) * to a point of dimensions defined in the constructor of {@code this}. * - * @param index - * index along the Hilbert Curve from 0. Maximum value 2 bits * - * dimensions-1. + * @param index hilbert index * @return array of longs being the point - * @throws NullPointerException - * if index is null - * @throws IllegalArgumentException - * if index is negative + * @throws NullPointerException if index is null + * @throws IllegalArgumentException if index is negative */ public long[] point(BigInteger index) { if (index == null) { @@ -145,12 +134,9 @@ public void point(long i, long[] x) { * Converts a {@code long} index (distance along the Hilbert Curve from 0) to a * point of dimensions defined in the constructor of {@code this}. * - * @param index - * index along the Hilbert Curve from 0. Maximum value 2 - * bits+1-1. + * @param index hilbert index * @return array of longs being the point - * @throws IllegalArgumentException - * if index is negative + * @throws IllegalArgumentException if index is negative */ public long[] point(long index) { return point(BigInteger.valueOf(index)); @@ -158,22 +144,14 @@ public long[] point(long index) { /** * Returns the transposed representation of the Hilbert curve index. - * - *

* The Hilbert index is expressed internally as an array of transposed bits. + * Example: 5 bits for each of n=3 coordinates. + * 15-bit Hilbert integer = A B C D E F G H I J K L M N O is stored as its Transpose: + * X[0] = A D G J M + * X[1] = B E H K N + * X[2] = C F I L O * - *

-   Example: 5 bits for each of n=3 coordinates.
-   15-bit Hilbert integer = A B C D E F G H I J K L M N O is stored
-   as its Transpose                        ^
-   X[0] = A D G J M                    X[2]|  7
-   X[1] = B E H K N        <------->       | /X[1]
-   X[2] = C F I L O                   axes |/
-   high low                         0------> X[0]
-   * 
- * - * @param index - * index to be tranposed + * @param index index to be tranposed * @return transposed index */ long[] transpose(BigInteger index) { @@ -194,23 +172,17 @@ private void transpose(BigInteger index, long[] x) { } /** - *

* Given the axes (coordinates) of a point in N-Dimensional space, find the * distance to that point along the Hilbert curve. That distance will be * transposed; broken into pieces and distributed into an array. - * - *

* The number of dimensions is the length of the hilbertAxes array. - * - *

* Note: In Skilling's paper, this function is called AxestoTranspose. * * @param bits - * @param point - * Point in N-space - * @return The Hilbert distance (or index) as a transposed Hilbert index + * @param point Point in N-space. + * @return The Hilbert distance (or index) as a transposed Hilbert index. */ - static long[] transposedIndex(int bits, long... point) { + static long[] transposedIndex(int bits, long[] point) { final long M = 1L << (bits - 1); final int n = point.length; // n: Number of dimensions final long[] x = Arrays.copyOf(point, n); @@ -248,17 +220,14 @@ static long[] transposedIndex(int bits, long... point) { } /** - * Converts the Hilbert transposed index into an N-dimensional point expressed - * as a vector of {@code long}. - * - * In Skilling's paper this function is named {@code TransposeToAxes} + * Converts the Hilbert transposed index into an N-dimensional point expressed as a vector of {@code long}. + * In Skilling's paper this function is named {@code TransposeToAxes}. * * @param bits * @param x - * @return the coordinates of the point represented by the transposed index on - * the Hilbert curve + * @return the coordinates of the point represented by the transposed index on the Hilbert curve. */ - static long[] transposedIndexToPoint(int bits, long... x) { + static long[] transposedIndexToPoint(int bits, long[] x) { final long N = 2L << (bits - 1); // Note that x is mutated by this method (as a performance improvement // to avoid allocation) @@ -298,11 +267,11 @@ static long[] transposedIndexToPoint(int bits, long... x) { // last element. Combine those bits in that order into a single BigInteger, // which can have as many bits as necessary. This converts the array into a // single number. - BigInteger toIndex(long... transposedIndex) { + BigInteger toIndex(long[] transposedIndex) { return new BigInteger(1, toIndexBytes(transposedIndex)); } - byte[] toIndexBytes(long... transposedIndex) { + byte[] toIndexBytes(long[] transposedIndex) { byte[] b = new byte[length]; int bIndex = length - 1; long mask = 1L << (bits - 1); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java new file mode 100644 index 0000000000000..f803f6962628d --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.optimize; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import java.math.BigInteger; +import java.util.Arrays; + +public class TestHilbertCurve { + + private static final HilbertCurve INSTANCE = HilbertCurve.bits(5).dimensions(2); + + @Test + public void testIndex() { + long[] t = {1, 2}; + assertEquals(13, INSTANCE.index(t).intValue()); + long[] t1 = {0, 16}; + assertEquals(256, INSTANCE.index(t1).intValue()); + } + + @Test + public void testTranspose() { + long[] ti = INSTANCE.transpose(BigInteger.valueOf(256)); + assertEquals(2, ti.length); + assertEquals(0, ti[0]); + assertEquals(16, ti[1]); + } + + @Test + public void toIndexByte() { + HilbertCurve c = HilbertCurve.bits(4).dimensions(2); + long[][] points = new long[][]{{1,2}, {3,2}, {5,6}, {7,8} }; + // bits interleave + byte[][] resCheck = new byte[][] {{0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b0110}, + {0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b1110}, + {0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00110110}, + {0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b01101010}}; + for (int i = 0; i < points.length; i++) { + byte[] res = c.toIndexBytes(Arrays.stream(points[i]).toArray()); + for (int j = 0; j < 8; j++) { + assertEquals(res[j], resCheck[i][j]); + } + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java index 7dab6c2057c77..1e1ff3088533f 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java @@ -126,4 +126,21 @@ public OrginValueWrapper(T index, T originValue) { this.originValue = originValue; } } + + @Test + public void testConvertBytesToLong() { + long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE}; + for (int i = 0; i < tests.length; i++) { + assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); + } + } + + private byte[] convertLongToBytes(long num) { + byte[] byteNum = new byte[8]; + for (int i = 0; i < 8; i++) { + int offset = 64 - (i + 1) * 8; + byteNum[i] = (byte) ((num >> offset) & 0xff); + } + return byteNum; + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index 1d9cd431de652..35177272bb19e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -371,7 +371,7 @@ object RangeSampleSort { }.filter(_._1 != -1) // Complex type found, use createZIndexedDataFrameByRange if (zFields.length != zCols.length) { - return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum) + return sortDataFrameBySampleSupportAllTypes(df, zCols, fileNum) } val rawRdd = df.rdd @@ -515,7 +515,7 @@ object RangeSampleSort { } }.filter(v => v != -1) val mapValues = if (hilbertCurve.isDefined) { - hilbertCurve.get.indexBytes(values.map(_.toLong): _*) + hilbertCurve.get.indexBytes(values.map(_.toLong).toArray) } else { ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala index 7ce4ba63c2ead..f14f96b57f5a8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -31,9 +31,9 @@ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource - +import org.junit.jupiter.params.provider.{CsvSource, ValueSource} import java.sql.{Date, Timestamp} + import scala.collection.JavaConversions._ import scala.util.Random @@ -66,11 +66,8 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { } @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE, hilbert", "COPY_ON_WRITE, z-order", "MERGE_ON_READ, hilbert", "MERGE_ON_READ, z-order")) - def testOptimizewithClustering(parameter: String): Unit = { - val splits = parameter.split(",").map(_.trim) - val tableType = splits(0) - val optimizeMode = splits(1) + @CsvSource(Array("COPY_ON_WRITE, hilbert", "COPY_ON_WRITE, z-order", "MERGE_ON_READ, hilbert", "MERGE_ON_READ, z-order")) + def testOptimizewithClustering(tableType: String, optimizeMode: String): Unit = { val targetRecordsCount = 10000 // Bulk Insert Operation val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala index 45731f1f85740..9a85b2895bec9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala @@ -51,7 +51,7 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { def runNormalTableSkippingBenchMark(): Unit = { withTempDir { f => withTempTable("table_z_sort_byMap", "table_z_sort_bySample", "table_hilbert_sort_byMap", "table_hilbert_sort_bySample") { - prepareNormalTable(new Path(f.getAbsolutePath), 1000000) + prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000) // choose median value as filter condition. // the median value of c1_int is 500000 // the median value of c2_int is 500000 @@ -72,7 +72,8 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { def runSkewTableSkippingBenchMark(): Unit = { withTempDir { f => withTempTable("table_z_sort_byMap_skew", "table_z_sort_bySample_skew", "table_hilbert_sort_byMap_skew", "table_hilbert_sort_bySample_skew") { - prepareSkewTable(new Path(f.getAbsolutePath), 1000000) + // prepare skewed table. + prepareInterTypeTable(new Path(f.getAbsolutePath), 1000000, 10000, 1000000, true) // choose median value as filter condition. // the median value of c1_int is 5000 // the median value of c2_int is 500000 @@ -93,7 +94,7 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { try f finally tableNames.foreach(spark.catalog.dropTempView) } - def prepareNormalTable(tablePath: Path, numRows: Int): Unit = { + def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = { import spark.implicits._ val df = spark.range(numRows).map(_ => (Random.nextInt(1000000), Random.nextInt(1000000))).toDF("c1_int", "c2_int") val dfOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") @@ -102,25 +103,10 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { val dfHilbertOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") val dfHilbertOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") - saveAsTable(dfOptimizeByMap, tablePath, "z_sort_byMap") - saveAsTable(dfOptimizeBySample, tablePath, "z_sort_bySample") - saveAsTable(dfHilbertOptimizeByMap, tablePath, "hilbert_sort_byMap") - saveAsTable(dfHilbertOptimizeBySample, tablePath, "hilbert_sort_bySample") - } - - def prepareSkewTable(tablePath: Path, numRows: Int): Unit = { - import spark.implicits._ - val df = spark.range(numRows).map(_ => (Random.nextInt(10000), Random.nextInt(1000000))).toDF("c1_int", "c2_int") - val dfOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") - val dfOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") - - val dfHilbertOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") - val dfHilbertOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") - - saveAsTable(dfOptimizeByMap, tablePath, "z_sort_byMap_skew") - saveAsTable(dfOptimizeBySample, tablePath, "z_sort_bySample_skew") - saveAsTable(dfHilbertOptimizeByMap, tablePath, "hilbert_sort_byMap_skew") - saveAsTable(dfHilbertOptimizeBySample, tablePath, "hilbert_sort_bySample_skew") + saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew" else "z_sort_byMap") + saveAsTable(dfOptimizeBySample, tablePath, if (skewed) "z_sort_bySample_skew" else "z_sort_bySample") + saveAsTable(dfHilbertOptimizeByMap, tablePath, if (skewed) "hilbert_sort_byMap_skew" else "hilbert_sort_byMap") + saveAsTable(dfHilbertOptimizeBySample, tablePath, if (skewed) "hilbert_sort_bySample_skew" else "hilbert_sort_bySample") } def saveAsTable(df: DataFrame, savePath: Path, suffix: String): Unit = { From 295f26fd2b6af426125233caeb31130fffaa6c93 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Thu, 25 Nov 2021 17:35:05 +0800 Subject: [PATCH 04/12] fix test bug --- .../TestTableLayoutOptimization.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala index f14f96b57f5a8..904a2174c2f61 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -31,7 +31,9 @@ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, ValueSource} +import org.junit.jupiter.params.provider.{Arguments, MethodSource} +import org.junit.jupiter.params.provider.Arguments.arguments + import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ @@ -66,7 +68,7 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { } @ParameterizedTest - @CsvSource(Array("COPY_ON_WRITE, hilbert", "COPY_ON_WRITE, z-order", "MERGE_ON_READ, hilbert", "MERGE_ON_READ, z-order")) + @MethodSource(Array("testLayOutParameter")) def testOptimizewithClustering(tableType: String, optimizeMode: String): Unit = { val targetRecordsCount = 10000 // Bulk Insert Operation @@ -234,3 +236,14 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { } } +object TestTableLayoutOptimization { + def testLayOutParameter(): java.util.stream.Stream[Arguments] = { + java.util.stream.Stream.of( + arguments("COPY_ON_WRITE", "hilbert"), + arguments("COPY_ON_WRITE", "z-order"), + arguments("MERGE_ON_READ", "hilbert"), + arguments("MERGE_ON_READ", "z-order") + ) + } +} + From cb2fc3ecc6082e82b2bf4b2abd636ad8cdfc04b0 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 13:03:20 -0800 Subject: [PATCH 05/12] Address review comments --- NOTICE | 6 + hudi-client/hudi-client-common/pom.xml | 7 + .../apache/hudi/optimize/HilbertCurve.java | 290 ------------------ .../hudi/optimize/HilbertCurveUtils.java | 34 ++ .../apache/hudi/optimize/ZOrderingUtil.java | 7 +- ...tCurve.java => TestHilbertCurveUtils.java} | 33 +- .../spark/SpaceCurveOptimizeHelper.java | 27 +- .../sql/hudi/execution/RangeSample.scala | 18 +- 8 files changed, 77 insertions(+), 345 deletions(-) delete mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java rename hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/{TestHilbertCurve.java => TestHilbertCurveUtils.java} (55%) diff --git a/NOTICE b/NOTICE index 9b249331f3435..437b974ac217b 100644 --- a/NOTICE +++ b/NOTICE @@ -159,3 +159,9 @@ its NOTICE file: This product includes software developed at StreamSets (http://www.streamsets.com/). +-------------------------------------------------------------------------------- + +This product includes code from hilbert-curve project + * Copyright https://github.com/davidmoten/hilbert-curve + * Licensed under the Apache-2.0 License + diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index d30ee298b1827..22ad8ec0b9d30 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -64,6 +64,13 @@ parquet-avro + + + com.github.davidmoten + hilbert-curve + 0.2.2 + + io.dropwizard.metrics diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java deleted file mode 100644 index 5e60dc8907309..0000000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurve.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.optimize; - -import java.math.BigInteger; -import java.util.Arrays; - -/** - * Converts between Hilbert index ({@code BigInteger}) and N-dimensional points. - * - * Note: - * GitHub). - * the Licensed of above link is also http://www.apache.org/licenses/LICENSE-2.0 - */ -public final class HilbertCurve { - - private final int bits; - private final int dimensions; - // cached calculations - private final int length; - - private HilbertCurve(int bits, int dimensions) { - this.bits = bits; - this.dimensions = dimensions; - // cache a calculated values for small perf improvements - this.length = bits * dimensions; - } - - /** - * Returns a builder for and object that performs transformations for a Hilbert - * curve with the given number of bits. - * - * @param bits depth of the Hilbert curve. If bits is one, this is the top-level Hilbert curve - * @return builder for object to do transformations with the Hilbert Curve - */ - public static Builder bits(int bits) { - return new Builder(bits); - } - - /** - * Builds a {@link HilbertCurve} instance. - */ - public static final class Builder { - final int bits; - - private Builder(int bits) { - if (bits <= 0 || bits >= 64) { - throw new IllegalArgumentException(String.format("bits must be greater than zero and less than 64, now found bits value: %s", bits)); - } - this.bits = bits; - } - - public HilbertCurve dimensions(int dimensions) { - if (dimensions < 2) { - throw new IllegalArgumentException(String.format("dimensions must be at least 2, now found dimensions value: %s", dimensions)); - } - return new HilbertCurve(bits, dimensions); - } - } - - /** - * Converts a point to its Hilbert curve index. - * - * @param point an array of dimensions - * @return hilbert index - * @throws IllegalArgumentException if length of point array is not equal to the number of dimensions. - */ - public BigInteger index(long[] point) { - if (point.length != dimensions) { - throw new IllegalArgumentException(String.format("length of point array must equal to the number of dimensions")); - } - return toIndex(transposedIndex(bits, point)); - } - - public byte[] indexBytes(long[] point) { - if (point.length != dimensions) { - throw new IllegalArgumentException(String.format("length of point array must equal to the number of dimensions")); - } - return toIndexBytes(transposedIndex(bits, point)); - } - - /** - * Converts a {@link BigInteger} index (distance along the Hilbert Curve from 0) - * to a point of dimensions defined in the constructor of {@code this}. - * - * @param index hilbert index - * @return array of longs being the point - * @throws NullPointerException if index is null - * @throws IllegalArgumentException if index is negative - */ - public long[] point(BigInteger index) { - if (index == null) { - throw new NullPointerException("index must not be null"); - } - if (index.signum() == -1) { - throw new IllegalArgumentException("index cannot be negative"); - } - return transposedIndexToPoint(bits, transpose(index)); - } - - public void point(BigInteger index, long[] x) { - if (index == null) { - throw new NullPointerException("index must not be null"); - } - if (index.signum() == -1) { - throw new IllegalArgumentException("index cannot be negative"); - } - Arrays.fill(x, 0); - transpose(index, x); - transposedIndexToPoint(bits, x); - } - - public void point(long i, long[] x) { - point(BigInteger.valueOf(i), x); - } - - /** - * Converts a {@code long} index (distance along the Hilbert Curve from 0) to a - * point of dimensions defined in the constructor of {@code this}. - * - * @param index hilbert index - * @return array of longs being the point - * @throws IllegalArgumentException if index is negative - */ - public long[] point(long index) { - return point(BigInteger.valueOf(index)); - } - - /** - * Returns the transposed representation of the Hilbert curve index. - * The Hilbert index is expressed internally as an array of transposed bits. - * Example: 5 bits for each of n=3 coordinates. - * 15-bit Hilbert integer = A B C D E F G H I J K L M N O is stored as its Transpose: - * X[0] = A D G J M - * X[1] = B E H K N - * X[2] = C F I L O - * - * @param index index to be tranposed - * @return transposed index - */ - long[] transpose(BigInteger index) { - long[] x = new long[dimensions]; - transpose(index, x); - return x; - } - - private void transpose(BigInteger index, long[] x) { - byte[] b = index.toByteArray(); - for (int idx = 0; idx < 8 * b.length; idx++) { - if ((b[b.length - 1 - idx / 8] & (1L << (idx % 8))) != 0) { - int dim = (length - idx - 1) % dimensions; - int shift = (idx / dimensions) % bits; - x[dim] |= 1L << shift; - } - } - } - - /** - * Given the axes (coordinates) of a point in N-Dimensional space, find the - * distance to that point along the Hilbert curve. That distance will be - * transposed; broken into pieces and distributed into an array. - * The number of dimensions is the length of the hilbertAxes array. - * Note: In Skilling's paper, this function is called AxestoTranspose. - * - * @param bits - * @param point Point in N-space. - * @return The Hilbert distance (or index) as a transposed Hilbert index. - */ - static long[] transposedIndex(int bits, long[] point) { - final long M = 1L << (bits - 1); - final int n = point.length; // n: Number of dimensions - final long[] x = Arrays.copyOf(point, n); - long p; - long q; - long t; - int i; - // Inverse undo - for (q = M; q > 1; q >>= 1) { - p = q - 1; - for (i = 0; i < n; i++) { - if ((x[i] & q) != 0) { - x[0] ^= p; // invert - } else { - t = (x[0] ^ x[i]) & p; - x[0] ^= t; - x[i] ^= t; - } - } - } // exchange - // Gray encode - for (i = 1; i < n; i++) { - x[i] ^= x[i - 1]; - } - t = 0; - for (q = M; q > 1; q >>= 1) { - if ((x[n - 1] & q) != 0) { - t ^= q - 1; - } - } - for (i = 0; i < n; i++) { - x[i] ^= t; - } - return x; - } - - /** - * Converts the Hilbert transposed index into an N-dimensional point expressed as a vector of {@code long}. - * In Skilling's paper this function is named {@code TransposeToAxes}. - * - * @param bits - * @param x - * @return the coordinates of the point represented by the transposed index on the Hilbert curve. - */ - static long[] transposedIndexToPoint(int bits, long[] x) { - final long N = 2L << (bits - 1); - // Note that x is mutated by this method (as a performance improvement - // to avoid allocation) - int n = x.length; // number of dimensions - long p; - long q; - long t; - int i; - // Gray decode by H ^ (H/2) - t = x[n - 1] >> 1; - // Corrected error in Skilling's paper on the following line. The - // appendix had i >= 0 leading to negative array index. - for (i = n - 1; i > 0; i--) { - x[i] ^= x[i - 1]; - } - x[0] ^= t; - // Undo excess work - for (q = 2; q != N; q <<= 1) { - p = q - 1; - for (i = n - 1; i >= 0; i--) { - if ((x[i] & q) != 0L) { - x[0] ^= p; // invert - } else { - t = (x[0] ^ x[i]) & p; - x[0] ^= t; - x[i] ^= t; - } - } - } // exchange - return x; - } - - // Quote from Paul Chernoch - // Interleaving means take one bit from the first matrix element, one bit - // from the next, etc, then take the second bit from the first matrix - // element, second bit from the second, all the way to the last bit of the - // last element. Combine those bits in that order into a single BigInteger, - // which can have as many bits as necessary. This converts the array into a - // single number. - BigInteger toIndex(long[] transposedIndex) { - return new BigInteger(1, toIndexBytes(transposedIndex)); - } - - byte[] toIndexBytes(long[] transposedIndex) { - byte[] b = new byte[length]; - int bIndex = length - 1; - long mask = 1L << (bits - 1); - for (int i = 0; i < bits; i++) { - for (int j = 0; j < transposedIndex.length; j++) { - if ((transposedIndex[j] & mask) != 0) { - b[length - 1 - bIndex / 8] |= 1 << (bIndex % 8); - } - bIndex--; - } - mask >>= 1; - } - // b is expected to be BigEndian - return b; - } -} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java new file mode 100644 index 0000000000000..f3acc521b2c53 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.optimize; + +import org.davidmoten.hilbert.HilbertCurve; + +import java.math.BigInteger; + +/** + * Utils for Hilbert Curve. + */ +public class HilbertCurveUtils { + public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points) { + BigInteger index = hilbertCurve.index(points); + return index.toByteArray(); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java index 0b641c2125c6c..50827cc2efa69 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java @@ -180,13 +180,10 @@ public static Long convertStringToLong(String a) { } public static long convertBytesToLong(byte[] bytes) { - byte[] padBytes = bytes; - if (bytes.length != 8) { - padBytes = paddingTo8Byte(bytes); - } + byte[] paddedBytes = paddingTo8Byte(bytes); long temp = 0L; for (int i = 7; i >= 0; i--) { - temp = temp | (((long)padBytes[i] & 0xff) << (7 - i) * 8); + temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8); } return temp; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java similarity index 55% rename from hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java rename to hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java index f803f6962628d..5bb482e6d67fe 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurve.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java @@ -19,14 +19,12 @@ package org.apache.hudi.optimize; -import static org.junit.jupiter.api.Assertions.assertEquals; - +import org.davidmoten.hilbert.HilbertCurve; import org.junit.jupiter.api.Test; -import java.math.BigInteger; -import java.util.Arrays; +import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestHilbertCurve { +public class TestHilbertCurveUtils { private static final HilbertCurve INSTANCE = HilbertCurve.bits(5).dimensions(2); @@ -37,29 +35,4 @@ public void testIndex() { long[] t1 = {0, 16}; assertEquals(256, INSTANCE.index(t1).intValue()); } - - @Test - public void testTranspose() { - long[] ti = INSTANCE.transpose(BigInteger.valueOf(256)); - assertEquals(2, ti.length); - assertEquals(0, ti[0]); - assertEquals(16, ti[1]); - } - - @Test - public void toIndexByte() { - HilbertCurve c = HilbertCurve.bits(4).dimensions(2); - long[][] points = new long[][]{{1,2}, {3,2}, {5,6}, {7,8} }; - // bits interleave - byte[][] resCheck = new byte[][] {{0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b0110}, - {0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b1110}, - {0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00110110}, - {0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b00, 0b01101010}}; - for (int i = 0; i < points.length; i++) { - byte[] res = c.toIndexBytes(Arrays.stream(points[i]).toArray()); - for (int j = 0; j < 8; j++) { - assertEquals(res[j], resCheck[i][j]); - } - } - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java index 44beff81c353f..ba992f3e1444c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java @@ -18,13 +18,6 @@ package org.apache.spark; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.optimize.HilbertCurve; -import scala.collection.JavaConversions; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.HoodieSparkUtils$; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; @@ -32,8 +25,14 @@ import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.optimize.HilbertCurveUtils; import org.apache.hudi.optimize.ZOrderingUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -63,6 +62,7 @@ import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.util.SerializableConfiguration; +import org.davidmoten.hilbert.HilbertCurve; import java.io.IOException; import java.math.BigDecimal; @@ -74,6 +74,8 @@ import java.util.Map; import java.util.stream.Collectors; +import scala.collection.JavaConversions; + public class SpaceCurveOptimizeHelper { private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; @@ -83,11 +85,11 @@ public class SpaceCurveOptimizeHelper { * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte * this method is more effective than createOptimizeDataFrameBySample * - * @param df a spark DataFrame holds parquet files to be read. - * @param sortCols z-sort/hilbert-sort cols - * @param fileNum spark partition num + * @param df a spark DataFrame holds parquet files to be read. + * @param sortCols sorting columns for the curve + * @param fileNum spark partition num * @param sortMode layout optimization strategy - * @return a dataFrame sorted by z-order/hilbert. + * @return a dataFrame sorted by the curve. */ public static Dataset createOptimizedDataFrameByMapValue(Dataset df, List sortCols, int fileNum, String sortMode) { Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); @@ -214,7 +216,8 @@ public Row next() { return null; }).filter(f -> f != null).collect(Collectors.toList()); - byte[] hilbertValue = hilbertCurve.indexBytes(longList.stream().mapToLong(l -> l).toArray()); + byte[] hilbertValue = HilbertCurveUtils.indexBytes( + hilbertCurve, longList.stream().mapToLong(l -> l).toArray()); List values = new ArrayList<>(); values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); values.add(hilbertValue); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index 35177272bb19e..b5898a6ffb97a 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -19,26 +19,28 @@ package org.apache.spark.sql.hudi.execution import org.apache.hudi.config.HoodieClusteringConfig +import org.apache.hudi.optimize.{HilbertCurveUtils, ZOrderingUtil} import org.apache.spark.rdd.{PartitionPruningRDD, RDD} -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} -import org.apache.hudi.optimize.{HilbertCurve, ZOrderingUtil} -import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.util.MutablePair import org.apache.spark.util.random.SamplingUtils +import org.davidmoten.hilbert.HilbertCurve +import java.util import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.{ClassTag, classTag} import scala.util.hashing.byteswap32 class RangeSample[K: ClassTag, V]( - zEncodeNum: Int, - rdd: RDD[_ <: Product2[K, V]], - private var ascend: Boolean = true, - val samplePointsPerPartitionHint: Int = 20) extends Serializable { + zEncodeNum: Int, + rdd: RDD[_ <: Product2[K, V]], + private var ascend: Boolean = true, + val samplePointsPerPartitionHint: Int = 20) extends Serializable { // We allow zEncodeNum = 0, which happens when sorting an empty RDD under the default settings. require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found $zEncodeNum.") @@ -515,7 +517,7 @@ object RangeSampleSort { } }.filter(v => v != -1) val mapValues = if (hilbertCurve.isDefined) { - hilbertCurve.get.indexBytes(values.map(_.toLong).toArray) + HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray) } else { ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8) } From 0d034fb15666134c98cfff743715ec0aeee29602 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 14:05:54 -0800 Subject: [PATCH 06/12] Fix rebase issues --- ...atialCurveOptimizationSortPartitioner.java | 6 ++--- .../table/HoodieSparkCopyOnWriteTable.java | 10 ++++---- ...zeHelper.java => OrderingIndexHelper.java} | 20 ++++++++-------- .../TestTableLayoutOptimization.scala | 23 +++++++++---------- .../SpaceCurveOptimizeBenchMark.scala | 12 +++++----- 5 files changed, 36 insertions(+), 35 deletions(-) rename hudi-client/hudi-spark-client/src/main/java/org/apache/spark/{SpaceCurveOptimizeHelper.java => OrderingIndexHelper.java} (97%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java index 58adb42b8d637..51526fc4dfeb5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -33,7 +33,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.spark.SpaceCurveOptimizeHelper; +import org.apache.spark.OrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -79,11 +79,11 @@ private JavaRDD prepareGenericRecord(JavaRDD> inp switch (config.getLayoutOptimizationCurveBuildMethod()) { case DIRECT: - zDataFrame = SpaceCurveOptimizeHelper + zDataFrame = OrderingIndexHelper .createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); break; case SAMPLE: - zDataFrame = SpaceCurveOptimizeHelper + zDataFrame = OrderingIndexHelper .createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy()); break; default: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index fe44a08a3b28e..f41116d77b294 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -18,8 +18,6 @@ package org.apache.hudi.table; -import org.apache.avro.Schema; -import org.apache.hadoop.fs.Path; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -76,12 +74,16 @@ import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor; import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.SpaceCurveOptimizeHelper; +import org.apache.spark.OrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import javax.annotation.Nonnull; + import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -213,7 +215,7 @@ private void updateZIndex( new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() ); - SpaceCurveOptimizeHelper.updateZIndexFor( + OrderingIndexHelper.updateZIndexFor( sparkEngineContext.getSqlContext().sparkSession(), AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), touchedFiles, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java similarity index 97% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java index ba992f3e1444c..f0d9a290e931f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/SpaceCurveOptimizeHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java @@ -76,7 +76,7 @@ import scala.collection.JavaConversions; -public class SpaceCurveOptimizeHelper { +public class OrderingIndexHelper { private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; @@ -86,10 +86,10 @@ public class SpaceCurveOptimizeHelper { * this method is more effective than createOptimizeDataFrameBySample * * @param df a spark DataFrame holds parquet files to be read. - * @param sortCols sorting columns for the curve + * @param sortCols ordering columns for the curve * @param fileNum spark partition num * @param sortMode layout optimization strategy - * @return a dataFrame sorted by the curve. + * @return a dataFrame ordered by the curve. */ public static Dataset createOptimizedDataFrameByMapValue(Dataset df, List sortCols, int fileNum, String sortMode) { Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); @@ -304,14 +304,14 @@ public static Dataset getMinMaxValue(Dataset df, List cols) { rows.add(currentColRangeMetaData.getMinValue()); rows.add(currentColRangeMetaData.getMaxValue()); } else if (colType instanceof StringType) { - rows.add(currentColRangeMetaData.getMinValueAsString()); - rows.add(currentColRangeMetaData.getMaxValueAsString()); + rows.add(currentColRangeMetaData.getMinValue().toString()); + rows.add(currentColRangeMetaData.getMaxValue().toString()); } else if (colType instanceof DecimalType) { - rows.add(new BigDecimal(currentColRangeMetaData.getMinValueAsString())); - rows.add(new BigDecimal(currentColRangeMetaData.getMaxValueAsString())); + rows.add(new BigDecimal(currentColRangeMetaData.getMinValue().toString())); + rows.add(new BigDecimal(currentColRangeMetaData.getMaxValue().toString())); } else if (colType instanceof DateType) { - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValueAsString())); - rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValueAsString())); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMinValue().toString())); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getMaxValue().toString())); } else if (colType instanceof LongType) { rows.add(currentColRangeMetaData.getMinValue()); rows.add(currentColRangeMetaData.getMaxValue()); @@ -370,7 +370,7 @@ public static void saveStatisticsInfo(Dataset df, String cols, String index Path savePath = new Path(indexPath, commitTime); SparkSession spark = df.sparkSession(); FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration()); - Dataset statisticsDF = SpaceCurveOptimizeHelper.getMinMaxValue(df, cols); + Dataset statisticsDF = OrderingIndexHelper.getMinMaxValue(df, cols); // try to find last validate index table from index path try { // If there's currently no index, create one diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala index 904a2174c2f61..84f521d42c186 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -25,17 +25,16 @@ import org.apache.hudi.common.util.{BaseFileUtils, ParquetUtils} import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} -import org.apache.spark.SpaceCurveOptimizeHelper +import org.apache.spark.OrderingIndexHelper import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach, Tag, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.{Arguments, MethodSource} import java.sql.{Date, Timestamp} - import scala.collection.JavaConversions._ import scala.util.Random @@ -136,27 +135,27 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { try { // test z-order/hilbert sort for all primitive type // shoud not throw exception. - SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) - SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) - SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) - SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) + OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) + OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) + OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "hilbert").show(1) + OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1,c2,c3,c5,c6,c7,c8", 20, "z-order").show(1) try { // do not support TimeStampType, so if we collect statistics for c4, should throw exception - val colDf = SpaceCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") + val colDf = OrderingIndexHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") colDf.cache() assertEquals(colDf.count(), 3) assertEquals(colDf.take(1)(0).length, 22) colDf.unpersist() // try to save statistics - SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) // save again - SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) // test old index table clean - SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) assertEquals(!fs.exists(new Path(statisticPath, "2")), true) assertEquals(fs.exists(new Path(statisticPath, "3")), true) // test to save different index, new index on ("c1,c6,c7,c8") should be successfully saved. - SpaceCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) + OrderingIndexHelper.saveStatisticsInfo(df, "c1,c6,c7,c8", statisticPath.toString, "5", Seq("0", "1", "3", "4")) assertEquals(fs.exists(new Path(statisticPath, "5")), true) } finally { if (fs.exists(testPath)) fs.delete(testPath) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala index 9a85b2895bec9..0568037b4ae2f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark import org.apache.hadoop.fs.Path -import org.apache.spark.SpaceCurveOptimizeHelper +import org.apache.spark.OrderingIndexHelper import org.apache.spark.sql.DataFrame import org.apache.spark.sql.hudi.TestHoodieSqlBase @@ -28,7 +28,7 @@ import scala.util.Random object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { def getSkippingPercent(tableName: String, co1: String, co2: String, value1: Int, value2: Int): Unit= { - val minMax = SpaceCurveOptimizeHelper + val minMax = OrderingIndexHelper .getMinMaxValue(spark.sql(s"select * from ${tableName}"), s"${co1}, ${co2}") .collect().map(f => (f.getInt(1), f.getInt(2), f.getInt(4), f.getInt(5))) var c = 0 @@ -97,11 +97,11 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = { import spark.implicits._ val df = spark.range(numRows).map(_ => (Random.nextInt(1000000), Random.nextInt(1000000))).toDF("c1_int", "c2_int") - val dfOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") - val dfOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") + val dfOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") + val dfOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") - val dfHilbertOptimizeByMap = SpaceCurveOptimizeHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") - val dfHilbertOptimizeBySample = SpaceCurveOptimizeHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") + val dfHilbertOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "hilbert") + val dfHilbertOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "hilbert") saveAsTable(dfOptimizeByMap, tablePath, if (skewed) "z_sort_byMap_skew" else "z_sort_byMap") saveAsTable(dfOptimizeBySample, tablePath, if (skewed) "z_sort_bySample_skew" else "z_sort_bySample") From 411029ac026d4a7fa528a486240837d775fe3a75 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 16:12:38 -0800 Subject: [PATCH 07/12] Fix build regarding z-ordering changes --- .../hudi/config/HoodieClusteringConfig.java | 5 +++++ .../hudi/index/zorder/ZOrderingIndexHelper.java | 17 +++++++++++------ .../hudi/table/HoodieSparkCopyOnWriteTable.java | 4 ++-- .../org/apache/spark/OrderingIndexHelper.java | 5 ++--- .../TestTableLayoutOptimization.scala | 4 ++-- 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 21d4fa800a4a8..676f2fff44e89 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -550,10 +550,15 @@ public enum BuildLayoutOptimizationStrategy { ZORDER("z-order"), HILBERT("hilbert"); private final String value; + BuildLayoutOptimizationStrategy(String value) { this.value = value; } + public String toCustomString() { + return value; + } + public static BuildLayoutOptimizationStrategy fromValue(String value) { switch (value.toLowerCase(Locale.ROOT)) { case "z-order": diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java index 248c15c40a9fd..934d1b9c89f69 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java @@ -18,17 +18,19 @@ package org.apache.hudi.index.zorder; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.optimize.ZOrderingUtil; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.io.api.Binary; @@ -62,10 +64,10 @@ import org.apache.spark.sql.types.StructType$; import org.apache.spark.sql.types.TimestampType; import org.apache.spark.util.SerializableConfiguration; -import scala.collection.JavaConversions; import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; @@ -77,6 +79,8 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import scala.collection.JavaConversions; + import static org.apache.hudi.util.DataTypeUtils.areCompatible; public class ZOrderingIndexHelper { @@ -189,7 +193,8 @@ public static Dataset createZIndexedDataFrameByMapValue(Dataset df, St } public static Dataset createZIndexedDataFrameBySample(Dataset df, List zCols, int fileNum) { - return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum); + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum, + HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString()); } public static Dataset createZIndexedDataFrameBySample(Dataset df, String zCols, int fileNum) { @@ -584,7 +589,7 @@ private static String composeZIndexColName(String col, String statName) { * @VisibleForTesting */ @Nonnull - static String createIndexMergeSql( + public static String createIndexMergeSql( @Nonnull String originalIndexTable, @Nonnull String newIndexTable, @Nonnull List columns diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index f41116d77b294..7d2fbd32c4d49 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -47,6 +47,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.HoodieSortedMergeHandle; @@ -79,7 +80,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.OrderingIndexHelper; import org.apache.spark.api.java.JavaRDD; import javax.annotation.Nonnull; @@ -215,7 +215,7 @@ private void updateZIndex( new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() ); - OrderingIndexHelper.updateZIndexFor( + ZOrderingIndexHelper.updateZIndexFor( sparkEngineContext.getSqlContext().sparkSession(), AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), touchedFiles, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java index f0d9a290e931f..8e83f81d9f938 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java @@ -18,7 +18,6 @@ package org.apache.spark; -import org.apache.hudi.HoodieSparkUtils$; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -27,6 +26,7 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.zorder.ZOrderingIndexHelper; import org.apache.hudi.optimize.HilbertCurveUtils; import org.apache.hudi.optimize.ZOrderingUtil; @@ -419,8 +419,7 @@ public static void saveStatisticsInfo(Dataset df, String cols, String index statisticsDF.registerTempTable(updateTable); // update table by full out join List columns = Arrays.asList(statisticsDF.schema().fieldNames()); - spark.sql(HoodieSparkUtils$ - .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); + spark.sql(ZOrderingIndexHelper.createIndexMergeSql(originalTable, updateTable, columns)).repartition(1).write().save(savePath.toString()); } else { statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala index 84f521d42c186..4b7864f25e571 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTableLayoutOptimization.scala @@ -184,7 +184,7 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { inputFiles.zipWithIndex.foreach { case (f, index) => val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() - realResult(index) = (res.getMinValueAsString, res.getMaxValueAsString) + realResult(index) = (res.getMinValue.toString, res.getMaxValue.toString) } // multi thread read with no lock @@ -192,7 +192,7 @@ class TestTableLayoutOptimization extends HoodieClientTestBase { inputFiles.zipWithIndex.par.foreach { case (f, index) => val fileUtils = BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).asInstanceOf[ParquetUtils] val res = fileUtils.readRangeFromParquetMetadata(conf, new Path(f), cols).iterator().next() - resUseLock(index) = (res.getMinValueAsString, res.getMaxValueAsString) + resUseLock(index) = (res.getMinValue.toString, res.getMaxValue.toString) } // check resUseNoLock, From 688052e6a4e22f30bd6728609edea968d9cc1bfd Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 16:30:37 -0800 Subject: [PATCH 08/12] Add one more test for padding --- .../java/org/apache/hudi/optimize/TestZOrderingUtil.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java index 1e1ff3088533f..a22485ff9d415 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java @@ -135,6 +135,14 @@ public void testConvertBytesToLong() { } } + @Test + public void testConvertBytesToLongWithPadding() { + byte[] bytes = new byte[2]; + bytes[0] = 2; + bytes[1] = 127; + assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127); + } + private byte[] convertLongToBytes(long num) { byte[] byteNum = new byte[8]; for (int i = 0; i < 8; i++) { From da9b8fc0a3230a3b4233f4c1d5a4bec8399cfc4f Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 16:40:41 -0800 Subject: [PATCH 09/12] Whitelist hilbert curve dependency in the bundles --- packaging/hudi-flink-bundle/pom.xml | 1 + packaging/hudi-kafka-connect-bundle/pom.xml | 1 + packaging/hudi-spark-bundle/pom.xml | 1 + packaging/hudi-utilities-bundle/pom.xml | 1 + 4 files changed, 4 insertions(+) diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 5f45fdd3245d0..f232597ce620a 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -107,6 +107,7 @@ com.fasterxml.jackson.core:jackson-databind com.fasterxml.jackson.core:jackson-core + com.github.davidmoten:hilbert-curve com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} io.dropwizard.metrics:metrics-core diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index a828132d8b4c9..46eb8c993fea1 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -88,6 +88,7 @@ org.apache.flink:flink-core org.apache.flink:flink-hadoop-compatibility_${scala.binary.version} + com.github.davidmoten:hilbert-curve com.yammer.metrics:metrics-core com.beust:jcommander io.javalin:javalin diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 32a9abf8f7c90..934c5ebd9d913 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -88,6 +88,7 @@ org.antlr:stringtemplate org.apache.parquet:parquet-avro + com.github.davidmoten:hilbert-curve com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} io.dropwizard.metrics:metrics-core diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 568e9d571f42a..0f47cca38d14d 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -117,6 +117,7 @@ com.amazonaws:aws-java-sdk-dynamodb com.amazonaws:aws-java-sdk-core + com.github.davidmoten:hilbert-curve com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} io.confluent:kafka-avro-serializer From 4d1777324ea0588cf68ee56c439610278e222192 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 17:21:54 -0800 Subject: [PATCH 10/12] Fix NoClassDefFoundError for com/github/davidmoten/guavamini/Preconditions --- packaging/hudi-flink-bundle/pom.xml | 1 + packaging/hudi-kafka-connect-bundle/pom.xml | 1 + packaging/hudi-spark-bundle/pom.xml | 1 + packaging/hudi-utilities-bundle/pom.xml | 1 + 4 files changed, 4 insertions(+) diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index f232597ce620a..1766a989bb4dc 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -107,6 +107,7 @@ com.fasterxml.jackson.core:jackson-databind com.fasterxml.jackson.core:jackson-core + com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 46eb8c993fea1..7f730ec21e69a 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -88,6 +88,7 @@ org.apache.flink:flink-core org.apache.flink:flink-hadoop-compatibility_${scala.binary.version} + com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.yammer.metrics:metrics-core com.beust:jcommander diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 934c5ebd9d913..02d11be9bb476 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -88,6 +88,7 @@ org.antlr:stringtemplate org.apache.parquet:parquet-avro + com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 0f47cca38d14d..798c27fe817c4 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -117,6 +117,7 @@ com.amazonaws:aws-java-sdk-dynamodb com.amazonaws:aws-java-sdk-core + com.github.davidmoten:guava-mini com.github.davidmoten:hilbert-curve com.twitter:bijection-avro_${scala.binary.version} com.twitter:bijection-core_${scala.binary.version} From 503ebc7ff871791004511bae1bc79465e341062d Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 26 Nov 2021 18:56:53 -0800 Subject: [PATCH 11/12] Fixes from xiarixiaoyao --- .../hudi/optimize/HilbertCurveUtils.java | 22 +++++++++++++++++-- .../org/apache/spark/OrderingIndexHelper.java | 2 +- .../sql/hudi/execution/RangeSample.scala | 3 +-- .../SpaceCurveOptimizeBenchMark.scala | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java index f3acc521b2c53..71413c6294255 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java @@ -27,8 +27,26 @@ * Utils for Hilbert Curve. */ public class HilbertCurveUtils { - public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points) { + public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points, int paddingNum) { BigInteger index = hilbertCurve.index(points); - return index.toByteArray(); + return paddingToNByte(index.toByteArray(), paddingNum); + } + + public static byte[] paddingToNByte(byte[] a, int paddingNum) { + if (a.length == paddingNum) { + return a; + } + if (a.length > paddingNum) { + byte[] result = new byte[paddingNum]; + System.arraycopy(a, 0, result, 0, paddingNum); + return result; + } + int paddingSize = 8 - a.length; + byte[] result = new byte[paddingNum]; + for (int i = 0; i < paddingSize; i++) { + result[i] = 0; + } + System.arraycopy(a, 0, result, paddingSize, a.length); + return result; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java index 8e83f81d9f938..67b1c672ec86b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java @@ -217,7 +217,7 @@ public Row next() { }).filter(f -> f != null).collect(Collectors.toList()); byte[] hilbertValue = HilbertCurveUtils.indexBytes( - hilbertCurve, longList.stream().mapToLong(l -> l).toArray()); + hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63); List values = new ArrayList<>(); values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); values.add(hilbertValue); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index b5898a6ffb97a..a168e55b785ff 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -30,7 +30,6 @@ import org.apache.spark.util.MutablePair import org.apache.spark.util.random.SamplingUtils import org.davidmoten.hilbert.HilbertCurve -import java.util import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.{ClassTag, classTag} @@ -517,7 +516,7 @@ object RangeSampleSort { } }.filter(v => v != -1) val mapValues = if (hilbertCurve.isDefined) { - HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray) + HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) } else { ZOrderingUtil.interleaving(values.map(ZOrderingUtil.intTo8Byte(_)).toArray, 8) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala index 0568037b4ae2f..c8263b3a2373a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/SpaceCurveOptimizeBenchMark.scala @@ -96,7 +96,7 @@ object SpaceCurveOptimizeBenchMark extends TestHoodieSqlBase { def prepareInterTypeTable(tablePath: Path, numRows: Int, col1Range: Int = 1000000, col2Range: Int = 1000000, skewed: Boolean = false): Unit = { import spark.implicits._ - val df = spark.range(numRows).map(_ => (Random.nextInt(1000000), Random.nextInt(1000000))).toDF("c1_int", "c2_int") + val df = spark.range(numRows).map(_ => (Random.nextInt(col1Range), Random.nextInt(col2Range))).toDF("c1_int", "c2_int") val dfOptimizeByMap = OrderingIndexHelper.createOptimizedDataFrameByMapValue(df, "c1_int, c2_int", 200, "z-order") val dfOptimizeBySample = OrderingIndexHelper.createOptimizeDataFrameBySample(df, "c1_int, c2_int", 200, "z-order") From 18d2357845c09748e5cd7297b3d3f4569baad7d8 Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Sat, 27 Nov 2021 12:32:38 +0800 Subject: [PATCH 12/12] Update HilbertCurveUtils.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix the bug, Abnormal numerical calculation --- .../main/java/org/apache/hudi/optimize/HilbertCurveUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java index 71413c6294255..0f216abeee748 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java @@ -41,7 +41,7 @@ public static byte[] paddingToNByte(byte[] a, int paddingNum) { System.arraycopy(a, 0, result, 0, paddingNum); return result; } - int paddingSize = 8 - a.length; + int paddingSize = paddingNum - a.length; byte[] result = new byte[paddingNum]; for (int i = 0; i < paddingSize; i++) { result[i] = 0;