diff --git a/NOTICE b/NOTICE
index 9b249331f3435..437b974ac217b 100644
--- a/NOTICE
+++ b/NOTICE
@@ -159,3 +159,9 @@ its NOTICE file:
This product includes software developed at
StreamSets (http://www.streamsets.com/).
+--------------------------------------------------------------------------------
+
+This product includes code from hilbert-curve project
+ * Copyright https://github.com/davidmoten/hilbert-curve
+ * Licensed under the Apache-2.0 License
+
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index d30ee298b1827..22ad8ec0b9d30 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -64,6 +64,13 @@
parquet-avro
+
+
+ com.github.davidmoten
+ hilbert-curve
+ 0.2.2
+
+
io.dropwizard.metrics
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 9a10965427e07..676f2fff44e89 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -542,4 +542,32 @@ public static BuildCurveStrategyType fromValue(String value) {
}
}
}
+
+ /**
+ * strategy types for optimize layout for hudi data.
+ */
+ public enum BuildLayoutOptimizationStrategy {
+ ZORDER("z-order"),
+ HILBERT("hilbert");
+ private final String value;
+
+ BuildLayoutOptimizationStrategy(String value) {
+ this.value = value;
+ }
+
+ public String toCustomString() {
+ return value;
+ }
+
+ public static BuildLayoutOptimizationStrategy fromValue(String value) {
+ switch (value.toLowerCase(Locale.ROOT)) {
+ case "z-order":
+ return ZORDER;
+ case "hilbert":
+ return HILBERT;
+ default:
+ throw new HoodieException("Invalid value of Type.");
+ }
+ }
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
new file mode 100644
index 0000000000000..0f216abeee748
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/HilbertCurveUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.math.BigInteger;
+
+/**
+ * Utils for Hilbert Curve.
+ */
+public class HilbertCurveUtils {
+ public static byte[] indexBytes(HilbertCurve hilbertCurve, long[] points, int paddingNum) {
+ BigInteger index = hilbertCurve.index(points);
+ return paddingToNByte(index.toByteArray(), paddingNum);
+ }
+
+ public static byte[] paddingToNByte(byte[] a, int paddingNum) {
+ if (a.length == paddingNum) {
+ return a;
+ }
+ if (a.length > paddingNum) {
+ byte[] result = new byte[paddingNum];
+ System.arraycopy(a, 0, result, 0, paddingNum);
+ return result;
+ }
+ int paddingSize = paddingNum - a.length;
+ byte[] result = new byte[paddingNum];
+ for (int i = 0; i < paddingSize; i++) {
+ result[i] = 0;
+ }
+ System.arraycopy(a, 0, result, paddingSize, a.length);
+ return result;
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
index 3aa808075d330..50827cc2efa69 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java
@@ -176,9 +176,14 @@ public static byte[] utf8To8Byte(String a) {
public static Long convertStringToLong(String a) {
byte[] bytes = utf8To8Byte(a);
+ return convertBytesToLong(bytes);
+ }
+
+ public static long convertBytesToLong(byte[] bytes) {
+ byte[] paddedBytes = paddingTo8Byte(bytes);
long temp = 0L;
for (int i = 7; i >= 0; i--) {
- temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8);
+ temp = temp | (((long) paddedBytes[i] & 0xff) << (7 - i) * 8);
}
return temp;
}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
new file mode 100644
index 0000000000000..5bb482e6d67fe
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.optimize;
+
+import org.davidmoten.hilbert.HilbertCurve;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHilbertCurveUtils {
+
+ private static final HilbertCurve INSTANCE = HilbertCurve.bits(5).dimensions(2);
+
+ @Test
+ public void testIndex() {
+ long[] t = {1, 2};
+ assertEquals(13, INSTANCE.index(t).intValue());
+ long[] t1 = {0, 16};
+ assertEquals(256, INSTANCE.index(t1).intValue());
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
index 7dab6c2057c77..a22485ff9d415 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java
@@ -126,4 +126,29 @@ public OrginValueWrapper(T index, T originValue) {
this.originValue = originValue;
}
}
+
+ @Test
+ public void testConvertBytesToLong() {
+ long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE};
+ for (int i = 0; i < tests.length; i++) {
+ assertEquals(ZOrderingUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]);
+ }
+ }
+
+ @Test
+ public void testConvertBytesToLongWithPadding() {
+ byte[] bytes = new byte[2];
+ bytes[0] = 2;
+ bytes[1] = 127;
+ assertEquals(ZOrderingUtil.convertBytesToLong(bytes), 2 * 256 + 127);
+ }
+
+ private byte[] convertLongToBytes(long num) {
+ byte[] byteNum = new byte[8];
+ for (int i = 0; i < 8; i++) {
+ int offset = 64 - (i + 1) * 8;
+ byteNum[i] = (byte) ((num >> offset) & 0xff);
+ }
+ return byteNum;
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
index 03fdf5a5b8813..51526fc4dfeb5 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java
@@ -33,7 +33,7 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
+import org.apache.spark.OrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -79,10 +79,12 @@ private JavaRDD prepareGenericRecord(JavaRDD> inp
switch (config.getLayoutOptimizationCurveBuildMethod()) {
case DIRECT:
- zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups);
+ zDataFrame = OrderingIndexHelper
+ .createOptimizedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break;
case SAMPLE:
- zDataFrame = ZOrderingIndexHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups);
+ zDataFrame = OrderingIndexHelper
+ .createOptimizeDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups, config.getLayoutOptimizationStrategy());
break;
default:
throw new HoodieException("Not a valid build curve method for doWriteOperation: ");
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
index 248c15c40a9fd..934d1b9c89f69 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/zorder/ZOrderingIndexHelper.java
@@ -18,17 +18,19 @@
package org.apache.hudi.index.zorder;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.optimize.ZOrderingUtil;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.io.api.Binary;
@@ -62,10 +64,10 @@
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.util.SerializableConfiguration;
-import scala.collection.JavaConversions;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -77,6 +79,8 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import scala.collection.JavaConversions;
+
import static org.apache.hudi.util.DataTypeUtils.areCompatible;
public class ZOrderingIndexHelper {
@@ -189,7 +193,8 @@ public static Dataset createZIndexedDataFrameByMapValue(Dataset df, St
}
public static Dataset createZIndexedDataFrameBySample(Dataset df, List zCols, int fileNum) {
- return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum);
+ return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum,
+ HoodieClusteringConfig.BuildLayoutOptimizationStrategy.ZORDER.toCustomString());
}
public static Dataset createZIndexedDataFrameBySample(Dataset df, String zCols, int fileNum) {
@@ -584,7 +589,7 @@ private static String composeZIndexColName(String col, String statName) {
* @VisibleForTesting
*/
@Nonnull
- static String createIndexMergeSql(
+ public static String createIndexMergeSql(
@Nonnull String originalIndexTable,
@Nonnull String newIndexTable,
@Nonnull List columns
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 280d24f718e65..7d2fbd32c4d49 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -18,8 +18,6 @@
package org.apache.hudi.table;
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -49,6 +47,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
@@ -76,12 +75,15 @@
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.savepoint.SavepointActionExecutor;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
import org.apache.spark.api.java.JavaRDD;
import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
new file mode 100644
index 0000000000000..67b1c672ec86b
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/OrderingIndexHelper.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.util.BaseFileUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.zorder.ZOrderingIndexHelper;
+import org.apache.hudi.optimize.HilbertCurveUtils;
+import org.apache.hudi.optimize.ZOrderingUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.io.api.Binary;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Row$;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
+import org.apache.spark.sql.hudi.execution.ZorderingBinarySort;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.util.SerializableConfiguration;
+import org.davidmoten.hilbert.HilbertCurve;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+
+public class OrderingIndexHelper {
+
+ private static final String SPARK_JOB_DESCRIPTION = "spark.job.description";
+
+ /**
+ * Create optimized DataFrame directly
+ * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte
+ * this method is more effective than createOptimizeDataFrameBySample
+ *
+ * @param df a spark DataFrame holds parquet files to be read.
+ * @param sortCols ordering columns for the curve
+ * @param fileNum spark partition num
+ * @param sortMode layout optimization strategy
+ * @return a dataFrame ordered by the curve.
+ */
+ public static Dataset createOptimizedDataFrameByMapValue(Dataset df, List sortCols, int fileNum, String sortMode) {
+ Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e));
+ int fieldNum = df.schema().fields().length;
+ List checkCols = sortCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList());
+ if (sortCols.size() != checkCols.size()) {
+ return df;
+ }
+ // only one col to sort, no need to use z-order
+ if (sortCols.size() == 1) {
+ return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(sortCols.get(0)));
+ }
+ Map fieldMap = sortCols
+ .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e)));
+ // do optimize
+ JavaRDD sortedRDD = null;
+ switch (HoodieClusteringConfig.BuildLayoutOptimizationStrategy.fromValue(sortMode)) {
+ case ZORDER:
+ sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
+ break;
+ case HILBERT:
+ sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, fileNum);
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", sortMode));
+ }
+ // create new StructType
+ List newFields = new ArrayList<>();
+ newFields.addAll(Arrays.asList(df.schema().fields()));
+ newFields.add(new StructField("Index", BinaryType$.MODULE$, true, Metadata.empty()));
+
+ // create new DataFrame
+ return df.sparkSession().createDataFrame(sortedRDD, StructType$.MODULE$.apply(newFields)).drop("Index");
+ }
+
+ private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map fieldMap, int fieldNum, int fileNum) {
+ return originRDD.map(row -> {
+ List zBytesList = fieldMap.entrySet().stream().map(entry -> {
+ int index = entry.getKey();
+ StructField field = entry.getValue();
+ DataType dataType = field.dataType();
+ if (dataType instanceof LongType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
+ } else if (dataType instanceof DoubleType) {
+ return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
+ } else if (dataType instanceof IntegerType) {
+ return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
+ } else if (dataType instanceof FloatType) {
+ return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
+ } else if (dataType instanceof StringType) {
+ return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
+ } else if (dataType instanceof DateType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
+ } else if (dataType instanceof TimestampType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
+ } else if (dataType instanceof ByteType) {
+ return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
+ } else if (dataType instanceof ShortType) {
+ return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
+ } else if (dataType instanceof DecimalType) {
+ return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
+ } else if (dataType instanceof BooleanType) {
+ boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
+ return ZOrderingUtil.intTo8Byte(value ? 1 : 0);
+ } else if (dataType instanceof BinaryType) {
+ return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
+ }
+ return null;
+ }).filter(f -> f != null).collect(Collectors.toList());
+ byte[][] zBytes = new byte[zBytesList.size()][];
+ for (int i = 0; i < zBytesList.size(); i++) {
+ zBytes[i] = zBytesList.get(i);
+ }
+ List