Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ParquetProperties {
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

Expand Down Expand Up @@ -83,10 +84,11 @@ public static WriterVersion fromString(String name) {
private final boolean estimateNextSizeCheck;
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory) {
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -99,6 +101,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.allocator = allocator;

this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -183,6 +186,10 @@ public ValuesWriterFactory getValuesWriterFactory() {
return valuesWriterFactory;
}

public int getColumnIndexTruncateLength() {
return columnIndexTruncateLength;
}

public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}
Expand All @@ -205,6 +212,7 @@ public static class Builder {
private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;

private Builder() {
}
Expand Down Expand Up @@ -299,11 +307,17 @@ public Builder withValuesWriterFactory(ValuesWriterFactory factory) {
return this;
}

public Builder withColumnIndexTruncateLength(int length) {
Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
this.columnIndexTruncateLength = length;
return this;
}

public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory);
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ String getMaxValueAsString(int pageIndex) {

private final List<Binary> minValues = new ArrayList<>();
private final List<Binary> maxValues = new ArrayList<>();
private final BinaryTruncator truncator;
private final int truncateLength;

private static Binary convert(ByteBuffer buffer) {
return Binary.fromReusedByteBuffer(buffer);
Expand All @@ -67,6 +69,11 @@ private static ByteBuffer convert(Binary value) {
return value.toByteBuffer();
}

BinaryColumnIndexBuilder(PrimitiveType type, int truncateLength) {
truncator = BinaryTruncator.getTruncator(type);
this.truncateLength = truncateLength;
}

@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
minValues.add(min == null ? null : convert(min));
Expand All @@ -75,8 +82,8 @@ void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {

@Override
void addMinMax(Object min, Object max) {
minValues.add((Binary) min);
maxValues.add((Binary) max);
minValues.add(min == null ? null : truncator.truncateMin((Binary) min, truncateLength));
maxValues.add(max == null ? null : truncator.truncateMax((Binary) max, truncateLength));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.internal.column.columnindex;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;

import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;

/**
* Class for truncating min/max values for binary types.
*/
abstract class BinaryTruncator {
enum Validity {
VALID, MALFORMED, UNMAPPABLE;
}

private static class CharsetValidator {
private final CharBuffer dummyBuffer = CharBuffer.allocate(1024);
private final CharsetDecoder decoder;

CharsetValidator(Charset charset) {
decoder = charset.newDecoder();
decoder.onMalformedInput(CodingErrorAction.REPORT);
decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
}

Validity checkValidity(ByteBuffer buffer) {
int pos = buffer.position();
CoderResult result = CoderResult.OVERFLOW;
while (result.isOverflow()) {
dummyBuffer.clear();
result = decoder.decode(buffer, dummyBuffer, true);
}
buffer.position(pos);
if (result.isUnderflow()) {
return Validity.VALID;
} else if (result.isMalformed()) {
return Validity.MALFORMED;
} else {
return Validity.UNMAPPABLE;
}
}
}

private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
@Override
Binary truncateMin(Binary minValue, int length) {
return minValue;
}

@Override
Binary truncateMax(Binary maxValue, int length) {
return maxValue;
}
};

private static final BinaryTruncator DEFAULT_UTF8_TRUNCATOR = new BinaryTruncator() {
private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);

@Override
Binary truncateMin(Binary minValue, int length) {
if (minValue.length() <= length) {
return minValue;
}
ByteBuffer buffer = minValue.toByteBuffer();
byte[] array;
if (validator.checkValidity(buffer) == Validity.VALID) {
array = truncateUtf8(buffer, length);
} else {
array = truncate(buffer, length);
}
return array == null ? minValue : Binary.fromConstantByteArray(array);
}

@Override
Binary truncateMax(Binary maxValue, int length) {
if (maxValue.length() <= length) {
return maxValue;
}
byte[] array;
ByteBuffer buffer = maxValue.toByteBuffer();
if (validator.checkValidity(buffer) == Validity.VALID) {
array = incrementUtf8(truncateUtf8(buffer, length));
} else {
array = increment(truncate(buffer, length));
}
return array == null ? maxValue : Binary.fromConstantByteArray(array);
}

// Simply truncate to length
private byte[] truncate(ByteBuffer buffer, int length) {
assert length < buffer.remaining();
byte[] array = new byte[length];
buffer.get(array);
return array;
}

// Trying to increment the bytes from the last one to the beginning
private byte[] increment(byte[] array) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that this only increments bytes if they do not overflow. I think they should be incremented even if they overflow.

An example for the difference between the two approaches:

Description Value
Input 00 42 FF FF
Only incrementing bytes even if they overflow 00 43 00 00
Only incrementing bytes that do not overflow 00 43 FF FF

Even though 00 43 FF FF is a valid max value as well, 00 43 00 00 is closer to the original value thus it results in better filtering.

for (int i = array.length - 1; i >= 0; --i) {
byte elem = array[i];
++elem;
array[i] = elem;
if (elem != 0) { // Did not overflow: 0xFF -> 0x00
return array;
}
}
return null;
}

// Truncates the buffer to length or less so the remaining bytes form a valid UTF-8 string
private byte[] truncateUtf8(ByteBuffer buffer, int length) {
assert length < buffer.remaining();
ByteBuffer newBuffer = buffer.slice();
newBuffer.limit(newBuffer.position() + length);
while (validator.checkValidity(newBuffer) != Validity.VALID) {
newBuffer.limit(newBuffer.limit() - 1);
if (newBuffer.remaining() == 0) {
return null;
}
}
byte[] array = new byte[newBuffer.remaining()];
newBuffer.get(array);
return array;
}

// Trying to increment the bytes from the last one to the beginning until the bytes form a valid UTF-8 string
private byte[] incrementUtf8(byte[] array) {
if (array == null) {
return null;
}
ByteBuffer buffer = ByteBuffer.wrap(array);
for (int i = array.length - 1; i >= 0; --i) {
byte prev = array[i];
byte inc = prev;
while (++inc != 0) { // Until overflow: 0xFF -> 0x00
array[i] = inc;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems to increment bytes up to FF, but then not write the overflow back to the array. I.e., if I understand correctly, the array may go through the following changes:

42 FB
42 FC
42 FD
42 FE
42 FF
43 FF
...

I think the last one should be 43 00

switch (validator.checkValidity(buffer)) {
case VALID:
return array;
case UNMAPPABLE:
continue; // Increment the i byte once more
case MALFORMED:
break; // Stop incrementing the i byte; go to the i-1
}
break; // MALFORMED
}
array[i] = prev;
}
return null; // All characters are the largest possible; unable to increment
}
};

static BinaryTruncator getTruncator(PrimitiveType type) {
if (type == null) {
return NO_OP_TRUNCATOR;
}
switch (type.getPrimitiveTypeName()) {
case INT96:
return NO_OP_TRUNCATOR;
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
OriginalType originalType = type.getOriginalType();
if (originalType == null) {
return DEFAULT_UTF8_TRUNCATOR;
}
switch (originalType) {
case UTF8:
case ENUM:
case JSON:
case BSON:
return DEFAULT_UTF8_TRUNCATOR;
default:
return NO_OP_TRUNCATOR;
}
default:
throw new IllegalArgumentException("No truncator is available for the type: " + type);
}
}

abstract Binary truncateMin(Binary minValue, int length);

abstract Binary truncateMax(Binary maxValue, int length);
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,22 @@ public static ColumnIndexBuilder getNoOpBuilder() {
/**
* @param type
* the type this builder is to be created for
* @param truncateLength
* the length to be used for truncating binary values if possible
* @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
*/
public static ColumnIndexBuilder getBuilder(PrimitiveType type) {
ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName());
public static ColumnIndexBuilder getBuilder(PrimitiveType type, int truncateLength) {
ColumnIndexBuilder builder = createNewBuilder(type, truncateLength);
builder.type = type;
return builder;
}

private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) {
switch (type) {
private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int truncateLength) {
switch (type.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return new BinaryColumnIndexBuilder();
return new BinaryColumnIndexBuilder(type, truncateLength);
case BOOLEAN:
return new BooleanColumnIndexBuilder();
case DOUBLE:
Expand Down Expand Up @@ -276,7 +278,7 @@ public static ColumnIndex build(
PrimitiveTypeName typeName = type.getPrimitiveTypeName();
ColumnIndexBuilder builder = BUILDERS.get(typeName);
if (builder == null) {
builder = createNewBuilder(typeName);
builder = createNewBuilder(type, Integer.MAX_VALUE);
BUILDERS.put(typeName, builder);
}

Expand Down
Loading