diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java index 2e6185f3e026..f0c640bd0dd8 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/metadata/ParquetMetadata.java @@ -111,17 +111,15 @@ public List getBlocks(long splitStart, long splitLength) long fileRowCountOffset = fileRowCount; fileRowCount += rowGroup.getNum_rows(); // Update fileRowCount for all row groups - if (rowGroup.isSetFile_offset()) { - long rowGroupStart = rowGroup.getFile_offset(); - boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength; - if (!splitContainsRowGroup) { - continue; - } - } - List columns = rowGroup.getColumns(); validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); String filePath = columns.get(0).getFile_path(); + long rowGroupStart = getRowGroupStart(columns, messageType); + boolean splitContainsRowGroup = splitStart <= rowGroupStart && rowGroupStart < splitStart + splitLength; + if (!splitContainsRowGroup) { + continue; + } + ImmutableList.Builder columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size()); for (ColumnChunk columnChunk : columns) { validateParquet( @@ -129,27 +127,7 @@ public List getBlocks(long splitStart, long splitLength) || (filePath != null && filePath.equals(columnChunk.getFile_path())), dataSourceId, "all column chunks of the same row group must be in the same file"); - ColumnMetaData metaData = columnChunk.meta_data; - String[] path = metaData.path_in_schema.stream() - .map(value -> value.toLowerCase(Locale.ENGLISH)) - .toArray(String[]::new); - ColumnPath columnPath = ColumnPath.get(path); - PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); - ColumnChunkMetadata column = ColumnChunkMetadata.get( - columnPath, - primitiveType, - CompressionCodecName.fromParquet(metaData.codec), - convertEncodingStats(metaData.encoding_stats), - readEncodings(metaData.encodings), - MetadataReader.readStats(Optional.ofNullable(parquetMetadata.getCreated_by()), Optional.ofNullable(metaData.statistics), primitiveType), - metaData.data_page_offset, - metaData.dictionary_page_offset, - metaData.num_values, - metaData.total_compressed_size, - metaData.total_uncompressed_size); - column.setColumnIndexReference(toColumnIndexReference(columnChunk)); - column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); - column.setBloomFilterOffset(metaData.bloom_filter_offset); + ColumnChunkMetadata column = toColumnChunkMetadata(columnChunk, parquetMetadata.getCreated_by(), messageType); columnMetadataBuilder.add(column); } blocks.add(new BlockMetadata(fileRowCountOffset, rowGroup.getNum_rows(), columnMetadataBuilder.build())); @@ -165,6 +143,40 @@ public FileMetaData getParquetMetadata() return parquetMetadata; } + private static long getRowGroupStart(List columns, MessageType messageType) + { + // Note: Do not rely on org.apache.parquet.format.RowGroup.getFile_offset or org.apache.parquet.format.ColumnChunk.getFile_offset + // because some versions of parquet-cpp-arrow (and potentially other writers) set it incorrectly + ColumnChunkMetadata columnChunkMetadata = toColumnChunkMetadata(columns.getFirst(), null, messageType); + return columnChunkMetadata.getStartingPos(); + } + + private static ColumnChunkMetadata toColumnChunkMetadata(ColumnChunk columnChunk, String createdBy, MessageType messageType) + { + ColumnMetaData metaData = columnChunk.meta_data; + String[] path = metaData.path_in_schema.stream() + .map(value -> value.toLowerCase(Locale.ENGLISH)) + .toArray(String[]::new); + ColumnPath columnPath = ColumnPath.get(path); + PrimitiveType primitiveType = messageType.getType(columnPath.toArray()).asPrimitiveType(); + ColumnChunkMetadata column = ColumnChunkMetadata.get( + columnPath, + primitiveType, + CompressionCodecName.fromParquet(metaData.codec), + convertEncodingStats(metaData.encoding_stats), + readEncodings(metaData.encodings), + MetadataReader.readStats(Optional.ofNullable(createdBy), Optional.ofNullable(metaData.statistics), primitiveType), + metaData.data_page_offset, + metaData.dictionary_page_offset, + metaData.num_values, + metaData.total_compressed_size, + metaData.total_uncompressed_size); + column.setColumnIndexReference(toColumnIndexReference(columnChunk)); + column.setOffsetIndexReference(toOffsetIndexReference(columnChunk)); + column.setBloomFilterOffset(metaData.bloom_filter_offset); + return column; + } + private static MessageType readParquetSchema(List schema) { Iterator schemaIterator = schema.iterator(); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java index 0d71513a6dcc..3230c7190a0a 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/predicate/PredicateUtils.java @@ -195,28 +195,24 @@ public static List getFilteredRowGroups( { ImmutableList.Builder rowGroupInfoBuilder = ImmutableList.builder(); for (BlockMetadata block : parquetMetadata.getBlocks(splitStart, splitLength)) { - long blockStart = block.getStartingPos(); - boolean splitContainsBlock = splitStart <= blockStart && blockStart < splitStart + splitLength; - if (splitContainsBlock) { - for (int i = 0; i < parquetTupleDomains.size(); i++) { - TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); - TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); - Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); - Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); - PrunedBlockMetadata columnsMetadata = createPrunedColumnsMetadata(block, dataSource.getId(), descriptorsByPath); - if (predicateMatches( - parquetPredicate, - columnsMetadata, - dataSource, - descriptorsByPath, - parquetTupleDomain, - columnIndex, - bloomFilterStore, - timeZone, - domainCompactionThreshold)) { - rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, block.fileRowCountOffset(), columnIndex)); - break; - } + for (int i = 0; i < parquetTupleDomains.size(); i++) { + TupleDomain parquetTupleDomain = parquetTupleDomains.get(i); + TupleDomainParquetPredicate parquetPredicate = parquetPredicates.get(i); + Optional columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options); + Optional bloomFilterStore = getBloomFilterStore(dataSource, block, parquetTupleDomain, options); + PrunedBlockMetadata columnsMetadata = createPrunedColumnsMetadata(block, dataSource.getId(), descriptorsByPath); + if (predicateMatches( + parquetPredicate, + columnsMetadata, + dataSource, + descriptorsByPath, + parquetTupleDomain, + columnIndex, + bloomFilterStore, + timeZone, + domainCompactionThreshold)) { + rowGroupInfoBuilder.add(new RowGroupInfo(columnsMetadata, block.fileRowCountOffset(), columnIndex)); + break; } } }