Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public OrcWriter(
compression,
maxCompressionBufferSize,
options.getMaxStringStatisticsLimit(),
getBloomFilterBuilder(options, columnNames.get(fieldId)));
getBloomFilterBuilder(options, columnNames.get(fieldId)),
options.isShouldCompactMinMax());
columnWriters.add(columnWriter);

if (columnWriter instanceof SliceDictionaryColumnWriter) {
Expand Down
31 changes: 28 additions & 3 deletions lib/trino-orc/src/main/java/io/trino/orc/OrcWriterOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public enum WriterIdentification
private final DataSize maxCompressionBufferSize;
private final Set<String> bloomFilterColumns;
private final double bloomFilterFpp;
private final boolean shouldCompactMinMax;

public OrcWriterOptions()
{
Expand All @@ -74,7 +75,8 @@ public OrcWriterOptions()
DEFAULT_MAX_STRING_STATISTICS_LIMIT,
DEFAULT_MAX_COMPRESSION_BUFFER_SIZE,
ImmutableSet.of(),
DEFAULT_BLOOM_FILTER_FPP);
DEFAULT_BLOOM_FILTER_FPP,
true);
}

private OrcWriterOptions(
Expand All @@ -87,7 +89,8 @@ private OrcWriterOptions(
DataSize maxStringStatisticsLimit,
DataSize maxCompressionBufferSize,
Set<String> bloomFilterColumns,
double bloomFilterFpp)
double bloomFilterFpp,
boolean shouldCompactMinMax)
{
requireNonNull(stripeMinSize, "stripeMinSize is null");
requireNonNull(stripeMaxSize, "stripeMaxSize is null");
Expand All @@ -109,6 +112,7 @@ private OrcWriterOptions(
this.maxCompressionBufferSize = maxCompressionBufferSize;
this.bloomFilterColumns = ImmutableSet.copyOf(bloomFilterColumns);
this.bloomFilterFpp = bloomFilterFpp;
this.shouldCompactMinMax = shouldCompactMinMax;
}

public WriterIdentification getWriterIdentification()
Expand Down Expand Up @@ -231,6 +235,18 @@ public OrcWriterOptions withBloomFilterFpp(double bloomFilterFpp)
.build();
}

public boolean isShouldCompactMinMax()
{
return shouldCompactMinMax;
}

public OrcWriterOptions withShouldCompactMinMax(boolean shouldCompactMinMax)
{
return builderFrom(this)
.setShouldCompactMinMax(shouldCompactMinMax)
.build();
}

@Override
public String toString()
{
Expand Down Expand Up @@ -269,6 +285,7 @@ public static final class Builder
private DataSize maxCompressionBufferSize;
private Set<String> bloomFilterColumns;
private double bloomFilterFpp;
private boolean shouldCompactMinMax;

private Builder(OrcWriterOptions options)
{
Expand All @@ -284,6 +301,7 @@ private Builder(OrcWriterOptions options)
this.maxCompressionBufferSize = options.maxCompressionBufferSize;
this.bloomFilterColumns = ImmutableSet.copyOf(options.bloomFilterColumns);
this.bloomFilterFpp = options.bloomFilterFpp;
this.shouldCompactMinMax = options.shouldCompactMinMax;
}

public Builder setWriterIdentification(WriterIdentification writerIdentification)
Expand Down Expand Up @@ -346,6 +364,12 @@ public Builder setBloomFilterFpp(double bloomFilterFpp)
return this;
}

public Builder setShouldCompactMinMax(boolean shouldCompactMinMax)
{
this.shouldCompactMinMax = shouldCompactMinMax;
return this;
}

public OrcWriterOptions build()
{
return new OrcWriterOptions(
Expand All @@ -358,7 +382,8 @@ public OrcWriterOptions build()
maxStringStatisticsLimit,
maxCompressionBufferSize,
bloomFilterColumns,
bloomFilterFpp);
bloomFilterFpp,
shouldCompactMinMax);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.SliceUtf8.codePointToUtf8;
import static io.airlift.slice.SliceUtf8.getCodePointAt;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static io.trino.orc.metadata.statistics.StringStatistics.STRING_VALUE_BYTES_OVERHEAD;
import static java.lang.Character.MAX_CODE_POINT;
import static java.lang.Math.addExact;
import static java.lang.System.arraycopy;
import static java.util.Objects.requireNonNull;

public class StringStatisticsBuilder
Expand All @@ -36,20 +41,34 @@ public class StringStatisticsBuilder
private Slice maximum;
private long sum;
private final BloomFilterBuilder bloomFilterBuilder;
private final boolean shouldCompactMinMax;

