From f8c5580447cb2f4f873a86d20178df12b9c2d82c Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Thu, 20 Oct 2022 22:28:14 +0800 Subject: [PATCH 1/6] make SerDeUtils independent of kryo class --- .../apache/spark/sql/hudi/SerDeUtils.scala | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala index 32d1960ee13ee..287fc51195741 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala @@ -17,39 +17,26 @@ package org.apache.spark.sql.hudi -import java.io.ByteArrayOutputStream - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.SparkConf -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} + +import java.nio.ByteBuffer object SerDeUtils { - private val kryoLocal = new ThreadLocal[Kryo] { + private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] { - override protected def initialValue: Kryo = { - val serializer = new KryoSerializer(new SparkConf(true)) - serializer.newKryo() + override protected def initialValue: SerializerInstance = { + new KryoSerializer(new SparkConf(true)).newInstance() } } def toBytes(o: Any): Array[Byte] = { - val outputStream = new ByteArrayOutputStream(4096 * 5) - val output = new Output(outputStream) - try { - kryoLocal.get.writeClassAndObject(output, o) - output.flush() - } finally { - output.clear() - output.close() - } - outputStream.toByteArray + SERIALIZER_THREAD_LOCAL.get.serialize(o).array() } def toObject(bytes: Array[Byte]): Any = { - val input = new Input(bytes) - kryoLocal.get.readClassAndObject(input) + SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes)) } } From 7f8175827ab7c31d8cf849069cae693047dcd486 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 21 Oct 2022 01:03:25 +0800 Subject: [PATCH 2/6] fix byte buffer --- .../main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala index 287fc51195741..19d0a0a98ba39 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala @@ -33,7 +33,10 @@ object SerDeUtils { } def toBytes(o: Any): Array[Byte] = { - SERIALIZER_THREAD_LOCAL.get.serialize(o).array() + val bb: ByteBuffer = SERIALIZER_THREAD_LOCAL.get.serialize(o) + val bytes = new Array[Byte](bb.capacity()) + bb.get(bytes) + bytes } def toObject(bytes: Array[Byte]): Any = { From f3b6bba2ea8b01c4e5e714c526ee2756311f86b2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 20 Oct 2022 12:59:38 -0700 Subject: [PATCH 3/6] `BinaryUtil` > `BinaryUtils`; Added utility to extract bytes from `ByteBuffer` --- .../hudi/sort/SpaceCurveSortingHelper.java | 34 +++++++++---------- .../sql/hudi/execution/RangeSample.scala | 10 +++--- .../hudi/common/table/HoodieTableConfig.java | 4 +-- .../{BinaryUtil.java => BinaryUtils.java} | 12 ++++++- .../hudi/common/util/SpillableMapUtils.java | 2 +- .../util/collection/BitCaskDiskMap.java | 2 +- ...stBinaryUtil.java => TestBinaryUtils.java} | 22 ++++++------ 7 files changed, 48 insertions(+), 38 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/common/util/{BinaryUtil.java => BinaryUtils.java} (95%) rename hudi-common/src/test/java/org/apache/hudi/common/util/{TestBinaryUtil.java => TestBinaryUtils.java} (87%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java index 496168e844276..1ff54773c466a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java @@ -18,7 +18,7 @@ package org.apache.hudi.sort; -import org.apache.hudi.common.util.BinaryUtil; +import org.apache.hudi.common.util.BinaryUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.optimize.HilbertCurveUtils; @@ -158,7 +158,7 @@ private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); @@ -206,30 +206,30 @@ private static Row appendToRow(Row row, Object value) { @Nonnull private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType dataType) { if (dataType instanceof LongType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); } else if (dataType instanceof DoubleType) { - return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + return BinaryUtils.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); } else if (dataType instanceof IntegerType) { - return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + return BinaryUtils.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); } else if (dataType instanceof FloatType) { - return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + return BinaryUtils.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); } else if (dataType instanceof StringType) { - return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + return BinaryUtils.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); } else if (dataType instanceof DateType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); } else if (dataType instanceof TimestampType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); } else if (dataType instanceof ByteType) { - return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + return BinaryUtils.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); } else if (dataType instanceof ShortType) { - return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + return BinaryUtils.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); } else if (dataType instanceof DecimalType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); } else if (dataType instanceof BooleanType) { boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return BinaryUtil.intTo8Byte(value ? 1 : 0); + return BinaryUtils.intTo8Byte(value ? 1 : 0); } else if (dataType instanceof BinaryType) { - return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + return BinaryUtils.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); } throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); @@ -245,13 +245,13 @@ private static long mapColumnValueToLong(Row row, int index, DataType dataType) } else if (dataType instanceof FloatType) { return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); } else if (dataType instanceof StringType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index)); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertStringToLong(row.getString(index)); } else if (dataType instanceof DateType) { return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); } else if (dataType instanceof TimestampType) { return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); } else if (dataType instanceof ByteType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)}); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertBytesToLong(new byte[] {row.getByte(index)}); } else if (dataType instanceof ShortType) { return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getShort(index); } else if (dataType instanceof DecimalType) { @@ -260,7 +260,7 @@ private static long mapColumnValueToLong(Row row, int index, DataType dataType) boolean value = row.isNullAt(index) ? false : row.getBoolean(index); return value ? Long.MAX_VALUE : 0; } else if (dataType instanceof BinaryType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index)); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertBytesToLong((byte[]) row.get(index)); } throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index 7c39ce2546f26..757964c218607 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi.execution -import org.apache.hudi.common.util.BinaryUtil +import org.apache.hudi.common.util.BinaryUtils import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy import org.apache.hudi.optimize.HilbertCurveUtils @@ -240,7 +240,7 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S case class ByteArraySorting(b: Array[Byte]) extends Ordered[ByteArraySorting] with Serializable { override def compare(that: ByteArraySorting): Int = { val len = this.b.length - BinaryUtil.compareTo(this.b, 0, len, that.b, 0, len) + BinaryUtils.compareTo(this.b, 0, len, that.b, 0, len) } } @@ -430,7 +430,7 @@ object RangeSampleSort { case LayoutOptimizationStrategy.HILBERT => HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) case LayoutOptimizationStrategy.ZORDER => - BinaryUtil.interleaving(values.map(BinaryUtil.intTo8Byte(_)).toArray, 8) + BinaryUtils.interleaving(values.map(BinaryUtils.intTo8Byte(_)).toArray, 8) } Row.fromSeq(row.toSeq ++ Seq(mapValues)) @@ -525,8 +525,8 @@ object RangeSampleSort { decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]]) } } - }.toArray.map(BinaryUtil.intTo8Byte(_)) - val zValues = BinaryUtil.interleaving(interleaveValues, 8) + }.toArray.map(BinaryUtils.intTo8Byte(_)) + val zValues = BinaryUtils.interleaving(interleaveValues, 8) val mutablePair = new MutablePair[InternalRow, Array[Byte]]() mutablePair.update(unsafeRow, zValues) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index ac3608fc003dc..3239cc213c64a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -34,7 +34,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.util.BinaryUtil; +import org.apache.hudi.common.util.BinaryUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -460,7 +460,7 @@ public static long generateChecksum(Properties props) { } String table = props.getProperty(NAME.key()); String database = props.getProperty(DATABASE_NAME.key(), ""); - return BinaryUtil.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); + return BinaryUtils.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); } public static boolean validateChecksum(Properties props) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java similarity index 95% rename from hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java index 9fec2c8cf5924..96410a619e104 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java @@ -18,10 +18,11 @@ package org.apache.hudi.common.util; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.zip.CRC32; -public class BinaryUtil { +public class BinaryUtils { /** * Lexicographically compare two arrays. @@ -117,6 +118,15 @@ public static byte updatePos(byte a, int apos, byte b, int bpos) { return (byte) (a ^ (1 << (7 - apos))); } + /** + * Copies {@link ByteBuffer} into allocated {@code byte[]} array + */ + public static byte[] toBytes(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + public static byte[] toBytes(int val) { byte[] b = new byte[4]; for (int i = 3; i > 0; i--) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index d4bafd9c9feee..41fd3b6951a6c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -35,7 +35,7 @@ import java.io.RandomAccessFile; import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; -import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; +import static org.apache.hudi.common.util.BinaryUtils.generateChecksum; /** * A utility class supports spillable map. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 9fb0b20e74f2c..d5a4559848ad3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -56,7 +56,7 @@ import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; +import static org.apache.hudi.common.util.BinaryUtils.generateChecksum; /** * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java similarity index 87% rename from hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java index 1efe5a06865d8..fa0140cbc33ca 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java @@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestBinaryUtil { +public class TestBinaryUtils { @Test public void testIntConvert() { @@ -37,12 +37,12 @@ public void testIntConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testInt.length; i++) { valueWrappers.add(new OrginValueWrapper<>(i, testInt[i])); - convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtil.intTo8Byte(testInt[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtils.intTo8Byte(testInt[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testInt.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -57,12 +57,12 @@ public void testLongConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testLong.length; i++) { valueWrappers.add(new OrginValueWrapper<>((long)i, testLong[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtil.longTo8Byte(testLong[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtils.longTo8Byte(testLong[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testLong.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -77,12 +77,12 @@ public void testDoubleConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((Double)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtil.doubleTo8Byte(testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtils.doubleTo8Byte(testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -97,12 +97,12 @@ public void testFloatConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((float)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtil.doubleTo8Byte((double) testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtils.doubleTo8Byte((double) testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -131,7 +131,7 @@ public OrginValueWrapper(T index, T originValue) { public void testConvertBytesToLong() { long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE}; for (int i = 0; i < tests.length; i++) { - assertEquals(BinaryUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); + assertEquals(BinaryUtils.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); } } @@ -140,7 +140,7 @@ public void testConvertBytesToLongWithPadding() { byte[] bytes = new byte[2]; bytes[0] = 2; bytes[1] = 127; - assertEquals(BinaryUtil.convertBytesToLong(bytes), 2 * 256 + 127); + assertEquals(BinaryUtils.convertBytesToLong(bytes), 2 * 256 + 127); } private byte[] convertLongToBytes(long num) { From cebd168d9ec4aab77e87e553e2e4bce2995d46f0 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 20 Oct 2022 13:01:13 -0700 Subject: [PATCH 4/6] Rebased `ByteBuffer` cloning onto the new utility --- .../main/java/org/apache/hudi/common/util/AvroOrcUtils.java | 4 ++-- .../src/main/java/org/apache/hudi/common/util/OrcUtils.java | 5 +++-- .../scala/org/apache/hudi/ColumnStatsIndexSupport.scala | 6 ++---- .../main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala | 6 ++---- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index ca59c301c887e..c83ec68976491 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -54,6 +54,7 @@ import org.apache.orc.TypeDescription; import static org.apache.avro.JsonProperties.NULL_VALUE; +import static org.apache.hudi.common.util.BinaryUtils.toBytes; /** * Methods including addToVector, addUnionValue, createOrcSchema are originally from @@ -221,8 +222,7 @@ public static void addToVector(TypeDescription type, ColumnVector colVector, Sch binaryBytes = ((GenericData.Fixed)value).bytes(); } else if (value instanceof ByteBuffer) { final ByteBuffer byteBuffer = (ByteBuffer) value; - binaryBytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(binaryBytes); + binaryBytes = toBytes(byteBuffer); } else if (value instanceof byte[]) { binaryBytes = (byte[]) value; } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 0cc40591972a0..4cb55f379042e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -52,6 +52,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.BinaryUtils.toBytes; + /** * Utility functions for ORC files. */ @@ -238,8 +240,7 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { if (reader.hasMetadataValue("orc.avro.schema")) { ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); - byte[] bytes = new byte[metadataValue.remaining()]; - metadataValue.get(bytes); + byte[] bytes = toBytes(metadataValue); return new Schema.Parser().parse(new String(bytes)); } else { TypeDescription orcSchema = reader.getSchema(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 58511f791ed78..dc413afff1d27 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.util.BinaryUtils.toBytes import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.collection import org.apache.hudi.common.util.hash.ColumnIndexID @@ -469,10 +470,7 @@ object ColumnStatsIndexSupport { } case BinaryType => value match { - case b: ByteBuffer => - val bytes = new Array[Byte](b.remaining) - b.get(bytes) - bytes + case b: ByteBuffer => toBytes(b) case other => other } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala index 19d0a0a98ba39..294d282e3d909 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala @@ -33,10 +33,8 @@ object SerDeUtils { } def toBytes(o: Any): Array[Byte] = { - val bb: ByteBuffer = SERIALIZER_THREAD_LOCAL.get.serialize(o) - val bytes = new Array[Byte](bb.capacity()) - bb.get(bytes) - bytes + val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o) + toBytes(buf) } def toObject(bytes: Array[Byte]): Any = { From 836e03bd2e07dc5ad11450c618c42fb3d5fededc Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 20 Oct 2022 13:03:23 -0700 Subject: [PATCH 5/6] Reverted rename --- .../hudi/sort/SpaceCurveSortingHelper.java | 34 +++++++++---------- .../sql/hudi/execution/RangeSample.scala | 10 +++--- .../hudi/common/table/HoodieTableConfig.java | 4 +-- .../apache/hudi/common/util/AvroOrcUtils.java | 2 +- .../{BinaryUtils.java => BinaryUtil.java} | 2 +- .../org/apache/hudi/common/util/OrcUtils.java | 2 +- .../hudi/common/util/SpillableMapUtils.java | 2 +- .../util/collection/BitCaskDiskMap.java | 2 +- ...stBinaryUtils.java => TestBinaryUtil.java} | 22 ++++++------ .../apache/hudi/ColumnStatsIndexSupport.scala | 2 +- 10 files changed, 41 insertions(+), 41 deletions(-) rename hudi-common/src/main/java/org/apache/hudi/common/util/{BinaryUtils.java => BinaryUtil.java} (99%) rename hudi-common/src/test/java/org/apache/hudi/common/util/{TestBinaryUtils.java => TestBinaryUtil.java} (87%) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java index 1ff54773c466a..496168e844276 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java @@ -18,7 +18,7 @@ package org.apache.hudi.sort; -import org.apache.hudi.common.util.BinaryUtils; +import org.apache.hudi.common.util.BinaryUtil; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.optimize.HilbertCurveUtils; @@ -158,7 +158,7 @@ private static JavaRDD createZCurveSortedRDD(JavaRDD originRDD, Map new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); @@ -206,30 +206,30 @@ private static Row appendToRow(Row row, Object value) { @Nonnull private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType dataType) { if (dataType instanceof LongType) { - return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); } else if (dataType instanceof DoubleType) { - return BinaryUtils.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); } else if (dataType instanceof IntegerType) { - return BinaryUtils.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); } else if (dataType instanceof FloatType) { - return BinaryUtils.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); } else if (dataType instanceof StringType) { - return BinaryUtils.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); } else if (dataType instanceof DateType) { - return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); } else if (dataType instanceof TimestampType) { - return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); } else if (dataType instanceof ByteType) { - return BinaryUtils.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); } else if (dataType instanceof ShortType) { - return BinaryUtils.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); } else if (dataType instanceof DecimalType) { - return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); } else if (dataType instanceof BooleanType) { boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return BinaryUtils.intTo8Byte(value ? 1 : 0); + return BinaryUtil.intTo8Byte(value ? 1 : 0); } else if (dataType instanceof BinaryType) { - return BinaryUtils.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); } throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); @@ -245,13 +245,13 @@ private static long mapColumnValueToLong(Row row, int index, DataType dataType) } else if (dataType instanceof FloatType) { return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); } else if (dataType instanceof StringType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertStringToLong(row.getString(index)); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index)); } else if (dataType instanceof DateType) { return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); } else if (dataType instanceof TimestampType) { return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); } else if (dataType instanceof ByteType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertBytesToLong(new byte[] {row.getByte(index)}); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)}); } else if (dataType instanceof ShortType) { return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getShort(index); } else if (dataType instanceof DecimalType) { @@ -260,7 +260,7 @@ private static long mapColumnValueToLong(Row row, int index, DataType dataType) boolean value = row.isNullAt(index) ? false : row.getBoolean(index); return value ? Long.MAX_VALUE : 0; } else if (dataType instanceof BinaryType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertBytesToLong((byte[]) row.get(index)); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index)); } throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index 757964c218607..7c39ce2546f26 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi.execution -import org.apache.hudi.common.util.BinaryUtils +import org.apache.hudi.common.util.BinaryUtil import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy import org.apache.hudi.optimize.HilbertCurveUtils @@ -240,7 +240,7 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S case class ByteArraySorting(b: Array[Byte]) extends Ordered[ByteArraySorting] with Serializable { override def compare(that: ByteArraySorting): Int = { val len = this.b.length - BinaryUtils.compareTo(this.b, 0, len, that.b, 0, len) + BinaryUtil.compareTo(this.b, 0, len, that.b, 0, len) } } @@ -430,7 +430,7 @@ object RangeSampleSort { case LayoutOptimizationStrategy.HILBERT => HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) case LayoutOptimizationStrategy.ZORDER => - BinaryUtils.interleaving(values.map(BinaryUtils.intTo8Byte(_)).toArray, 8) + BinaryUtil.interleaving(values.map(BinaryUtil.intTo8Byte(_)).toArray, 8) } Row.fromSeq(row.toSeq ++ Seq(mapValues)) @@ -525,8 +525,8 @@ object RangeSampleSort { decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]]) } } - }.toArray.map(BinaryUtils.intTo8Byte(_)) - val zValues = BinaryUtils.interleaving(interleaveValues, 8) + }.toArray.map(BinaryUtil.intTo8Byte(_)) + val zValues = BinaryUtil.interleaving(interleaveValues, 8) val mutablePair = new MutablePair[InternalRow, Array[Byte]]() mutablePair.update(unsafeRow, zValues) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 3239cc213c64a..ac3608fc003dc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -34,7 +34,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.util.BinaryUtils; +import org.apache.hudi.common.util.BinaryUtil; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -460,7 +460,7 @@ public static long generateChecksum(Properties props) { } String table = props.getProperty(NAME.key()); String database = props.getProperty(DATABASE_NAME.key(), ""); - return BinaryUtils.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); + return BinaryUtil.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); } public static boolean validateChecksum(Properties props) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index c83ec68976491..c31184244390f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -54,7 +54,7 @@ import org.apache.orc.TypeDescription; import static org.apache.avro.JsonProperties.NULL_VALUE; -import static org.apache.hudi.common.util.BinaryUtils.toBytes; +import static org.apache.hudi.common.util.BinaryUtil.toBytes; /** * Methods including addToVector, addUnionValue, createOrcSchema are originally from diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java similarity index 99% rename from hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java index 96410a619e104..9d8f6c8e90cf3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java @@ -22,7 +22,7 @@ import java.nio.charset.Charset; import java.util.zip.CRC32; -public class BinaryUtils { +public class BinaryUtil { /** * Lexicographically compare two arrays. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 4cb55f379042e..5afe354d0e755 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -52,7 +52,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.hudi.common.util.BinaryUtils.toBytes; +import static org.apache.hudi.common.util.BinaryUtil.toBytes; /** * Utility functions for ORC files. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index 41fd3b6951a6c..d4bafd9c9feee 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -35,7 +35,7 @@ import java.io.RandomAccessFile; import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; -import static org.apache.hudi.common.util.BinaryUtils.generateChecksum; +import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; /** * A utility class supports spillable map. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index d5a4559848ad3..9fb0b20e74f2c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -56,7 +56,7 @@ import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import static org.apache.hudi.common.util.BinaryUtils.generateChecksum; +import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; /** * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java similarity index 87% rename from hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java index fa0140cbc33ca..1efe5a06865d8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java @@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestBinaryUtils { +public class TestBinaryUtil { @Test public void testIntConvert() { @@ -37,12 +37,12 @@ public void testIntConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testInt.length; i++) { valueWrappers.add(new OrginValueWrapper<>(i, testInt[i])); - convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtils.intTo8Byte(testInt[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtil.intTo8Byte(testInt[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testInt.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -57,12 +57,12 @@ public void testLongConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testLong.length; i++) { valueWrappers.add(new OrginValueWrapper<>((long)i, testLong[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtils.longTo8Byte(testLong[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtil.longTo8Byte(testLong[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testLong.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -77,12 +77,12 @@ public void testDoubleConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((Double)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtils.doubleTo8Byte(testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtil.doubleTo8Byte(testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -97,12 +97,12 @@ public void testFloatConvert() { List> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((float)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtils.doubleTo8Byte((double) testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtil.doubleTo8Byte((double) testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -131,7 +131,7 @@ public OrginValueWrapper(T index, T originValue) { public void testConvertBytesToLong() { long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE}; for (int i = 0; i < tests.length; i++) { - assertEquals(BinaryUtils.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); + assertEquals(BinaryUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); } } @@ -140,7 +140,7 @@ public void testConvertBytesToLongWithPadding() { byte[] bytes = new byte[2]; bytes[0] = 2; bytes[1] = 127; - assertEquals(BinaryUtils.convertBytesToLong(bytes), 2 * 256 + 127); + assertEquals(BinaryUtil.convertBytesToLong(bytes), 2 * 256 + 127); } private byte[] convertLongToBytes(long num) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index dc413afff1d27..5cf7a5ec035ab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -29,7 +29,7 @@ import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.FileSystemViewStorageConfig -import org.apache.hudi.common.util.BinaryUtils.toBytes +import org.apache.hudi.common.util.BinaryUtil.toBytes import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.collection import org.apache.hudi.common.util.hash.ColumnIndexID From 06b6d3765064021bee3013ce073a788aec5fe720 Mon Sep 17 00:00:00 2001 From: Raymond Xu <2701446+xushiyan@users.noreply.github.com> Date: Fri, 21 Oct 2022 10:32:17 +0800 Subject: [PATCH 6/6] fix import --- .../src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala index 294d282e3d909..631644121c133 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.common.util.BinaryUtil import org.apache.spark.SparkConf import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} @@ -34,7 +35,7 @@ object SerDeUtils { def toBytes(o: Any): Array[Byte] = { val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o) - toBytes(buf) + BinaryUtil.toBytes(buf) } def toObject(bytes: Array[Byte]): Any = {