diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java index 9363add2ca6c..1ac70600bd51 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java @@ -13,32 +13,303 @@ */ package io.trino.parquet.metadata; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.log.Logger; +import io.trino.parquet.DiskRange; +import io.trino.parquet.ParquetCorruptionException; +import io.trino.parquet.ParquetDataSource; +import io.trino.parquet.ParquetDataSourceId; +import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; +import io.trino.parquet.reader.TrinoColumnIndexStore; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.format.ColumnChunk; +import org.apache.parquet.format.ColumnMetaData; +import org.apache.parquet.format.FileMetaData; +import org.apache.parquet.format.KeyValue; +import org.apache.parquet.format.RowGroup; +import org.apache.parquet.format.SchemaElement; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; +import static io.trino.parquet.ParquetMetadataConverter.getEncoding; +import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation; +import static io.trino.parquet.ParquetMetadataConverter.getPrimitive; +import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference; +import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference; +import static io.trino.parquet.ParquetValidationUtils.validateParquet; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toMap; public class ParquetMetadata { - private final FileMetadata fileMetaData; - private final List blocks; + private static final Logger log = Logger.get(ParquetMetadata.class); - public ParquetMetadata(FileMetadata fileMetaData, List blocks) - { - this.fileMetaData = fileMetaData; - this.blocks = blocks; - } + private final FileMetaData fileMetaData; + private final MessageType messageType; + private final ParquetDataSourceId dataSourceId; + private final FileMetadata parquetMetadata; + private final Optional diskRange; - public List getBlocks() + public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId, Optional diskRange) + throws ParquetCorruptionException { - return blocks; + this.fileMetaData = requireNonNull(fileMetaData, "fileMetaData is null"); + this.messageType = readMessageType(); + this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null"); + this.diskRange = requireNonNull(diskRange, "range is null"); + this.parquetMetadata = new FileMetadata(messageType, keyValueMetaData(fileMetaData), fileMetaData.getCreated_by()); } public FileMetadata getFileMetaData() { - return fileMetaData; + return parquetMetadata; } @Override public String toString() { - return "ParquetMetaData{" + fileMetaData + ", blocks: " + blocks + "}"; + return toStringHelper(this) + .add("dataSourceId", dataSourceId) + .add("fileMetaData", fileMetaData) + .add("diskRange", diskRange) + .toString(); + } + + private List getRowGroups() + { + List rowGroups = fileMetaData.getRow_groups(); + if (rowGroups == null) { + return ImmutableList.of(); + } + ImmutableList.Builder builder = ImmutableList.builder(); + long lastRowCount = 0; + long fileRowCount = 0; + for (RowGroup rowGroup : rowGroups) { + fileRowCount += lastRowCount; + lastRowCount = rowGroup.getNum_rows(); + if (diskRange.isPresent()) { + /* + * Only return row group when offset range covering blockStart. + * If blockStart is not covered, row group is not returned even if offset range overlaps. + * This can avoid row group duplication in case of multiple splits. + */ + long blockStart = rowGroup.getColumns().getFirst().meta_data.data_page_offset; + boolean splitContainsBlock = diskRange.get().getOffset() <= blockStart && blockStart < diskRange.get().getEnd(); + if (!splitContainsBlock) { + continue; + } + } + + builder.add(new RowGroupOffset(rowGroup, fileRowCount)); + } + + return builder.build(); + } + + private ColumnChunkMetadata toColumnChunkMetadata(MessageType messageType, ColumnChunk columnChunk, ColumnPath columnPath) + { + ColumnMetaData metaData = columnChunk.meta_data; + PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); + ColumnChunkMetadata column = ColumnChunkMetadata.get( + columnPath, + primitiveType, + CompressionCodecName.fromParquet(metaData.codec), + convertEncodingStats(metaData.encoding_stats), + readEncodings(metaData.encodings), + MetadataReader.readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), + metaData.data_page_offset, + metaData.dictionary_page_offset, + metaData.num_values, + metaData.total_compressed_size, + metaData.total_uncompressed_size); + column.setColumnIndexReference(toColumnIndexReference(columnChunk)); + column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); + column.setBloomFilterOffset(metaData.bloom_filter_offset); + + return column; + } + + public List getRowGroupInfo() + throws ParquetCorruptionException + { + return getRowGroupInfo(Optional.empty(), Optional.empty()); + } + + public List getRowGroupInfo(Optional dataSource, Optional, ColumnDescriptor>> descriptorsByPath) + throws ParquetCorruptionException + { + Optional> filterColumnPaths = descriptorsByPath.map(dp -> + dp.keySet().stream() + .map(p -> p.toArray(new String[0])) + .map(ColumnPath::get) + .collect(toImmutableSet())); + ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); + for (RowGroupOffset rowGroupOffset : getRowGroups()) { + List columns = rowGroupOffset.rowGroup.getColumns(); + validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroupOffset.rowGroup); + String filePath = columns.getFirst().getFile_path(); + + ImmutableMap.Builder columnMetadataBuilder = ImmutableMap.builderWithExpectedSize(columns.size()); + + for (ColumnChunk columnChunk : columns) { + checkState((filePath == null && columnChunk.getFile_path() == null) + || (filePath != null && filePath.equals(columnChunk.getFile_path())), + "all column chunks of the same row group must be in the same file [%s]", dataSourceId); + ColumnPath columnPath = toColumnPath(columnChunk); + if (filterColumnPaths.isEmpty() || filterColumnPaths.get().contains(columnPath)) { + ColumnChunkMetadata chunkMetadata = toColumnChunkMetadata(messageType, columnChunk, columnPath); + columnMetadataBuilder.put(columnPath, chunkMetadata); + } + } + Map columnChunkMetadata = columnMetadataBuilder.buildOrThrow(); + + if (filterColumnPaths.isPresent() && filterColumnPaths.get().size() != columnChunkMetadata.size()) { + Set> existingPaths = columns.stream() + .map(ParquetMetadata::toColumnPath) + .map(p -> ImmutableList.copyOf(p.toArray())) + .collect(toImmutableSet()); + for (Map.Entry, ColumnDescriptor> entry : descriptorsByPath.get().entrySet()) { + if (!existingPaths.contains(entry.getKey())) { + throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", entry.getValue()); + } + } + } + + PrunedBlockMetadata columnsMetadata = new PrunedBlockMetadata(rowGroupOffset.rowGroup.getNum_rows(), dataSourceId, columnChunkMetadata); + Optional indexStore = Optional.empty(); + if (filterColumnPaths.isPresent() && dataSource.isPresent()) { + indexStore = Optional.of(new TrinoColumnIndexStore(dataSource.get(), columnsMetadata.getBlockMetadata(), filterColumnPaths.get(), ImmutableSet.of())); + } + rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, rowGroupOffset.offset, indexStore)); + } + + return rowGroupInfoBuilder.build(); + } + + private MessageType readMessageType() + throws ParquetCorruptionException + { + List schema = fileMetaData.getSchema(); + validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); + + Iterator schemaIterator = schema.iterator(); + SchemaElement rootSchema = schemaIterator.next(); + Types.MessageTypeBuilder builder = Types.buildMessage(); + readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); + return builder.named(rootSchema.name); + } + + private static ColumnPath toColumnPath(ColumnChunk columnChunk) + { + String[] paths = columnChunk.meta_data.path_in_schema.stream() + .map(value -> value.toLowerCase(Locale.ENGLISH)) + .toArray(String[]::new); + return ColumnPath.get(paths); + } + + private static void readTypeSchema(Types.GroupBuilder builder, Iterator schemaIterator, int typeCount) + { + for (int i = 0; i < typeCount; i++) { + SchemaElement element = schemaIterator.next(); + Types.Builder typeBuilder; + if (element.type == null) { + typeBuilder = builder.group(Type.Repetition.valueOf(element.repetition_type.name())); + readTypeSchema((Types.GroupBuilder) typeBuilder, schemaIterator, element.num_children); + } + else { + Types.PrimitiveBuilder primitiveBuilder = builder.primitive(getPrimitive(element.type), Type.Repetition.valueOf(element.repetition_type.name())); + if (element.isSetType_length()) { + primitiveBuilder.length(element.type_length); + } + if (element.isSetPrecision()) { + primitiveBuilder.precision(element.precision); + } + if (element.isSetScale()) { + primitiveBuilder.scale(element.scale); + } + typeBuilder = primitiveBuilder; + } + + // Reading of element.logicalType and element.converted_type corresponds to parquet-mr's code at + // https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1568-L1582 + LogicalTypeAnnotation annotationFromLogicalType = null; + if (element.isSetLogicalType()) { + annotationFromLogicalType = getLogicalTypeAnnotation(element.logicalType); + typeBuilder.as(annotationFromLogicalType); + } + if (element.isSetConverted_type()) { + LogicalTypeAnnotation annotationFromConvertedType = getLogicalTypeAnnotation(element.converted_type, element); + if (annotationFromLogicalType != null) { + // Both element.logicalType and element.converted_type set + if (annotationFromLogicalType.toOriginalType() == annotationFromConvertedType.toOriginalType()) { + // element.converted_type matches element.logicalType, even though annotationFromLogicalType may differ from annotationFromConvertedType + // Following parquet-mr behavior, we favor LogicalTypeAnnotation derived from element.logicalType, as potentially containing more information. + } + else { + // Following parquet-mr behavior, issue warning and let converted_type take precedence. + log.warn("Converted type and logical type metadata map to different OriginalType (convertedType: %s, logical type: %s). Using value in converted type.", + element.converted_type, element.logicalType); + // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation + // 1. for compatibility, as previous Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. + // 2. so that we override LogicalTypeAnnotation annotation read from element.logicalType in case of mismatch detected. + typeBuilder.as(annotationFromConvertedType); + } + } + else { + // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation for compatibility, as previous + // Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. + typeBuilder.as(annotationFromConvertedType); + } + } + + if (element.isSetField_id()) { + typeBuilder.id(element.field_id); + } + typeBuilder.named(element.name.toLowerCase(Locale.ENGLISH)); + } + } + + private static Set readEncodings(List encodings) + { + Set columnEncodings = new HashSet<>(); + for (org.apache.parquet.format.Encoding encoding : encodings) { + columnEncodings.add(getEncoding(encoding)); + } + return Collections.unmodifiableSet(columnEncodings); + } + + private static Map keyValueMetaData(FileMetaData fileMetaData) + { + if (fileMetaData.getKey_value_metadata() == null) { + return ImmutableMap.of(); + } + return fileMetaData.getKey_value_metadata().stream().collect(toMap(KeyValue::getKey, KeyValue::getValue)); + } + + private record RowGroupOffset(RowGroup rowGroup, long offset) + { } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java index 63004cc2ca9f..78c30443cb29 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/PrunedBlockMetadata.java @@ -14,57 +14,29 @@ package io.trino.parquet.metadata; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSourceId; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.ColumnPath; import java.util.List; import java.util.Map; -import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static java.util.Arrays.asList; -import static java.util.function.Function.identity; public final class PrunedBlockMetadata { - /** - * Stores only the necessary columns metadata from BlockMetadata and indexes them by path for efficient look-ups - */ - public static PrunedBlockMetadata createPrunedColumnsMetadata(BlockMetadata blockMetadata, ParquetDataSourceId dataSourceId, Map, ColumnDescriptor> descriptorsByPath) - throws ParquetCorruptionException - { - Set> requiredPaths = descriptorsByPath.keySet(); - Map, ColumnChunkMetadata> columnMetadataByPath = blockMetadata.columns().stream() - .collect(toImmutableMap( - column -> asList(column.getPath().toArray()), - identity(), - // Same column name may occur more than once when the file is written by case-sensitive tools - (oldValue, _) -> oldValue)); - ImmutableMap.Builder, ColumnChunkMetadata> columnMetadataByPathBuilder = ImmutableMap.builderWithExpectedSize(requiredPaths.size()); - for (Map.Entry, ColumnDescriptor> entry : descriptorsByPath.entrySet()) { - List requiredPath = entry.getKey(); - ColumnDescriptor columnDescriptor = entry.getValue(); - ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(requiredPath); - if (columnChunkMetadata == null) { - throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", columnDescriptor); - } - columnMetadataByPathBuilder.put(requiredPath, columnChunkMetadata); - } - return new PrunedBlockMetadata(blockMetadata.rowCount(), dataSourceId, columnMetadataByPathBuilder.buildOrThrow()); - } - private final long rowCount; private final ParquetDataSourceId dataSourceId; - private final Map, ColumnChunkMetadata> columnMetadataByPath; + private final Map columnMetadataByPath; + private final BlockMetadata blockMetadata; - private PrunedBlockMetadata(long rowCount, ParquetDataSourceId dataSourceId, Map, ColumnChunkMetadata> columnMetadataByPath) + public PrunedBlockMetadata(long rowCount, ParquetDataSourceId dataSourceId, Map columnMetadataByPath) { this.rowCount = rowCount; this.dataSourceId = dataSourceId; this.columnMetadataByPath = columnMetadataByPath; + this.blockMetadata = new BlockMetadata(rowCount, ImmutableList.copyOf(columnMetadataByPath.values())); } public long getRowCount() @@ -77,10 +49,15 @@ public List getColumns() return ImmutableList.copyOf(columnMetadataByPath.values()); } + public BlockMetadata getBlockMetadata() + { + return blockMetadata; + } + public ColumnChunkMetadata getColumnChunkMetaData(ColumnDescriptor columnDescriptor) throws ParquetCorruptionException { - ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(asList(columnDescriptor.getPath())); + ColumnChunkMetadata columnChunkMetadata = columnMetadataByPath.get(ColumnPath.get(columnDescriptor.getPath())); if (columnChunkMetadata == null) { throw new ParquetCorruptionException(dataSourceId, "Metadata is missing for column: %s", columnDescriptor); } @@ -93,6 +70,7 @@ public String toString() return toStringHelper(this) .add("rowCount", rowCount) .add("columnMetadataByPath", columnMetadataByPath) + .add("blockMetadata", blockMetadata) .toString(); } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 6901bb23a4e6..5ab083b39add 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -27,6 +27,7 @@ import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; +import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.reader.RowGroupInfo; import io.trino.spi.predicate.TupleDomain; @@ -39,6 +40,7 @@ import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; import org.apache.parquet.io.ParquetDecodingException; @@ -54,11 +56,11 @@ import java.util.Optional; import java.util.Set; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.parquet.BloomFilterStore.getBloomFilterStore; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetReaderUtils.isOnlyDictionaryEncodingPages; import static io.trino.parquet.ParquetTypeUtils.getParquetEncoding; -import static io.trino.parquet.metadata.PrunedBlockMetadata.createPrunedColumnsMetadata; import static io.trino.parquet.reader.TrinoColumnIndexStore.getColumnIndexStore; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; @@ -180,10 +182,8 @@ public static boolean predicateMatches( } public static List getFilteredRowGroups( - long splitStart, - long splitLength, + ParquetMetadata parquetMetadata, ParquetDataSource dataSource, - List blocksMetaData, List> parquetTupleDomains, List parquetPredicates, Map, ColumnDescriptor> descriptorsByPath, @@ -192,35 +192,37 @@ public static List getFilteredRowGroups( ParquetReaderOptions options) throws IOException { - long fileRowCount = 0; + Set columnPaths = descriptorsByPath.keySet().stream() + .map(p -> p.toArray(new String[0])) + .map(ColumnPath::get) + .collect(toImmutableSet()); + + List rowGroupInfos = parquetMetadata.getRowGroupInfo(Optional.of(dataSource), Optional.of(descriptorsByPath)); ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); - for (BlockMetadata block : blocksMetaData) { - long blockStart = block.getStartingPos(); - boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength; - if (splitContainsBlock) { - for (int i = 0; i < parquetTupleDomains.size(); i++) { - TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); - TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); - Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); - Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); - PrunedBlockMetadata columnsMetadata = createPrunedColumnsMetadata(block, dataSource.getId(), descriptorsByPath); - if (predicateMatches( - parquetPredicate, - columnsMetadata, - dataSource, - descriptorsByPath, - parquetTupleDomain, - columnIndex, - bloomFilterStore, - timeZone, - domainCompactionThreshold)) { - rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, fileRowCount, columnIndex)); - break; - } + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + BlockMetadata block = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); + + for (int i = 0; i < parquetTupleDomains.size(); i++) { + TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); + TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); + Optional columnIndex = getColumnIndexStore(dataSource, block, columnPaths, parquetTupleDomain, options); + Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); + if (predicateMatches( + parquetPredicate, + rowGroupInfo.prunedBlockMetadata(), + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + bloomFilterStore, + timeZone, + domainCompactionThreshold)) { + rowGroupInfoBuilder.add(new RowGroupInfo(rowGroupInfo.prunedBlockMetadata(), rowGroupInfo.fileRowOffset(), columnIndex)); + break; } } - fileRowCount += block.rowCount(); } + return rowGroupInfoBuilder.build(); } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java index fe0635646f98..5ed5cad8e0e9 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/MetadataReader.java @@ -13,57 +13,29 @@ */ package io.trino.parquet.reader; -import com.google.common.collect.ImmutableList; -import io.airlift.log.Logger; +import com.google.common.annotations.VisibleForTesting; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.ParquetWriteValidation; -import io.trino.parquet.metadata.BlockMetadata; -import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.FileMetadata; import io.trino.parquet.metadata.ParquetMetadata; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.column.statistics.BinaryStatistics; -import org.apache.parquet.format.ColumnChunk; -import org.apache.parquet.format.ColumnMetaData; -import org.apache.parquet.format.Encoding; import org.apache.parquet.format.FileMetaData; -import org.apache.parquet.format.KeyValue; -import org.apache.parquet.format.RowGroup; -import org.apache.parquet.format.SchemaElement; import org.apache.parquet.format.Statistics; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.LogicalTypeAnnotation; -import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type.Repetition; -import org.apache.parquet.schema.Types; import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; import static io.trino.parquet.ParquetMetadataConverter.fromParquetStatistics; -import static io.trino.parquet.ParquetMetadataConverter.getEncoding; -import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation; -import static io.trino.parquet.ParquetMetadataConverter.getPrimitive; -import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference; -import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference; import static io.trino.parquet.ParquetValidationUtils.validateParquet; import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; @@ -73,16 +45,15 @@ public final class MetadataReader { - private static final Logger log = Logger.get(MetadataReader.class); - private static final Slice MAGIC = Slices.utf8Slice("PAR1"); - private static final int POST_SCRIPT_SIZE = Integer.BYTES + MAGIC.length(); + @VisibleForTesting + static final int POST_SCRIPT_SIZE = Integer.BYTES + MAGIC.length(); // Typical 1GB files produced by Trino were found to have footer size between 30-40KB private static final int EXPECTED_FOOTER_SIZE = 48 * 1024; private MetadataReader() {} - public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional parquetWriteValidation) + public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional parquetWriteValidation, Optional diskRange) throws IOException { // Parquet File Layout: @@ -119,144 +90,11 @@ public static ParquetMetadata readFooter(ParquetDataSource dataSource, Optional< InputStream metadataStream = buffer.slice(buffer.length() - completeFooterSize, metadataLength).getInput(); FileMetaData fileMetaData = readFileMetaData(metadataStream); - ParquetMetadata parquetMetadata = createParquetMetadata(fileMetaData, dataSource.getId()); + ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, dataSource.getId(), diskRange); validateFileMetadata(dataSource.getId(), parquetMetadata.getFileMetaData(), parquetWriteValidation); return parquetMetadata; } - public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId) - throws ParquetCorruptionException - { - List schema = fileMetaData.getSchema(); - validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); - - MessageType messageType = readParquetSchema(schema); - List blocks = new ArrayList<>(); - List rowGroups = fileMetaData.getRow_groups(); - if (rowGroups != null) { - for (RowGroup rowGroup : rowGroups) { - List columns = rowGroup.getColumns(); - validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); - String filePath = columns.get(0).getFile_path(); - ImmutableList.Builder columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size()); - for (ColumnChunk columnChunk : columns) { - validateParquet( - (filePath == null && columnChunk.getFile_path() == null) - || (filePath != null && filePath.equals(columnChunk.getFile_path())), - dataSourceId, - "all column chunks of the same row group must be in the same file"); - ColumnMetaData metaData = columnChunk.meta_data; - String[] path = metaData.path_in_schema.stream() - .map(value -> value.toLowerCase(Locale.ENGLISH)) - .toArray(String[]::new); - ColumnPath columnPath = ColumnPath.get(path); - PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); - ColumnChunkMetadata column = ColumnChunkMetadata.get( - columnPath, - primitiveType, - CompressionCodecName.fromParquet(metaData.codec), - convertEncodingStats(metaData.encoding_stats), - readEncodings(metaData.encodings), - readStats(Optional.ofNullable(fileMetaData.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), - metaData.data_page_offset, - metaData.dictionary_page_offset, - metaData.num_values, - metaData.total_compressed_size, - metaData.total_uncompressed_size); - column.setColumnIndexReference(toColumnIndexReference(columnChunk)); - column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); - column.setBloomFilterOffset(metaData.bloom_filter_offset); - columnMetadataBuilder.add(column); - } - blocks.add(new BlockMetadata(rowGroup.getNum_rows(), columnMetadataBuilder.build())); - } - } - - Map keyValueMetaData = new HashMap<>(); - List keyValueList = fileMetaData.getKey_value_metadata(); - if (keyValueList != null) { - for (KeyValue keyValue : keyValueList) { - keyValueMetaData.put(keyValue.key, keyValue.value); - } - } - FileMetadata parquetFileMetadata = new FileMetadata( - messageType, - keyValueMetaData, - fileMetaData.getCreated_by()); - return new ParquetMetadata(parquetFileMetadata, blocks); - } - - private static MessageType readParquetSchema(List schema) - { - Iterator schemaIterator = schema.iterator(); - SchemaElement rootSchema = schemaIterator.next(); - Types.MessageTypeBuilder builder = Types.buildMessage(); - readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); - return builder.named(rootSchema.name); - } - - private static void readTypeSchema(Types.GroupBuilder builder, Iterator schemaIterator, int typeCount) - { - for (int i = 0; i < typeCount; i++) { - SchemaElement element = schemaIterator.next(); - Types.Builder typeBuilder; - if (element.type == null) { - typeBuilder = builder.group(Repetition.valueOf(element.repetition_type.name())); - readTypeSchema((Types.GroupBuilder) typeBuilder, schemaIterator, element.num_children); - } - else { - Types.PrimitiveBuilder primitiveBuilder = builder.primitive(getPrimitive(element.type), Repetition.valueOf(element.repetition_type.name())); - if (element.isSetType_length()) { - primitiveBuilder.length(element.type_length); - } - if (element.isSetPrecision()) { - primitiveBuilder.precision(element.precision); - } - if (element.isSetScale()) { - primitiveBuilder.scale(element.scale); - } - typeBuilder = primitiveBuilder; - } - - // Reading of element.logicalType and element.converted_type corresponds to parquet-mr's code at - // https://github.com/apache/parquet-mr/blob/apache-parquet-1.12.0/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L1568-L1582 - LogicalTypeAnnotation annotationFromLogicalType = null; - if (element.isSetLogicalType()) { - annotationFromLogicalType = getLogicalTypeAnnotation(element.logicalType); - typeBuilder.as(annotationFromLogicalType); - } - if (element.isSetConverted_type()) { - LogicalTypeAnnotation annotationFromConvertedType = getLogicalTypeAnnotation(element.converted_type, element); - if (annotationFromLogicalType != null) { - // Both element.logicalType and element.converted_type set - if (annotationFromLogicalType.toOriginalType() == annotationFromConvertedType.toOriginalType()) { - // element.converted_type matches element.logicalType, even though annotationFromLogicalType may differ from annotationFromConvertedType - // Following parquet-mr behavior, we favor LogicalTypeAnnotation derived from element.logicalType, as potentially containing more information. - } - else { - // Following parquet-mr behavior, issue warning and let converted_type take precedence. - log.warn("Converted type and logical type metadata map to different OriginalType (convertedType: %s, logical type: %s). Using value in converted type.", - element.converted_type, element.logicalType); - // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation - // 1. for compatibility, as previous Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. - // 2. so that we override LogicalTypeAnnotation annotation read from element.logicalType in case of mismatch detected. - typeBuilder.as(annotationFromConvertedType); - } - } - else { - // parquet-mr reads only OriginalType from converted_type. We retain full LogicalTypeAnnotation for compatibility, as previous - // Trino reader code would read LogicalTypeAnnotation from element.converted_type and some additional fields. - typeBuilder.as(annotationFromConvertedType); - } - } - - if (element.isSetField_id()) { - typeBuilder.id(element.field_id); - } - typeBuilder.named(element.name.toLowerCase(Locale.ENGLISH)); - } - } - public static org.apache.parquet.column.statistics.Statistics readStats(Optional fileCreatedBy, Optional statisticsFromFile, PrimitiveType type) { Statistics statistics = statisticsFromFile.orElse(null); @@ -352,15 +190,6 @@ private static int commonPrefix(byte[] a, byte[] b) return commonPrefixLength; } - private static Set readEncodings(List encodings) - { - Set columnEncodings = new HashSet<>(); - for (Encoding encoding : encodings) { - columnEncodings.add(getEncoding(encoding)); - } - return Collections.unmodifiableSet(columnEncodings); - } - private static void validateFileMetadata(ParquetDataSourceId dataSourceId, FileMetadata fileMetaData, Optional parquetWriteValidation) throws ParquetCorruptionException { diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java index fa9b7ae142d5..88383d07dd09 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/TrinoColumnIndexStore.java @@ -35,7 +35,6 @@ import java.io.IOException; import java.io.InputStream; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -140,7 +139,7 @@ public OffsetIndex getOffsetIndex(ColumnPath column) public static Optional getColumnIndexStore( ParquetDataSource dataSource, BlockMetadata blockMetadata, - Map, ColumnDescriptor> descriptorsByPath, + Set columnsReadPaths, TupleDomain parquetTupleDomain, ParquetReaderOptions options) { @@ -160,11 +159,6 @@ public static Optional getColumnIndexStore( return Optional.empty(); } - Set columnsReadPaths = new HashSet<>(descriptorsByPath.size()); - for (List path : descriptorsByPath.keySet()) { - columnsReadPaths.add(ColumnPath.get(path.toArray(new String[0]))); - } - Map parquetDomains = parquetTupleDomain.getDomains() .orElseThrow(() -> new IllegalStateException("Predicate other than none should have domains")); Set columnsFilteredPaths = parquetDomains.keySet().stream() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index 1eed8ba73ef1..55ec3236fe70 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -24,12 +24,10 @@ import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.ParquetWriteValidation; -import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.FileMetadata; import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.ParquetReader; -import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ColumnWriter.BufferData; import io.trino.spi.Page; import io.trino.spi.type.Type; @@ -77,7 +75,6 @@ import static io.trino.parquet.ParquetTypeUtils.getDescriptors; import static io.trino.parquet.ParquetTypeUtils.lookupColumnByName; import static io.trino.parquet.ParquetWriteValidation.ParquetWriteValidationBuilder; -import static io.trino.parquet.metadata.PrunedBlockMetadata.createPrunedColumnsMetadata; import static io.trino.parquet.writer.ParquetDataOutput.createDataOutput; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DoubleType.DOUBLE; @@ -240,7 +237,7 @@ public void validate(ParquetDataSource input) checkState(validationBuilder.isPresent(), "validation is not enabled"); ParquetWriteValidation writeValidation = validationBuilder.get().build(); try { - ParquetMetadata parquetMetadata = MetadataReader.readFooter(input, Optional.of(writeValidation)); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(input, Optional.of(writeValidation), Optional.empty()); try (ParquetReader parquetReader = createParquetReader(input, parquetMetadata, writeValidation)) { for (Page page = parquetReader.nextPage(); page != null; page = parquetReader.nextPage()) { // fully load the page @@ -277,16 +274,10 @@ private ParquetReader createParquetReader(ParquetDataSource input, ParquetMetada .orElseThrow())); } Map, ColumnDescriptor> descriptorsByPath = getDescriptors(fileMetaData.getSchema(), fileMetaData.getSchema()); - long nextStart = 0; - ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); - for (BlockMetadata block : parquetMetadata.getBlocks()) { - rowGroupInfoBuilder.add(new RowGroupInfo(createPrunedColumnsMetadata(block, input.getId(), descriptorsByPath), nextStart, Optional.empty())); - nextStart += block.rowCount(); - } return new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), columnFields.build(), - rowGroupInfoBuilder.build(), + parquetMetadata.getRowGroupInfo(Optional.of(input), Optional.of(descriptorsByPath)), input, parquetTimeZone.orElseThrow(), newSimpleAggregatedMemoryContext(), @@ -350,7 +341,7 @@ private void flush() columnMetaDataBuilder.add(columnMetaData); currentOffset += columnMetaData.getTotal_compressed_size(); } - updateRowGroups(columnMetaDataBuilder.build()); + updateRowGroups(columnMetaDataBuilder.build(), outputStream.longSize()); // flush pages for (BufferData bufferData : bufferDataList) { @@ -409,12 +400,14 @@ private void writeBloomFilters(List rowGroups, List columnMetaData) + private void updateRowGroups(List columnMetaData, long fileOffset) { long totalCompressedBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_compressed_size).sum(); long totalBytes = columnMetaData.stream().mapToLong(ColumnMetaData::getTotal_uncompressed_size).sum(); ImmutableList columnChunks = columnMetaData.stream().map(ParquetWriter::toColumnChunk).collect(toImmutableList()); - fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows).setTotal_compressed_size(totalCompressedBytes)); + fileFooter.addRowGroup(new RowGroup(columnChunks, totalBytes, rows) + .setTotal_compressed_size(totalCompressedBytes) + .setFile_offset(fileOffset)); } private static Slice serializeFooter(FileMetaData fileMetaData) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java b/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java index 9f7918115838..e6cdd9825e77 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/BenchmarkColumnarFilterParquetData.java @@ -225,7 +225,7 @@ public void setup() testData.getColumnNames(), testData.getPages()), new ParquetReaderOptions()); - parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); columnNames = columns.stream() .map(TpchColumn::getColumnName) .collect(toImmutableList()); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java index aeda237c642f..2ece87c326bd 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/ParquetTestUtils.java @@ -152,16 +152,15 @@ public static ParquetReader createParquetReader( columnName -> descriptorsByPath.get(ImmutableList.of(columnName.toLowerCase(ENGLISH)))); TupleDomainParquetPredicate parquetPredicate = buildPredicate(fileSchema, parquetTupleDomain, descriptorsByPath, UTC); List rowGroups = getFilteredRowGroups( - 0, - input.getEstimatedSize(), + parquetMetadata, input, - parquetMetadata.getBlocks(), ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, UTC, 1000, options); + return new ParquetReader( Optional.ofNullable(fileMetaData.getCreatedBy()), columnFields.build(), diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java index d42725e5acb2..7f448bdbed2d 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestByteStreamSplitEncoding.java @@ -50,7 +50,7 @@ public void testReadFloatDouble() ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("byte_stream_split_float_and_double.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); readAndCompare(reader, getExpectedValues()); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java index aabb734e5b0c..d9ca113e5625 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestInt96Timestamp.java @@ -112,7 +112,7 @@ public void testNanosOutsideDayRange() ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("int96_timestamps_nanos_outside_day_range.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); Page page = reader.nextPage(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java index 00ecd8388857..f7fda503028c 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestParquetReader.java @@ -18,9 +18,12 @@ import com.google.common.io.Resources; import io.airlift.units.DataSize; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; +import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -36,6 +39,8 @@ import io.trino.spi.type.ArrayType; import io.trino.spi.type.Type; import io.trino.testing.TestingConnectorSession; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.Types; import org.junit.jupiter.api.Test; import java.io.File; @@ -52,11 +57,13 @@ import static io.trino.parquet.ParquetTestUtils.createParquetReader; import static io.trino.parquet.ParquetTestUtils.generateInputPages; import static io.trino.parquet.ParquetTestUtils.writeParquetFile; +import static io.trino.parquet.reader.MetadataReader.POST_SCRIPT_SIZE; import static io.trino.parquet.reader.ParquetReader.COLUMN_INDEX_ROWS_FILTERED; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.VarcharType.VARCHAR; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -79,13 +86,13 @@ public void testColumnReaderMemoryUsage() columnNames, generateInputPages(types, 100, 5)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - assertThat(parquetMetadata.getBlocks().size()).isGreaterThan(1); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo().size()).isGreaterThan(1); // Verify file has only non-dictionary encodings as dictionary memory usage is already tested in TestFlatColumnReader#testMemoryUsage - parquetMetadata.getBlocks().forEach(block -> { - block.columns() + parquetMetadata.getRowGroupInfo().forEach(rowGroupInfo -> { + rowGroupInfo.prunedBlockMetadata().getBlockMetadata().columns() .forEach(columnChunkMetaData -> assertThat(columnChunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isFalse()); - assertThat(block.rowCount()).isEqualTo(100); + assertThat(rowGroupInfo.prunedBlockMetadata().getBlockMetadata().rowCount()).isEqualTo(100); }); AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); @@ -105,7 +112,7 @@ public void testColumnReaderMemoryUsage() assertThat(currentMemoryUsage).isGreaterThan(initialMemoryUsage); // Memory usage does not change until next row group (1 page per row-group) - long rowGroupRowCount = parquetMetadata.getBlocks().get(0).rowCount(); + long rowGroupRowCount = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getBlockMetadata().rowCount(); int rowsRead = page.getPositionCount(); while (rowsRead < rowGroupRowCount) { rowsRead += reader.nextPage().getPositionCount(); @@ -132,8 +139,8 @@ public void testEmptyRowRangesWithColumnIndex() ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("lineitem_sorted_by_shipdate/data.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - assertThat(parquetMetadata.getBlocks()).hasSize(2); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo()).hasSize(2); // The predicate and the file are prepared so that page indexes will result in non-overlapping row ranges and eliminate the entire first row group // while the second row group still has to be read TupleDomain predicate = TupleDomain.withColumnDomains( @@ -153,7 +160,7 @@ public void testEmptyRowRangesWithColumnIndex() assertThat(metrics).containsKey(COLUMN_INDEX_ROWS_FILTERED); // Column index should filter at least the first row group assertThat(((Count) metrics.get(COLUMN_INDEX_ROWS_FILTERED)).getTotal()) - .isGreaterThanOrEqualTo(parquetMetadata.getBlocks().get(0).rowCount()); + .isGreaterThanOrEqualTo(parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getBlockMetadata().rowCount()); } } @@ -186,6 +193,52 @@ public void testBackwardsCompatibleRepeatedPrimitiveFieldDefinedAsPrimitive() .isInstanceOf(TrinoException.class); } + @Test + public void testOffsetColumnFilter() + throws IOException + { + // Write a file with 100 rows per row-group + List columnNames = ImmutableList.of("columna", "columnb"); + List types = ImmutableList.of(INTEGER, BIGINT); + + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder() + .setMaxBlockSize(DataSize.ofBytes(1000)) + .build(), + types, + columnNames, + generateInputPages(types, 100, 5)), + new ParquetReaderOptions()); + + long estimatedSize = dataSource.getEstimatedSize(); + long estimatedDataSize = estimatedSize - POST_SCRIPT_SIZE - dataSource.readFully(0, (int) estimatedSize).getInt((int) estimatedSize - POST_SCRIPT_SIZE); + + // Read single column, 1 row group + ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"columnb"}, Types.optional(INT64).named(""), 0, 0); + ParquetMetadata parquetMetadata1 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(100, 101))); + List rowGroupInfo1 = parquetMetadata1.getRowGroupInfo(Optional.of(dataSource), Optional.of(ImmutableMap.of(ImmutableList.of("columnb"), columnDescriptor))); + + assertThat(rowGroupInfo1.stream().allMatch(rg -> rg.prunedBlockMetadata().getColumns().size() == 1)).isTrue(); + assertThat(rowGroupInfo1.stream().map(RowGroupInfo::prunedBlockMetadata).mapToLong(PrunedBlockMetadata::getRowCount).sum()).isEqualTo(100); + + // Read both columns, half row groups + ParquetMetadata parquetMetadata2 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(0, estimatedDataSize / 2))); + List rowGroupInfo2 = parquetMetadata2.getRowGroupInfo(); + + assertThat(rowGroupInfo2.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getColumns).allMatch(c -> c.size() == 2)).isTrue(); + assertThat(rowGroupInfo2.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getBlockMetadata).mapToLong(BlockMetadata::rowCount).sum()) + .isEqualTo(300); + + // Read both columns, all row groups + ParquetMetadata parquetMetadata3 = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + List rowGroupInfo3 = parquetMetadata3.getRowGroupInfo(); + + assertThat(rowGroupInfo2.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getColumns).allMatch(c -> c.size() == 2)).isTrue(); + assertThat(rowGroupInfo3.stream().map(RowGroupInfo::prunedBlockMetadata).map(PrunedBlockMetadata::getBlockMetadata).mapToLong(BlockMetadata::rowCount).sum()) + .isEqualTo(500); + } + private void testReadingOldParquetFiles(File file, List columnNames, Type columnType, List expectedValues) throws IOException { @@ -193,7 +246,7 @@ private void testReadingOldParquetFiles(File file, List columnNames, Typ file, new ParquetReaderOptions()); ConnectorSession session = TestingConnectorSession.builder().build(); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); try (ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), ImmutableList.of(columnType), columnNames)) { Page page = reader.nextPage(); Iterator expected = expectedValues.iterator(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java index 390608f445a9..99ae226bca08 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestTimeMillis.java @@ -60,7 +60,7 @@ private void testTimeMillsInt32(TimeType timeType) ParquetDataSource dataSource = new FileParquetDataSource( new File(Resources.getResource("time_millis_int32.snappy.parquet").toURI()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); ParquetReader reader = createParquetReader(dataSource, parquetMetadata, newSimpleAggregatedMemoryContext(), types, columnNames); Page page = reader.nextPage(); diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java index a80cbbcd00d7..5a248553afd2 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java @@ -33,6 +33,7 @@ import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.PageReader; import io.trino.parquet.reader.ParquetReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.reader.TestingParquetDataSource; import io.trino.spi.Page; import io.trino.spi.block.Block; @@ -128,11 +129,11 @@ public void testWrittenPageSize() columnNames, generateInputPages(types, 100, 1000)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - assertThat(parquetMetadata.getBlocks()).hasSize(1); - assertThat(parquetMetadata.getBlocks().get(0).rowCount()).isEqualTo(100 * 1000); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo()).hasSize(1); + assertThat(parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getBlockMetadata().rowCount()).isEqualTo(100 * 1000); - ColumnChunkMetadata chunkMetaData = parquetMetadata.getBlocks().get(0).columns().get(0); + ColumnChunkMetadata chunkMetaData = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getColumns().getFirst(); DiskRange range = new DiskRange(chunkMetaData.getStartingPos(), chunkMetaData.getTotalSize()); Map chunkReader = dataSource.planRead(ImmutableListMultimap.of(0, range), newSimpleAggregatedMemoryContext()); @@ -177,12 +178,12 @@ public void testWrittenPageValueCount() columnNames, generateInputPages(types, 100, 1000)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - assertThat(parquetMetadata.getBlocks()).hasSize(1); - assertThat(parquetMetadata.getBlocks().get(0).rowCount()).isEqualTo(100 * 1000); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo()).hasSize(1); + assertThat(parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getRowCount()).isEqualTo(100 * 1000); - ColumnChunkMetadata columnAMetaData = parquetMetadata.getBlocks().get(0).columns().get(0); - ColumnChunkMetadata columnBMetaData = parquetMetadata.getBlocks().get(0).columns().get(1); + ColumnChunkMetadata columnAMetaData = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getColumns().getFirst(); + ColumnChunkMetadata columnBMetaData = parquetMetadata.getRowGroupInfo().getFirst().prunedBlockMetadata().getColumns().get(1); Map chunkReader = dataSource.planRead( ImmutableListMultimap.of( 0, new DiskRange(columnAMetaData.getStartingPos(), columnAMetaData.getTotalSize()), @@ -258,8 +259,8 @@ public void testLargeStringTruncation() ImmutableList.of(new Page(2, blockA, blockB))), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getBlocks()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getRowGroupInfo()).prunedBlockMetadata().getBlockMetadata(); ColumnChunkMetadata chunkMetaData = blockMetaData.columns().get(0); assertThat(chunkMetaData.getStatistics().getMinBytes()).isEqualTo(minA.getBytes()); @@ -291,11 +292,11 @@ public void testColumnReordering() generateInputPages(types, 100, 100)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(10); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo().size()).isGreaterThanOrEqualTo(10); + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { // Verify that the columns are stored in the same order as the metadata - List offsets = blockMetaData.columns().stream() + List offsets = rowGroupInfo.prunedBlockMetadata().getColumns().stream() .map(ColumnChunkMetadata::getFirstDataPageOffset) .collect(toImmutableList()); assertThat(offsets).isSorted(); @@ -348,10 +349,10 @@ public void testDictionaryPageOffset() generateInputPages(types, 100, 100)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(1); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { - ColumnChunkMetadata chunkMetaData = getOnlyElement(blockMetaData.columns()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + assertThat(parquetMetadata.getRowGroupInfo().size()).isGreaterThanOrEqualTo(1); + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + ColumnChunkMetadata chunkMetaData = getOnlyElement(rowGroupInfo.prunedBlockMetadata().getColumns()); assertThat(chunkMetaData.getDictionaryPageOffset()).isGreaterThan(0); int dictionaryPageSize = toIntExact(chunkMetaData.getFirstDataPageOffset() - chunkMetaData.getDictionaryPageOffset()); assertThat(dictionaryPageSize).isGreaterThan(0); @@ -397,10 +398,11 @@ public void testWriteBloomFilters(Type type, List data) generateInputPages(types, 100, data)), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); // Check that bloom filters are right after each other int bloomFilterSize = Integer.highestOneBit(BlockSplitBloomFilter.optimalNumOfBits(BLOOM_FILTER_EXPECTED_ENTRIES, DEFAULT_BLOOM_FILTER_FPP) / 8) << 1; - for (BlockMetadata block : parquetMetadata.getBlocks()) { + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + BlockMetadata block = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); for (int i = 0; i < block.columns().size(); i++) { ColumnChunkMetadata chunkMetaData = block.columns().get(i); assertThat(hasBloomFilter(chunkMetaData)).isTrue(); @@ -414,7 +416,7 @@ public void testWriteBloomFilters(Type type, List data) } } } - int rowGroupCount = parquetMetadata.getBlocks().size(); + int rowGroupCount = parquetMetadata.getRowGroupInfo().size(); assertThat(rowGroupCount).isGreaterThanOrEqualTo(2); TupleDomain predicate = TupleDomain.withColumnDomains( @@ -462,8 +464,8 @@ void testBloomFilterWithDictionaryFallback() .build()), new ParquetReaderOptions()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getBlocks()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getRowGroupInfo()).prunedBlockMetadata().getBlockMetadata(); ColumnChunkMetadata chunkMetaData = getOnlyElement(blockMetaData.columns()); assertThat(chunkMetaData.getEncodingStats().hasDictionaryPages()).isTrue(); assertThat(chunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isTrue(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java index 329039d98e22..8089b73407b4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMergeSink.java @@ -25,9 +25,10 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetReaderOptions; -import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.metadata.PrunedBlockMetadata; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.deltalake.delete.RoaringBitmapArray; @@ -381,8 +382,11 @@ private Slice writeMergeResult(Slice path, FileDeletion deletion) } TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(path.toStringUtf8())); try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, parquetReaderOptions, fileFormatDataSourceStats)) { - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - long rowCount = parquetMetadata.getBlocks().stream().map(BlockMetadata::rowCount).mapToLong(Long::longValue).sum(); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + long rowCount = parquetMetadata.getRowGroupInfo(Optional.of(dataSource), Optional.empty()).stream() + .map(RowGroupInfo::prunedBlockMetadata) + .mapToLong(PrunedBlockMetadata::getRowCount) + .sum(); RoaringBitmapArray rowsRetained = new RoaringBitmapArray(); rowsRetained.addRange(0, rowCount - 1); rowsRetained.andNot(deletedRows); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java index 80d32276fb8a..ecf664d3b177 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java @@ -303,7 +303,7 @@ private PositionDeleteFilter readDeletes( public Map loadParquetIdAndNameMapping(TrinoInputFile inputFile, ParquetReaderOptions options) { try (ParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, options, fileFormatDataSourceStats)) { - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java index 8f686205e239..82f752f97221 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeWriter.java @@ -22,7 +22,7 @@ import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; -import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.deltalake.DataFileInfo.DataFileType; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeJsonFileStatistics; import io.trino.plugin.hive.FileWriter; @@ -184,7 +184,7 @@ public DataFileInfo getDataFileInfo() { Location path = rootTableLocation.appendPath(relativeFilePath); FileMetaData fileMetaData = fileWriter.getFileMetadata(); - ParquetMetadata parquetMetadata = MetadataReader.createParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString())); + ParquetMetadata parquetMetadata = new ParquetMetadata(fileMetaData, new ParquetDataSourceId(path.toString()), Optional.empty()); return new DataFileInfo( relativeFilePath, @@ -204,7 +204,8 @@ public static DeltaLakeJsonFileStatistics readStatistics(ParquetMetadata parquet .collect(toImmutableMap(column -> column.basePhysicalColumnName().toLowerCase(ENGLISH), DeltaLakeColumnHandle::basePhysicalType)); ImmutableMultimap.Builder metadataForColumn = ImmutableMultimap.builder(); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + BlockMetadata blockMetaData = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); for (ColumnChunkMetadata columnChunkMetaData : blockMetaData.columns()) { if (columnChunkMetaData.getPath().size() != 1) { continue; // Only base column stats are supported diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java index 8d1db3984a13..8cc837c1b2fb 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java @@ -448,7 +448,7 @@ private void testOptimizeWithColumnMappingMode(String columnMappingMode) TrinoInputFile inputFile = new LocalInputFile(tableLocation.resolve(addFileEntry.getPath()).toFile()); ParquetMetadata parquetMetadata = MetadataReader.readFooter( new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), - Optional.empty()); + Optional.empty(), Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); PrimitiveType physicalType = getOnlyElement(fileMetaData.getSchema().getColumns().iterator()).getPrimitiveType(); assertThat(physicalType.getName()).isEqualTo(physicalName); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java index dc072f0967a0..2d2ac8183ad5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/parquet/ParquetPageSourceFactory.java @@ -221,7 +221,7 @@ public static ReaderPageSource createPageSource( AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); dataSource = createDataSource(inputFile, estimatedFileSize, options, memoryContext, stats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, parquetWriteValidation, Optional.empty()); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); fileSchema = fileMetaData.getSchema(); @@ -250,10 +250,8 @@ public static ReaderPageSource createPageSource( } List rowGroups = getFilteredRowGroups( - start, - length, + parquetMetadata, dataSource, - parquetMetadata.getBlocks(), parquetTupleDomains, parquetPredicates, descriptorsByPath, diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java index 6b43072da91e..1761797a99e6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/parquet/TestBloomFilterStore.java @@ -22,6 +22,7 @@ import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.predicate.TupleDomainParquetPredicate; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.Range; @@ -308,10 +309,11 @@ private static BloomFilterStore generateBloomFilterStore(ParquetTester.TempFile TrinoInputFile inputFile = new LocalInputFile(tempFile.getFile()); TrinoParquetDataSource dataSource = new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); - ColumnChunkMetadata columnChunkMetaData = getOnlyElement(getOnlyElement(parquetMetadata.getBlocks()).columns()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + RowGroupInfo rowGroupInfo = getOnlyElement(parquetMetadata.getRowGroupInfo()); + ColumnChunkMetadata columnChunkMetaData = getOnlyElement(rowGroupInfo.prunedBlockMetadata().getBlockMetadata().columns()); - return new BloomFilterStore(dataSource, getOnlyElement(parquetMetadata.getBlocks()), Set.of(columnChunkMetaData.getPath())); + return new BloomFilterStore(dataSource, rowGroupInfo.prunedBlockMetadata().getBlockMetadata(), Set.of(columnChunkMetaData.getPath())); } private static class BloomFilterTypeTestCase diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index bee386c71553..37b294dc5c21 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -20,6 +20,7 @@ import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; import io.trino.memory.context.AggregatedMemoryContext; +import io.trino.parquet.DiskRange; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; import io.trino.parquet.ParquetDataSourceId; @@ -198,7 +199,7 @@ private static ConnectorPageSource createPageSource( try { AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext(); dataSource = createDataSource(inputFile, OptionalLong.of(hudiSplit.getFileSize()), options, memoryContext, dataSourceStats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(start, length))); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); @@ -215,10 +216,8 @@ private static ConnectorPageSource createPageSource( TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, timeZone); List rowGroups = getFilteredRowGroups( - start, - length, + parquetMetadata, dataSource, - parquetMetadata.getBlocks(), ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java index 4b5abd936d98..1d5c0d323687 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergPageSourceProvider.java @@ -35,6 +35,7 @@ import io.trino.orc.TupleDomainOrcPredicate; import io.trino.orc.TupleDomainOrcPredicate.TupleDomainOrcPredicateBuilder; import io.trino.parquet.Column; +import io.trino.parquet.DiskRange; import io.trino.parquet.Field; import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetDataSource; @@ -862,7 +863,7 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( ParquetDataSource dataSource = null; try { dataSource = createDataSource(inputFile, OptionalLong.of(fileSize), options, memoryContext, fileFormatDataSourceStats); - ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.of(new DiskRange(start, length))); FileMetadata fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); if (nameMapping.isPresent() && !ParquetSchemaUtil.hasIds(fileSchema)) { @@ -888,10 +889,8 @@ private static ReaderPageSourceWithRowPositions createParquetPageSource( TupleDomainParquetPredicate parquetPredicate = buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, UTC); List rowGroups = getFilteredRowGroups( - start, - length, + parquetMetadata, dataSource, - parquetMetadata.getBlocks(), ImmutableList.of(parquetTupleDomain), ImmutableList.of(parquetPredicate), descriptorsByPath, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java index 7f0716b66188..f9b2d303648d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergParquetFileWriter.java @@ -17,6 +17,7 @@ import io.trino.filesystem.TrinoOutputFile; import io.trino.parquet.ParquetDataSourceId; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.parquet.writer.ParquetWriterOptions; import io.trino.plugin.hive.parquet.ParquetFileWriter; import io.trino.spi.Page; @@ -33,7 +34,6 @@ import java.util.Optional; import java.util.stream.Stream; -import static io.trino.parquet.reader.MetadataReader.createParquetMetadata; import static io.trino.plugin.iceberg.util.ParquetUtil.footerMetrics; import static io.trino.plugin.iceberg.util.ParquetUtil.getSplitOffsets; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; @@ -83,12 +83,13 @@ public FileMetrics getFileMetrics() { ParquetMetadata parquetMetadata; try { - parquetMetadata = createParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString())); + parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString()), Optional.empty()); + List rowGroupInfos = parquetMetadata.getRowGroupInfo(); + return new FileMetrics(footerMetrics(parquetMetadata, rowGroupInfos, Stream.empty(), metricsConfig, null), Optional.of(getSplitOffsets(rowGroupInfos))); } catch (IOException e) { throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error creating metadata for Parquet file %s", location), e); } - return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata))); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java index f869928de41d..0eb3afe1968d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/MigrationUtils.java @@ -152,8 +152,8 @@ public static Metrics loadMetrics(TrinoInputFile file, HiveStorageFormat storage private static Metrics parquetMetrics(TrinoInputFile file, MetricsConfig metricsConfig, NameMapping nameMapping) { try (ParquetDataSource dataSource = new TrinoParquetDataSource(file, new ParquetReaderOptions(), new FileFormatDataSourceStats())) { - ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty()); - return ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig, nameMapping); + ParquetMetadata metadata = MetadataReader.readFooter(dataSource, Optional.empty(), Optional.empty()); + return ParquetUtil.footerMetrics(metadata, metadata.getRowGroupInfo(), Stream.empty(), metricsConfig, nameMapping); } catch (IOException e) { throw new UncheckedIOException("Failed to read file footer: " + file.location(), e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java index 0a676ca339ca..06209a1ecac9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/util/ParquetUtil.java @@ -14,10 +14,12 @@ package io.trino.plugin.iceberg.util; -import com.google.common.collect.ImmutableList; +import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; +import io.trino.parquet.metadata.PrunedBlockMetadata; +import io.trino.parquet.reader.RowGroupInfo; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; @@ -45,8 +47,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -57,6 +57,7 @@ import java.util.function.Function; import java.util.stream.Stream; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toMap; @@ -68,13 +69,9 @@ public final class ParquetUtil // based on org.apache.iceberg.parquet.ParquetUtil and on org.apache.iceberg.parquet.ParquetConversions private ParquetUtil() {} - public static Metrics footerMetrics(ParquetMetadata metadata, Stream> fieldMetrics, MetricsConfig metricsConfig) - { - return footerMetrics(metadata, fieldMetrics, metricsConfig, null); - } - public static Metrics footerMetrics( ParquetMetadata metadata, + List rowGroupInfos, Stream> fieldMetrics, MetricsConfig metricsConfig, NameMapping nameMapping) @@ -95,8 +92,8 @@ public static Metrics footerMetrics( Map> fieldMetricsMap = fieldMetrics.collect(toMap(FieldMetrics::id, identity())); - List blocks = metadata.getBlocks(); - for (BlockMetadata block : blocks) { + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + BlockMetadata block = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); rowCount += block.rowCount(); for (ColumnChunkMetadata column : block.columns()) { Integer fieldId = fileSchema.aliasToId(column.getPath().toDotString()); @@ -155,14 +152,15 @@ public static Metrics footerMetrics( toBufferMap(fileSchema, upperBounds)); } - public static List getSplitOffsets(ParquetMetadata metadata) + public static List getSplitOffsets(List rowGroupInfos) + throws ParquetCorruptionException { - List splitOffsets = new ArrayList<>(metadata.getBlocks().size()); - for (BlockMetadata blockMetaData : metadata.getBlocks()) { - splitOffsets.add(blockMetaData.getStartingPos()); - } - Collections.sort(splitOffsets); - return ImmutableList.copyOf(splitOffsets); + return rowGroupInfos.stream() + .map(RowGroupInfo::prunedBlockMetadata) + .map(PrunedBlockMetadata::getBlockMetadata) + .map(BlockMetadata::getStartingPos) + .sorted() + .collect(toImmutableList()); } private static void updateFromFieldMetrics( diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index c9fef25627a5..a07c1f6c8bde 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -20,6 +20,7 @@ import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.testing.BaseConnectorSmokeTest; import io.trino.testing.TestingConnectorBehavior; @@ -522,6 +523,7 @@ public void testCreateTableWithNonExistingSchemaVerifyLocation() @Test public void testSortedNationTable() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -538,6 +540,7 @@ public void testSortedNationTable() @Test public void testFileSortingWithLargerTable() + throws ParquetCorruptionException { // Using a larger table forces buffered data to be written to disk Session withSmallRowGroups = Session.builder(getSession()) @@ -734,7 +737,8 @@ public void testPartitionFilterRequired() assertUpdate(session, "DROP TABLE " + tableName); } - protected abstract boolean isFileSorted(Location path, String sortColumnName); + protected abstract boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException; @Test public void testTableChangesFunction() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 670200247be5..253dea8edbc4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -30,6 +30,7 @@ import io.trino.metadata.QualifiedObjectName; import io.trino.metadata.TableHandle; import io.trino.operator.OperatorStats; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.hive.HiveCompressionCodec; import io.trino.plugin.hive.TestingHivePlugin; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; @@ -1513,6 +1514,7 @@ private void testCreateSortedTableWithSortTransform(String columnName, String so @Test public void testSortOrderChange() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -1542,6 +1544,7 @@ public void testSortOrderChange() @Test public void testSortingDisabled() + throws ParquetCorruptionException { Session withSortingDisabled = Session.builder(withSmallRowGroups(getSession())) .setCatalogSessionProperty(ICEBERG_CATALOG, "sorted_writing_enabled", "false") @@ -1560,6 +1563,7 @@ public void testSortingDisabled() @Test public void testOptimizeWithSortOrder() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -1582,6 +1586,7 @@ public void testOptimizeWithSortOrder() @Test public void testUpdateWithSortOrder() + throws ParquetCorruptionException { Session withSmallRowGroups = withSmallRowGroups(getSession()); try (TestTable table = new TestTable( @@ -1602,7 +1607,8 @@ public void testUpdateWithSortOrder() } } - protected abstract boolean isFileSorted(String path, String sortColumnName); + protected abstract boolean isFileSorted(String path, String sortColumnName) + throws ParquetCorruptionException; @Test public void testSortingOnNestedField() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java index 9d70f271f4a2..dd5bfa6e9b80 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergTestUtils.java @@ -26,11 +26,13 @@ import io.trino.orc.metadata.OrcColumnId; import io.trino.orc.metadata.statistics.StringStatistics; import io.trino.orc.metadata.statistics.StripeStatistics; +import io.trino.parquet.ParquetCorruptionException; import io.trino.parquet.ParquetReaderOptions; import io.trino.parquet.metadata.BlockMetadata; import io.trino.parquet.metadata.ColumnChunkMetadata; import io.trino.parquet.metadata.ParquetMetadata; import io.trino.parquet.reader.MetadataReader; +import io.trino.parquet.reader.RowGroupInfo; import io.trino.plugin.base.metrics.FileFormatDataSourceStats; import io.trino.plugin.hive.TrinoViewHiveMetastore; import io.trino.plugin.hive.metastore.HiveMetastoreFactory; @@ -128,20 +130,22 @@ private static boolean checkOrcFileSorting(Supplier dataSourceSup @SuppressWarnings({"unchecked", "rawtypes"}) public static boolean checkParquetFileSorting(TrinoInputFile inputFile, String sortColumnName) + throws ParquetCorruptionException { ParquetMetadata parquetMetadata; try { parquetMetadata = MetadataReader.readFooter( new TrinoParquetDataSource(inputFile, new ParquetReaderOptions(), new FileFormatDataSourceStats()), - Optional.empty()); + Optional.empty(), Optional.empty()); } catch (IOException e) { throw new UncheckedIOException(e); } Comparable previousMax = null; - verify(parquetMetadata.getBlocks().size() > 1, "Test must produce at least two row groups"); - for (BlockMetadata blockMetaData : parquetMetadata.getBlocks()) { + verify(parquetMetadata.getRowGroupInfo().size() > 1, "Test must produce at least two row groups"); + for (RowGroupInfo rowGroupInfo : parquetMetadata.getRowGroupInfo()) { + BlockMetadata blockMetaData = rowGroupInfo.prunedBlockMetadata().getBlockMetadata(); ColumnChunkMetadata columnMetadata = blockMetaData.columns().stream() .filter(column -> getOnlyElement(column.getPath().iterator()).equalsIgnoreCase(sortColumnName)) .collect(onlyElement()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java index addd4954ab4b..5ba0726f08a3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetCachingConnectorSmokeTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import org.apache.iceberg.FileFormat; import org.junit.jupiter.api.AfterAll; @@ -60,6 +61,7 @@ public ImmutableMap getAdditionalIcebergProperties() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java index 06073417e602..b681c6ed3019 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMinioParquetConnectorSmokeTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import static io.trino.plugin.iceberg.IcebergTestUtils.checkParquetFileSorting; import static org.apache.iceberg.FileFormat.PARQUET; @@ -28,6 +29,7 @@ public TestIcebergMinioParquetConnectorSmokeTest() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java index 99a2b2220dee..b992d39a9df0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetConnectorTest.java @@ -16,6 +16,7 @@ import io.trino.Session; import io.trino.filesystem.Location; import io.trino.operator.OperatorStats; +import io.trino.parquet.ParquetCorruptionException; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.testing.QueryRunner.MaterializedResultWithPlan; @@ -151,6 +152,7 @@ public void testPushdownPredicateToParquetAfterColumnRename() @Override protected boolean isFileSorted(String path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(Location.of(path)), sortColumnName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index 9dec58090118..a4d79d5ccd19 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -34,6 +34,7 @@ import io.trino.hdfs.HdfsEnvironment; import io.trino.hdfs.TrinoHdfsFileSystemStats; import io.trino.hdfs.authentication.NoHdfsAuthentication; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.plugin.iceberg.SchemaInitializer; @@ -248,6 +249,7 @@ protected void deleteDirectory(String location) @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { TrinoFileSystem fileSystem = fileSystemFactory.create(SESSION); return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java index c78160d77d58..3f8d7e5baeea 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/jdbc/TestIcebergJdbcCatalogConnectorSmokeTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -242,6 +243,7 @@ protected void deleteDirectory(String location) @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java index 5ad8d045335c..6d215e079f40 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -366,6 +367,7 @@ public void testDropTableWithMissingDataFile() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java index 9c578e94921f..14d2e6b443c8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergPolarisCatalogConnectorSmokeTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg.catalog.rest; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergConnector; @@ -132,6 +133,7 @@ protected boolean locationExists(String location) @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java index a6dea1e1a757..03e3fd44116a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergRestCatalogNestedNamespaceConnectorSmokeTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.http.server.testing.TestingHttpServer; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.SchemaInitializer; @@ -253,6 +254,7 @@ public void testDropTableWithNonExistentTableLocation() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java index 831e945240fa..aa3f71483081 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergTrinoRestCatalogConnectorSmokeTest.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import io.airlift.http.server.testing.TestingHttpServer; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -249,6 +250,7 @@ public void testDropTableWithNonExistentTableLocation() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java index c2efa55daa8d..ee355ebd86bb 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestIcebergVendingRestCatalogConnectorSmokeTest.java @@ -19,6 +19,7 @@ import io.trino.filesystem.s3.S3FileSystemConfig; import io.trino.filesystem.s3.S3FileSystemFactory; import io.trino.filesystem.s3.S3FileSystemStats; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -309,6 +310,7 @@ public void testDropTableWithNonExistentTableLocation() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java index 394e4c0b32a9..24b22253aba5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/snowflake/TestIcebergSnowflakeCatalogConnectorSmokeTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.plugin.iceberg.SchemaInitializer; @@ -686,6 +687,7 @@ public void testSnowflakeNativeTable() @Override protected boolean isFileSorted(Location path, String sortColumnName) + throws ParquetCorruptionException { if (format == PARQUET) { return checkParquetFileSorting(fileSystem.newInputFile(path), sortColumnName); diff --git a/testing/trino-faulttolerant-tests/pom.xml b/testing/trino-faulttolerant-tests/pom.xml index de6f1fb27f72..2961207e5362 100644 --- a/testing/trino-faulttolerant-tests/pom.xml +++ b/testing/trino-faulttolerant-tests/pom.xml @@ -256,6 +256,12 @@ test + + io.trino + trino-parquet + test + + io.trino trino-plugin-toolkit diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java index c4a6e5a38f6e..4e1cf34a0e17 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/iceberg/TestIcebergParquetFaultTolerantExecutionConnectorTest.java @@ -14,6 +14,7 @@ package io.trino.faulttolerant.iceberg; import io.trino.filesystem.Location; +import io.trino.parquet.ParquetCorruptionException; import io.trino.plugin.exchange.filesystem.FileSystemExchangePlugin; import io.trino.plugin.exchange.filesystem.containers.MinioStorage; import io.trino.plugin.iceberg.IcebergQueryRunner; @@ -80,6 +81,7 @@ public void testStatsBasedRepartitionDataOnInsert() @Override protected boolean isFileSorted(String path, String sortColumnName) + throws ParquetCorruptionException { return checkParquetFileSorting(fileSystem.newInputFile(Location.of(path)), sortColumnName); }