diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index aa5d49c7ab65..115269f4b7e0 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -113,6 +113,15 @@ default RewriteDataFiles sort(SortOrder sortOrder) { throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework"); } + /** + * Choose Z-ORDER as a strategy for this rewrite operation with a specified list of columns to use + * @param columns Columns to be used to generate Z-Values + * @return this for method chaining + */ + default RewriteDataFiles zOrder(String... columns) { + throw new UnsupportedOperationException("Z-ORDER Rewrite Strategy not implemented for this framework"); + } + /** * A user provided filter for determining which files will be considered by the rewrite strategy. This will be used * in addition to whatever rules the rewrite strategy generates. For example this would be used for providing a diff --git a/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java index 213b222dc507..efc05f179f82 100644 --- a/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java +++ b/api/src/main/java/org/apache/iceberg/util/ByteBuffers.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public class ByteBuffers { @@ -46,6 +47,15 @@ public static byte[] toByteArray(ByteBuffer buffer) { } } + public static ByteBuffer reuse(ByteBuffer reuse, int length) { + Preconditions.checkArgument(reuse.hasArray() && reuse.arrayOffset() == 0 && reuse.capacity() == length, + "Cannot reuse buffer: Should be an array %s, should have an offset of 0 %s, should be of size %s was %s", + reuse.hasArray(), reuse.arrayOffset(), length, reuse.capacity()); + reuse.position(0); + reuse.limit(length); + return reuse; + } + public static ByteBuffer copy(ByteBuffer buffer) { if (buffer == null) { return null; diff --git a/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java new file mode 100644 index 000000000000..39ef0dcc14d3 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/ZOrderByteUtils.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Within Z-Ordering the byte representations of objects being compared must be ordered, + * this requires several types to be transformed when converted to bytes. The goal is to + * map object's whose byte representation are not lexicographically ordered into representations + * that are lexicographically ordered. Bytes produced should be compared lexicographically as + * unsigned bytes, big-endian. + *

+ * Most of these techniques are derived from + * https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/ + *

+ * Some implementation is taken from + * https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/OrderedBytes.java + */ +public class ZOrderByteUtils { + + private ZOrderByteUtils() { + + } + + /** + * Signed ints do not have their bytes in magnitude order because of the sign bit. + * To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially + * shifts the 0 value so that we don't break our ordering when we cross the new 0 value. + */ + public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Integer.BYTES); + bytes.putInt(val ^ 0x80000000); + return bytes; + } + + /** + * Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Long.BYTES); + bytes.putLong(val ^ 0x8000000000000000L); + return bytes; + } + + /** + * Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Short.BYTES); + bytes.putShort((short) (val ^ (0x8000))); + return bytes; + } + + /** + * Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)} + */ + public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Byte.BYTES); + bytes.put((byte) (val ^ (0x80))); + return bytes; + } + + /** + * IEEE 754 : + * “If two floating-point numbers in the same format are ordered (say, x {@literal <} y), + * they are ordered the same way when their bits are reinterpreted as sign-magnitude integers.” + * + * Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically + * comparable bytes + */ + public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Float.BYTES); + int ival = Float.floatToIntBits(val); + ival ^= ((ival >> (Integer.SIZE - 1)) | Integer.MIN_VALUE); + bytes.putInt(ival); + return bytes; + } + + /** + * Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)} + */ + public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer reuse) { + ByteBuffer bytes = ByteBuffers.reuse(reuse, Double.BYTES); + long lng = Double.doubleToLongBits(val); + lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE); + bytes.putLong(lng); + return bytes; + } + + /** + * Strings are lexicographically sortable BUT if different byte array lengths will + * ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time). + * This implementation just uses a set size to for all output byte representations. Truncating longer strings + * and right padding 0 for shorter strings. + */ + public static ByteBuffer stringToOrderedBytes(String val, int length, ByteBuffer reuse, CharsetEncoder encoder) { + Preconditions.checkArgument(encoder.charset().equals(StandardCharsets.UTF_8), + "Cannot use an encoder not using UTF_8 as it's Charset"); + + ByteBuffer bytes = ByteBuffers.reuse(reuse, length); + Arrays.fill(bytes.array(), 0, length, (byte) 0x00); + if (val != null) { + CharBuffer inputBuffer = CharBuffer.wrap(val); + encoder.encode(inputBuffer, bytes, true); + } + return bytes; + } + + /** + * For Testing interleave all available bytes + */ + static byte[] interleaveBits(byte[][] columnsBinary) { + return interleaveBits(columnsBinary, + Arrays.stream(columnsBinary).mapToInt(column -> column.length).sum()); + } + + public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) { + return interleaveBits(columnsBinary, interleavedSize, ByteBuffer.allocate(interleavedSize)); + } + + /** + * Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is + * required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all + * columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the + * interleaving. + * @param columnsBinary an array of ordered byte representations of the columns being ZOrdered + * @param interleavedSize the number of bytes to use in the output + * @return the columnbytes interleaved + */ + public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) { + byte[] interleavedBytes = reuse.array(); + int sourceColumn = 0; + int sourceByte = 0; + int sourceBit = 7; + int interleaveByte = 0; + int interleaveBit = 7; + + while (interleaveByte < interleavedSize) { + // Take the source bit from source byte and move it to the output bit position + interleavedBytes[interleaveByte] |= + (columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit; + --interleaveBit; + + // Check if an output byte has been completed + if (interleaveBit == -1) { + // Move to the next output byte + interleaveByte++; + // Move to the highest order bit of the new output byte + interleaveBit = 7; + } + + // Check if the last output byte has been completed + if (interleaveByte == interleavedSize) { + break; + } + + // Find the next source bit to interleave + do { + // Move to next column + ++sourceColumn; + if (sourceColumn == columnsBinary.length) { + // If the last source column was used, reset to next bit of first column + sourceColumn = 0; + --sourceBit; + if (sourceBit == -1) { + // If the last bit of the source byte was used, reset to the highest bit of the next byte + sourceByte++; + sourceBit = 7; + } + } + } while (columnsBinary[sourceColumn].length <= sourceByte); + } + return interleavedBytes; + } + +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java new file mode 100644 index 000000000000..bf84319d0d45 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestZOrderByteUtil.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Random; +import org.apache.iceberg.relocated.com.google.common.primitives.UnsignedBytes; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestZOrderByteUtil { + private static final byte IIIIIIII = (byte) 255; + private static final byte IOIOIOIO = (byte) 170; + private static final byte OIOIOIOI = (byte) 85; + private static final byte OOOOIIII = (byte) 15; + private static final byte OOOOOOOI = (byte) 1; + private static final byte OOOOOOOO = (byte) 0; + + private static final int NUM_TESTS = 100000; + private static final int NUM_INTERLEAVE_TESTS = 1000; + + private final Random random = new Random(42); + + private String bytesToString(byte[] bytes) { + StringBuilder result = new StringBuilder(); + for (byte b : bytes) { + result.append(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0')); + } + return result.toString(); + } + + /** + * Returns a non-0 length byte array + */ + private byte[] generateRandomBytes() { + int length = Math.abs(random.nextInt(100) + 1); + byte[] result = new byte[length]; + random.nextBytes(result); + return result; + } + + /** + * Test method to ensure correctness of byte interleaving code + */ + private String interleaveStrings(String[] strings) { + StringBuilder result = new StringBuilder(); + int totalLength = Arrays.stream(strings).mapToInt(String::length).sum(); + int substringIndex = 0; + int characterIndex = 0; + while (characterIndex < totalLength) { + for (String str : strings) { + if (substringIndex < str.length()) { + result.append(str.charAt(substringIndex)); + characterIndex++; + } + } + substringIndex++; + } + return result.toString(); + } + + /** + * Compares the result of a string based interleaving algorithm implemented above + * versus the binary bit-shifting algorithm used in ZOrderByteUtils. Either both + * algorithms are identically wrong or are both identically correct. + */ + @Test + public void testInterleaveRandomExamples() { + for (int test = 0; test < NUM_INTERLEAVE_TESTS; test++) { + int numByteArrays = Math.abs(random.nextInt(6)) + 1; + byte[][] testBytes = new byte[numByteArrays][]; + String[] testStrings = new String[numByteArrays]; + for (int byteIndex = 0; byteIndex < numByteArrays; byteIndex++) { + testBytes[byteIndex] = generateRandomBytes(); + testStrings[byteIndex] = bytesToString(testBytes[byteIndex]); + } + byte[] byteResult = ZOrderByteUtils.interleaveBits(testBytes); + String byteResultAsString = bytesToString(byteResult); + + String stringResult = interleaveStrings(testStrings); + + Assert.assertEquals("String interleave didn't match byte interleave", stringResult, byteResultAsString); + } + } + + @Test + public void testInterleaveEmptyBits() { + byte[][] test = new byte[4][10]; + byte[] expected = new byte[40]; + + Assert.assertArrayEquals("Should combine empty arrays", + expected, ZOrderByteUtils.interleaveBits(test)); + } + + @Test + public void testInterleaveFullBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[]{IIIIIIII, IIIIIIII}; + test[1] = new byte[]{IIIIIIII}; + test[2] = new byte[0]; + test[3] = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII}; + byte[] expected = new byte[]{IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII, IIIIIIII}; + + Assert.assertArrayEquals("Should combine full arrays", + expected, ZOrderByteUtils.interleaveBits(test)); + } + + @Test + public void testInterleaveMixedBits() { + byte[][] test = new byte[4][]; + test[0] = new byte[]{OOOOOOOI, IIIIIIII, OOOOOOOO, OOOOIIII}; + test[1] = new byte[]{OOOOOOOI, OOOOOOOO, IIIIIIII}; + test[2] = new byte[]{OOOOOOOI}; + test[3] = new byte[]{OOOOOOOI}; + byte[] expected = new byte[]{ + OOOOOOOO, OOOOOOOO, OOOOOOOO, OOOOIIII, + IOIOIOIO, IOIOIOIO, + OIOIOIOI, OIOIOIOI, + OOOOIIII}; + Assert.assertArrayEquals("Should combine mixed byte arrays", + expected, ZOrderByteUtils.interleaveBits(test)); + } + + @Test + public void testIntOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Integer.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Integer.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + int aInt = random.nextInt(); + int bInt = random.nextInt(); + int intCompare = Integer.signum(Integer.compare(aInt, bInt)); + byte[] aBytes = ZOrderByteUtils.intToOrderedBytes(aInt, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.intToOrderedBytes(bInt, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of ints should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aInt, bInt, intCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + intCompare, byteCompare); + } + } + + @Test + public void testLongOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Long.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Long.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + long aLong = random.nextInt(); + long bLong = random.nextInt(); + int longCompare = Integer.signum(Long.compare(aLong, bLong)); + byte[] aBytes = ZOrderByteUtils.longToOrderedBytes(aLong, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.longToOrderedBytes(bLong, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aLong, bLong, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testShortOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Short.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Short.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + short aShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + short bShort = (short) (random.nextInt() % (Short.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aShort, bShort)); + byte[] aBytes = ZOrderByteUtils.shortToOrderedBytes(aShort, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.shortToOrderedBytes(bShort, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aShort, bShort, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testTinyOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Byte.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Byte.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + byte aByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + byte bByte = (byte) (random.nextInt() % (Byte.MAX_VALUE + 1)); + int longCompare = Integer.signum(Long.compare(aByte, bByte)); + byte[] aBytes = ZOrderByteUtils.tinyintToOrderedBytes(aByte, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.tinyintToOrderedBytes(bByte, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of longs should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aByte, bByte, longCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + longCompare, byteCompare); + } + } + + @Test + public void testFloatOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Float.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Float.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + float aFloat = random.nextFloat(); + float bFloat = random.nextFloat(); + int floatCompare = Integer.signum(Float.compare(aFloat, bFloat)); + byte[] aBytes = ZOrderByteUtils.floatToOrderedBytes(aFloat, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.floatToOrderedBytes(bFloat, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of floats should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aFloat, bFloat, floatCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + floatCompare, byteCompare); + } + } + + @Test + public void testDoubleOrdering() { + ByteBuffer aBuffer = ByteBuffer.allocate(Double.BYTES); + ByteBuffer bBuffer = ByteBuffer.allocate(Double.BYTES); + for (int i = 0; i < NUM_TESTS; i++) { + double aDouble = random.nextDouble(); + double bDouble = random.nextDouble(); + int doubleCompare = Integer.signum(Double.compare(aDouble, bDouble)); + byte[] aBytes = ZOrderByteUtils.doubleToOrderedBytes(aDouble, aBuffer).array(); + byte[] bBytes = ZOrderByteUtils.doubleToOrderedBytes(bDouble, bBuffer).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of doubles should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aDouble, bDouble, doubleCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + doubleCompare, byteCompare); + } + } + + @Test + public void testStringOrdering() { + CharsetEncoder encoder = StandardCharsets.UTF_8.newEncoder(); + ByteBuffer aBuffer = ByteBuffer.allocate(128); + ByteBuffer bBuffer = ByteBuffer.allocate(128); + for (int i = 0; i < NUM_TESTS; i++) { + String aString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random); + String bString = (String) RandomUtil.generatePrimitive(Types.StringType.get(), random); + int stringCompare = Integer.signum(aString.compareTo(bString)); + byte[] aBytes = ZOrderByteUtils.stringToOrderedBytes(aString, 128, aBuffer, encoder).array(); + byte[] bBytes = ZOrderByteUtils.stringToOrderedBytes(bString, 128, bBuffer, encoder).array(); + int byteCompare = Integer.signum(UnsignedBytes.lexicographicalComparator().compare(aBytes, bBytes)); + + Assert.assertEquals(String.format( + "Ordering of strings should match ordering of bytes, %s ~ %s -> %s != %s ~ %s -> %s ", + aString, bString, stringCompare, Arrays.toString(aBytes), Arrays.toString(bBytes), byteCompare), + stringCompare, byteCompare); + } + } +} diff --git a/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java new file mode 100644 index 000000000000..69b21a5a89ef --- /dev/null +++ b/spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.spark.action; + +import java.io.IOException; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.io.Files; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkSessionCatalog; +import org.apache.iceberg.spark.actions.SparkActions; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.expressions.Transform; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Timeout; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; + +@Fork(1) +@State(Scope.Benchmark) +@Measurement(iterations = 3) +@BenchmarkMode(Mode.SingleShotTime) +@Timeout(time = 1000, timeUnit = TimeUnit.HOURS) +public class IcebergSortCompactionBenchmark { + + private static final String[] NAMESPACE = new String[] {"default"}; + private static final String NAME = "sortbench"; + private static final Identifier IDENT = Identifier.of(NAMESPACE, NAME); + private static final int NUM_FILES = 8; + private static final long NUM_ROWS = 10000000L; + + + private final Configuration hadoopConf = initHadoopConf(); + private SparkSession spark; + + @Setup + public void setupBench() { + setupSpark(); + } + + @TearDown + public void teardownBench() { + tearDownSpark(); + } + + @Setup(Level.Iteration) + public void setupIteration() { + initTable(); + appendData(); + } + + @TearDown(Level.Iteration) + public void cleanUpIteration() throws IOException { + cleanupFiles(); + } + + @Benchmark + @Threads(1) + public void sortInt() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt2() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt3() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol3", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol4", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortInt4() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol2", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol3", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol4", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortString() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortFourColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void sortSixColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .sort(SortOrder + .builderFor(table().schema()) + .sortBy("stringCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("intCol", SortDirection.ASC, NullOrder.NULLS_FIRST) + .sortBy("dateCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("timestampCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("doubleCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .sortBy("longCol", SortDirection.DESC, NullOrder.NULLS_FIRST) + .build()) + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("intCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt2() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("intCol", "intCol2") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt3() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("intCol", "intCol2", "intCol3") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortInt4() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("intCol", "intCol2", "intCol3", "intCol4") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortString() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("stringCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortFourColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("stringCol", "intCol", "dateCol", "doubleCol") + .execute(); + } + + @Benchmark + @Threads(1) + public void zSortSixColumns() { + SparkActions.get() + .rewriteDataFiles(table()) + .zOrder("stringCol", "intCol", "dateCol", "timestampCol", "doubleCol", "longCol") + .execute(); + } + + protected Configuration initHadoopConf() { + return new Configuration(); + } + + protected final void initTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "intCol2", Types.IntegerType.get()), + required(4, "intCol3", Types.IntegerType.get()), + required(5, "intCol4", Types.IntegerType.get()), + required(6, "floatCol", Types.FloatType.get()), + optional(7, "doubleCol", Types.DoubleType.get()), + optional(8, "dateCol", Types.DateType.get()), + optional(9, "timestampCol", Types.TimestampType.withZone()), + optional(10, "stringCol", Types.StringType.get())); + + SparkSessionCatalog catalog = null; + try { + catalog = (SparkSessionCatalog) + Spark3Util.catalogAndIdentifier(spark(), "spark_catalog").catalog(); + catalog.dropTable(IDENT); + catalog.createTable(IDENT, SparkSchemaUtil.convert(schema), new Transform[0], Collections.emptyMap()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void appendData() { + Dataset df = spark().range(0, NUM_ROWS * NUM_FILES, 1, NUM_FILES) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("intCol2", expr("CAST(longCol AS INT)")) + .withColumn("intCol3", expr("CAST(longCol AS INT)")) + .withColumn("intCol4", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), col("intCol").mod(NUM_FILES))) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")); + writeData(df); + } + + private void writeData(Dataset df) { + df.write().format("iceberg").mode(SaveMode.Append).save(NAME); + } + + protected final Table table() { + try { + return Spark3Util.loadIcebergTable(spark(), NAME); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected final SparkSession spark() { + return spark; + } + + protected String getCatalogWarehouse() { + String location = Files.createTempDir().getAbsolutePath() + "/" + UUID.randomUUID() + "/"; + return location; + } + + protected void cleanupFiles() throws IOException { + spark.sql("DROP TABLE IF EXISTS " + NAME); + } + + protected void setupSpark() { + SparkSession.Builder builder = + SparkSession.builder() + .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse()) + .master("local[*]"); + spark = builder.getOrCreate(); + Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); + hadoopConf.forEach(entry -> sparkHadoopConf.set(entry.getKey(), entry.getValue())); + } + + protected void tearDownSpark() { + spark.stop(); + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java index b1c08e607de8..be9be465cba9 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSpark3Action.java @@ -23,6 +23,7 @@ import org.apache.iceberg.actions.BinPackStrategy; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.SortStrategy; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.SparkSession; public class BaseRewriteDataFilesSpark3Action extends BaseRewriteDataFilesSparkAction { @@ -41,6 +42,11 @@ protected SortStrategy sortStrategy() { return new Spark3SortStrategy(table(), spark()); } + @Override + protected SortStrategy zOrderStrategy(String... columnNames) { + return new Spark3ZOrderStrategy(table(), spark(), Lists.newArrayList(columnNames)); + } + @Override protected RewriteDataFiles self() { return this; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index c03847450a57..337b579edeac 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -111,6 +111,11 @@ protected Table table() { */ protected abstract SortStrategy sortStrategy(); + /** + * The framework specific ZOrder Strategy + */ + protected abstract SortStrategy zOrderStrategy(String... columnNames); + @Override public RewriteDataFiles binPack() { Preconditions.checkArgument(this.strategy == null, @@ -135,6 +140,12 @@ public RewriteDataFiles sort() { return this; } + @Override + public RewriteDataFiles zOrder(String... columnNames) { + this.strategy = zOrderStrategy(columnNames); + return this; + } + @Override public RewriteDataFiles filter(Expression expression) { filter = Expressions.and(filter, expression); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java index 8a213e52c1e5..9023b877bd54 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3SortStrategy.java @@ -155,4 +155,8 @@ protected SparkSession spark() { protected LogicalPlan sortPlan(Distribution distribution, SortOrder[] ordering, LogicalPlan plan, SQLConf conf) { return DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf); } + + protected double sizeEstimateMultiple() { + return this.sizeEstimateMultiple; + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java new file mode 100644 index 000000000000..42495a9c0572 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderStrategy.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.FileRewriteCoordinator; +import org.apache.iceberg.spark.FileScanTaskSetManager; +import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.SortOrderUtil; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.api.java.function.MapPartitionsFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.distributions.Distribution; +import org.apache.spark.sql.connector.distributions.Distributions; +import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.TimestampType; +import org.sparkproject.jetty.server.Authentication; +import scala.collection.Seq; + +public class Spark3ZOrderStrategy extends Spark3SortStrategy { + + private static final String Z_COLUMN = "ICEZVALUE"; + private static final Schema Z_SCHEMA = new Schema(NestedField.required(0, Z_COLUMN, Types.LongType.get())); + private static final org.apache.iceberg.SortOrder Z_SORT_ORDER = org.apache.iceberg.SortOrder.builderFor(Z_SCHEMA) + .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST) + .build(); + private static final int STRING_KEY_LENGTH = 128; + + private final List zOrderColNames; + private transient FileScanTaskSetManager manager = FileScanTaskSetManager.get(); + private transient FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); + + private final SparkZOrder orderHelper; + + public Spark3ZOrderStrategy(Table table, SparkSession spark, List zOrderColNames) { + super(table, spark); + + Stream identityPartitionColumns = table.spec().fields().stream() + .filter(f -> f.transform().isIdentity()) + .map(PartitionField::name); + List partZOrderCols = identityPartitionColumns + .filter(zOrderColNames::contains) + .collect(Collectors.toList()); + Preconditions.checkArgument( + partZOrderCols.isEmpty(), + "Cannot ZOrder on an Identity partition column as these values are constant within a partition, " + + "ZOrdering requested on %s", + partZOrderCols); + + this.orderHelper = new SparkZOrder(zOrderColNames.size()); + + this.zOrderColNames = zOrderColNames; + } + + @Override + public String name() { + return "Z-ORDER"; + } + + @Override + protected void validateOptions() { + // TODO implement Zorder Strategy in API Module + return; + } + + @Override + public Set rewriteFiles(List filesToRewrite) { + String groupID = UUID.randomUUID().toString(); + boolean requiresRepartition = !filesToRewrite.get(0).spec().equals(table().spec()); + + SortOrder[] ordering; + if (requiresRepartition) { + ordering = SparkDistributionAndOrderingUtil.convert(SortOrderUtil.buildSortOrder(table(), sortOrder())); + } else { + ordering = SparkDistributionAndOrderingUtil.convert(sortOrder()); + } + + Distribution distribution = Distributions.ordered(ordering); + + try { + manager.stageTasks(table(), groupID, filesToRewrite); + + // Disable Adaptive Query Execution as this may change the output partitioning of our write + SparkSession cloneSession = spark().cloneSession(); + cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + + // Reset Shuffle Partitions for our sort + long numOutputFiles = numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple())); + cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles)); + + Dataset scanDF = cloneSession.read().format("iceberg") + .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID) + .load(table().name()); + + Column[] originalColumns = Arrays.stream(scanDF.schema().names()) + .map(n -> functions.col(n)) + .toArray(Column[]::new); + + List zOrderColumns = zOrderColNames.stream() + .map(scanDF.schema()::apply) + .collect(Collectors.toList()); + + Column zvalueArray = functions.array(zOrderColumns.stream().map(colStruct -> + orderHelper.sortedLexicographically(functions.col(colStruct.name()), colStruct.dataType()) + ).toArray(Column[]::new)); + + Dataset zvalueDF = scanDF.withColumn(Z_COLUMN, orderHelper.interleaveBytes(zvalueArray)); + + SQLConf sqlConf = cloneSession.sessionState().conf(); + LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf); + Dataset sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder()); + sortedDf + .select(originalColumns) + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupID) + .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) + .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .mode("append") + .save(table().name()); + + return rewriteCoordinator.fetchNewDataFiles(table(), groupID); + } finally { + manager.removeTasks(table(), groupID); + rewriteCoordinator.clearRewrite(table(), groupID); + } + } + + @Override + protected org.apache.iceberg.SortOrder sortOrder() { + return Z_SORT_ORDER; + } +} diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java new file mode 100644 index 000000000000..a50195189931 --- /dev/null +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/Spark3ZOrderUDF.java @@ -0,0 +1,242 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.spark.actions; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.expressions.UserDefinedFunction; +import org.apache.spark.sql.functions; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.TimestampType; +import scala.collection.Seq; + +class SparkZOrder implements Serializable { + private final int STRING_KEY_LENGTH = 8; + + private final byte[] TINY_EMPTY = new byte[Byte.BYTES]; + private final byte[] SHORT_EMPTY = new byte[Short.BYTES]; + private final byte[] INT_EMPTY = new byte[Integer.BYTES]; + private final byte[] LONG_EMPTY = new byte[Long.BYTES]; + private final byte[] FLOAT_EMPTY = new byte[Float.BYTES]; + private final byte[] DOUBLE_EMPTY = new byte[Double.BYTES]; + + transient private ThreadLocal outputBuffer; + transient private ThreadLocal inputHolder; + transient private ThreadLocal[] inputBuffers; + transient private ThreadLocal encoder; + + private final int numCols; + + private int inputCol = 0; + private int totalBytes = 0; + + SparkZOrder(int numCols) { + this.numCols = numCols; + } + + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + inputBuffers = new ThreadLocal[numCols]; + inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]); + encoder = ThreadLocal.withInitial(() -> StandardCharsets.UTF_8.newEncoder()); + } + + + private ByteBuffer outputBuffer(int size) { + if (outputBuffer == null) { + // May over allocate on concurrent calls + outputBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocate(size)); + } + return outputBuffer.get().position(0); + } + + private ByteBuffer inputBuffer(int position, int size){ + if (inputBuffers[position] == null) { + // May over allocate on concurrent calls + inputBuffers[position] = ThreadLocal.withInitial(() -> ByteBuffer.allocate(size)); + } + return inputBuffers[position].get(); + } + + long interleaveBits(Seq scalaBinary) { + byte[][] columnsBinary = scala.collection.JavaConverters.seqAsJavaList(scalaBinary) + .toArray(inputHolder.get()); + ZOrderByteUtils.interleaveBits(columnsBinary, 8, outputBuffer(8)); + return outputBuffer(8).getLong(); + } + + private UserDefinedFunction tinyToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Byte value) -> { + if (value == null) { + return TINY_EMPTY; + } + return ZOrderByteUtils.tinyintToOrderedBytes(value, inputBuffer(position, Byte.BYTES)).array(); + }, DataTypes.BinaryType).withName("TINY_ORDERED_BYTES"); + + this.inputCol++; + this.totalBytes+= Byte.BYTES; + + return udf; + } + + private UserDefinedFunction shortToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Short value) -> { + if (value == null) { + return SHORT_EMPTY; + } + return ZOrderByteUtils.shortToOrderedBytes(value, inputBuffer(position, Short.BYTES)).array(); + }, DataTypes.BinaryType) + .withName("SHORT_ORDERED_BYTES"); + + this.inputCol++; + this.totalBytes+= Short.BYTES; + + return udf; + } + + private UserDefinedFunction intToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Integer value) -> { + if (value == null) { + return INT_EMPTY; + } + return ZOrderByteUtils.intToOrderedBytes(value, inputBuffer(position, Integer.BYTES)).array(); + }, DataTypes.BinaryType) + .withName("INT_ORDERED_BYTES"); + + this.inputCol++; + this.totalBytes += Integer.BYTES; + + return udf; + } + + private UserDefinedFunction longToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Long value) -> { + if (value == null) { + return LONG_EMPTY; + } + return ZOrderByteUtils.longToOrderedBytes(value, inputBuffer(position, Long.BYTES)).array(); + }, DataTypes.BinaryType) + .withName("LONG_ORDERED_BYTES"); + + this.inputCol++; + this.totalBytes += Long.BYTES; + + return udf; + } + + private UserDefinedFunction floatToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Float value) -> { + if (value == null) { + return FLOAT_EMPTY; + } + return ZOrderByteUtils.floatToOrderedBytes(value, inputBuffer(position, Float.BYTES)).array(); + }, DataTypes.BinaryType) + .withName("FLOAT_ORDERED_BYTES"); + + this.inputCol++; + this.totalBytes += Float.BYTES; + + return udf; + } + + private UserDefinedFunction doubleToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((Double value) -> { + if (value == null) { + return DOUBLE_EMPTY; + } + return ZOrderByteUtils.doubleToOrderedBytes(value, inputBuffer(position, Double.BYTES)).array(); + }, DataTypes.BinaryType) + .withName("FLOAT_ORDERED_BYTES"); + + this.inputCol++; + this.totalBytes += Double.BYTES; + + return udf; + } + + private UserDefinedFunction stringToOrderedBytesUDF() { + int position = inputCol; + UserDefinedFunction udf = functions.udf((String value) -> + ZOrderByteUtils.stringToOrderedBytes(value, STRING_KEY_LENGTH, inputBuffer(position, STRING_KEY_LENGTH), + encoder.get()).array(), DataTypes.BinaryType).withName("STRING-LEXICAL-BYTES"); + + this.inputCol++; + this.totalBytes += STRING_KEY_LENGTH; + + return udf; + } + + private final UserDefinedFunction INTERLEAVE_UDF = + functions.udf((Seq arrayBinary) -> interleaveBits(arrayBinary), DataTypes.LongType) + .withName("INTERLEAVE_BYTES"); + + Column interleaveBytes(Column arrayBinary) { + return INTERLEAVE_UDF.apply(arrayBinary); + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + Column sortedLexicographically(Column column, DataType type) { + if (type instanceof ByteType) { + return tinyToOrderedBytesUDF().apply(column); + } else if (type instanceof ShortType) { + return shortToOrderedBytesUDF().apply(column); + } else if (type instanceof IntegerType) { + return intToOrderedBytesUDF().apply(column); + } else if (type instanceof LongType) { + return longToOrderedBytesUDF().apply(column); + } else if (type instanceof FloatType) { + return floatToOrderedBytesUDF().apply(column); + } else if (type instanceof DoubleType) { + return doubleToOrderedBytesUDF().apply(column); + } else if (type instanceof StringType) { + return stringToOrderedBytesUDF().apply(column); + } else if (type instanceof BinaryType) { + return stringToOrderedBytesUDF().apply(column); + } else if (type instanceof BooleanType) { + return column.cast(DataTypes.BinaryType); + } else if (type instanceof TimestampType) { + return longToOrderedBytesUDF().apply(column.cast(DataTypes.LongType)); + } else if (type instanceof DateType) { + return longToOrderedBytesUDF().apply(column.cast(DataTypes.LongType)); + } else { + throw new IllegalArgumentException( + String.format("Cannot use column %s of type %s in ZOrdering, the type is unsupported", + column, type)); + } + } +} \ No newline at end of file diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 564c501ed05b..c6e8c572b992 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -61,10 +61,12 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -92,6 +94,10 @@ import org.mockito.Mockito; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.spark.sql.functions.current_date; +import static org.apache.spark.sql.functions.date_add; +import static org.apache.spark.sql.functions.expr; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doAnswer; @@ -994,6 +1000,83 @@ public void testCommitStateUnknownException() { shouldHaveSnapshots(table, 2); // Commit actually Succeeded } + @Test + public void testZOrderSort() { + int originalFiles = 20; + Table table = createTable(originalFiles); + shouldHaveLastCommitUnsorted(table, "c2"); + shouldHaveFiles(table, originalFiles); + + List originalData = currentData(); + double originalFilesC2 = percentFilesRequired(table, "c2", "foo23"); + double originalFilesC3 = percentFilesRequired(table, "c3", "bar21"); + double originalFilesC2C3 = percentFilesRequired(table, new String[]{"c2", "c3"}, new String[]{"foo23", "bar23"}); + + Assert.assertTrue("Should require all files to scan c2", originalFilesC2 > 0.99); + Assert.assertTrue("Should require all files to scan c3", originalFilesC3 > 0.99); + + RewriteDataFiles.Result result = + basicRewrite(table) + .zOrder("c2", "c3") + .option(SortStrategy.MAX_FILE_SIZE_BYTES, Integer.toString((averageFileSize(table) / 2) + 2)) + // Divide files in 2 + .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, Integer.toString(averageFileSize(table) / 2)) + .option(SortStrategy.MIN_INPUT_FILES, "1") + .execute(); + + Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedFiles()); + Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 40); + + table.refresh(); + + List postRewriteData = currentData(); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + + double filesScannedC2 = percentFilesRequired(table, "c2", "foo23"); + double filesScannedC3 = percentFilesRequired(table, "c3", "bar21"); + double filesScannedC2C3 = percentFilesRequired(table, new String[]{"c2", "c3"}, new String[]{"foo23", "bar23"}); + + Assert.assertTrue("Should have reduced the number of files required for c2", + filesScannedC2 < originalFilesC2); + Assert.assertTrue("Should have reduced the number of files required for c3", + filesScannedC3 < originalFilesC3); + Assert.assertTrue("Should have reduced the number of files required for a c2,c3 predicate", + filesScannedC2C3 < originalFilesC2C3); + } + + @Test + public void testZOrderAllTypesSort() { + Table table = createTypeTestTable(); + shouldHaveFiles(table, 10); + + List originalRaw = spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); + List originalData = rowsToJava(originalRaw); + + RewriteDataFiles.Result result = + basicRewrite(table) + .zOrder("longCol", "intCol", "floatCol", "doubleCol", "dateCol", "timestampCol", "stringCol") + .option(SortStrategy.MIN_INPUT_FILES, "1") + .option(SortStrategy.REWRITE_ALL, "true") + .execute(); + + Assert.assertEquals("Should have 1 fileGroups", 1, result.rewriteResults().size()); + int zOrderedFilesTotal = Iterables.size(table.currentSnapshot().addedFiles()); + Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal); + + table.refresh(); + + List postRaw = spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList(); + List postRewriteData = rowsToJava(postRaw); + assertEquals("We shouldn't have changed the data", originalData, postRewriteData); + + shouldHaveSnapshots(table, 2); + shouldHaveACleanCache(table); + } + @Test public void testInvalidAPIUsage() { Table table = createTable(1); @@ -1143,6 +1226,35 @@ protected Table createTablePartitioned(int partitions, int files) { return createTablePartitioned(partitions, files, SCALE, Maps.newHashMap()); } + private Table createTypeTestTable() { + Schema schema = new Schema( + required(1, "longCol", Types.LongType.get()), + required(2, "intCol", Types.IntegerType.get()), + required(3, "floatCol", Types.FloatType.get()), + optional(4, "doubleCol", Types.DoubleType.get()), + optional(6, "dateCol", Types.DateType.get()), + optional(7, "timestampCol", Types.TimestampType.withZone()), + optional(8, "stringCol", Types.StringType.get())); + + Map options = Maps.newHashMap(); + Table table = TABLES.create(schema, PartitionSpec.unpartitioned(), options, tableLocation); + + spark.range(0, 10, 1, 10) + .withColumnRenamed("id", "longCol") + .withColumn("intCol", expr("CAST(longCol AS INT)")) + .withColumn("floatCol", expr("CAST(longCol AS FLOAT)")) + .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)")) + .withColumn("dateCol", date_add(current_date(), 1)) + .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)")) + .withColumn("stringCol", expr("CAST(dateCol AS STRING)")) + .write() + .format("iceberg") + .mode("append") + .save(tableLocation); + + return table; + } + protected int averageFileSize(Table table) { table.refresh(); return (int) Streams.stream(table.newScan().planFiles()).mapToLong(FileScanTask::length).average().getAsDouble(); @@ -1228,6 +1340,21 @@ private Set cacheContents(Table table) { .build(); } + private double percentFilesRequired(Table table, String col, String value) { + return percentFilesRequired(table, new String[]{col}, new String[]{value}); + } + + private double percentFilesRequired(Table table, String[] cols, String[] values) { + Preconditions.checkArgument(cols.length == values.length); + Expression restriction = Expressions.alwaysTrue(); + for (int i = 0; i < cols.length; i++) { + restriction = Expressions.and(restriction, Expressions.equal(cols[i], values[i])); + } + int totalFiles = Iterables.size(table.newScan().planFiles()); + int filteredFiles = Iterables.size(table.newScan().filter(restriction).planFiles()); + return (double) filteredFiles / (double) totalFiles; + } + class GroupInfoMatcher implements ArgumentMatcher { private final Set groupIDs;