public StringStatisticsBuilder(int stringStatisticsLimitInBytes, BloomFilterBuilder bloomFilterBuilder)
{
this(stringStatisticsLimitInBytes, 0, null, null, 0, bloomFilterBuilder);
this(stringStatisticsLimitInBytes, 0, null, null, 0, bloomFilterBuilder, true);
}

private StringStatisticsBuilder(int stringStatisticsLimitInBytes, long nonNullValueCount, Slice minimum, Slice maximum, long sum, BloomFilterBuilder bloomFilterBuilder)
public StringStatisticsBuilder(int stringStatisticsLimitInBytes, BloomFilterBuilder bloomFilterBuilder, boolean shouldCompactMinMax)
{
this(stringStatisticsLimitInBytes, 0, null, null, 0, bloomFilterBuilder, shouldCompactMinMax);
}

private StringStatisticsBuilder(
int stringStatisticsLimitInBytes,
long nonNullValueCount,
Slice minimum,
Slice maximum,
long sum,
BloomFilterBuilder bloomFilterBuilder,
boolean shouldCompactMinMax)
{
this.stringStatisticsLimitInBytes = stringStatisticsLimitInBytes;
this.nonNullValueCount = nonNullValueCount;
this.minimum = minimum;
this.maximum = maximum;
this.sum = sum;
this.bloomFilterBuilder = requireNonNull(bloomFilterBuilder, "bloomFilterBuilder");
this.shouldCompactMinMax = shouldCompactMinMax;
}

