diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f965511dae..b173239332 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -47,6 +47,7 @@ public class ParquetProperties { public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true; public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; + public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -83,10 +84,11 @@ public static WriterVersion fromString(String name) { private final boolean estimateNextSizeCheck; private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; + private final int columnIndexTruncateLength; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, - ValuesWriterFactory writerFactory) { + ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); @@ -99,6 +101,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.allocator = allocator; this.valuesWriterFactory = writerFactory; + this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -183,6 +186,10 @@ public ValuesWriterFactory getValuesWriterFactory() { return valuesWriterFactory; } + public int getColumnIndexTruncateLength() { + return columnIndexTruncateLength; + } + public boolean estimateNextSizeCheck() { return estimateNextSizeCheck; } @@ -205,6 +212,7 @@ public static class Builder { private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK; private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; + private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; private Builder() { } @@ -299,11 +307,17 @@ public Builder withValuesWriterFactory(ValuesWriterFactory factory) { return this; } + public Builder withColumnIndexTruncateLength(int length) { + Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length); + this.columnIndexTruncateLength = length; + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory); + estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java index 12ed7b4f87..950b70f54e 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java @@ -58,6 +58,8 @@ String getMaxValueAsString(int pageIndex) { private final List minValues = new ArrayList<>(); private final List maxValues = new ArrayList<>(); + private final BinaryTruncator truncator; + private final int truncateLength; private static Binary convert(ByteBuffer buffer) { return Binary.fromReusedByteBuffer(buffer); @@ -67,6 +69,11 @@ private static ByteBuffer convert(Binary value) { return value.toByteBuffer(); } + BinaryColumnIndexBuilder(PrimitiveType type, int truncateLength) { + truncator = BinaryTruncator.getTruncator(type); + this.truncateLength = truncateLength; + } + @Override void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) { minValues.add(min == null ? null : convert(min)); @@ -75,8 +82,8 @@ void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) { @Override void addMinMax(Object min, Object max) { - minValues.add((Binary) min); - maxValues.add((Binary) max); + minValues.add(min == null ? null : truncator.truncateMin((Binary) min, truncateLength)); + maxValues.add(max == null ? null : truncator.truncateMax((Binary) max, truncateLength)); } @Override diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java new file mode 100644 index 0000000000..bcc43fb866 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.internal.column.columnindex; + +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; +import java.nio.charset.StandardCharsets; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +/** + * Class for truncating min/max values for binary types. + */ +abstract class BinaryTruncator { + enum Validity { + VALID, MALFORMED, UNMAPPABLE; + } + + private static class CharsetValidator { + private final CharBuffer dummyBuffer = CharBuffer.allocate(1024); + private final CharsetDecoder decoder; + + CharsetValidator(Charset charset) { + decoder = charset.newDecoder(); + decoder.onMalformedInput(CodingErrorAction.REPORT); + decoder.onUnmappableCharacter(CodingErrorAction.REPORT); + } + + Validity checkValidity(ByteBuffer buffer) { + int pos = buffer.position(); + CoderResult result = CoderResult.OVERFLOW; + while (result.isOverflow()) { + dummyBuffer.clear(); + result = decoder.decode(buffer, dummyBuffer, true); + } + buffer.position(pos); + if (result.isUnderflow()) { + return Validity.VALID; + } else if (result.isMalformed()) { + return Validity.MALFORMED; + } else { + return Validity.UNMAPPABLE; + } + } + } + + private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() { + @Override + Binary truncateMin(Binary minValue, int length) { + return minValue; + } + + @Override + Binary truncateMax(Binary maxValue, int length) { + return maxValue; + } + }; + + private static final BinaryTruncator DEFAULT_UTF8_TRUNCATOR = new BinaryTruncator() { + private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8); + + @Override + Binary truncateMin(Binary minValue, int length) { + if (minValue.length() <= length) { + return minValue; + } + ByteBuffer buffer = minValue.toByteBuffer(); + byte[] array; + if (validator.checkValidity(buffer) == Validity.VALID) { + array = truncateUtf8(buffer, length); + } else { + array = truncate(buffer, length); + } + return array == null ? minValue : Binary.fromConstantByteArray(array); + } + + @Override + Binary truncateMax(Binary maxValue, int length) { + if (maxValue.length() <= length) { + return maxValue; + } + byte[] array; + ByteBuffer buffer = maxValue.toByteBuffer(); + if (validator.checkValidity(buffer) == Validity.VALID) { + array = incrementUtf8(truncateUtf8(buffer, length)); + } else { + array = increment(truncate(buffer, length)); + } + return array == null ? maxValue : Binary.fromConstantByteArray(array); + } + + // Simply truncate to length + private byte[] truncate(ByteBuffer buffer, int length) { + assert length < buffer.remaining(); + byte[] array = new byte[length]; + buffer.get(array); + return array; + } + + // Trying to increment the bytes from the last one to the beginning + private byte[] increment(byte[] array) { + for (int i = array.length - 1; i >= 0; --i) { + byte elem = array[i]; + ++elem; + array[i] = elem; + if (elem != 0) { // Did not overflow: 0xFF -> 0x00 + return array; + } + } + return null; + } + + // Truncates the buffer to length or less so the remaining bytes form a valid UTF-8 string + private byte[] truncateUtf8(ByteBuffer buffer, int length) { + assert length < buffer.remaining(); + ByteBuffer newBuffer = buffer.slice(); + newBuffer.limit(newBuffer.position() + length); + while (validator.checkValidity(newBuffer) != Validity.VALID) { + newBuffer.limit(newBuffer.limit() - 1); + if (newBuffer.remaining() == 0) { + return null; + } + } + byte[] array = new byte[newBuffer.remaining()]; + newBuffer.get(array); + return array; + } + + // Trying to increment the bytes from the last one to the beginning until the bytes form a valid UTF-8 string + private byte[] incrementUtf8(byte[] array) { + if (array == null) { + return null; + } + ByteBuffer buffer = ByteBuffer.wrap(array); + for (int i = array.length - 1; i >= 0; --i) { + byte prev = array[i]; + byte inc = prev; + while (++inc != 0) { // Until overflow: 0xFF -> 0x00 + array[i] = inc; + switch (validator.checkValidity(buffer)) { + case VALID: + return array; + case UNMAPPABLE: + continue; // Increment the i byte once more + case MALFORMED: + break; // Stop incrementing the i byte; go to the i-1 + } + break; // MALFORMED + } + array[i] = prev; + } + return null; // All characters are the largest possible; unable to increment + } + }; + + static BinaryTruncator getTruncator(PrimitiveType type) { + if (type == null) { + return NO_OP_TRUNCATOR; + } + switch (type.getPrimitiveTypeName()) { + case INT96: + return NO_OP_TRUNCATOR; + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + OriginalType originalType = type.getOriginalType(); + if (originalType == null) { + return DEFAULT_UTF8_TRUNCATOR; + } + switch (originalType) { + case UTF8: + case ENUM: + case JSON: + case BSON: + return DEFAULT_UTF8_TRUNCATOR; + default: + return NO_OP_TRUNCATOR; + } + default: + throw new IllegalArgumentException("No truncator is available for the type: " + type); + } + } + + abstract Binary truncateMin(Binary minValue, int length); + + abstract Binary truncateMax(Binary maxValue, int length); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index aa0502ba16..6d05558ec8 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -220,20 +220,22 @@ public static ColumnIndexBuilder getNoOpBuilder() { /** * @param type * the type this builder is to be created for + * @param truncateLength + * the length to be used for truncating binary values if possible * @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects */ - public static ColumnIndexBuilder getBuilder(PrimitiveType type) { - ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName()); + public static ColumnIndexBuilder getBuilder(PrimitiveType type, int truncateLength) { + ColumnIndexBuilder builder = createNewBuilder(type, truncateLength); builder.type = type; return builder; } - private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) { - switch (type) { + private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int truncateLength) { + switch (type.getPrimitiveTypeName()) { case BINARY: case FIXED_LEN_BYTE_ARRAY: case INT96: - return new BinaryColumnIndexBuilder(); + return new BinaryColumnIndexBuilder(type, truncateLength); case BOOLEAN: return new BooleanColumnIndexBuilder(); case DOUBLE: @@ -276,7 +278,7 @@ public static ColumnIndex build( PrimitiveTypeName typeName = type.getPrimitiveTypeName(); ColumnIndexBuilder builder = BUILDERS.get(typeName); if (builder == null) { - builder = createNewBuilder(typeName); + builder = createNewBuilder(type, Integer.MAX_VALUE); BUILDERS.put(typeName, builder); } diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java new file mode 100644 index 0000000000..c3e3d85749 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.internal.column.columnindex; + +import static org.apache.parquet.schema.OriginalType.BSON; +import static org.apache.parquet.schema.OriginalType.DECIMAL; +import static org.apache.parquet.schema.OriginalType.ENUM; +import static org.apache.parquet.schema.OriginalType.INTERVAL; +import static org.apache.parquet.schema.OriginalType.JSON; +import static org.apache.parquet.schema.OriginalType.UTF8; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; +import java.nio.charset.StandardCharsets; +import java.util.Comparator; +import java.util.Random; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.PrimitiveStringifier; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for {@link BinaryTruncator} + */ +public class TestBinaryTruncator { + + private static final Logger LOG = LoggerFactory.getLogger(TestBinaryTruncator.class); + private static final PrimitiveStringifier HEXA_STRINGIFIER = Types.required(BINARY) + .named("dummy_type").stringifier(); + private static final Random RANDOM = new Random(42); + private static final CharsetDecoder UTF8_DECODER = StandardCharsets.UTF_8.newDecoder(); + static { + UTF8_DECODER.onMalformedInput(CodingErrorAction.REPORT); + UTF8_DECODER.onUnmappableCharacter(CodingErrorAction.REPORT); + } + + // The maximum values in UTF-8 for the 1, 2, 3 and 4 bytes representations + private static final String UTF8_1BYTE_MAX_CHAR = "\u007F"; + private static final String UTF8_2BYTES_MAX_CHAR = "\u07FF"; + private static final String UTF8_3BYTES_MAX_CHAR = "\uFFFF"; + private static final String UTF8_4BYTES_MAX_CHAR = "\uDBFF\uDFFF"; + + @Test + public void testNonStringTruncate() { + BinaryTruncator truncator = BinaryTruncator + .getTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal")); + assertEquals(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), + truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 2)); + assertEquals(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06), + truncator.truncateMax(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06), 2)); + } + + @Test + public void testContractNonStringTypes() { + testTruncator( + Types.required(FIXED_LEN_BYTE_ARRAY).length(8).as(DECIMAL).precision(18).scale(4).named("test_fixed_decimal"), + false); + testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("test_fixed_interval"), false); + testTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"), false); + testTruncator(Types.required(INT96).named("test_int96"), false); + } + + @Test + public void testStringTruncate() { + BinaryTruncator truncator = BinaryTruncator.getTruncator(Types.required(BINARY).as(UTF8).named("test_utf8")); + + // Truncate 1 byte characters + assertEquals(Binary.fromString("abc"), truncator.truncateMin(Binary.fromString("abcdef"), 3)); + assertEquals(Binary.fromString("abd"), truncator.truncateMax(Binary.fromString("abcdef"), 3)); + + // Truncate 1-2 bytes characters; the target length is "inside" a UTF-8 character + assertEquals(Binary.fromString("árvízt"), truncator.truncateMin(Binary.fromString("árvíztűrő"), 9)); + assertEquals(Binary.fromString("árvízu"), truncator.truncateMax(Binary.fromString("árvíztűrő"), 9)); + + // Truncate highest UTF-8 values -> unable to increment + assertEquals( + Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR), + truncator.truncateMin(Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR), + 5)); + assertEquals( + Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR), + truncator.truncateMax(Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR), + 5)); + + // Truncate highest UTF-8 values at the end -> increment the first possible character + assertEquals( + Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + "b" + + UTF8_3BYTES_MAX_CHAR), + truncator.truncateMax(Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + "a" + + UTF8_3BYTES_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR), + 10)); + + // Truncate invalid UTF-8 values -> truncate without validity check + assertEquals(binary(0xFF, 0xFE, 0xFD), truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3)); + assertEquals(binary(0xFF, 0xFE, 0xFE), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3)); + assertEquals(binary(0xFF, 0xFE, 0xFE, 0x00, 0x00), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFF, 0xFF, 0xFF), 5)); + } + + @Test + public void testContractStringTypes() { + testTruncator(Types.required(BINARY).named("test_binary"), true); + testTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"), true); + testTruncator(Types.required(BINARY).as(ENUM).named("test_enum"), true); + testTruncator(Types.required(BINARY).as(JSON).named("test_json"), true); + testTruncator(Types.required(BINARY).as(BSON).named("test_bson"), true); + testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(5).named("test_fixed"), true); + } + + private void testTruncator(PrimitiveType type, boolean strict) { + BinaryTruncator truncator = BinaryTruncator.getTruncator(type); + Comparator comparator = type.comparator(); + + checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa"), strict, strict); + checkContract(truncator, comparator, Binary.fromString("árvíztűrő tükörfúrógép"), strict, strict); + checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa" + UTF8_3BYTES_MAX_CHAR), strict, strict); + checkContract(truncator, comparator, Binary.fromString("a" + UTF8_3BYTES_MAX_CHAR + UTF8_1BYTE_MAX_CHAR), strict, + strict); + + checkContract(truncator, comparator, + Binary.fromConstantByteArray(new byte[] { (byte) 0xFE, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, (byte) 0xFF }), strict, + strict); + + // Edge case: zero length -> unable to truncate + checkContract(truncator, comparator, Binary.fromString(""), false, false); + // Edge case: containing only UTF-8 max characters -> unable to truncate for max + checkContract(truncator, comparator, Binary.fromString( + UTF8_1BYTE_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_1BYTE_MAX_CHAR + + UTF8_2BYTES_MAX_CHAR + + UTF8_3BYTES_MAX_CHAR + + UTF8_4BYTES_MAX_CHAR), + strict, false); + // Edge case: non-UTF-8; max bytes -> unable to truncate for max + checkContract( + truncator, comparator, + binary(0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF), + strict, false); + } + + // Checks the contract of truncator + // strict means actual truncation is required and the truncated value is a valid UTF-8 string + private void checkContract(BinaryTruncator truncator, Comparator comparator, Binary value, boolean strictMin, + boolean strictMax) { + int length = value.length(); + + // Edge cases: returning the original value if no truncation is required + assertSame(value, truncator.truncateMin(value, length)); + assertSame(value, truncator.truncateMax(value, length)); + assertSame(value, truncator.truncateMin(value, random(length + 1, length * 2 + 1))); + assertSame(value, truncator.truncateMax(value, random(length + 1, length * 2 + 1))); + + if (length > 1) { + checkMinContract(truncator, comparator, value, length - 1, strictMin); + checkMaxContract(truncator, comparator, value, length - 1, strictMax); + checkMinContract(truncator, comparator, value, random(1, length - 1), strictMin); + checkMaxContract(truncator, comparator, value, random(1, length - 1), strictMax); + } + + // Edge case: possible to truncate min value to 0 length if original value is not empty + checkMinContract(truncator, comparator, value, 0, strictMin); + // Edge case: impossible to truncate max value to 0 length -> returning the original value + assertSame(value, truncator.truncateMax(value, 0)); + } + + private void checkMinContract(BinaryTruncator truncator, Comparator comparator, Binary value, int length, + boolean strict) { + Binary truncated = truncator.truncateMin(value, length); + LOG.debug("\"{}\" --truncMin({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(), + HEXA_STRINGIFIER.stringify(truncated)); + assertTrue("truncatedMin(value) should be <= than value", comparator.compare(truncated, value) <= 0); + assertFalse("length of truncateMin(value) should not be > than the length of value", + truncated.length() > value.length()); + if (isValidUtf8(value)) { + checkValidUtf8(truncated); + } + if (strict) { + assertTrue("length of truncateMin(value) ahould be < than the length of value", + truncated.length() < value.length()); + } + } + + private void checkMaxContract(BinaryTruncator truncator, Comparator comparator, Binary value, int length, + boolean strict) { + Binary truncated = truncator.truncateMax(value, length); + LOG.debug("\"{}\" --truncMax({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(), + HEXA_STRINGIFIER.stringify(truncated)); + assertTrue("truncatedMax(value) should be >= than value", comparator.compare(truncated, value) >= 0); + assertFalse("length of truncateMax(value) should not be > than the length of value", + truncated.length() > value.length()); + if (isValidUtf8(value)) { + checkValidUtf8(truncated); + } + if (strict) { + assertTrue("length of truncateMax(value) ahould be < than the length of value", + truncated.length() < value.length()); + } + } + + private static boolean isValidUtf8(Binary binary) { + try { + UTF8_DECODER.decode(binary.toByteBuffer()); + return true; + } catch (CharacterCodingException e) { + return false; + } + } + + private static void checkValidUtf8(Binary binary) { + try { + UTF8_DECODER.decode(binary.toByteBuffer()); + } catch (CharacterCodingException e) { + throw new AssertionError("Truncated value should be a valid UTF-8 string", e); + } + } + + private static int random(int min, int max) { + return RANDOM.nextInt(max - min + 1) + min; + } + + private static Binary binary(int... unsignedBytes) { + byte[] byteArray = new byte[unsignedBytes.length]; + for (int i = 0, n = byteArray.length; i < n; ++i) { + int b = unsignedBytes[i]; + assert (0xFFFFFF00 & b) == 0; + byteArray[i] = (byte) b; + } + return Binary.fromConstantByteArray(byteArray); + } + +} diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java index 5acae97c9c..7a5745e7b8 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java @@ -65,7 +65,7 @@ public class TestColumnIndexBuilder { @Test public void testBuildBinaryDecimal() { PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class)); assertNull(builder.build()); @@ -103,7 +103,7 @@ public void testBuildBinaryDecimal() { null, decimalBinary("87656273")); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null, null)); builder.add(sb.stats(type, decimalBinary("-9999293.23"), decimalBinary("-234.23"))); @@ -138,7 +138,7 @@ public void testBuildBinaryDecimal() { decimalBinary("1234567890.12"), null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null)); builder.add(sb.stats(type, null, null)); @@ -177,7 +177,7 @@ public void testBuildBinaryDecimal() { @Test public void testBuildBinaryUtf8() { PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class)); assertNull(builder.build()); @@ -215,7 +215,7 @@ public void testBuildBinaryUtf8() { stringBinary("Beeblebrox"), null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, stringBinary("Beeblebrox"), stringBinary("Dent"), null, null)); builder.add(sb.stats(type, null, null)); @@ -250,7 +250,7 @@ public void testBuildBinaryUtf8() { stringBinary("Slartibartfast"), null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, stringBinary("Slartibartfast"))); @@ -337,11 +337,11 @@ public void testStaticBuildBinary() { @Test public void testBuildBoolean() { PrimitiveType type = Types.required(BOOLEAN).named("test_boolean"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class)); assertNull(builder.build()); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); StatsBuilder sb = new StatsBuilder(); builder.add(sb.stats(type, false, true)); builder.add(sb.stats(type, true, false, null)); @@ -357,7 +357,7 @@ public void testBuildBoolean() { assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false); assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, false, false)); @@ -375,7 +375,7 @@ public void testBuildBoolean() { assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null); assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, true, true)); @@ -413,7 +413,7 @@ public void testStaticBuildBoolean() { @Test public void testBuildDouble() { PrimitiveType type = Types.required(DOUBLE).named("test_double"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class)); assertNull(builder.build()); @@ -433,7 +433,7 @@ public void testBuildDouble() { assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1); assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, -532.3, -345.2, null, null)); @@ -453,7 +453,7 @@ public void testBuildDouble() { assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null); assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null, null, null)); builder.add(sb.stats(type, 532.3, 345.2)); @@ -493,7 +493,7 @@ public void testStaticBuildDouble() { @Test public void testBuildFloat() { PrimitiveType type = Types.required(FLOAT).named("test_float"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(FloatColumnIndexBuilder.class)); assertNull(builder.build()); @@ -513,7 +513,7 @@ public void testBuildFloat() { assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f); assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, -532.3f, -345.2f, null, null)); @@ -533,7 +533,7 @@ public void testBuildFloat() { assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null); assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null, null, null)); builder.add(sb.stats(type, 532.3f, 345.2f)); @@ -573,7 +573,7 @@ public void testStaticBuildFloat() { @Test public void testBuildInt32() { PrimitiveType type = Types.required(INT32).named("test_int32"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(IntColumnIndexBuilder.class)); assertNull(builder.build()); @@ -593,7 +593,7 @@ public void testBuildInt32() { assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8); assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, -532, -345, null, null)); @@ -613,7 +613,7 @@ public void testBuildInt32() { assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null); assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null, null, null)); builder.add(sb.stats(type, 532, 345)); @@ -653,7 +653,7 @@ public void testStaticBuildInt32() { @Test public void testBuildUInt8() { PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(IntColumnIndexBuilder.class)); assertNull(builder.build()); @@ -673,7 +673,7 @@ public void testBuildUInt8() { assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA); assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, 0, 0, null, null)); @@ -693,7 +693,7 @@ public void testBuildUInt8() { assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null); assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null, null, null)); builder.add(sb.stats(type, 0xFF, 0xFF)); @@ -717,7 +717,7 @@ public void testBuildUInt8() { @Test public void testBuildInt64() { PrimitiveType type = Types.required(INT64).named("test_int64"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); assertThat(builder, instanceOf(LongColumnIndexBuilder.class)); assertNull(builder.build()); @@ -737,7 +737,7 @@ public void testBuildInt64() { assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l); assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null)); builder.add(sb.stats(type, -532l, -345l, null, null)); @@ -757,7 +757,7 @@ public void testBuildInt64() { assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null); assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null); - builder = ColumnIndexBuilder.getBuilder(type); + builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); sb = new StatsBuilder(); builder.add(sb.stats(type, null, null, null, null, null)); builder.add(sb.stats(type, 532l, 345l)); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 0646493342..29a353fd29 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -76,12 +76,13 @@ private static final class ColumnChunkPageWriter implements PageWriter { private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, - ByteBufferAllocator allocator) { + ByteBufferAllocator allocator, + int columnIndexTruncateLength) { this.path = path; this.compressor = compressor; this.allocator = allocator; this.buf = new ConcatenatingByteArrayCollector(); - this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType()); + this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength); this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); } @@ -273,10 +274,11 @@ public String memUsageString(String prefix) { private final Map writers = new HashMap(); private final MessageType schema; - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator) { + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator, + int columnIndexTruncateLength) { this.schema = schema; for (ColumnDescriptor path : schema.getColumns()) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator)); + writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength)); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index d9e9b5e15e..1f8a093b19 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -20,7 +20,6 @@ import static java.lang.Math.max; import static java.lang.Math.min; -import static java.lang.String.format; import static org.apache.parquet.Preconditions.checkNotNull; import java.io.IOException; @@ -102,7 +101,8 @@ public ParquetMetadata getFooter() { } private void initStore() { - pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator()); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(), + props.getColumnIndexTruncateLength()); columnStore = props.newColumnWriteStore(schema, pageStore); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 3c85b02ec2..3a65624b06 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -47,6 +47,7 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; @@ -100,6 +101,7 @@ public static enum Mode { private final MessageType schema; private final PositionOutputStream out; private final AlignmentStrategy alignment; + private final int columnIndexTruncateLength; // file data private List blocks = new ArrayList(); @@ -244,10 +246,27 @@ public ParquetFileWriter(Configuration configuration, MessageType schema, * @param rowGroupSize the row group size * @param maxPaddingSize the maximum padding * @throws IOException if the file can not be created + * @deprecated will be removed in 2.0.0 */ + @Deprecated public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long rowGroupSize, int maxPaddingSize) throws IOException { + this(file, schema, mode, rowGroupSize, maxPaddingSize, + ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); + } + /** + * @param file OutputFile to create or overwrite + * @param schema the schema of the data + * @param mode file creation mode + * @param rowGroupSize the row group size + * @param maxPaddingSize the maximum padding + * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to + * @throws IOException if the file can not be created + */ + public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, + long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength) + throws IOException { TypeUtil.checkValidWriteSchema(schema); this.schema = schema; @@ -267,6 +286,7 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, } this.encodingStatsBuilder = new EncodingStats.Builder(); + this.columnIndexTruncateLength = columnIndexTruncateLength; } /** @@ -289,6 +309,8 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, this.out = HadoopStreams.wrap( fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize)); this.encodingStatsBuilder = new EncodingStats.Builder(); + // no truncation is needed for testing + this.columnIndexTruncateLength = Integer.MAX_VALUE; } /** * start the file @@ -342,7 +364,7 @@ public void startColumn(ColumnDescriptor descriptor, // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one currentStatistics = null; - columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType); + columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); firstPageOffset = -1; } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index ff5bab397d..0789bf50d4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -143,6 +143,7 @@ public static enum JobSummaryLevel { public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min"; public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; + public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -312,6 +313,18 @@ private static int getMaxPaddingSize(Configuration conf) { return conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT); } + public static void setColumnIndexTruncateLength(JobContext jobContext, int length) { + setColumnIndexTruncateLength(getConfiguration(jobContext), length); + } + + public static void setColumnIndexTruncateLength(Configuration conf, int length) { + conf.setInt(COLUMN_INDEX_TRUNCATE_LENGTH, length); + } + + private static int getColumnIndexTruncateLength(Configuration conf) { + return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); + } + private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -366,6 +379,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf)) .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) + .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -383,11 +397,12 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant")); LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); + LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength()); } WriteContext init = writeSupport.init(conf); ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf), - init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize); + init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength()); w.start(); float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index a32df39a5d..5b0e4f82d1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -278,7 +278,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport MessageType schema = writeContext.getSchema(); ParquetFileWriter fileWriter = new ParquetFileWriter( - file, schema, mode, rowGroupSize, maxPaddingSize); + file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength()); fileWriter.start(); this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index b9382fc0b7..90f4a5bdd3 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -918,7 +918,7 @@ public void testOffsetIndexConversion() { @Test public void testColumnIndexConversion() { PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64"); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE); Statistics stats = Statistics.createStats(type); stats.incrementNumNulls(16); stats.updateStats(-100l); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index b78726838c..9a27defe15 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -160,7 +160,8 @@ public void test() throws Exception { writer.startBlock(rowCount); pageOffset = outputFile.out().getPos(); { - ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator()); + ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema, + new HeapByteBufferAllocator(), Integer.MAX_VALUE); PageWriter pageWriter = store.getPageWriter(col); pageWriter.writePageV2( rowCount, nullCount, valueCount, @@ -235,7 +236,7 @@ public void testColumnOrderV1() throws IOException { // TODO - look back at this, an allocator was being passed here in the ByteBuffer changes // see comment at this constructor ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( - compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator()); + compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator(), Integer.MAX_VALUE); for (ColumnDescriptor col : schema.getColumns()) { PageWriter pageWriter = store.getPageWriter(col);