Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.Comparator;
import org.apache.iceberg.util.UnicodeUtil;

public class Comparators {

Expand Down Expand Up @@ -182,13 +183,31 @@ private static class CharSeqComparator implements Comparator<CharSequence> {
private CharSeqComparator() {
}

/**
* Java character supports only upto 3 byte UTF-8 characters. 4 byte UTF-8 character is represented using two Java
* characters (using UTF-16 surrogate pairs). Character by character comparison may yield incorrect results
* while comparing a 4 byte UTF-8 character to a java char. Character by character comparison works as expected
* if both characters are <= 3 byte UTF-8 character or both characters are 4 byte UTF-8 characters.
* isCharInUTF16HighSurrogateRange method detects a 4-byte character and considers that character to be
* lexicographically greater than any 3 byte or lower UTF-8 character.
*/
@Override
public int compare(CharSequence s1, CharSequence s2) {
int len = Math.min(s1.length(), s2.length());

// find the first difference and return
for (int i = 0; i < len; i += 1) {
int cmp = Character.compare(s1.charAt(i), s2.charAt(i));
char c1 = s1.charAt(i);
char c2 = s2.charAt(i);
boolean isC1HighSurrogate = UnicodeUtil.isCharHighSurrogate(c1);
boolean isC2HighSurrogate = UnicodeUtil.isCharHighSurrogate(c2);
if (isC1HighSurrogate && !isC2HighSurrogate) {
return 1;
}
if (!isC1HighSurrogate && isC2HighSurrogate) {
return -1;
}
int cmp = Character.compare(c1, c2);
if (cmp != 0) {
return cmp;
}
Expand Down
79 changes: 79 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/BinaryUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.iceberg.expressions.Literal;

public class BinaryUtil {
// not meant to be instantiated
private BinaryUtil() {
}

/**
* Truncates the input byte buffer to the given length
*/
public static ByteBuffer truncateBinary(ByteBuffer input, int length) {
Preconditions.checkArgument(length > 0 && length < input.remaining(),
"Truncate length should be positive and lower than the number of remaining elements");
byte[] array = new byte[length];
input.duplicate().get(array);
return ByteBuffer.wrap(array);
}

/**
* Returns a byte buffer whose length is lesser than or equal to truncateLength and is lower than the given input
*/
public static Literal<ByteBuffer> truncateBinaryMin(Literal<ByteBuffer> input, int length) {
ByteBuffer inputBuffer = input.value();
if (length >= inputBuffer.remaining()) {
return input;
}
return Literal.of(truncateBinary(inputBuffer, length));
}

/**
* Returns a byte buffer whose length is lesser than or equal to truncateLength and is greater than the given input
*/
public static Literal<ByteBuffer> truncateBinaryMax(Literal<ByteBuffer> input, int length) {
ByteBuffer inputBuffer = input.value();
if (length >= inputBuffer.remaining()) {
return input;
}

// Truncate the input to the specified truncate length.
ByteBuffer truncatedInput = truncateBinary(inputBuffer, length);

// Try incrementing the bytes from the end. If all bytes overflow after incrementing, then return null
for (int i = length - 1; i >= 0; --i) {
byte element = truncatedInput.get(i);
element = (byte) (element + 1);
if (element != 0) { // No overflow
truncatedInput.put(i, element);
// Return a byte buffer whose position is zero and limit is i + 1
truncatedInput.position(0);
truncatedInput.limit(i + 1);
return Literal.of(truncatedInput);
}
}
return null; // Cannot find a valid upper bound
}
}
95 changes: 95 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.util;

import com.google.common.base.Preconditions;
import org.apache.iceberg.expressions.Literal;

public class UnicodeUtil {
// not meant to be instantiated
private UnicodeUtil() {
}

/**
* Determines if the given character value is a unicode high-surrogate code unit.
* The range of high-surrogates is 0xD800 - 0xDBFF.
*/
public static boolean isCharHighSurrogate(char ch) {
return (ch & '\uFC00') == '\uD800'; // 0xDC00 - 0xDFFF shouldn't match
}

/**
* Truncates the input charSequence such that the truncated charSequence is a valid unicode string
* and the number of unicode characters in the truncated charSequence is lesser than or equal to length
*/
public static CharSequence truncateString(CharSequence input, int length) {
Preconditions.checkArgument(length > 0, "Truncate length should be positive");
StringBuffer sb = new StringBuffer(input);
// Get the number of unicode characters in the input
int numUniCodeCharacters = sb.codePointCount(0, sb.length());
// No need to truncate if the number of unicode characters in the char sequence is <= truncate length
if (length >= numUniCodeCharacters) {
return input;
}
// Get the offset in the input charSequence where the number of unicode characters = truncate length
int offsetByCodePoint = sb.offsetByCodePoints(0, length);
return input.subSequence(0, offsetByCodePoint);
}

/**
* Returns a valid unicode charsequence that is lower than the given input such that the
* number of unicode characters in the truncated charSequence is lesser than or equal to length
*/
public static Literal<CharSequence> truncateStringMin(Literal<CharSequence> input, int length) {
// Truncate the input to the specified truncate length.
CharSequence truncatedInput = truncateString(input.value(), length);
return Literal.of(truncatedInput);
}

/**
* Returns a valid unicode charsequence that is greater than the given input such that the
* number of unicode characters in the truncated charSequence is lesser than or equal to length
*/
public static Literal<CharSequence> truncateStringMax(Literal<CharSequence> input, int length) {
CharSequence inputCharSeq = input.value();
// Truncate the input to the specified truncate length.
StringBuffer truncatedStringBuffer = new StringBuffer(truncateString(inputCharSeq, length));

// No need to increment if the input length is under the truncate length
if (inputCharSeq.length() == truncatedStringBuffer.length()) {
return input;
}

// Try incrementing the code points from the end
for (int i = length - 1; i >= 0; i--) {
int nextCodePoint = truncatedStringBuffer.codePointAt(i) + 1;
// No overflow
if (nextCodePoint != 0 && Character.isValidCodePoint(nextCodePoint)) {
// Get the offset in the truncated string buffer where the number of unicode characters = i
int offsetByCodePoint = truncatedStringBuffer.offsetByCodePoints(0, i);
truncatedStringBuffer.setLength(offsetByCodePoint);
// Append next code point to the truncated substring
truncatedStringBuffer.appendCodePoint(nextCodePoint);
return Literal.of(truncatedStringBuffer.toString());
}
}
return null; // Cannot find a valid upper bound
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ private TableProperties() {}

public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = true;

public static final String WRITE_METADATA_TRUNCATE_BYTES = "write.metadata.truncate-length";
public static final int WRITE_METADATA_TRUNCATE_BYTES_DEFAULT = 16;
}
11 changes: 9 additions & 2 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_TRUNCATE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT;


public class Parquet {
private Parquet() {
Expand Down Expand Up @@ -165,6 +168,9 @@ public <D> FileAppender<D> build() throws IOException {
PARQUET_PAGE_SIZE_BYTES, PARQUET_PAGE_SIZE_BYTES_DEFAULT));
int dictionaryPageSize = Integer.parseInt(config.getOrDefault(
PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT));
int statsTruncateLength = Integer.parseInt(config.getOrDefault(
WRITE_METADATA_TRUNCATE_BYTES, String.valueOf(WRITE_METADATA_TRUNCATE_BYTES_DEFAULT)));


WriterVersion writerVersion = WriterVersion.PARQUET_1_0;

Expand Down Expand Up @@ -192,7 +198,8 @@ public <D> FileAppender<D> build() throws IOException {
.build();

return new org.apache.iceberg.parquet.ParquetWriter<>(
conf, file, schema, rowGroupSize, metadata, createWriterFunc, codec(), parquetProperties);
conf, file, schema, rowGroupSize, statsTruncateLength, metadata,
createWriterFunc, codec(), parquetProperties);
} else {
return new ParquetWriteAdapter<>(new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
Expand All @@ -205,7 +212,7 @@ public <D> FileAppender<D> build() throws IOException {
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.build());
.build(), statsTruncateLength);
}
}
}
Expand Down
59 changes: 47 additions & 12 deletions parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Set;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.io.InputFile;
Expand All @@ -47,21 +48,30 @@
import org.apache.parquet.schema.MessageType;

import static org.apache.iceberg.parquet.ParquetConversions.fromParquetPrimitive;
import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMax;
import static org.apache.iceberg.util.BinaryUtil.truncateBinaryMin;
import static org.apache.iceberg.util.UnicodeUtil.truncateStringMax;
import static org.apache.iceberg.util.UnicodeUtil.truncateStringMin;

public class ParquetUtil {
// not meant to be instantiated
private ParquetUtil() {
}

public static Metrics fileMetrics(InputFile file) {
// Access modifier is package-private, to only allow use from existing tests
static Metrics fileMetrics(InputFile file) {
return fileMetrics(file, TableProperties.WRITE_METADATA_TRUNCATE_BYTES_DEFAULT);
}

public static Metrics fileMetrics(InputFile file, int statsTruncateLength) {
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file))) {
return footerMetrics(reader.getFooter());
return footerMetrics(reader.getFooter(), statsTruncateLength);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read footer of file: %s", file);
}
}

public static Metrics footerMetrics(ParquetMetadata metadata) {
public static Metrics footerMetrics(ParquetMetadata metadata, int statsTruncateLength) {
long rowCount = 0;
Map<Integer, Long> columnSizes = Maps.newHashMap();
Map<Integer, Long> valueCounts = Maps.newHashMap();
Expand Down Expand Up @@ -89,11 +99,14 @@ public static Metrics footerMetrics(ParquetMetadata metadata) {
increment(nullValueCounts, fieldId, stats.getNumNulls());

Types.NestedField field = fileSchema.findField(fieldId);
if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)) {
updateMin(lowerBounds, fieldId,
fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMin()));
updateMax(upperBounds, fieldId,
fromParquetPrimitive(field.type(), column.getPrimitiveType(), stats.genericGetMax()));
if (field != null && stats.hasNonNullValue() && shouldStoreBounds(path, fileSchema)
&& statsTruncateLength > 0) {
updateMin(lowerBounds, fieldId, field.type(),
fromParquetPrimitive(field.type(), column.getPrimitiveType(),
stats.genericGetMin()), statsTruncateLength);
updateMax(upperBounds, fieldId, field.type(),
fromParquetPrimitive(field.type(), column.getPrimitiveType(),
stats.genericGetMax()), statsTruncateLength);
}
}
}
Expand Down Expand Up @@ -151,18 +164,40 @@ private static void increment(Map<Integer, Long> columns, int fieldId, long amou
}

@SuppressWarnings("unchecked")
private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Literal<T> min) {
private static <T> void updateMin(Map<Integer, Literal<?>> lowerBounds, int id, Type type,
Literal<T> min, int truncateLength) {
Literal<T> currentMin = (Literal<T>) lowerBounds.get(id);
if (currentMin == null || min.comparator().compare(min.value(), currentMin.value()) < 0) {
lowerBounds.put(id, min);
switch (type.typeId()) {
case STRING:
lowerBounds.put(id, truncateStringMin((Literal<CharSequence>) min, truncateLength));
break;
case FIXED:
case BINARY:
lowerBounds.put(id, truncateBinaryMin((Literal<ByteBuffer>) min, truncateLength));
break;
default:
lowerBounds.put(id, min);
}
}
}

@SuppressWarnings("unchecked")
private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Literal<T> max) {
private static <T> void updateMax(Map<Integer, Literal<?>> upperBounds, int id, Type type,
Literal<T> max, int truncateLength) {
Literal<T> currentMax = (Literal<T>) upperBounds.get(id);
if (currentMax == null || max.comparator().compare(max.value(), currentMax.value()) > 0) {
upperBounds.put(id, max);
switch (type.typeId()) {
case STRING:
upperBounds.put(id, truncateStringMax((Literal<CharSequence>) max, truncateLength));
break;
case FIXED:
case BINARY:
upperBounds.put(id, truncateBinaryMax((Literal<ByteBuffer>) max, truncateLength));
break;
default:
upperBounds.put(id, max);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
public class ParquetWriteAdapter<D> implements FileAppender<D> {
private ParquetWriter<D> writer = null;
private ParquetMetadata footer = null;
private int statsTruncateLength;

public ParquetWriteAdapter(ParquetWriter<D> writer) {
public ParquetWriteAdapter(ParquetWriter<D> writer, int statsTruncateLength) {
this.writer = writer;
this.statsTruncateLength = statsTruncateLength;
}

@Override
Expand All @@ -48,7 +50,7 @@ public void add(D datum) {
@Override
public Metrics metrics() {
Preconditions.checkState(footer != null, "Cannot produce metrics until closed");
return ParquetUtil.footerMetrics(footer);
return ParquetUtil.footerMetrics(footer, statsTruncateLength);
}

@Override
Expand Down
Loading