public long getNonNullValueCount()
Expand Down Expand Up @@ -112,8 +131,8 @@ private Optional<StringStatistics> buildStringStatistics()
if (nonNullValueCount == 0) {
return Optional.empty();
}
minimum = dropStringMinMaxIfNecessary(minimum);
maximum = dropStringMinMaxIfNecessary(maximum);
minimum = computeStringMinMax(minimum, true);
maximum = computeStringMinMax(maximum, false);
if (minimum == null && maximum == null) {
// Create string stats only when min or max is not null.
// This corresponds to the behavior of metadata reader.
Expand Down Expand Up @@ -158,16 +177,87 @@ public static Optional<StringStatistics> mergeStringStatistics(List<ColumnStatis
return stringStatisticsBuilder.buildStringStatistics();
}

private Slice dropStringMinMaxIfNecessary(Slice minOrMax)
private Slice computeStringMinMax(Slice minOrMax, boolean isMin)
{
if (minOrMax == null || minOrMax.length() > stringStatisticsLimitInBytes) {
if (minOrMax == null || (!shouldCompactMinMax && minOrMax.length() > stringStatisticsLimitInBytes)) {
return null;
}
if (minOrMax.length() > stringStatisticsLimitInBytes) {
if (isMin) {
return StringCompactor.truncateMin(minOrMax, stringStatisticsLimitInBytes);
}
else {
return StringCompactor.truncateMax(minOrMax, stringStatisticsLimitInBytes);
}
}

// Do not hold the entire slice where the actual stats could be small
if (minOrMax.isCompact()) {
return minOrMax;
}
return Slices.copyOf(minOrMax);
}

static final class StringCompactor
{
private static final int INDEX_NOT_FOUND = -1;

private StringCompactor() {}

public static Slice truncateMin(Slice slice, int maxBytes)
{
checkArgument(slice.length() > maxBytes);
int lastIndex = findLastCharacterInRange(slice, maxBytes);
if (lastIndex == INDEX_NOT_FOUND) {
return EMPTY_SLICE;
}
return slice.slice(0, lastIndex);
}

public static Slice truncateMax(Slice slice, int maxBytes)
{
int firstRemovedCharacterIndex = findLastCharacterInRange(slice, maxBytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will throw (IllegalArgumentException("Provided byte array is not a valid utf8 string")) when input is eg 4-byte utf8 sequence, and maxBytes=3 (please add a test)

let's

  • change so that findLastCharacterInRange returns -1 (or perhaps Optional.empty) in such case
    • return early when we get -1 here
  • remove if (maxBytes == 0) above, as it becomes obsolete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok but this will look very ugly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

propose something else?

int lastRetainedCharacterIndex = findLastCharacterInRange(slice, firstRemovedCharacterIndex - 1);
if (firstRemovedCharacterIndex == INDEX_NOT_FOUND || lastRetainedCharacterIndex == INDEX_NOT_FOUND) {
return EMPTY_SLICE;
}
int lastRetainedCharacter = getCodePointAt(slice, lastRetainedCharacterIndex);
while (lastRetainedCharacter == MAX_CODE_POINT && lastRetainedCharacterIndex > 0) {
lastRetainedCharacterIndex = findLastCharacterInRange(slice, lastRetainedCharacterIndex - 1);
if (lastRetainedCharacterIndex == INDEX_NOT_FOUND) {
return EMPTY_SLICE;
}
lastRetainedCharacter = getCodePointAt(slice, lastRetainedCharacterIndex);
}

if (lastRetainedCharacterIndex == 0 && lastRetainedCharacter == MAX_CODE_POINT) {
// whole string is made of MAX_CODE_POINT characters, we cannot provide upper bound that is shorter than maxBytes
return EMPTY_SLICE;
}

lastRetainedCharacter++;
Slice sliceToAppend = codePointToUtf8(lastRetainedCharacter);
byte[] result = new byte[lastRetainedCharacterIndex + sliceToAppend.length()];
arraycopy(slice.byteArray(), slice.byteArrayOffset(), result, 0, lastRetainedCharacterIndex);
arraycopy(sliceToAppend.byteArray(), 0, result, lastRetainedCharacterIndex, sliceToAppend.length());
return Slices.wrappedBuffer(result);
}

private static int findLastCharacterInRange(Slice slice, int toInclusive)
{
int pos = toInclusive;
while (pos >= 0) {
if (isUtfBlockStartChar(slice.getByte(pos))) {
return pos;
}
pos--;
}
return INDEX_NOT_FOUND;
}

private static boolean isUtfBlockStartChar(byte b)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

! io.airlift.slice.SliceUtf8#isContinuationByte instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a private method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. let's copy it and mark with a comment it's copied from there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it cleaner to have a method that checks the condition we want instead of having one which result has to be inverted/negated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this one was copied for orc-core

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wanted isContinuationByte because it's a somewhat familar thing for our codebase (even if it's a private method in arilift)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will do

{
return (b & 0xC0) != 0x80;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public static ColumnWriter createColumnWriter(
CompressionKind compression,
int bufferSize,
DataSize stringStatisticsLimit,
Supplier<BloomFilterBuilder> bloomFilterBuilder)
Supplier<BloomFilterBuilder> bloomFilterBuilder,
boolean shouldCompactMinMax)
{
requireNonNull(type, "type is null");
OrcType orcType = orcTypes.get(columnId);
Expand Down Expand Up @@ -92,12 +93,12 @@ public static ColumnWriter createColumnWriter(
case CHAR:
case VARCHAR:
case STRING:
return new SliceDictionaryColumnWriter(columnId, type, compression, bufferSize, () -> new StringStatisticsBuilder(toIntExact(stringStatisticsLimit.toBytes()), bloomFilterBuilder.get()));
return new SliceDictionaryColumnWriter(columnId, type, compression, bufferSize, () -> new StringStatisticsBuilder(toIntExact(stringStatisticsLimit.toBytes()), bloomFilterBuilder.get(), shouldCompactMinMax));

case LIST: {
OrcColumnId fieldColumnIndex = orcType.getFieldTypeIndex(0);
Type fieldType = type.getTypeParameters().get(0);
ColumnWriter elementWriter = createColumnWriter(fieldColumnIndex, orcTypes, fieldType, compression, bufferSize, stringStatisticsLimit, bloomFilterBuilder);
ColumnWriter elementWriter = createColumnWriter(fieldColumnIndex, orcTypes, fieldType, compression, bufferSize, stringStatisticsLimit, bloomFilterBuilder, shouldCompactMinMax);
return new ListColumnWriter(columnId, compression, bufferSize, elementWriter);
}

Expand All @@ -109,15 +110,17 @@ public static ColumnWriter createColumnWriter(
compression,
bufferSize,
stringStatisticsLimit,
bloomFilterBuilder);
bloomFilterBuilder,
shouldCompactMinMax);
ColumnWriter valueWriter = createColumnWriter(
orcType.getFieldTypeIndex(1),
orcTypes,
type.getTypeParameters().get(1),
compression,
bufferSize,
stringStatisticsLimit,
bloomFilterBuilder);
bloomFilterBuilder,
shouldCompactMinMax);
return new MapColumnWriter(columnId, compression, bufferSize, keyWriter, valueWriter);
}

Expand All @@ -126,7 +129,7 @@ public static ColumnWriter createColumnWriter(
for (int fieldId = 0; fieldId < orcType.getFieldCount(); fieldId++) {
OrcColumnId fieldColumnIndex = orcType.getFieldTypeIndex(fieldId);
Type fieldType = type.getTypeParameters().get(fieldId);
fieldWriters.add(createColumnWriter(fieldColumnIndex, orcTypes, fieldType, compression, bufferSize, stringStatisticsLimit, bloomFilterBuilder));
fieldWriters.add(createColumnWriter(fieldColumnIndex, orcTypes, fieldType, compression, bufferSize, stringStatisticsLimit, bloomFilterBuilder, shouldCompactMinMax));
}
return new StructColumnWriter(columnId, compression, bufferSize, fieldWriters.build());
}
Expand Down
Loading