diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java index bf1f20cc77be..00cb658020e0 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ParquetWriter.java @@ -29,6 +29,7 @@ import io.trino.parquet.writer.ColumnWriter.BufferData; import io.trino.spi.Page; import io.trino.spi.type.Type; +import it.unimi.dsi.fastutil.ints.IntArrays; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.format.ColumnMetaData; import org.apache.parquet.format.CompressionCodec; @@ -45,6 +46,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -317,16 +319,33 @@ private void flush() // update stats long stripeStartOffset = outputStream.longSize(); - List metadatas = bufferDataList.stream() + List columns = bufferDataList.stream() .map(BufferData::getMetaData) .collect(toImmutableList()); - updateRowGroups(updateColumnMetadataOffset(metadatas, stripeStartOffset)); + + // Since the reader coalesces nearby small reads, it is beneficial to + // reorder data streams to group columns with small size together + int[] indexes = new int[columns.size()]; + Arrays.setAll(indexes, index -> index); + IntArrays.quickSort(indexes, (index, otherIndex) -> + Long.compare(columns.get(index).getTotal_compressed_size(), columns.get(otherIndex).getTotal_compressed_size())); + + // Ordering of columns in the metadata should remain unchanged. + // Only the offsets in file at which the columns start may change as a result + // of reordering column data streams by their compressed size + long currentOffset = stripeStartOffset; + for (int index : indexes) { + ColumnMetaData columnMetaData = columns.get(index); + columnMetaData.setData_page_offset(currentOffset); + currentOffset += columnMetaData.getTotal_compressed_size(); + } + updateRowGroups(columns); // flush pages - bufferDataList.stream() - .map(BufferData::getData) - .flatMap(List::stream) - .forEach(data -> data.writeData(outputStream)); + for (int index : indexes) { + bufferDataList.get(index).getData() + .forEach(data -> data.writeData(outputStream)); + } } private void writeFooter() @@ -379,20 +398,6 @@ private static org.apache.parquet.format.ColumnChunk toColumnChunk(ColumnMetaDat return columnChunk; } - private List updateColumnMetadataOffset(List columns, long offset) - { - ImmutableList.Builder builder = ImmutableList.builder(); - long currentOffset = offset; - for (ColumnMetaData column : columns) { - ColumnMetaData columnMetaData = new ColumnMetaData(column.type, column.encodings, column.path_in_schema, column.codec, column.num_values, column.total_uncompressed_size, column.total_compressed_size, currentOffset); - columnMetaData.setStatistics(column.getStatistics()); - columnMetaData.setEncoding_stats(column.getEncoding_stats()); - builder.add(columnMetaData); - currentOffset += column.getTotal_compressed_size(); - } - return builder.build(); - } - @VisibleForTesting static String formatCreatedBy(String trinoVersion) { diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java index 65adbfb6ed5f..5e88c5e42faf 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java @@ -24,24 +24,29 @@ import io.trino.parquet.reader.MetadataReader; import io.trino.parquet.reader.PageReader; import io.trino.parquet.reader.TestingParquetDataSource; +import io.trino.spi.type.DecimalType; import io.trino.spi.type.Type; import org.apache.parquet.VersionParser; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.PrimitiveType; import org.testng.annotations.Test; import java.io.IOException; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Optional; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.parquet.ParquetTestUtils.generateInputPages; import static io.trino.parquet.ParquetTestUtils.writeParquetFile; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.TinyintType.TINYINT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.assertj.core.api.Assertions.assertThat; @@ -110,4 +115,37 @@ public void testWrittenPageSize() } assertThat(pagesRead).isGreaterThan(10); } + + @Test + public void testColumnReordering() + throws IOException + { + List columnNames = ImmutableList.of("columnA", "columnB", "columnC", "columnD"); + List types = ImmutableList.of(BIGINT, TINYINT, INTEGER, DecimalType.createDecimalType(12)); + + // Write a file with many row groups + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder() + .setMaxBlockSize(DataSize.ofBytes(20 * 1024)) + .build(), + types, + columnNames, + generateInputPages(types, 100, 100)), + new ParquetReaderOptions()); + + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(10); + for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) { + // Sort columns by size in file + List columns = blockMetaData.getColumns().stream() + .sorted(Comparator.comparingLong(ColumnChunkMetaData::getTotalUncompressedSize)) + .collect(toImmutableList()); + // Verify that the columns are stored in the same order + List offsets = columns.stream() + .map(ColumnChunkMetaData::getFirstDataPageOffset) + .collect(toImmutableList()); + assertThat(offsets).isSorted(); + } + } }