-
Notifications
You must be signed in to change notification settings - Fork 3.4k
optimize parquet footer reader #24007
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
b5f0d0c
e888f39
a166f98
e69f6b8
933ccdd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| package io.trino.parquet.metadata; | ||
|
|
||
raunaqmorarka marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import io.airlift.log.Logger; | ||
| import io.trino.parquet.ParquetCorruptionException; | ||
| import io.trino.parquet.ParquetDataSourceId; | ||
|
|
@@ -35,7 +36,6 @@ | |
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
|
|
@@ -44,64 +44,61 @@ | |
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
||
| import static com.google.common.base.Preconditions.checkState; | ||
| import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; | ||
| import static io.trino.parquet.ParquetMetadataConverter.getEncoding; | ||
| import static io.trino.parquet.ParquetMetadataConverter.getLogicalTypeAnnotation; | ||
| import static io.trino.parquet.ParquetMetadataConverter.getPrimitive; | ||
| import static io.trino.parquet.ParquetMetadataConverter.toColumnIndexReference; | ||
| import static io.trino.parquet.ParquetMetadataConverter.toOffsetIndexReference; | ||
| import static io.trino.parquet.ParquetValidationUtils.validateParquet; | ||
| import static java.util.Objects.requireNonNull; | ||
| import static java.util.stream.Collectors.toMap; | ||
|
|
||
| public class ParquetMetadata | ||
| { | ||
| private static final Logger log = Logger.get(ParquetMetadata.class); | ||
|
|
||
| private final FileMetadata fileMetaData; | ||
| private final List<BlockMetadata> blocks; | ||
| private final FileMetaData fileMetaData; | ||
| private final ParquetDataSourceId dataSourceId; | ||
| private final FileMetadata parquetMetadata; | ||
|
|
||
| public ParquetMetadata(FileMetadata fileMetaData, List<BlockMetadata> blocks) | ||
| { | ||
| this.fileMetaData = fileMetaData; | ||
| this.blocks = blocks; | ||
| } | ||
|
|
||
| public List<BlockMetadata> getBlocks() | ||
| public ParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId) | ||
| throws ParquetCorruptionException | ||
| { | ||
| return blocks; | ||
| this.fileMetaData = requireNonNull(fileMetaData, "fileMetaData is null"); | ||
| this.dataSourceId = requireNonNull(dataSourceId, "dataSourceId is null"); | ||
| this.parquetMetadata = new FileMetadata(readMessageType(), keyValueMetaData(fileMetaData), fileMetaData.getCreated_by()); | ||
| } | ||
|
|
||
| public FileMetadata getFileMetaData() | ||
| { | ||
| return fileMetaData; | ||
| return parquetMetadata; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() | ||
| { | ||
| return "ParquetMetaData{" + fileMetaData + ", blocks: " + blocks + "}"; | ||
| return "ParquetMetaData{" + fileMetaData + "}"; | ||
| } | ||
|
|
||
| public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, ParquetDataSourceId dataSourceId) | ||
| throws ParquetCorruptionException | ||
| public List<BlockMetadata> getBlocks() | ||
| { | ||
| List<SchemaElement> schema = fileMetaData.getSchema(); | ||
| validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); | ||
|
|
||
| MessageType messageType = readParquetSchema(schema); | ||
| List<BlockMetadata> blocks = new ArrayList<>(); | ||
| List<RowGroup> rowGroups = fileMetaData.getRow_groups(); | ||
| if (rowGroups != null) { | ||
| for (RowGroup rowGroup : rowGroups) { | ||
| List<ColumnChunk> columns = rowGroup.getColumns(); | ||
| validateParquet(!columns.isEmpty(), dataSourceId, "No columns in row group: %s", rowGroup); | ||
| checkState(!columns.isEmpty(), "No columns in row group: %s [%s]", rowGroup, dataSourceId); | ||
|
||
| String filePath = columns.get(0).getFile_path(); | ||
| ImmutableList.Builder<ColumnChunkMetadata> columnMetadataBuilder = ImmutableList.builderWithExpectedSize(columns.size()); | ||
| for (ColumnChunk columnChunk : columns) { | ||
| validateParquet( | ||
| checkState( | ||
| (filePath == null && columnChunk.getFile_path() == null) | ||
| || (filePath != null && filePath.equals(columnChunk.getFile_path())), | ||
| dataSourceId, | ||
| "all column chunks of the same row group must be in the same file"); | ||
| "all column chunks of the same row group must be in the same file [%s]", dataSourceId); | ||
| ColumnMetaData metaData = columnChunk.meta_data; | ||
| String[] path = metaData.path_in_schema.stream() | ||
| .map(value -> value.toLowerCase(Locale.ENGLISH)) | ||
|
|
@@ -129,18 +126,7 @@ public static ParquetMetadata createParquetMetadata(FileMetaData fileMetaData, P | |
| } | ||
| } | ||
|
|
||
| Map<String, String> keyValueMetaData = new HashMap<>(); | ||
| List<KeyValue> keyValueList = fileMetaData.getKey_value_metadata(); | ||
| if (keyValueList != null) { | ||
| for (KeyValue keyValue : keyValueList) { | ||
| keyValueMetaData.put(keyValue.key, keyValue.value); | ||
| } | ||
| } | ||
| FileMetadata parquetFileMetadata = new FileMetadata( | ||
| messageType, | ||
| keyValueMetaData, | ||
| fileMetaData.getCreated_by()); | ||
| return new ParquetMetadata(parquetFileMetadata, blocks); | ||
| return blocks; | ||
| } | ||
|
|
||
| private static MessageType readParquetSchema(List<SchemaElement> schema) | ||
|
|
@@ -222,4 +208,25 @@ private static Set<Encoding> readEncodings(List<org.apache.parquet.format.Encodi | |
| } | ||
| return Collections.unmodifiableSet(columnEncodings); | ||
| } | ||
|
|
||
| private MessageType readMessageType() | ||
| throws ParquetCorruptionException | ||
| { | ||
| List<SchemaElement> schema = fileMetaData.getSchema(); | ||
| validateParquet(!schema.isEmpty(), dataSourceId, "Schema is empty"); | ||
|
|
||
| Iterator<SchemaElement> schemaIterator = schema.iterator(); | ||
| SchemaElement rootSchema = schemaIterator.next(); | ||
| Types.MessageTypeBuilder builder = Types.buildMessage(); | ||
| readTypeSchema(builder, schemaIterator, rootSchema.getNum_children()); | ||
| return builder.named(rootSchema.name); | ||
| } | ||
|
|
||
| private static Map<String, String> keyValueMetaData(FileMetaData fileMetaData) | ||
| { | ||
| if (fileMetaData.getKey_value_metadata() == null) { | ||
| return ImmutableMap.of(); | ||
| } | ||
| return fileMetaData.getKey_value_metadata().stream().collect(toMap(KeyValue::getKey, KeyValue::getValue)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,12 +82,12 @@ public FileMetrics getFileMetrics() | |
| { | ||
| ParquetMetadata parquetMetadata; | ||
| try { | ||
| parquetMetadata = ParquetMetadata.createParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString())); | ||
| parquetMetadata = new ParquetMetadata(parquetFileWriter.getFileMetadata(), new ParquetDataSourceId(location.toString())); | ||
| return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata))); | ||
|
||
| } | ||
| catch (IOException e) { | ||
| throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Error creating metadata for Parquet file %s", location), e); | ||
| } | ||
| return new FileMetrics(footerMetrics(parquetMetadata, Stream.empty(), metricsConfig), Optional.of(getSplitOffsets(parquetMetadata))); | ||
| } | ||
|
|
||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.