diff --git a/core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java b/core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java new file mode 100644 index 000000000000..6c1e0eb8b250 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/InputFilesDecryptor.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +public class InputFilesDecryptor { + + private final Map decryptedInputFiles; + + public InputFilesDecryptor(CombinedScanTask combinedTask, FileIO io, EncryptionManager encryption) { + Map keyMetadata = Maps.newHashMap(); + combinedTask.files().stream() + .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) + .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata())); + Stream encrypted = keyMetadata.entrySet().stream() + .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue())); + + // decrypt with the batch call to avoid multiple RPCs to a key server, if possible + Iterable decryptedFiles = encryption.decrypt(encrypted::iterator); + + Map files = Maps.newHashMapWithExpectedSize(keyMetadata.size()); + decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted)); + this.decryptedInputFiles = Collections.unmodifiableMap(files); + } + + public InputFile getInputFile(FileScanTask task) { + Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); + return decryptedInputFiles.get(task.file().path().toString()); + } + + public InputFile getInputFile(String location) { + return decryptedInputFiles.get(location); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index f74a8968fab8..d470b0752304 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -21,64 +21,38 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.ByteBuffer; import java.util.Iterator; -import java.util.Map; -import java.util.stream.Stream; +import org.apache.flink.annotation.Internal; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.encryption.EncryptedFiles; -import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; /** - * Base class of Flink iterators. + * Flink data iterator that reads {@link CombinedScanTask} into a {@link CloseableIterator} * - * @param is the Java class returned by this iterator whose objects contain one or more rows. + * @param is the output data type returned by this iterator. */ -abstract class DataIterator implements CloseableIterator { +@Internal +public class DataIterator implements CloseableIterator { - private Iterator tasks; - private final Map inputFiles; + private final FileScanTaskReader fileScanTaskReader; + private final InputFilesDecryptor inputFilesDecryptor; + private Iterator tasks; private CloseableIterator currentIterator; - DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) { - this.tasks = task.files().iterator(); - - Map keyMetadata = Maps.newHashMap(); - task.files().stream() - .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream())) - .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata())); - Stream encrypted = keyMetadata.entrySet().stream() - .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue())); - - // decrypt with the batch call to avoid multiple RPCs to a key server, if possible - Iterable decryptedFiles = encryption.decrypt(encrypted::iterator); - - Map files = Maps.newHashMapWithExpectedSize(task.files().size()); - decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted)); - this.inputFiles = ImmutableMap.copyOf(files); + public DataIterator(FileScanTaskReader fileScanTaskReader, CombinedScanTask task, + FileIO io, EncryptionManager encryption) { + this.fileScanTaskReader = fileScanTaskReader; + this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption); + this.tasks = task.files().iterator(); this.currentIterator = CloseableIterator.empty(); } - InputFile getInputFile(FileScanTask task) { - Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); - - return inputFiles.get(task.file().path().toString()); - } - - InputFile getInputFile(String location) { - return inputFiles.get(location); - } - @Override public boolean hasNext() { updateCurrentIterator(); @@ -106,7 +80,9 @@ private void updateCurrentIterator() { } } - abstract CloseableIterator openTaskIterator(FileScanTask scanTask) throws IOException; + private CloseableIterator openTaskIterator(FileScanTask scanTask) { + return fileScanTaskReader.open(scanTask, inputFilesDecryptor); + } @Override public void close() throws IOException { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FileScanTaskReader.java b/flink/src/main/java/org/apache/iceberg/flink/source/FileScanTaskReader.java new file mode 100644 index 000000000000..04273016ee2d --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FileScanTaskReader.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.Serializable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.encryption.InputFilesDecryptor; +import org.apache.iceberg.io.CloseableIterator; + +/** + * Read a {@link FileScanTask} into a {@link CloseableIterator} + * + * @param is the output data type returned by this iterator. + */ +@Internal +public interface FileScanTaskReader extends Serializable { + CloseableIterator open(FileScanTask fileScanTask, InputFilesDecryptor inputFilesDecryptor); +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 1bad1c25952e..8b757ac31606 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -42,21 +42,22 @@ public class FlinkInputFormat extends RichInputFormat private static final long serialVersionUID = 1L; private final TableLoader tableLoader; - private final Schema tableSchema; private final FileIO io; private final EncryptionManager encryption; private final ScanContext context; + private final RowDataFileScanTaskReader rowDataReader; - private transient RowDataIterator iterator; + private transient DataIterator iterator; private transient long currentReadCount = 0L; FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption, ScanContext context) { this.tableLoader = tableLoader; - this.tableSchema = tableSchema; this.io = io; this.encryption = encryption; this.context = context; + this.rowDataReader = new RowDataFileScanTaskReader(tableSchema, + context.project(), context.nameMapping(), context.caseSensitive()); } @VisibleForTesting @@ -91,9 +92,7 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { - this.iterator = new RowDataIterator( - split.getTask(), io, encryption, tableSchema, context.project(), context.nameMapping(), - context.caseSensitive()); + this.iterator = new DataIterator<>(rowDataReader, split.getTask(), io, encryption); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java similarity index 69% rename from flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java rename to flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5a568144d1f7..fbdb7bf3cc02 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -20,15 +20,15 @@ package org.apache.iceberg.flink.source; import java.util.Map; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.data.RowData; -import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; -import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.InputFilesDecryptor; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataWrapper; import org.apache.iceberg.flink.data.FlinkAvroReader; @@ -37,7 +37,6 @@ import org.apache.iceberg.flink.data.RowDataUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; -import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.orc.ORC; @@ -47,16 +46,16 @@ import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; -class RowDataIterator extends DataIterator { +@Internal +public class RowDataFileScanTaskReader implements FileScanTaskReader { private final Schema tableSchema; private final Schema projectedSchema; private final String nameMapping; private final boolean caseSensitive; - RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema, - Schema projectedSchema, String nameMapping, boolean caseSensitive) { - super(task, io, encryption); + public RowDataFileScanTaskReader(Schema tableSchema, Schema projectedSchema, + String nameMapping, boolean caseSensitive) { this.tableSchema = tableSchema; this.projectedSchema = projectedSchema; this.nameMapping = nameMapping; @@ -64,34 +63,35 @@ class RowDataIterator extends DataIterator { } @Override - protected CloseableIterator openTaskIterator(FileScanTask task) { + public CloseableIterator open(FileScanTask task, InputFilesDecryptor inputFilesDecryptor) { Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); Map idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant); - FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema); - CloseableIterable iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant)); - - return iterable.iterator(); + FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema, inputFilesDecryptor); + return deletes + .filter(newIterable(task, deletes.requiredSchema(), idToConstant, inputFilesDecryptor)) + .iterator(); } - private CloseableIterable newIterable(FileScanTask task, Schema schema, Map idToConstant) { + private CloseableIterable newIterable( + FileScanTask task, Schema schema, Map idToConstant, InputFilesDecryptor inputFilesDecryptor) { CloseableIterable iter; if (task.isDataTask()) { throw new UnsupportedOperationException("Cannot read data task."); } else { switch (task.file().format()) { case PARQUET: - iter = newParquetIterable(task, schema, idToConstant); + iter = newParquetIterable(task, schema, idToConstant, inputFilesDecryptor); break; case AVRO: - iter = newAvroIterable(task, schema, idToConstant); + iter = newAvroIterable(task, schema, idToConstant, inputFilesDecryptor); break; case ORC: - iter = newOrcIterable(task, schema, idToConstant); + iter = newOrcIterable(task, schema, idToConstant, inputFilesDecryptor); break; default: @@ -103,8 +103,9 @@ private CloseableIterable newIterable(FileScanTask task, Schema schema, return iter; } - private CloseableIterable newAvroIterable(FileScanTask task, Schema schema, Map idToConstant) { - Avro.ReadBuilder builder = Avro.read(getInputFile(task)) + private CloseableIterable newAvroIterable( + FileScanTask task, Schema schema, Map idToConstant, InputFilesDecryptor inputFilesDecryptor) { + Avro.ReadBuilder builder = Avro.read(inputFilesDecryptor.getInputFile(task)) .reuseContainers() .project(schema) .split(task.start(), task.length()) @@ -117,9 +118,9 @@ private CloseableIterable newAvroIterable(FileScanTask task, Schema sch return builder.build(); } - private CloseableIterable newParquetIterable(FileScanTask task, Schema schema, - Map idToConstant) { - Parquet.ReadBuilder builder = Parquet.read(getInputFile(task)) + private CloseableIterable newParquetIterable( + FileScanTask task, Schema schema, Map idToConstant, InputFilesDecryptor inputFilesDecryptor) { + Parquet.ReadBuilder builder = Parquet.read(inputFilesDecryptor.getInputFile(task)) .reuseContainers() .split(task.start(), task.length()) .project(schema) @@ -135,11 +136,12 @@ private CloseableIterable newParquetIterable(FileScanTask task, Schema return builder.build(); } - private CloseableIterable newOrcIterable(FileScanTask task, Schema schema, Map idToConstant) { + private CloseableIterable newOrcIterable( + FileScanTask task, Schema schema, Map idToConstant, InputFilesDecryptor inputFilesDecryptor) { Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(schema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - ORC.ReadBuilder builder = ORC.read(getInputFile(task)) + ORC.ReadBuilder builder = ORC.read(inputFilesDecryptor.getInputFile(task)) .project(readSchemaWithoutConstantAndMetadataFields) .split(task.start(), task.length()) .createReaderFunc(readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant)) @@ -153,12 +155,15 @@ private CloseableIterable newOrcIterable(FileScanTask task, Schema sche return builder.build(); } - private class FlinkDeleteFilter extends DeleteFilter { + private static class FlinkDeleteFilter extends DeleteFilter { private final RowDataWrapper asStructLike; + private final InputFilesDecryptor inputFilesDecryptor; - FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema, + InputFilesDecryptor inputFilesDecryptor) { super(task, tableSchema, requestedSchema); this.asStructLike = new RowDataWrapper(FlinkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct()); + this.inputFilesDecryptor = inputFilesDecryptor; } @Override @@ -168,7 +173,7 @@ protected StructLike asStructLike(RowData row) { @Override protected InputFile getInputFile(String location) { - return RowDataIterator.this.getInputFile(location); + return inputFilesDecryptor.getInputFile(location); } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java index a6cd374c3044..752035e4ea3b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java @@ -99,6 +99,7 @@ public static class RewriteMap extends RichMapFunction taskWriterFactory; + private final RowDataFileScanTaskReader rowDataReader; public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean caseSensitive, EncryptionManager encryptionManager, TaskWriterFactory taskWriterFactory) { @@ -108,6 +109,7 @@ public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean caseSens this.caseSensitive = caseSensitive; this.encryptionManager = encryptionManager; this.taskWriterFactory = taskWriterFactory; + this.rowDataReader = new RowDataFileScanTaskReader(schema, schema, nameMapping, caseSensitive); } @Override @@ -122,8 +124,8 @@ public void open(Configuration parameters) { public List map(CombinedScanTask task) throws Exception { // Initialize the task writer. this.writer = taskWriterFactory.create(); - try (RowDataIterator iterator = - new RowDataIterator(task, io, encryptionManager, schema, schema, nameMapping, caseSensitive)) { + try (DataIterator iterator = + new DataIterator<>(rowDataReader, task, io, encryptionManager)) { while (iterator.hasNext()) { RowData rowData = iterator.next(); writer.write(rowData);