diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index 57cfc28ae9cf..1b09fe2e766e 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; import java.util.Comparator; +import org.apache.iceberg.util.UnicodeUtil; public class Comparators { @@ -182,13 +183,31 @@ private static class CharSeqComparator implements Comparator { private CharSeqComparator() { } + /** + * Java character supports only upto 3 byte UTF-8 characters. 4 byte UTF-8 character is represented using two Java + * characters (using UTF-16 surrogate pairs). Character by character comparison may yield incorrect results + * while comparing a 4 byte UTF-8 character to a java char. Character by character comparison works as expected + * if both characters are <= 3 byte UTF-8 character or both characters are 4 byte UTF-8 characters. + * isCharInUTF16HighSurrogateRange method detects a 4-byte character and considers that character to be + * lexicographically greater than any 3 byte or lower UTF-8 character. + */ @Override public int compare(CharSequence s1, CharSequence s2) { int len = Math.min(s1.length(), s2.length()); // find the first difference and return for (int i = 0; i < len; i += 1) { - int cmp = Character.compare(s1.charAt(i), s2.charAt(i)); + char c1 = s1.charAt(i); + char c2 = s2.charAt(i); + boolean isC1HighSurrogate = UnicodeUtil.isCharHighSurrogate(c1); + boolean isC2HighSurrogate = UnicodeUtil.isCharHighSurrogate(c2); + if (isC1HighSurrogate && !isC2HighSurrogate) { + return 1; + } + if (!isC1HighSurrogate && isC2HighSurrogate) { + return -1; + } + int cmp = Character.compare(c1, c2); if (cmp != 0) { return cmp; } diff --git a/api/src/main/java/org/apache/iceberg/util/BinaryUtil.java b/api/src/main/java/org/apache/iceberg/util/BinaryUtil.java new file mode 100644 index 000000000000..fdfe751d296b --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/BinaryUtil.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import com.google.common.base.Preconditions; +import java.nio.ByteBuffer; +import org.apache.iceberg.expressions.Literal; + +public class BinaryUtil { + // not meant to be instantiated + private BinaryUtil() { + } + + /** + * Truncates the input byte buffer to the given length + */ + public static ByteBuffer truncateBinary(ByteBuffer input, int length) { + Preconditions.checkArgument(length > 0 && length < input.remaining(), + "Truncate length should be positive and lower than the number of remaining elements"); + byte[] array = new byte[length]; + input.duplicate().get(array); + return ByteBuffer.wrap(array); + } + + /** + * Returns a byte buffer whose length is lesser than or equal to truncateLength and is lower than the given input + */ + public static Literal truncateBinaryMin(Literal input, int length) { + ByteBuffer inputBuffer = input.value(); + if (length >= inputBuffer.remaining()) { + return input; + } + return Literal.of(truncateBinary(inputBuffer, length)); + } + + /** + * Returns a byte buffer whose length is lesser than or equal to truncateLength and is greater than the given input + */ + public static Literal truncateBinaryMax(Literal input, int length) { + ByteBuffer inputBuffer = input.value(); + if (length >= inputBuffer.remaining()) { + return input; + } + + // Truncate the input to the specified truncate length. + ByteBuffer truncatedInput = truncateBinary(inputBuffer, length); + + // Try incrementing the bytes from the end. If all bytes overflow after incrementing, then return null + for (int i = length - 1; i >= 0; --i) { + byte element = truncatedInput.get(i); + element = (byte) (element + 1); + if (element != 0) { // No overflow + truncatedInput.put(i, element); + // Return a byte buffer whose position is zero and limit is i + 1 + truncatedInput.position(0); + truncatedInput.limit(i + 1); + return Literal.of(truncatedInput); + } + } + return null; // Cannot find a valid upper bound + } +} diff --git a/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java b/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java new file mode 100644 index 000000000000..1eaed21df6d2 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import com.google.common.base.Preconditions; +import org.apache.iceberg.expressions.Literal; + +public class UnicodeUtil { + // not meant to be instantiated + private UnicodeUtil() { + } + + /** + * Determines if the given character value is a unicode high-surrogate code unit. + * The range of high-surrogates is 0xD800 - 0xDBFF. + */ + public static boolean isCharHighSurrogate(char ch) { + return (ch & '\uFC00') == '\uD800'; // 0xDC00 - 0xDFFF shouldn't match + } + + /** + * Truncates the input charSequence such that the truncated charSequence is a valid unicode string + * and the number of unicode characters in the truncated charSequence is lesser than or equal to length + */ + public static CharSequence truncateString(CharSequence input, int length) { + Preconditions.checkArgument(length > 0, "Truncate length should be positive"); + StringBuffer sb = new StringBuffer(input); + // Get the number of unicode characters in the input + int numUniCodeCharacters = sb.codePointCount(0, sb.length()); + // No need to truncate if the number of unicode characters in the char sequence is <= truncate length + if (length >= numUniCodeCharacters) { + return input; + } + // Get the offset in the input charSequence where the number of unicode characters = truncate length + int offsetByCodePoint = sb.offsetByCodePoints(0, length); + return input.subSequence(0, offsetByCodePoint); + } + + /** + * Returns a valid unicode charsequence that is lower than the given input such that the + * number of unicode characters in the truncated charSequence is lesser than or equal to length + */ + public static Literal truncateStringMin(Literal input, int length) { + // Truncate the input to the specified truncate length. + CharSequence truncatedInput = truncateString(input.value(), length); + return Literal.of(truncatedInput); + } + + /** + * Returns a valid unicode charsequence that is greater than the given input such that the + * number of unicode characters in the truncated charSequence is lesser than or equal to length + */ + public static Literal truncateStringMax(Literal input, int length) { + CharSequence inputCharSeq = input.value(); + // Truncate the input to the specified truncate length. + StringBuffer truncatedStringBuffer = new StringBuffer(truncateString(inputCharSeq, length)); + + // No need to increment if the input length is under the truncate length + if (inputCharSeq.length() == truncatedStringBuffer.length()) { + return input; + } + + // Try incrementing the code points from the end + for (int i = length - 1; i >= 0; i--) { + int nextCodePoint = truncatedStringBuffer.codePointAt(i) + 1; + // No overflow + if (nextCodePoint != 0 && Character.isValidCodePoint(nextCodePoint)) { + // Get the offset in the truncated string buffer where the number of unicode characters = i + int offsetByCodePoint = truncatedStringBuffer.offsetByCodePoints(0, i); + truncatedStringBuffer.setLength(offsetByCodePoint); + // Append next code point to the truncated substring + truncatedStringBuffer.appendCodePoint(nextCodePoint); + return Literal.of(truncatedStringBuffer.toString()); + } + } + return null; // Cannot find a valid upper bound + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index c705b776bbe9..9481118f6046 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -85,4 +85,7 @@ private TableProperties() {} public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled"; public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true; + + public static final String WRITE_METADATA_TRUNCATE_BYTES = "write.metadata.truncate-length"; + public static final int WRITE_METADATA_TRUNCATE_BYTES_DEFAULT = 16; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 3763a289b3af..2782eea1ec8a 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -63,6 +63,9 @@ import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES; import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_TRUNCATE_BYTES; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT; + public class Parquet { private Parquet() { @@ -165,6 +168,9 @@ public FileAppender build() throws IOException { PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT)); int dictionaryPageSize = Integer.parseInt(config.getOrDefault( PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT)); + int statsTruncateLength = Integer.parseInt(config.getOrDefault( + WRITE_METADATA_TRUNCATE_BYTES, String.valueOf(WRITE_METADATA_TRUNCATE_BYTES_DEFAULT))); + WriterVersion writerVersion = WriterVersion.PARQUET_1_0; @@ -192,7 +198,8 @@ public FileAppender build() throws IOException { .build(); return new org.apache.iceberg.parquet.ParquetWriter<>( - conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(), parquetProperties); + conf, file, schema, rowGroupSize, statsTruncateLength, metadata, + createWriterFunc, codec(), parquetProperties); } else { return new ParquetWriteAdapter<>(new ParquetWriteBuilder(ParquetIO.file(file)) .withWriterVersion(writerVersion) @@ -205,7 +212,7 @@ public FileAppender build() throws IOException { .withRowGroupSize(rowGroupSize) .withPageSize(pageSize) .withDictionaryPageSize(dictionaryPageSize) - .build()); + .build(), statsTruncateLength); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java index 0bd4154a19c4..e1b4d39084b3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java @@ -32,6 +32,7 @@ import java.util.Set; import org.apache.iceberg.Metrics; import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.io.InputFile; @@ -47,21 +48,30 @@ import org.apache.parquet.schema.MessageType; import static org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive; +import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMax; +import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMin; +import static org.apache.iceberg.util.UnicodeUtil.truncateStringMax; +import static org.apache.iceberg.util.UnicodeUtil.truncateStringMin; public class ParquetUtil { // not meant to be instantiated private ParquetUtil() { } - public static Metrics fileMetrics(InputFile file) { + // Access modifier is package-private, to only allow use from existing tests + static Metrics fileMetrics(InputFile file) { + return fileMetrics(file, TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT); + } + + public static Metrics fileMetrics(InputFile file, int statsTruncateLength) { try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) { - return footerMetrics(reader.getFooter()); + return footerMetrics(reader.getFooter(), statsTruncateLength); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read footer of file: %s", file); } } - public static Metrics footerMetrics(ParquetMetadata metadata) { + public static Metrics footerMetrics(ParquetMetadata metadata, int statsTruncateLength) { long rowCount = 0; Map columnSizes = Maps.newHashMap(); Map valueCounts = Maps.newHashMap(); @@ -89,11 +99,14 @@ public static Metrics footerMetrics(ParquetMetadata metadata) { increment(nullValueCounts, fieldId, stats.getNumNulls()); Types.NestedField field = fileSchema.findField(fieldId); - if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) { - updateMin(lowerBounds, fieldId, - fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin())); - updateMax(upperBounds, fieldId, - fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax())); + if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema) + && statsTruncateLength > 0) { + updateMin(lowerBounds, fieldId, field.type(), + fromParquetPrimitive(field.type(), column.getPrimitiveType(), + stats.genericGetMin()), statsTruncateLength); + updateMax(upperBounds, fieldId, field.type(), + fromParquetPrimitive(field.type(), column.getPrimitiveType(), + stats.genericGetMax()), statsTruncateLength); } } } @@ -151,18 +164,40 @@ private static void increment(Map columns, int fieldId, long amou } @SuppressWarnings("unchecked") - private static void updateMin(Map> lowerBounds, int id, Literal min) { + private static void updateMin(Map> lowerBounds, int id, Type type, + Literal min, int truncateLength) { Literal currentMin = (Literal) lowerBounds.get(id); if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) { - lowerBounds.put(id, min); + switch (type.typeId()) { + case STRING: + lowerBounds.put(id, truncateStringMin((Literal) min, truncateLength)); + break; + case FIXED: + case BINARY: + lowerBounds.put(id, truncateBinaryMin((Literal) min, truncateLength)); + break; + default: + lowerBounds.put(id, min); + } } } @SuppressWarnings("unchecked") - private static void updateMax(Map> upperBounds, int id, Literal max) { + private static void updateMax(Map> upperBounds, int id, Type type, + Literal max, int truncateLength) { Literal currentMax = (Literal) upperBounds.get(id); if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) { - upperBounds.put(id, max); + switch (type.typeId()) { + case STRING: + upperBounds.put(id, truncateStringMax((Literal) max, truncateLength)); + break; + case FIXED: + case BINARY: + upperBounds.put(id, truncateBinaryMax((Literal) max, truncateLength)); + break; + default: + upperBounds.put(id, max); + } } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java index db51788bc7b4..40bde82df6b5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteAdapter.java @@ -31,9 +31,11 @@ public class ParquetWriteAdapter implements FileAppender { private ParquetWriter writer = null; private ParquetMetadata footer = null; + private int statsTruncateLength; - public ParquetWriteAdapter(ParquetWriter writer) { + public ParquetWriteAdapter(ParquetWriter writer, int statsTruncateLength) { this.writer = writer; + this.statsTruncateLength = statsTruncateLength; } @Override @@ -48,7 +50,7 @@ public void add(D datum) { @Override public Metrics metrics() { Preconditions.checkState(footer != null, "Cannot produce metrics until closed"); - return ParquetUtil.footerMetrics(footer); + return ParquetUtil.footerMetrics(footer, statsTruncateLength); } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index c7bd6e216f7b..e80920e4059b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -74,9 +74,11 @@ class ParquetWriter implements FileAppender, Closeable { private long nextRowGroupSize = 0; private long recordCount = 0; private long nextCheckRecordCount = 10; + private int statsTruncateLength; @SuppressWarnings("unchecked") ParquetWriter(Configuration conf, OutputFile output, Schema schema, long rowGroupSize, + int statsTruncateLength, Map metadata, Function> createWriterFunc, CompressionCodecName codec, @@ -84,6 +86,7 @@ class ParquetWriter implements FileAppender, Closeable { this.output = output; this.targetRowGroupSize = rowGroupSize; this.props = properties; + this.statsTruncateLength = statsTruncateLength; this.metadata = ImmutableMap.copyOf(metadata); this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec); this.parquetSchema = convert(schema, "table"); @@ -115,7 +118,7 @@ public void add(T value) { @Override public Metrics metrics() { - return ParquetUtil.footerMetrics(writer.getFooter()); + return ParquetUtil.footerMetrics(writer.getFooter(), statsTruncateLength); } @Override diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetricsTruncation.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetricsTruncation.java new file mode 100644 index 000000000000..a887e19cff53 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetMetricsTruncation.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.parquet; + +import org.apache.iceberg.expressions.Literal; +import org.junit.Assert; +import org.junit.Test; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; + +import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMax; +import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMin; +import static org.apache.iceberg.util.UnicodeUtil.truncateStringMax; +import static org.apache.iceberg.util.UnicodeUtil.truncateStringMin; + +public class TestParquetMetricsTruncation { + @Test + public void testTruncateBinaryMin() throws IOException { + ByteBuffer test1 = ByteBuffer.wrap(new byte[] {1, 1, (byte) 0xFF, 2}); + // Output of test1 when truncated to 2 bytes + ByteBuffer test1_2_expected = ByteBuffer.wrap(new byte[] {1, 1}); + ByteBuffer test2 = ByteBuffer.wrap(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 2}); + ByteBuffer test2_2 = ByteBuffer.wrap(new byte[] {(byte) 0xFF, (byte) 0xFF}); + + Comparator cmp = Literal.of(test1).comparator(); + Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound", + cmp.compare(truncateBinaryMin(Literal.of(test1), 2).value(), test1) <= 0); + Assert.assertTrue("Output must have the first two bytes of the input", + cmp.compare(truncateBinaryMin(Literal.of(test1), 2).value(), test1_2_expected) == 0); + Assert.assertTrue("No truncation required as truncate length is greater than the input size", + cmp.compare(truncateBinaryMin(Literal.of(test1), 5).value(), test1) == 0); + Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound", + cmp.compare(truncateBinaryMin(Literal.of(test2), 2).value(), test2) <= 0); + Assert.assertTrue("Output must have the first two bytes of the input. A lower bound exists " + + "even though the first two bytes are the max value", + cmp.compare(truncateBinaryMin(Literal.of(test2), 2).value(), test2_2) == 0); + } + + @Test + public void testTruncateBinaryMax() throws IOException { + ByteBuffer test1 = ByteBuffer.wrap(new byte[] {1, 1, 2}); + ByteBuffer test2 = ByteBuffer.wrap(new byte[] {1, 1, (byte) 0xFF, 2}); + ByteBuffer test3 = ByteBuffer.wrap(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 2}); + ByteBuffer test4 = ByteBuffer.wrap(new byte[] {1, 1, 0}); + ByteBuffer expectedOutput = ByteBuffer.wrap(new byte[] {1, 2}); + + Comparator cmp = Literal.of(test1).comparator(); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound", + cmp.compare(truncateBinaryMax(Literal.of(test1), 2).value(), test1) >= 0); + Assert.assertTrue("Output must have two bytes and the second byte of the input must be incremented", + cmp.compare(truncateBinaryMax(Literal.of(test1), 2).value(), expectedOutput) == 0); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound", + cmp.compare(truncateBinaryMax(Literal.of(test2), 2).value(), test2) >= 0); + Assert.assertTrue("Since the third byte is already the max value, output must have two bytes " + + "with the second byte incremented ", cmp.compare( + truncateBinaryMax(Literal.of(test2), 3).value(), expectedOutput) == 0); + Assert.assertTrue("No truncation required as truncate length is greater than the input size", + cmp.compare(truncateBinaryMax(Literal.of(test3), 5).value(), test3) == 0); + Assert.assertNull("An upper bound doesn't exist since the first two bytes are the max value", + truncateBinaryMax(Literal.of(test3), 2)); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound", + cmp.compare(truncateBinaryMax(Literal.of(test4), 2).value(), test4) >= 0); + Assert.assertTrue("Since a shorter sequence is considered smaller, output must have two bytes " + + "and the second byte of the input must be incremented", + cmp.compare(truncateBinaryMax(Literal.of(test4), 2).value(), expectedOutput) == 0); + } + + @Test + public void testTruncateStringMin() throws IOException { + String test1 = "イロハニホヘト"; + // Output of test1 when truncated to 2 unicode characters + String test1_2_expected = "イロ"; + String test1_3_expected = "イロハ"; + String test2 = "щщаεはчωいにπάほхεろへσκζ"; + String test2_7_expected = "щщаεはчω"; + // U+FFFF is max 3 byte UTF-8 character + String test3 = "\uFFFF\uFFFF"; + // test4 consists of 2 4 byte UTF-8 characters + String test4 = "\uD800\uDC00\uD800\uDC00"; + String test4_1_expected = "\uD800\uDC00"; + + Comparator cmp = Literal.of(test1).comparator(); + Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound", + cmp.compare(truncateStringMin(Literal.of(test1), 3).value(), test1) <= 0); + Assert.assertTrue("No truncation required as truncate length is greater than the input size", + cmp.compare(truncateStringMin(Literal.of(test1), 8).value(), test1) == 0); + Assert.assertTrue("Output must have the first two characters of the input", + cmp.compare(truncateStringMin(Literal.of(test1), 2).value(), test1_2_expected) == 0); + Assert.assertTrue("Output must have the first three characters of the input", + cmp.compare(truncateStringMin(Literal.of(test1), 3).value(), test1_3_expected) == 0); + Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound", + cmp.compare(truncateStringMin(Literal.of(test2), 16).value(), test2) <= 0); + Assert.assertTrue("Output must have the first seven characters of the input", + cmp.compare(truncateStringMin(Literal.of(test2), 7).value(), test2_7_expected) == 0); + Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound", + cmp.compare(truncateStringMin(Literal.of(test3), 2).value(), test3) <= 0); + Assert.assertTrue("No truncation required as truncate length is equal to the input size", + cmp.compare(truncateStringMin(Literal.of(test3), 2).value(), test3) == 0); + Assert.assertTrue("Truncated lower bound should be lower than or equal to the actual lower bound", + cmp.compare(truncateStringMin(Literal.of(test4), 1).value(), test4) <= 0); + Assert.assertTrue("Output must have the first 4 byte UTF-8 character of the input", + cmp.compare(truncateStringMin(Literal.of(test4), 1).value(), test4_1_expected) == 0); + } + + @Test + public void testTruncateStringMax() throws IOException { + String test1 = "イロハニホヘト"; + // Output of test1 when truncated to 2 unicode characters + String test1_2_expected = "イヮ"; + String test1_3_expected = "イロバ"; + String test2 = "щщаεはчωいにπάほхεろへσκζ"; + String test2_7_expected = "щщаεはчϊ"; + String test3 = "aनि\uFFFF\uFFFF"; + String test3_3_expected = "aनी"; + // U+FFFF is max 3 byte UTF-8 character + String test4 = "\uFFFF\uFFFF"; + String test4_1_expected = "\uD800\uDC00"; + // test5 consists of 2 4 byte max UTF-8 characters + String test5 = "\uDBFF\uDFFF\uDBFF\uDFFF"; + String test6 = "\uD800\uDFFF\uD800\uDFFF"; + // Increment the previous character + String test6_2_expected = "\uD801\uDC00"; + + Comparator cmp = Literal.of(test1).comparator(); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound", + cmp.compare(truncateStringMax(Literal.of(test1), 4).value(), test1) >= 0); + Assert.assertTrue("No truncation required as truncate length is equal to the input size", + cmp.compare(truncateStringMax(Literal.of(test1), 7).value(), test1) == 0); + Assert.assertTrue("Output must have two characters and the second character of the input must " + + "be incremented", cmp.compare( + truncateStringMax(Literal.of(test1), 2).value(), test1_2_expected) == 0); + Assert.assertTrue("Output must have three characters and the third character of the input must " + + "be incremented", cmp.compare( + truncateStringMax(Literal.of(test1), 3).value(), test1_3_expected) == 0); + Assert.assertTrue("No truncation required as truncate length is greater than the input size", + cmp.compare(truncateStringMax(Literal.of(test1), 8).value(), test1) == 0); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper " + + "bound", cmp.compare(truncateStringMax(Literal.of(test2), 8).value(), test2) >= 0); + Assert.assertTrue("Output must have seven characters and the seventh character of the input " + + "must be incremented", cmp.compare( + truncateStringMax(Literal.of(test2), 7).value(), test2_7_expected) == 0); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper " + + "bound", cmp.compare(truncateStringMax(Literal.of(test3), 3).value(), test3) >= 0); + Assert.assertTrue("Output must have three characters and the third character of the input must " + + "be incremented. The second perceivable character in this string is actually a glyph. It consists of " + + "two unicode characters", cmp.compare( + truncateStringMax(Literal.of(test3), 3).value(), test3_3_expected) == 0); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound", + cmp.compare(truncateStringMax(Literal.of(test4), 1).value(), test4) >= 0); + Assert.assertTrue("Output must have one character. Since the first character is the max 3 byte " + + "UTF-8 character, it should be incremented to the lowest 4 byte UTF-8 character", + cmp.compare(truncateStringMax(Literal.of(test4), 1).value(), test4_1_expected) == 0); + Assert.assertNull("An upper bound doesn't exist since the first two characters are max UTF-8 " + + "characters", truncateStringMax(Literal.of(test5), 1)); + Assert.assertTrue("Truncated upper bound should be greater than or equal to the actual upper bound", + cmp.compare(truncateStringMax(Literal.of(test6), 2).value(), test6) >= 0); + Assert.assertTrue("Test 4 byte UTF-8 character increment. Output must have one character with " + + "the first character incremented", cmp.compare( + truncateStringMax(Literal.of(test6), 1).value(), test6_2_expected) == 0); + } +} diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala index c0635e314842..d7bb2ed9e02e 100644 --- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala +++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala @@ -24,7 +24,7 @@ import java.nio.ByteBuffer import java.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.iceberg.{DataFile, DataFiles, Metrics, PartitionSpec} +import org.apache.iceberg.{DataFile, DataFiles, Metrics, PartitionSpec, TableProperties} import org.apache.iceberg.hadoop.HadoopInputFile import org.apache.iceberg.orc.OrcMetrics import org.apache.iceberg.parquet.ParquetUtil @@ -238,7 +238,8 @@ object SparkTableUtil { val fs = partition.getFileSystem(conf) fs.listStatus(partition, HiddenPathFilter).filter(_.isFile).map { stat => - val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat)) + val metrics = ParquetUtil.footerMetrics(ParquetFileReader.readFooter(conf, stat), + TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT) SparkDataFile( stat.getPath.toString, diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java index 2faca60a5311..fcecb17c5b4d 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java @@ -49,9 +49,7 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; -import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.Files.localOutput; -import static org.apache.iceberg.parquet.ParquetUtil.fileMetrics; public class TestParquetScan extends AvroDataTest { private static final Configuration CONF = new Configuration(); @@ -106,7 +104,7 @@ protected void writeAndValidate(Schema schema) throws IOException { DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) .withFileSizeInBytes(parquetFile.length()) .withPath(parquetFile.toString()) - .withMetrics(fileMetrics(localInput(parquetFile))) + .withRecordCount(100) .build(); table.newAppend().appendFile(file).commit();