-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: refactor DataIterator to use composition (instead of inheritance) #2905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a6a25fc
2b123ca
ced59a9
00f18d8
7db0ec9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.encryption; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import java.util.stream.Stream; | ||
| import org.apache.iceberg.CombinedScanTask; | ||
| import org.apache.iceberg.FileScanTask; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
|
|
||
| public class InputFilesDecryptor { | ||
|
|
||
| private final Map<String, InputFile> decryptedInputFiles; | ||
|
|
||
| public InputFilesDecryptor(CombinedScanTask combinedTask, FileIO io, EncryptionManager encryption) { | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Map<String, ByteBuffer> keyMetadata = Maps.newHashMap(); | ||
| combinedTask.files().stream() | ||
| .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) | ||
| .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata())); | ||
| Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream() | ||
| .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue())); | ||
|
|
||
| // decrypt with the batch call to avoid multiple RPCs to a key server, if possible | ||
| Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator); | ||
|
|
||
| Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(keyMetadata.size()); | ||
| decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted)); | ||
| this.decryptedInputFiles = Collections.unmodifiableMap(files); | ||
| } | ||
|
|
||
| public InputFile getInputFile(FileScanTask task) { | ||
| Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return decryptedInputFiles.get(task.file().path().toString()); | ||
| } | ||
|
|
||
| public InputFile getInputFile(String location) { | ||
| return decryptedInputFiles.get(location); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.flink.source; | ||
|
|
||
| import java.io.Serializable; | ||
| import org.apache.flink.annotation.Internal; | ||
| import org.apache.iceberg.FileScanTask; | ||
| import org.apache.iceberg.encryption.InputFilesDecryptor; | ||
| import org.apache.iceberg.io.CloseableIterator; | ||
|
|
||
| /** | ||
| * Read a {@link FileScanTask} into a {@link CloseableIterator} | ||
| * | ||
| * @param <T> is the output data type returned by this iterator. | ||
| */ | ||
| @Internal | ||
| public interface FileScanTaskReader<T> extends Serializable { | ||
| CloseableIterator<T> open(FileScanTask fileScanTask, InputFilesDecryptor inputFilesDecryptor); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,15 +20,15 @@ | |
| package org.apache.iceberg.flink.source; | ||
|
|
||
| import java.util.Map; | ||
| import org.apache.flink.annotation.Internal; | ||
| import org.apache.flink.table.data.RowData; | ||
| import org.apache.iceberg.CombinedScanTask; | ||
| import org.apache.iceberg.FileScanTask; | ||
| import org.apache.iceberg.MetadataColumns; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.StructLike; | ||
| import org.apache.iceberg.avro.Avro; | ||
| import org.apache.iceberg.data.DeleteFilter; | ||
| import org.apache.iceberg.encryption.EncryptionManager; | ||
| import org.apache.iceberg.encryption.InputFilesDecryptor; | ||
| import org.apache.iceberg.flink.FlinkSchemaUtil; | ||
| import org.apache.iceberg.flink.RowDataWrapper; | ||
| import org.apache.iceberg.flink.data.FlinkAvroReader; | ||
|
|
@@ -37,7 +37,6 @@ | |
| import org.apache.iceberg.flink.data.RowDataUtil; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.CloseableIterator; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.io.InputFile; | ||
| import org.apache.iceberg.mapping.NameMappingParser; | ||
| import org.apache.iceberg.orc.ORC; | ||
|
|
@@ -47,51 +46,52 @@ | |
| import org.apache.iceberg.types.TypeUtil; | ||
| import org.apache.iceberg.util.PartitionUtil; | ||
|
|
||
| class RowDataIterator extends DataIterator<RowData> { | ||
| @Internal | ||
| public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> { | ||
|
|
||
| private final Schema tableSchema; | ||
| private final Schema projectedSchema; | ||
| private final String nameMapping; | ||
| private final boolean caseSensitive; | ||
|
|
||
| RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema, | ||
| Schema projectedSchema, String nameMapping, boolean caseSensitive) { | ||
| super(task, io, encryption); | ||
| public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema, | ||
| String nameMapping, boolean caseSensitive) { | ||
| this.tableSchema = tableSchema; | ||
| this.projectedSchema = projectedSchema; | ||
| this.nameMapping = nameMapping; | ||
| this.caseSensitive = caseSensitive; | ||
| } | ||
|
|
||
| @Override | ||
| protected CloseableIterator<RowData> openTaskIterator(FileScanTask task) { | ||
| public CloseableIterator<RowData> open(FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we construct the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also tied to the other comment where we need What you described above is similar to the current status, where we have If we switch to composition model, we need to pass in |
||
| Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); | ||
|
|
||
| Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : | ||
| PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); | ||
|
|
||
| FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema); | ||
| CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant)); | ||
|
|
||
| return iterable.iterator(); | ||
| FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); | ||
| return deletes | ||
| .filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)) | ||
| .iterator(); | ||
| } | ||
|
|
||
| private CloseableIterable<RowData> newIterable(FileScanTask task, Schema schema, Map<Integer, ?> idToConstant) { | ||
| private CloseableIterable<RowData> newIterable( | ||
| FileScanTask task, Schema schema, Map<Integer, ?> idToConstant, InputFilesDecryptor inputFilesDecryptor) { | ||
openinx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| CloseableIterable<RowData> iter; | ||
| if (task.isDataTask()) { | ||
| throw new UnsupportedOperationException("Cannot read data task."); | ||
| } else { | ||
| switch (task.file().format()) { | ||
| case PARQUET: | ||
| iter = newParquetIterable(task, schema, idToConstant); | ||
| iter = newParquetIterable(task, schema, idToConstant, inputFilesDecryptor); | ||
| break; | ||
|
|
||
| case AVRO: | ||
| iter = newAvroIterable(task, schema, idToConstant); | ||
| iter = newAvroIterable(task, schema, idToConstant, inputFilesDecryptor); | ||
| break; | ||
|
|
||
| case ORC: | ||
| iter = newOrcIterable(task, schema, idToConstant); | ||
| iter = newOrcIterable(task, schema, idToConstant, inputFilesDecryptor); | ||
| break; | ||
|
|
||
| default: | ||
|
|
@@ -103,8 +103,9 @@ private CloseableIterable<RowData> newIterable(FileScanTask task, Schema schema, | |
| return iter; | ||
| } | ||
|
|
||
| private CloseableIterable<RowData> newAvroIterable(FileScanTask task, Schema schema, Map<Integer, ?> idToConstant) { | ||
| Avro.ReadBuilder builder = Avro.read(getInputFile(task)) | ||
| private CloseableIterable<RowData> newAvroIterable( | ||
| FileScanTask task, Schema schema, Map<Integer, ?> idToConstant, InputFilesDecryptor inputFilesDecryptor) { | ||
| Avro.ReadBuilder builder = Avro.read(inputFilesDecryptor.getInputFile(task)) | ||
| .reuseContainers() | ||
| .project(schema) | ||
| .split(task.start(), task.length()) | ||
|
|
@@ -117,9 +118,9 @@ private CloseableIterable<RowData> newAvroIterable(FileScanTask task, Schema sch | |
| return builder.build(); | ||
| } | ||
|
|
||
| private CloseableIterable<RowData> newParquetIterable(FileScanTask task, Schema schema, | ||
| Map<Integer, ?> idToConstant) { | ||
| Parquet.ReadBuilder builder = Parquet.read(getInputFile(task)) | ||
| private CloseableIterable<RowData> newParquetIterable( | ||
| FileScanTask task, Schema schema, Map<Integer, ?> idToConstant, InputFilesDecryptor inputFilesDecryptor) { | ||
| Parquet.ReadBuilder builder = Parquet.read(inputFilesDecryptor.getInputFile(task)) | ||
| .reuseContainers() | ||
| .split(task.start(), task.length()) | ||
| .project(schema) | ||
|
|
@@ -135,11 +136,12 @@ private CloseableIterable<RowData> newParquetIterable(FileScanTask task, Schema | |
| return builder.build(); | ||
| } | ||
|
|
||
| private CloseableIterable<RowData> newOrcIterable(FileScanTask task, Schema schema, Map<Integer, ?> idToConstant) { | ||
| private CloseableIterable<RowData> newOrcIterable( | ||
| FileScanTask task, Schema schema, Map<Integer, ?> idToConstant, InputFilesDecryptor inputFilesDecryptor) { | ||
| Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(schema, | ||
| Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); | ||
|
|
||
| ORC.ReadBuilder builder = ORC.read(getInputFile(task)) | ||
| ORC.ReadBuilder builder = ORC.read(inputFilesDecryptor.getInputFile(task)) | ||
| .project(readSchemaWithoutConstantAndMetadataFields) | ||
| .split(task.start(), task.length()) | ||
| .createReaderFunc(readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant)) | ||
|
|
@@ -153,12 +155,15 @@ private CloseableIterable<RowData> newOrcIterable(FileScanTask task, Schema sche | |
| return builder.build(); | ||
| } | ||
|
|
||
| private class FlinkDeleteFilter extends DeleteFilter<RowData> { | ||
| private static class FlinkDeleteFilter extends DeleteFilter<RowData> { | ||
| private final RowDataWrapper asStructLike; | ||
| private final InputFilesDecryptor inputFilesDecryptor; | ||
|
|
||
| FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { | ||
| FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema, | ||
| InputFilesDecryptor inputFilesDecryptor) { | ||
| super(task, tableSchema, requestedSchema); | ||
| this.asStructLike = new RowDataWrapper(FlinkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct()); | ||
| this.inputFilesDecryptor = inputFilesDecryptor; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -168,7 +173,7 @@ protected StructLike asStructLike(RowData row) { | |
|
|
||
| @Override | ||
| protected InputFile getInputFile(String location) { | ||
| return RowDataIterator.this.getInputFile(location); | ||
| return inputFilesDecryptor.getInputFile(location); | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.