-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Filter Iceberg splits based on $path column predicates #13012
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,6 +115,8 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.net.URI; | ||
| import java.net.URLEncoder; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
|
|
@@ -145,6 +147,7 @@ | |
| import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CANNOT_OPEN_SPLIT; | ||
| import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CURSOR_ERROR; | ||
| import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; | ||
| import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; | ||
| import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_MISSING_DATA; | ||
| import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_MODIFIED_TIME; | ||
| import static io.trino.plugin.iceberg.IcebergMetadataColumn.FILE_PATH; | ||
|
|
@@ -179,6 +182,7 @@ | |
| import static io.trino.spi.type.VarbinaryType.VARBINARY; | ||
| import static io.trino.spi.type.VarcharType.VARCHAR; | ||
| import static java.lang.String.format; | ||
| import static java.nio.charset.StandardCharsets.UTF_8; | ||
| import static java.util.Locale.ENGLISH; | ||
| import static java.util.Objects.requireNonNull; | ||
| import static java.util.function.Predicate.not; | ||
|
|
@@ -319,7 +323,7 @@ public ConnectorPageSource createPageSource( | |
| ReaderPageSource dataPageSource = createDataPageSource( | ||
| session, | ||
| hdfsContext, | ||
| new Path(split.getPath()), | ||
| split.getPath(), | ||
| split.getStart(), | ||
| split.getLength(), | ||
| fileSize, | ||
|
|
@@ -448,7 +452,7 @@ private ConnectorPageSource openDeletes( | |
| return createDataPageSource( | ||
| session, | ||
| new HdfsContext(session), | ||
| new Path(delete.path().toString()), | ||
| delete.path().toString(), | ||
| 0, | ||
| delete.fileSizeInBytes(), | ||
| delete.fileSizeInBytes(), | ||
|
|
@@ -465,7 +469,7 @@ private ConnectorPageSource openDeletes( | |
| public ReaderPageSource createDataPageSource( | ||
| ConnectorSession session, | ||
| HdfsContext hdfsContext, | ||
| Path path, | ||
| String path, | ||
| long start, | ||
| long length, | ||
| long fileSize, | ||
|
|
@@ -477,13 +481,15 @@ public ReaderPageSource createDataPageSource( | |
| Optional<NameMapping> nameMapping, | ||
| Map<Integer, Optional<String>> partitionKeys) | ||
| { | ||
| Path hadoopPath = new Path(hadoopPath(path)); | ||
| switch (fileFormat) { | ||
| case ORC: | ||
| return createOrcPageSource( | ||
| hdfsEnvironment, | ||
| session.getIdentity(), | ||
| hdfsEnvironment.getConfiguration(hdfsContext, path), | ||
| hdfsEnvironment.getConfiguration(hdfsContext, hadoopPath), | ||
| path, | ||
| hadoopPath, | ||
| start, | ||
| length, | ||
| fileSize, | ||
|
|
@@ -507,8 +513,9 @@ public ReaderPageSource createDataPageSource( | |
| return createParquetPageSource( | ||
| hdfsEnvironment, | ||
| session.getIdentity(), | ||
| hdfsEnvironment.getConfiguration(hdfsContext, path), | ||
| hdfsEnvironment.getConfiguration(hdfsContext, hadoopPath), | ||
| path, | ||
| hadoopPath, | ||
| start, | ||
| length, | ||
| fileSize, | ||
|
|
@@ -524,6 +531,7 @@ public ReaderPageSource createDataPageSource( | |
| return createAvroPageSource( | ||
| fileIoProvider.createFileIo(hdfsContext, session.getQueryId()), | ||
| path, | ||
| hadoopPath, | ||
| start, | ||
| length, | ||
| fileModifiedTime, | ||
|
|
@@ -539,7 +547,8 @@ private static ReaderPageSource createOrcPageSource( | |
| HdfsEnvironment hdfsEnvironment, | ||
| ConnectorIdentity identity, | ||
| Configuration configuration, | ||
| Path path, | ||
| String path, | ||
| Path hadoopPath, | ||
| long start, | ||
| long length, | ||
| long fileSize, | ||
|
|
@@ -554,10 +563,10 @@ private static ReaderPageSource createOrcPageSource( | |
| { | ||
| OrcDataSource orcDataSource = null; | ||
| try { | ||
| FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); | ||
| FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path)); | ||
| FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, hadoopPath, configuration); | ||
| FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(hadoopPath)); | ||
| orcDataSource = new HdfsOrcDataSource( | ||
| new OrcDataSourceId(path.toString()), | ||
| new OrcDataSourceId(hadoopPath.toString()), | ||
| fileSize, | ||
| options, | ||
| inputStream, | ||
|
|
@@ -608,7 +617,7 @@ else if (partitionKeys.containsKey(column.getId())) { | |
| deserializePartitionValue(trinoType, partitionKeys.get(column.getId()).orElse(null), column.getName())))); | ||
| } | ||
| else if (column.isPathColumn()) { | ||
| columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString())))); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q are we changing the behavior here? i.e. before this PR, effectively
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's changing. I will mention in a release note. Relates to #13012 (comment) |
||
| columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path)))); | ||
| } | ||
| else if (column.isFileModifiedTimeColumn()) { | ||
| columnAdaptations.add(ColumnAdaptation.constantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)))); | ||
|
|
@@ -690,7 +699,7 @@ else if (orcColumn != null) { | |
| if (e instanceof TrinoException) { | ||
| throw (TrinoException) e; | ||
| } | ||
| String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage()); | ||
| String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", hadoopPath, start, length, e.getMessage()); | ||
| if (e instanceof BlockMissingException) { | ||
| throw new TrinoException(ICEBERG_MISSING_DATA, message, e); | ||
| } | ||
|
|
@@ -886,7 +895,8 @@ private static ReaderPageSource createParquetPageSource( | |
| HdfsEnvironment hdfsEnvironment, | ||
| ConnectorIdentity identity, | ||
| Configuration configuration, | ||
| Path path, | ||
| String path, | ||
| Path hadoopPath, | ||
| long start, | ||
| long length, | ||
| long fileSize, | ||
|
|
@@ -902,9 +912,9 @@ private static ReaderPageSource createParquetPageSource( | |
|
|
||
| ParquetDataSource dataSource = null; | ||
| try { | ||
| FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, path, configuration); | ||
| FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(path)); | ||
| dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), fileSize, inputStream, fileFormatDataSourceStats, options); | ||
| FileSystem fileSystem = hdfsEnvironment.getFileSystem(identity, hadoopPath, configuration); | ||
| FSDataInputStream inputStream = hdfsEnvironment.doAs(identity, () -> fileSystem.open(hadoopPath)); | ||
| dataSource = new HdfsParquetDataSource(new ParquetDataSourceId(hadoopPath.toString()), fileSize, inputStream, fileFormatDataSourceStats, options); | ||
| ParquetDataSource theDataSource = dataSource; // extra variable required for lambda below | ||
| ParquetMetadata parquetMetadata = hdfsEnvironment.doAs(identity, () -> MetadataReader.readFooter(theDataSource)); | ||
| FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); | ||
|
|
@@ -975,7 +985,7 @@ else if (partitionKeys.containsKey(column.getId())) { | |
| deserializePartitionValue(trinoType, partitionKeys.get(column.getId()).orElse(null), column.getName()))); | ||
| } | ||
| else if (column.isPathColumn()) { | ||
| constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString()))); | ||
| constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path))); | ||
| } | ||
| else if (column.isFileModifiedTimeColumn()) { | ||
| constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))); | ||
|
|
@@ -1033,7 +1043,7 @@ else if (column.isRowPositionColumn()) { | |
| if (e instanceof TrinoException) { | ||
| throw (TrinoException) e; | ||
| } | ||
| String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage()); | ||
| String message = format("Error opening Iceberg split %s (offset=%s, length=%s): %s", hadoopPath, start, length, e.getMessage()); | ||
|
|
||
| if (e instanceof ParquetCorruptionException) { | ||
| throw new TrinoException(ICEBERG_BAD_DATA, message, e); | ||
|
|
@@ -1048,7 +1058,8 @@ else if (column.isRowPositionColumn()) { | |
|
|
||
| private ReaderPageSource createAvroPageSource( | ||
| FileIO fileIo, | ||
| Path path, | ||
| String path, | ||
| Path hadoopPath, | ||
| long start, | ||
| long length, | ||
| OptionalLong fileModifiedTime, | ||
|
|
@@ -1066,7 +1077,7 @@ private ReaderPageSource createAvroPageSource( | |
| .orElse(columns); | ||
|
|
||
| // The column orders in the generated schema might be different from the original order | ||
| try (DataFileStream<GenericRecord> avroFileReader = new DataFileStream<>(fileIo.newInputFile(path.toString()).newStream(), new GenericDatumReader<>())) { | ||
| try (DataFileStream<GenericRecord> avroFileReader = new DataFileStream<>(fileIo.newInputFile(hadoopPath.toString()).newStream(), new GenericDatumReader<>())) { | ||
| org.apache.avro.Schema avroSchema = avroFileReader.getSchema(); | ||
| List<org.apache.avro.Schema.Field> fileFields = avroSchema.getFields(); | ||
| if (nameMapping.isPresent() && fileFields.stream().noneMatch(IcebergPageSourceProvider::hasId)) { | ||
|
|
@@ -1086,7 +1097,7 @@ private ReaderPageSource createAvroPageSource( | |
| org.apache.avro.Schema.Field field = fileColumnsByIcebergId.get(column.getId()); | ||
|
|
||
| if (column.isPathColumn()) { | ||
| constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path.toString()))); | ||
| constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_PATH.getType(), utf8Slice(path))); | ||
| } | ||
| else if (column.isFileModifiedTimeColumn()) { | ||
| constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(FILE_MODIFIED_TIME.getType(), packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY))); | ||
|
|
@@ -1114,7 +1125,7 @@ else if (field == null) { | |
| return new ReaderPageSource( | ||
| constantPopulatingPageSourceBuilder.build(new IcebergAvroPageSource( | ||
| fileIo, | ||
| path.toString(), | ||
| hadoopPath.toString(), | ||
| start, | ||
| length, | ||
| fileSchema, | ||
|
|
@@ -1291,4 +1302,18 @@ private static TrinoException handleException(OrcDataSourceId dataSourceId, Exce | |
| } | ||
| return new TrinoException(ICEBERG_CURSOR_ERROR, format("Failed to read ORC file: %s", dataSourceId), exception); | ||
| } | ||
|
|
||
| private static String hadoopPath(String path) | ||
| { | ||
| // hack to preserve the original path for S3 if necessary | ||
| Path hadoopPath = new Path(path); | ||
| if ("s3".equals(hadoopPath.toUri().getScheme()) && !path.equals(hadoopPath.toString())) { | ||
| if (hadoopPath.toUri().getFragment() != null) { | ||
| throw new TrinoException(ICEBERG_INVALID_METADATA, "Unexpected URI fragment in path: " + path); | ||
| } | ||
| URI uri = URI.create(path); | ||
| return uri + "#" + URLEncoder.encode(uri.getPath(), UTF_8); | ||
| } | ||
| return path; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should "Return $path without URL encoding in Iceberg" commit have any test changes/additions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally it should, but let me handle in #13457
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
url encoding (or lack of) should be exercisable independently from double slashes.
eg path containing
%.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understood your suggestion correctly, but just adding
%to file or directory name wouldn't work because it doesn't pass in!path.equals(hadoopPath.toString())inhadoopPath().