Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.parquet.writer.ColumnWriter.BufferData;
import io.trino.spi.Page;
import io.trino.spi.type.Type;
import it.unimi.dsi.fastutil.ints.IntArrays;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.format.ColumnMetaData;
import org.apache.parquet.format.CompressionCodec;
Expand All @@ -45,6 +46,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -317,16 +319,33 @@ private void flush()

// update stats
long stripeStartOffset = outputStream.longSize();
List<ColumnMetaData> metadatas = bufferDataList.stream()
List<ColumnMetaData> columns = bufferDataList.stream()
.map(BufferData::getMetaData)
.collect(toImmutableList());
updateRowGroups(updateColumnMetadataOffset(metadatas, stripeStartOffset));

// Since the reader coalesces nearby small reads, it is beneficial to
// reorder data streams to group columns with small size together
int[] indexes = new int[columns.size()];
Arrays.setAll(indexes, index -> index);
IntArrays.quickSort(indexes, (index, otherIndex) ->
Long.compare(columns.get(index).getTotal_compressed_size(), columns.get(otherIndex).getTotal_compressed_size()));

// Ordering of columns in the metadata should remain unchanged.
// Only the offsets in file at which the columns start may change as a result
// of reordering column data streams by their compressed size
long currentOffset = stripeStartOffset;
for (int index : indexes) {
ColumnMetaData columnMetaData = columns.get(index);
columnMetaData.setData_page_offset(currentOffset);
currentOffset += columnMetaData.getTotal_compressed_size();
}
updateRowGroups(columns);

// flush pages
bufferDataList.stream()
.map(BufferData::getData)
.flatMap(List::stream)
.forEach(data -> data.writeData(outputStream));
for (int index : indexes) {
bufferDataList.get(index).getData()
.forEach(data -> data.writeData(outputStream));
}
}

private void writeFooter()
Expand Down Expand Up @@ -379,20 +398,6 @@ private static org.apache.parquet.format.ColumnChunk toColumnChunk(ColumnMetaDat
return columnChunk;
}

private List<ColumnMetaData> updateColumnMetadataOffset(List<ColumnMetaData> columns, long offset)
{
ImmutableList.Builder<ColumnMetaData> builder = ImmutableList.builder();
long currentOffset = offset;
for (ColumnMetaData column : columns) {
ColumnMetaData columnMetaData = new ColumnMetaData(column.type, column.encodings, column.path_in_schema, column.codec, column.num_values, column.total_uncompressed_size, column.total_compressed_size, currentOffset);
columnMetaData.setStatistics(column.getStatistics());
columnMetaData.setEncoding_stats(column.getEncoding_stats());
builder.add(columnMetaData);
currentOffset += column.getTotal_compressed_size();
}
return builder.build();
}

@VisibleForTesting
static String formatCreatedBy(String trinoVersion)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,29 @@
import io.trino.parquet.reader.MetadataReader;
import io.trino.parquet.reader.PageReader;
import io.trino.parquet.reader.TestingParquetDataSource;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;
import org.apache.parquet.VersionParser;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.PrimitiveType;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.parquet.ParquetTestUtils.generateInputPages;
import static io.trino.parquet.ParquetTestUtils.writeParquetFile;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.TinyintType.TINYINT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -110,4 +115,37 @@ public void testWrittenPageSize()
}
assertThat(pagesRead).isGreaterThan(10);
}

@Test
public void testColumnReordering()
throws IOException
{
List<String> columnNames = ImmutableList.of("columnA", "columnB", "columnC", "columnD");
List<Type> types = ImmutableList.of(BIGINT, TINYINT, INTEGER, DecimalType.createDecimalType(12));

// Write a file with many row groups
ParquetDataSource dataSource = new TestingParquetDataSource(
writeParquetFile(
ParquetWriterOptions.builder()
.setMaxBlockSize(DataSize.ofBytes(20 * 1024))
.build(),
types,
columnNames,
generateInputPages(types, 100, 100)),
new ParquetReaderOptions());

ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
assertThat(parquetMetadata.getBlocks().size()).isGreaterThanOrEqualTo(10);
for (BlockMetaData blockMetaData : parquetMetadata.getBlocks()) {
// Sort columns by size in file
List<ColumnChunkMetaData> columns = blockMetaData.getColumns().stream()
.sorted(Comparator.comparingLong(ColumnChunkMetaData::getTotalUncompressedSize))
.collect(toImmutableList());
// Verify that the columns are stored in the same order
List<Long> offsets = columns.stream()
.map(ColumnChunkMetaData::getFirstDataPageOffset)
.collect(toImmutableList());
assertThat(offsets).isSorted();
}
}
}