Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.parquet.writer;

import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.io.api.Binary;

import static com.google.common.base.Verify.verify;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;

public final class ParquetMetadataUtils
{
private ParquetMetadataUtils() {}

public static <T extends Comparable<T>> Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics<T> stats, int truncateLength)
{
// TODO Utilize https://github.com/apache/parquet-format/pull/216 when available to populate is_max_value_exact/is_min_value_exact
if (isTruncationPossible(stats, truncateLength)) {
// parquet-mr drops statistics larger than MAX_STATS_SIZE rather than truncating them.
// In order to ensure truncation rather than no stats, we need to use a truncateLength which would never exceed ParquetMetadataConverter.MAX_STATS_SIZE
verify(
2L * truncateLength < MAX_STATS_SIZE,
"Twice of truncateLength %s must be less than MAX_STATS_SIZE %s",
truncateLength,
MAX_STATS_SIZE);
// We need to take a lock here because CharsetValidator inside BinaryTruncator modifies a reusable dummyBuffer in-place
// and DEFAULT_UTF8_TRUNCATOR is a static instance, which makes this method thread unsafe.
// isTruncationPossible should ensure that locking is used only when we expect truncation, which is an uncommon scenario.
// TODO remove synchronization when we use a release with the fix https://github.com/apache/parquet-mr/pull/1154
synchronized (ParquetMetadataUtils.class) {
return ParquetMetadataConverter.toParquetStatistics(stats, truncateLength);
}
}
return ParquetMetadataConverter.toParquetStatistics(stats);
}

private static <T extends Comparable<T>> boolean isTruncationPossible(org.apache.parquet.column.statistics.Statistics<T> stats, int truncateLength)
{
PrimitiveTypeName primitiveType = stats.type().getPrimitiveTypeName();
if (!primitiveType.equals(BINARY) && !primitiveType.equals(FIXED_LEN_BYTE_ARRAY)) {
return false;
}
if (stats.isEmpty() || !stats.hasNonNullValue() || !(stats instanceof BinaryStatistics binaryStatistics)) {
return false;
}
// non-null value exists, so min and max can't be null
Binary min = binaryStatistics.genericGetMin();
Binary max = binaryStatistics.genericGetMax();
return min.length() > truncateLength || max.length() > truncateLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class PrimitiveColumnWriter
private static final int INSTANCE_SIZE = instanceSize(PrimitiveColumnWriter.class);
private static final int MINIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 8 * 1024;
private static final int MAXIMUM_OUTPUT_BUFFER_CHUNK_SIZE = 2 * 1024 * 1024;
// ParquetMetadataConverter.MAX_STATS_SIZE is 4096, we need a value which would guarantee that min and max
// don't add up to 4096 (so less than 2048). Using 1K as that is big enough for most use cases.
private static final int MAX_STATISTICS_LENGTH_IN_BYTES = 1024;

private final ColumnDescriptor columnDescriptor;
private final CompressionCodec compressionCodec;
Expand Down Expand Up @@ -180,7 +183,7 @@ private ColumnMetaData getColumnMetaData()
totalUnCompressedSize,
totalCompressedSize,
-1);
columnMetaData.setStatistics(ParquetMetadataConverter.toParquetStatistics(columnStatistics));
columnMetaData.setStatistics(ParquetMetadataUtils.toParquetStatistics(columnStatistics, MAX_STATISTICS_LENGTH_IN_BYTES));
ImmutableList.Builder<PageEncodingStats> pageEncodingStats = ImmutableList.builder();
dataPagesWithEncoding.entrySet().stream()
.map(encodingAndCount -> new PageEncodingStats(PageType.DATA_PAGE, encodingAndCount.getKey(), encodingAndCount.getValue()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableListMultimap;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.parquet.DataPage;
import io.trino.parquet.DiskRange;
Expand All @@ -26,6 +27,8 @@
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.PageReader;
import io.trino.parquet.reader.TestingParquetDataSource;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;
import org.apache.parquet.VersionParser;
Expand All @@ -49,13 +52,15 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.operator.scalar.CharacterStringCasts.varcharToVarcharSaturatedFloorCast;
import static io.trino.parquet.ParquetCompressionUtils.decompress;
import static io.trino.parquet.ParquetTestUtils.createParquetWriter;
import static io.trino.parquet.ParquetTestUtils.generateInputPages;
import static io.trino.parquet.ParquetTestUtils.writeParquetFile;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TinyintType.TINYINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.toIntExact;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
Expand Down Expand Up @@ -126,6 +131,52 @@ public void testWrittenPageSize()
assertThat(pagesRead).isGreaterThan(10);
}

@Test
public void testLargeStringTruncation()
throws IOException
{
List<String> columnNames = ImmutableList.of("columnA", "columnB");
List<Type> types = ImmutableList.of(VARCHAR, VARCHAR);

Slice minA = Slices.utf8Slice("abc".repeat(300)); // within truncation threshold
Block blockA = VARCHAR.createBlockBuilder(null, 2)
.writeEntry(minA)
.writeEntry(Slices.utf8Slice("y".repeat(3200))) // bigger than truncation threshold
.build();

String threeByteCodePoint = new String(Character.toChars(0x20AC));
String maxCodePoint = new String(Character.toChars(Character.MAX_CODE_POINT));
Slice minB = Slices.utf8Slice(threeByteCodePoint.repeat(300)); // truncation in middle of unicode bytes
Block blockB = VARCHAR.createBlockBuilder(null, 2)
.writeEntry(minB)
// start with maxCodePoint to make it max value in stats
// last character for truncation is maxCodePoint
.writeEntry(Slices.utf8Slice(maxCodePoint + "d".repeat(1017) + maxCodePoint))
.build();

ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder().build(),
types,
columnNames,
ImmutableList.of(new Page(2, blockA, blockB))),
new ParquetReaderOptions());

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
BlockMetaData blockMetaData = getOnlyElement(parquetMetadata.getBlocks());

ColumnChunkMetaData chunkMetaData = blockMetaData.getColumns().get(0);
assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(minA.getBytes());
Slice truncatedMax = Slices.utf8Slice("y".repeat(1023) + "z");
assertThat(chunkMetaData.getStatistics().getMaxBytes()).isEqualTo(truncatedMax.getBytes());

chunkMetaData = blockMetaData.getColumns().get(1);
Slice truncatedMin = varcharToVarcharSaturatedFloorCast(1024, minB);
assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(truncatedMin.getBytes());
truncatedMax = Slices.utf8Slice(maxCodePoint + "d".repeat(1016) + "e");
assertThat(chunkMetaData.getStatistics().getMaxBytes()).isEqualTo(truncatedMax.getBytes());
}

@Test
public void testColumnReordering()
throws IOException
Expand Down