diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 95c492385c80..f11a702aff9b 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -62,14 +62,16 @@ public interface ManifestFile { Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions", Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE), "Summary for each partition"); - // next ID to assign: 519 + Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(), + "Encryption key metadata blob"); + // next ID to assign: 520 Schema SCHEMA = new Schema( PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT, SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID, ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT, ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT, - PARTITION_SUMMARIES); + PARTITION_SUMMARIES, KEY_METADATA); static Schema schema() { return SCHEMA; @@ -179,6 +181,13 @@ default boolean hasDeletedFiles() { */ List partitions(); + /** + * Returns metadata about how this manifest file is encrypted, or null if the file is stored in plain text. + */ + default ByteBuffer keyMetadata() { + return null; + } + /** * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use * this method to make defensive copies. diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 197a730bdc05..14c889f96610 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -205,7 +205,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { .build()); LOG.info("Successfully dropped table {} from Glue", identifier); if (purge && lastMetadata != null) { - CatalogUtil.dropTableData(ops.io(), lastMetadata); + CatalogUtil.dropTableData(ops.io(), ops.encryption(), lastMetadata); LOG.info("Glue table {} data purged", identifier); } LOG.info("Dropped table: {}", identifier); diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java index 13963b8e0437..5738cb567b3c 100644 --- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java @@ -120,7 +120,8 @@ protected CloseableIterable planFiles( // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in // all cases. return CloseableIterable.transform(manifests, manifest -> - new DataFilesTable.ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals)); + new DataFilesTable.ManifestReadTask(ops.io(), ops.encryption(), manifest, fileSchema, + schemaString, specString, residuals)); } } diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index 66f8b4927135..da33cd23c9e3 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -105,7 +105,8 @@ protected CloseableIterable planFiles( ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask( - ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById())); + ops.io(), ops.encryption(), manifest, fileSchema, schemaString, specString, residuals, + ops.current().specsById())); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index d8e6e9338963..a961a320d19d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -170,7 +170,7 @@ protected void refreshFromMetadataLocation(String newLocation, Predicate newMetadata.set( - TableMetadataParser.read(io(), metadataLocation))); + TableMetadataParser.read(io(), encryption(), metadataLocation))); String newUUID = newMetadata.get().uuid(); if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) { diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 1b28f364cb35..b87a8c0dc303 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -30,10 +30,10 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -156,7 +156,7 @@ public RewriteManifests addManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); InputFile toCopy = ops.io().newInputFile(manifest.path()); - OutputFile newFile = newManifestOutput(); + EncryptedOutputFile newFile = newManifestOutput(); return ManifestFiles.copyRewriteManifest( current.formatVersion(), toCopy, specsById, newFile, snapshotId(), summaryBuilder); } @@ -236,8 +236,8 @@ private void performRewrite(List currentManifests) { keptManifests.add(manifest); } else { rewrittenManifests.add(manifest); - try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById()) - .select(Arrays.asList("*"))) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.encryption(), + ops.current().specsById()).select(Arrays.asList("*"))) { reader.liveEntries().forEach( entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId()) ); diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java index 95e6d621f238..ab8c843d107e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java +++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -36,6 +37,7 @@ class BaseSnapshot implements Snapshot { private static final long INITIAL_SEQUENCE_NUMBER = 0; private final FileIO io; + private final EncryptionManager encryption; private final long snapshotId; private final Long parentId; private final long sequenceNumber; @@ -55,14 +57,16 @@ class BaseSnapshot implements Snapshot { * For testing only. */ BaseSnapshot(FileIO io, + EncryptionManager encryption, long snapshotId, String... manifestFiles) { - this(io, snapshotId, null, System.currentTimeMillis(), null, null, + this(io, encryption, snapshotId, null, System.currentTimeMillis(), null, null, Lists.transform(Arrays.asList(manifestFiles), path -> new GenericManifestFile(io.newInputFile(path), 0))); } BaseSnapshot(FileIO io, + EncryptionManager encryption, long sequenceNumber, long snapshotId, Long parentId, @@ -71,6 +75,7 @@ class BaseSnapshot implements Snapshot { Map summary, String manifestList) { this.io = io; + this.encryption = encryption; this.sequenceNumber = sequenceNumber; this.snapshotId = snapshotId; this.parentId = parentId; @@ -81,13 +86,14 @@ class BaseSnapshot implements Snapshot { } BaseSnapshot(FileIO io, + EncryptionManager encryption, long snapshotId, Long parentId, long timestampMillis, String operation, Map summary, List dataManifests) { - this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null); + this(io, encryption, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null); this.allManifests = dataManifests; } @@ -187,7 +193,7 @@ private void cacheChanges() { // read only manifests that were created by this snapshot Iterable changedManifests = Iterables.filter(dataManifests(), manifest -> Objects.equal(manifest.snapshotId(), snapshotId)); - try (CloseableIterable> entries = new ManifestGroup(io, changedManifests) + try (CloseableIterable> entries = new ManifestGroup(io, encryption, changedManifests) .ignoreExisting() .entries()) { for (ManifestEntry entry : entries) { diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index cb6a0d8b092c..b9695c4a5f03 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; @@ -54,6 +56,14 @@ public class CatalogUtil { private CatalogUtil() { } + /** + * @deprecated please use {@link #dropTableData(FileIO, EncryptionManager, TableMetadata)} + */ + @Deprecated + public static void dropTableData(FileIO io, TableMetadata metadata) { + dropTableData(io, EncryptionManagers.plainText(), metadata); + } + /** * Drops all data and metadata files referenced by TableMetadata. *

@@ -63,7 +73,7 @@ private CatalogUtil() { * @param io a FileIO to use for deletes * @param metadata the last valid TableMetadata instance for a dropped table. */ - public static void dropTableData(FileIO io, TableMetadata metadata) { + public static void dropTableData(FileIO io, EncryptionManager encryption, TableMetadata metadata) { // Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete // as much of the delete work as possible and avoid orphaned data or manifest files. @@ -86,7 +96,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { if (gcEnabled) { // delete data files only if we are sure this won't corrupt other tables - deleteFiles(io, manifestsToDelete); + deleteFiles(io, encryption, manifestsToDelete); } Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path)) @@ -106,7 +116,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { } @SuppressWarnings("DangerousStringInternUsage") - private static void deleteFiles(FileIO io, Set allManifests) { + private static void deleteFiles(FileIO io, EncryptionManager encryption, Set allManifests) { // keep track of deleted files in a map that can be cleaned up when memory runs low Map deletedFiles = new MapMaker() .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE) @@ -118,7 +128,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { .executeWith(ThreadPools.getWorkerPool()) .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { - try (ManifestReader reader = ManifestFiles.open(manifest, io)) { + try (ManifestReader reader = ManifestFiles.open(manifest, io, encryption)) { for (ManifestEntry entry : reader.entries()) { // intern the file path because the weak key map uses identity (==) instead of equals String path = entry.file().path().toString().intern(); diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index ded26c07bdbf..0f1f40c4ba9b 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ResidualEvaluator; @@ -102,19 +103,21 @@ protected CloseableIterable planFiles( // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in // all cases. return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals)); + new ManifestReadTask(ops.io(), ops.encryption(), manifest, fileSchema, schemaString, specString, residuals)); } } static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final FileIO io; + private final EncryptionManager encryption; private final ManifestFile manifest; private final Schema schema; - ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString, + ManifestReadTask(FileIO io, EncryptionManager encryption, ManifestFile manifest, Schema schema, String schemaString, String specString, ResidualEvaluator residuals) { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); this.io = io; + this.encryption = encryption; this.manifest = manifest; this.schema = schema; } @@ -122,7 +125,7 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { @Override public CloseableIterable rows() { return CloseableIterable.transform( - ManifestFiles.read(manifest, io).project(schema), + ManifestFiles.read(manifest, io, encryption).project(schema), file -> (GenericDataFile) file); } diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index fdcd5ed35645..11d821128444 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -71,7 +71,8 @@ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema sche public CloseableIterable planFiles(TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { - ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests()) + ManifestGroup manifestGroup = new ManifestGroup(ops.io(), ops.encryption(), + snapshot.dataManifests(), snapshot.deleteManifests()) .caseSensitive(caseSensitive) .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterData(rowFilter) diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 7605e1c3548b..a4caa6df2f3d 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -305,12 +306,13 @@ private static Stream limitBySequenceNumber(long sequenceNumber, lon return Arrays.stream(files, start, files.length); } - static Builder builderFor(FileIO io, Iterable deleteManifests) { - return new Builder(io, Sets.newHashSet(deleteManifests)); + static Builder builderFor(FileIO io, EncryptionManager encryption, Iterable deleteManifests) { + return new Builder(io, encryption, Sets.newHashSet(deleteManifests)); } static class Builder { private final FileIO io; + private final EncryptionManager encryption; private final Set deleteManifests; private Map specsById = null; private Expression dataFilter = Expressions.alwaysTrue(); @@ -318,8 +320,9 @@ static class Builder { private boolean caseSensitive = true; private ExecutorService executorService = null; - Builder(FileIO io, Set deleteManifests) { + Builder(FileIO io, EncryptionManager encryption, Set deleteManifests) { this.io = io; + this.encryption = encryption; this.deleteManifests = Sets.newHashSet(deleteManifests); } @@ -445,7 +448,7 @@ private Iterable>> deleteManifestRea return Iterables.transform( matchingManifests, manifest -> - ManifestFiles.readDeleteManifest(manifest, io, specsById) + ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById) .filterRows(dataFilter) .filterPartitions(partitionFilter) .caseSensitive(caseSensitive) diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index eaa4cd5ed0e3..805d695ce067 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -23,11 +23,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -117,7 +117,7 @@ public FastAppend appendManifest(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); InputFile toCopy = ops.io().newInputFile(manifest.path()); - OutputFile newManifestPath = newManifestOutput(); + EncryptedOutputFile newManifestPath = newManifestOutput(); return ManifestFiles.copyAppendManifest( current.formatVersion(), toCopy, current.specsById(), newManifestPath, snapshotId(), summaryBuilder); } diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java index 9fa358bb87e8..dd9c3bfbfaa4 100644 --- a/core/src/main/java/org/apache/iceberg/FindFiles.java +++ b/core/src/main/java/org/apache/iceberg/FindFiles.java @@ -197,7 +197,8 @@ public CloseableIterable collect() { } // when snapshot is not null - CloseableIterable> entries = new ManifestGroup(ops.io(), snapshot.dataManifests()) + CloseableIterable> entries = new ManifestGroup( + ops.io(), ops.encryption(), snapshot.dataManifests()) .specsById(ops.current().specsById()) .filterData(rowFilter) .filterFiles(fileFilter) diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 716f41b3938f..1a95149a8af3 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.function.Function; @@ -33,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; public class GenericManifestFile implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable { @@ -58,6 +60,7 @@ public class GenericManifestFile private Long existingRowsCount = null; private Long deletedRowsCount = null; private PartitionFieldSummary[] partitions = null; + private byte[] keyMetadata = null; /** * Used by Avro reflection to instantiate this class when reading manifest files. @@ -101,13 +104,14 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) { this.deletedRowsCount = null; this.partitions = null; this.fromProjectionPos = null; + this.keyMetadata = null; } public GenericManifestFile(String path, long length, int specId, ManifestContent content, long sequenceNumber, long minSequenceNumber, Long snapshotId, int addedFilesCount, long addedRowsCount, int existingFilesCount, long existingRowsCount, int deletedFilesCount, long deletedRowsCount, - List partitions) { + List partitions, ByteBuffer keyMetadata) { this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; this.length = length; @@ -124,6 +128,7 @@ public GenericManifestFile(String path, long length, int specId, ManifestContent this.deletedRowsCount = deletedRowsCount; this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); this.fromProjectionPos = null; + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } /** @@ -154,6 +159,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { this.partitions = null; } this.fromProjectionPos = toCopy.fromProjectionPos; + this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); } /** @@ -245,6 +251,11 @@ public List partitions() { return partitions == null ? null : Arrays.asList(partitions); } + @Override + public ByteBuffer keyMetadata() { + return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata); + } + @Override public int size() { return ManifestFile.schema().columns().size(); @@ -291,6 +302,8 @@ public Object get(int i) { return deletedRowsCount; case 13: return partitions(); + case 14: + return keyMetadata(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -349,6 +362,9 @@ public void set(int i, T value) { this.partitions = value == null ? null : ((List) value).toArray(new PartitionFieldSummary[0]); return; + case 14: + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + return; default: // ignore the object, it must be from a newer version of the format } @@ -399,6 +415,7 @@ public String toString() { .add("deleted_data_files_count", deletedFilesCount) .add("deleted_rows_count", deletedRowsCount) .add("partitions", partitions) + .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") .toString(); } @@ -418,7 +435,7 @@ private CopyBuilder(ManifestFile toCopy) { toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(), toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), - copyList(toCopy.partitions(), PartitionFieldSummary::copy)); + copyList(toCopy.partitions(), PartitionFieldSummary::copy), toCopy.keyMetadata()); } } diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java index a02366b6b66e..24aabd4da2d3 100644 --- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java @@ -79,7 +79,7 @@ public CloseableIterable planFiles() { .filter(manifestFile -> snapshotIds.contains(manifestFile.snapshotId())) .toSet(); - ManifestGroup manifestGroup = new ManifestGroup(tableOps().io(), manifests) + ManifestGroup manifestGroup = new ManifestGroup(tableOps().io(), tableOps().encryption(), manifests) .caseSensitive(isCaseSensitive()) .select(colStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS) .filterData(filter()) diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index 5e006e0f031a..4fa7b84a5a0e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.ResidualEvaluator; @@ -103,7 +104,7 @@ protected CloseableIterable planFiles( ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals, + new ManifestReadTask(ops.io(), ops.encryption(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById())); } } @@ -111,14 +112,17 @@ protected CloseableIterable planFiles( static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final Schema fileSchema; private final FileIO io; + private final EncryptionManager encryption; private final ManifestFile manifest; private final Map specsById; - ManifestReadTask(FileIO io, ManifestFile manifest, Schema fileSchema, String schemaString, - String specString, ResidualEvaluator residuals, Map specsById) { + ManifestReadTask(FileIO io, EncryptionManager encryption, ManifestFile manifest, Schema fileSchema, + String schemaString, String specString, ResidualEvaluator residuals, + Map specsById) { super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals); this.fileSchema = fileSchema; this.io = io; + this.encryption = encryption; this.manifest = manifest; this.specsById = specsById; } @@ -126,10 +130,10 @@ static class ManifestReadTask extends BaseFileScanTask implements DataTask { @Override public CloseableIterable rows() { if (manifest.content() == ManifestContent.DATA) { - return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(), + return CloseableIterable.transform(ManifestFiles.read(manifest, io, encryption).project(fileSchema).entries(), file -> (GenericManifestEntry) file); } else { - return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById) + return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById) .project(fileSchema).entries(), file -> (GenericManifestEntry) file); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 9dbf50e762c7..7d227fff1f6c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -24,6 +24,12 @@ import org.apache.iceberg.ManifestReader.FileType; import org.apache.iceberg.avro.AvroEncoderUtil; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedInputFile; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; @@ -43,19 +49,36 @@ private ManifestFiles() { ManifestFile.PARTITION_SUMMARY_TYPE, GenericPartitionFieldSummary.class.getName() )); + /** + * @deprecated please use {@link #readPaths(ManifestFile, FileIO, EncryptionManager)} + */ + @Deprecated + public static CloseableIterable readPaths(ManifestFile manifest, FileIO io) { + return readPaths(manifest, io, EncryptionManagers.plainText()); + } + /** * Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}. * - * @param manifest a ManifestFile + * @param manifest an encrypted ManifestFile * @param io a FileIO + * @param encryption an EncryptionManager * @return a manifest reader */ - public static CloseableIterable readPaths(ManifestFile manifest, FileIO io) { + public static CloseableIterable readPaths(ManifestFile manifest, FileIO io, EncryptionManager encryption) { return CloseableIterable.transform( - read(manifest, io, null).select(ImmutableList.of("file_path")).liveEntries(), + read(manifest, io, encryption, null).select(ImmutableList.of("file_path")).liveEntries(), entry -> entry.file().path().toString()); } + /** + * @deprecated please use {@link #read(ManifestFile, FileIO, EncryptionManager)} + */ + @Deprecated + public static ManifestReader read(ManifestFile manifest, FileIO io) { + return read(manifest, io, EncryptionManagers.plainText()); + } + /** * Returns a new {@link ManifestReader} for a {@link ManifestFile}. *

@@ -67,26 +90,47 @@ public static CloseableIterable readPaths(ManifestFile manifest, FileIO * @param io a FileIO * @return a manifest reader */ - public static ManifestReader read(ManifestFile manifest, FileIO io) { - return read(manifest, io, null); + public static ManifestReader read(ManifestFile manifest, FileIO io, EncryptionManager encryption) { + return read(manifest, io, encryption, null); + } + + /** + * @deprecated please use {@link #read(ManifestFile, FileIO, EncryptionManager, Map)} + */ + @Deprecated + public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { + return read(manifest, io, EncryptionManagers.plainText(), specsById); } /** * Returns a new {@link ManifestReader} for a {@link ManifestFile}. * - * @param manifest a {@link ManifestFile} + * @param manifest an encrypted {@link ManifestFile} * @param io a {@link FileIO} + * @param encryption a {@link EncryptionManager} * @param specsById a Map from spec ID to partition spec * @return a {@link ManifestReader} */ - public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { + public static ManifestReader read(ManifestFile manifest, FileIO io, EncryptionManager encryption, + Map specsById) { Preconditions.checkArgument(manifest.content() == ManifestContent.DATA, "Cannot read a delete manifest with a ManifestReader: %s", manifest); - InputFile file = io.newInputFile(manifest.path()); + + EncryptedInputFile encryptedFile = EncryptedFiles.encryptedInput( + io.newInputFile(manifest.path()), manifest.keyMetadata()); + InputFile file = encryption.decrypt(encryptedFile); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES); } + /** + * @deprecated please use {@link #write(PartitionSpec, EncryptedOutputFile)} + */ + @Deprecated + public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { + return write(spec, EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY)); + } + /** * Create a new {@link ManifestWriter}. *

@@ -97,20 +141,30 @@ public static ManifestReader read(ManifestFile manifest, FileIO io, Ma * @param outputFile the destination file location * @return a manifest writer */ - public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { + public static ManifestWriter write(PartitionSpec spec, EncryptedOutputFile outputFile) { return write(1, spec, outputFile, null); } /** - * Create a new {@link ManifestWriter} for the given format version. + * @deprecated please use {@link #write(int, PartitionSpec, EncryptedOutputFile, Long)} + */ + @Deprecated + public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, + Long snapshotId) { + return write(formatVersion, spec, EncryptedFiles.encryptedOutput(outputFile, + EncryptionKeyMetadata.EMPTY), snapshotId); + } + + /** + * Create a new encrypted {@link ManifestWriter} for the given format version. * * @param formatVersion a target format version * @param spec a {@link PartitionSpec} - * @param outputFile an {@link OutputFile} where the manifest will be written + * @param outputFile an {@link EncryptedOutputFile} where the manifest will be written * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID * @return a manifest writer */ - public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, + public static ManifestWriter write(int formatVersion, PartitionSpec spec, EncryptedOutputFile outputFile, Long snapshotId) { switch (formatVersion) { case 1: @@ -121,34 +175,56 @@ public static ManifestWriter write(int formatVersion, PartitionSpec sp throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion); } + /** + * @deprecated please use {@link #readDeleteManifest(ManifestFile, FileIO, EncryptionManager, Map)} + */ + @Deprecated + public static ManifestReader readDeleteManifest(ManifestFile manifest, FileIO io, + Map specsById) { + return readDeleteManifest(manifest, io, EncryptionManagers.plainText(), specsById); + } + /** * Returns a new {@link ManifestReader} for a {@link ManifestFile}. * * @param manifest a {@link ManifestFile} * @param io a {@link FileIO} + * @param encryption a {@link EncryptionManager} * @param specsById a Map from spec ID to partition spec * @return a {@link ManifestReader} */ public static ManifestReader readDeleteManifest(ManifestFile manifest, FileIO io, + EncryptionManager encryption, Map specsById) { Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES, "Cannot read a data manifest with a DeleteManifestReader: %s", manifest); - InputFile file = io.newInputFile(manifest.path()); + InputFile file = encryption.decrypt(EncryptedFiles.encryptedInput( + io.newInputFile(manifest.path()), manifest.keyMetadata())); InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES); } + /** + * @deprecated please use {@link #writeDeleteManifest(int, PartitionSpec, EncryptedOutputFile, Long)} + */ + @Deprecated + public static ManifestWriter writeDeleteManifest(int formatVersion, PartitionSpec spec, + OutputFile outputFile, Long snapshotId) { + return writeDeleteManifest(formatVersion, spec, + EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY), snapshotId); + } + /** * Create a new {@link ManifestWriter} for the given format version. * * @param formatVersion a target format version * @param spec a {@link PartitionSpec} - * @param outputFile an {@link OutputFile} where the manifest will be written + * @param outputFile an {@link EncryptedOutputFile} where the manifest will be written * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID * @return a manifest writer */ public static ManifestWriter writeDeleteManifest(int formatVersion, PartitionSpec spec, - OutputFile outputFile, Long snapshotId) { + EncryptedOutputFile outputFile, Long snapshotId) { switch (formatVersion) { case 1: throw new IllegalArgumentException("Cannot write delete files in a v1 table"); @@ -181,24 +257,24 @@ public static ManifestFile decode(byte[] manifestData) throws IOException { return AvroEncoderUtil.decode(manifestData); } - static ManifestReader open(ManifestFile manifest, FileIO io) { - return open(manifest, io, null); + static ManifestReader open(ManifestFile manifest, FileIO io, EncryptionManager encryption) { + return open(manifest, io, encryption, null); } - static ManifestReader open(ManifestFile manifest, FileIO io, + static ManifestReader open(ManifestFile manifest, FileIO io, EncryptionManager encryption, Map specsById) { switch (manifest.content()) { case DATA: - return ManifestFiles.read(manifest, io, specsById); + return ManifestFiles.read(manifest, io, encryption, specsById); case DELETES: - return ManifestFiles.readDeleteManifest(manifest, io, specsById); + return ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById); } throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content()); } static ManifestFile copyAppendManifest(int formatVersion, InputFile toCopy, Map specsById, - OutputFile outputFile, long snapshotId, + EncryptedOutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder) { // use metadata that will add the current snapshot's ID for the rewrite InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId); @@ -213,7 +289,7 @@ static ManifestFile copyAppendManifest(int formatVersion, static ManifestFile copyRewriteManifest(int formatVersion, InputFile toCopy, Map specsById, - OutputFile outputFile, long snapshotId, + EncryptedOutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder) { // for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an exception if it is not InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty(); @@ -227,7 +303,7 @@ static ManifestFile copyRewriteManifest(int formatVersion, } private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader reader, - OutputFile outputFile, long snapshotId, + EncryptedOutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, ManifestEntry.Status allowedEntryStatus) { ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 1120bdfb36d3..eec01f192a40 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.function.BiFunction; import java.util.function.Predicate; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -45,6 +46,7 @@ class ManifestGroup { private static final Types.StructType EMPTY_STRUCT = Types.StructType.of(); private final FileIO io; + private final EncryptionManager encryption; private final Set dataManifests; private final DeleteFileIndex.Builder deleteIndexBuilder; private Predicate manifestPredicate; @@ -60,16 +62,18 @@ class ManifestGroup { private boolean caseSensitive; private ExecutorService executorService; - ManifestGroup(FileIO io, Iterable manifests) { - this(io, + ManifestGroup(FileIO io, EncryptionManager encryption, Iterable manifests) { + this(io, encryption, Iterables.filter(manifests, manifest -> manifest.content() == ManifestContent.DATA), Iterables.filter(manifests, manifest -> manifest.content() == ManifestContent.DELETES)); } - ManifestGroup(FileIO io, Iterable dataManifests, Iterable deleteManifests) { + ManifestGroup(FileIO io, EncryptionManager encryption, + Iterable dataManifests, Iterable deleteManifests) { this.io = io; + this.encryption = encryption; this.dataManifests = Sets.newHashSet(dataManifests); - this.deleteIndexBuilder = DeleteFileIndex.builderFor(io, deleteManifests); + this.deleteIndexBuilder = DeleteFileIndex.builderFor(io, encryption, deleteManifests); this.dataFilter = Expressions.alwaysTrue(); this.fileFilter = Expressions.alwaysTrue(); this.partitionFilter = Expressions.alwaysTrue(); @@ -242,7 +246,7 @@ private Iterable> entries( return Iterables.transform( matchingManifests, manifest -> { - ManifestReader reader = ManifestFiles.read(manifest, io, specsById) + ManifestReader reader = ManifestFiles.read(manifest, io, encryption, specsById) .filterRows(dataFilter) .filterPartitions(partitionFilter) .caseSensitive(caseSensitive) diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index b03d68ed217e..54023af655d4 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -21,9 +21,9 @@ import java.io.IOException; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; /** @@ -36,7 +36,7 @@ public abstract class ManifestWriter> implements FileAp // this is replaced when writing a manifest list by the ManifestFile wrapper static final long UNASSIGNED_SEQ = -1L; - private final OutputFile file; + private final EncryptedOutputFile file; private final int specId; private final FileAppender> writer; private final Long snapshotId; @@ -52,7 +52,7 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minSequenceNumber = null; - private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + private ManifestWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { this.file = file; this.specId = spec.specId(); this.writer = newAppender(spec, file); @@ -63,7 +63,7 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { protected abstract ManifestEntry prepare(ManifestEntry entry); - protected abstract FileAppender> newAppender(PartitionSpec spec, OutputFile outputFile); + protected abstract FileAppender> newAppender(PartitionSpec spec, EncryptedOutputFile outputFile); protected ManifestContent content() { return ManifestContent.DATA; @@ -154,9 +154,9 @@ public ManifestFile toManifestFile() { // if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min // sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it. long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ; - return new GenericManifestFile(file.location(), writer.length(), specId, content(), - UNASSIGNED_SEQ, minSeqNumber, snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); + return new GenericManifestFile(file.encryptingOutputFile().location(), writer.length(), specId, content(), + UNASSIGNED_SEQ, minSeqNumber, snapshotId, addedFiles, addedRows, existingFiles, existingRows, deletedFiles, + deletedRows, stats.summaries(), file.keyMetadata().buffer()); } @Override @@ -168,7 +168,7 @@ public void close() throws IOException { static class V2Writer extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { + V2Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -179,10 +179,10 @@ protected ManifestEntry prepare(ManifestEntry entry) { } @Override - protected FileAppender> newAppender(PartitionSpec spec, OutputFile file) { + protected FileAppender> newAppender(PartitionSpec spec, EncryptedOutputFile file) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return Avro.write(file.encryptingOutputFile()) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -201,7 +201,7 @@ protected FileAppender> newAppender(PartitionSpec spec, static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + V2DeleteWriter(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -212,10 +212,10 @@ protected ManifestEntry prepare(ManifestEntry entry) { } @Override - protected FileAppender> newAppender(PartitionSpec spec, OutputFile file) { + protected FileAppender> newAppender(PartitionSpec spec, EncryptedOutputFile file) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return Avro.write(file.encryptingOutputFile()) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) @@ -239,7 +239,7 @@ protected ManifestContent content() { static class V1Writer extends ManifestWriter { private final V1Metadata.IndexedManifestEntry entryWrapper; - V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { + V1Writer(PartitionSpec spec, EncryptedOutputFile file, Long snapshotId) { super(spec, file, snapshotId); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); } @@ -250,10 +250,10 @@ protected ManifestEntry prepare(ManifestEntry entry) { } @Override - protected FileAppender> newAppender(PartitionSpec spec, OutputFile file) { + protected FileAppender> newAppender(PartitionSpec spec, EncryptedOutputFile file) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) + return Avro.write(file.encryptingOutputFile()) .schema(manifestSchema) .named("manifest_entry") .meta("schema", SchemaParser.toJson(spec.schema())) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 775ff1a74388..cf6dfbff698e 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; @@ -31,7 +32,6 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -228,7 +228,7 @@ protected void add(ManifestFile manifest) { private ManifestFile copyManifest(ManifestFile manifest) { TableMetadata current = ops.current(); InputFile toCopy = ops.io().newInputFile(manifest.path()); - OutputFile newManifestPath = newManifestOutput(); + EncryptedOutputFile newManifestPath = newManifestOutput(); return ManifestFiles.copyAppendManifest( current.formatVersion(), toCopy, current.specsById(), newManifestPath, snapshotId(), appendedManifestsSummary); } @@ -271,7 +271,7 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI currentSnapshotId = currentSnapshot.parentId(); } - ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + ManifestGroup conflictGroup = new ManifestGroup(ops.io(), ops.encryption(), manifests, ImmutableList.of()) .caseSensitive(caseSensitive) .filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())) .filterData(conflictDetectionFilter) @@ -326,7 +326,7 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI currentSnapshotId = currentSnapshot.parentId(); } - ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), ops.encryption(), manifests, ImmutableList.of()) .filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED && newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path())) .specsById(base.specsById()) diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java index 846f20a07b7c..eafa2f90badd 100644 --- a/core/src/main/java/org/apache/iceberg/MicroBatches.java +++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; @@ -81,8 +83,16 @@ public boolean lastIndexOfSnapshot() { } } + /** + * @deprecated please use {@link #from(Snapshot, FileIO, EncryptionManager)} + */ + @Deprecated public static MicroBatchBuilder from(Snapshot snapshot, FileIO io) { - return new MicroBatchBuilder(snapshot, io); + return from(snapshot, io, EncryptionManagers.plainText()); + } + + public static MicroBatchBuilder from(Snapshot snapshot, FileIO io, EncryptionManager encryption) { + return new MicroBatchBuilder(snapshot, io, encryption); } public static class MicroBatchBuilder { @@ -90,12 +100,14 @@ public static class MicroBatchBuilder { private final Snapshot snapshot; private final FileIO io; + private final EncryptionManager encryption; private boolean caseSensitive; private Map specsById; - private MicroBatchBuilder(Snapshot snapshot, FileIO io) { + private MicroBatchBuilder(Snapshot snapshot, FileIO io, EncryptionManager encryption) { this.snapshot = snapshot; this.io = io; + this.encryption = encryption; this.caseSensitive = true; } @@ -239,7 +251,7 @@ private MicroBatch generateMicroBatch(List> indexedM } private CloseableIterable open(ManifestFile manifestFile, boolean isStarting) { - ManifestGroup manifestGroup = new ManifestGroup(io, ImmutableList.of(manifestFile)) + ManifestGroup manifestGroup = new ManifestGroup(io, encryption, ImmutableList.of(manifestFile)) .specsById(specsById) .caseSensitive(caseSensitive); if (isStarting) { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index bdffbebad098..dcb93557ae0a 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -399,7 +399,8 @@ private Set findFilesToDelete(Set manifestsToScan, Set LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.open(manifest, ops.io(), ops.encryption(), + ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted if (entry.status() == ManifestEntry.Status.DELETED && @@ -419,7 +420,8 @@ private Set findFilesToDelete(Set manifestsToScan, Set LOG.warn("Failed to get added files: this may cause orphaned data files", exc)) .run(manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.open(manifest, ops.io(), ops.encryption(), + ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java index b0d6b5b1964d..b4ffef3d551a 100644 --- a/core/src/main/java/org/apache/iceberg/ScanSummary.java +++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java @@ -219,7 +219,7 @@ private Map computeTopPartitionMetrics( TopN topN = new TopN<>( limit, throwIfLimited, Comparators.charSequences()); - try (CloseableIterable> entries = new ManifestGroup(ops.io(), manifests) + try (CloseableIterable> entries = new ManifestGroup(ops.io(), ops.encryption(), manifests) .specsById(ops.current().specsById()) .filterData(rowFilter) .ignoreDeleted() diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index 5b649f6eb1fa..8e156a4185e3 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -119,7 +119,7 @@ private Table lazyTable() { throw new UnsupportedOperationException("Cannot load metadata: metadata file location is null"); } - TableOperations ops = new StaticTableOperations(metadataFileLocation, io, locationProvider); + TableOperations ops = new StaticTableOperations(metadataFileLocation, io, encryption, locationProvider); this.lazyTable = newTable(ops, name); } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java index cf13c35b02ff..db1e4b0264a5 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -104,7 +106,7 @@ public static String toJson(Snapshot snapshot) { } } - static Snapshot fromJson(FileIO io, JsonNode node) { + static Snapshot fromJson(FileIO io, EncryptionManager encryption, JsonNode node) { Preconditions.checkArgument(node.isObject(), "Cannot parse table version from a non-object: %s", node); @@ -142,20 +144,29 @@ static Snapshot fromJson(FileIO io, JsonNode node) { if (node.has(MANIFEST_LIST)) { // the manifest list is stored in a manifest list file String manifestList = JsonUtil.getString(MANIFEST_LIST, node); - return new BaseSnapshot(io, sequenceNumber, snapshotId, parentId, timestamp, operation, summary, manifestList); + return new BaseSnapshot(io, encryption, sequenceNumber, snapshotId, parentId, timestamp, + operation, summary, manifestList); } else { // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be // loaded lazily, if it is needed List manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node), location -> new GenericManifestFile(io.newInputFile(location), 0)); - return new BaseSnapshot(io, snapshotId, parentId, timestamp, operation, summary, manifests); + return new BaseSnapshot(io, encryption, snapshotId, parentId, timestamp, operation, summary, manifests); } } + /** + * @deprecated please use {@link #fromJson(FileIO, EncryptionManager, JsonNode)} + */ + @Deprecated public static Snapshot fromJson(FileIO io, String json) { + return fromJson(io, EncryptionManagers.plainText(), json); + } + + public static Snapshot fromJson(FileIO io, EncryptionManager encryption, String json) { try { - return fromJson(io, JsonUtil.mapper().readValue(json, JsonNode.class)); + return fromJson(io, encryption, JsonUtil.mapper().readValue(json, JsonNode.class)); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read version from json: %s", json); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 3a179f99fbe4..a91e5465a912 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.Listeners; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -186,12 +187,12 @@ public Snapshot apply() { throw new RuntimeIOException(e, "Failed to write manifest list file"); } - return new BaseSnapshot(ops.io(), + return new BaseSnapshot(ops.io(), ops.encryption(), sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), manifestList.location()); } else { - return new BaseSnapshot(ops.io(), + return new BaseSnapshot(ops.io(), ops.encryption(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), manifests); } @@ -361,9 +362,9 @@ protected OutputFile manifestListPath() { String.format("snap-%d-%d-%s", snapshotId(), attempt.incrementAndGet(), commitUUID)))); } - protected OutputFile newManifestOutput() { - return ops.io().newOutputFile( - ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement()))); + protected EncryptedOutputFile newManifestOutput() { + return ops.encryption().encrypt(ops.io().newOutputFile( + ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())))); } protected ManifestWriter newManifestWriter(PartitionSpec spec) { @@ -375,11 +376,11 @@ protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) } protected ManifestReader newManifestReader(ManifestFile manifest) { - return ManifestFiles.read(manifest, ops.io(), ops.current().specsById()); + return ManifestFiles.read(manifest, ops.io(), ops.encryption(), ops.current().specsById()); } protected ManifestReader newDeleteManifestReader(ManifestFile manifest) { - return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()); + return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.encryption(), ops.current().specsById()); } protected long snapshotId() { @@ -394,7 +395,8 @@ protected long snapshotId() { } private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { - try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) { + try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.encryption(), + ops.current().specsById())) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); int addedFiles = 0; long addedRows = 0L; @@ -441,7 +443,8 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(), ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); + addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), + manifest.keyMetadata()); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path()); diff --git a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java index b4eb9e33cb78..ce528e2c3f0f 100644 --- a/core/src/main/java/org/apache/iceberg/StaticTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/StaticTableOperations.java @@ -20,6 +20,8 @@ package org.apache.iceberg; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; @@ -32,17 +34,28 @@ public class StaticTableOperations implements TableOperations { private TableMetadata staticMetadata; private final String metadataFileLocation; private final FileIO io; + private final EncryptionManager encryption; private final LocationProvider locationProvider; /** - * Creates a StaticTableOperations tied to a specific static version of the TableMetadata + * @deprecated please use {@link #StaticTableOperations(String, FileIO, EncryptionManager)} */ + @Deprecated public StaticTableOperations(String metadataFileLocation, FileIO io) { - this(metadataFileLocation, io, null); + this(metadataFileLocation, io, EncryptionManagers.plainText()); + } + + /** + * Creates a StaticTableOperations tied to a specific static version of the TableMetadata + */ + public StaticTableOperations(String metadataFileLocation, FileIO io, EncryptionManager encryption) { + this(metadataFileLocation, io, encryption, null); } - public StaticTableOperations(String metadataFileLocation, FileIO io, LocationProvider locationProvider) { + public StaticTableOperations(String metadataFileLocation, FileIO io, EncryptionManager encryption, + LocationProvider locationProvider) { this.io = io; + this.encryption = encryption; this.metadataFileLocation = metadataFileLocation; this.locationProvider = locationProvider; } @@ -50,7 +63,7 @@ public StaticTableOperations(String metadataFileLocation, FileIO io, LocationPro @Override public TableMetadata current() { if (staticMetadata == null) { - staticMetadata = TableMetadataParser.read(io, metadataFileLocation); + staticMetadata = TableMetadataParser.read(io, encryption, metadataFileLocation); } return staticMetadata; } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java index 29ad8998b472..fe1b12d1f7e7 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java @@ -35,6 +35,8 @@ import java.util.zip.GZIPOutputStream; import org.apache.iceberg.TableMetadata.MetadataLogEntry; import org.apache.iceberg.TableMetadata.SnapshotLogEntry; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; @@ -248,21 +250,37 @@ public static TableMetadata read(TableOperations ops, InputFile file) { return read(ops.io(), file); } + /** + * @deprecated please use {@link #read(FileIO, EncryptionManager, String)} + */ + @Deprecated public static TableMetadata read(FileIO io, String path) { - return read(io, io.newInputFile(path)); + return read(io, EncryptionManagers.plainText(), path); + } + + public static TableMetadata read(FileIO io, EncryptionManager encryption, String path) { + return read(io, encryption, io.newInputFile(path)); } + /** + * @deprecated please use {@link #read(FileIO, EncryptionManager, InputFile)} + */ + @Deprecated public static TableMetadata read(FileIO io, InputFile file) { + return read(io, EncryptionManagers.plainText(), file); + } + + public static TableMetadata read(FileIO io, EncryptionManager encryption, InputFile file) { Codec codec = Codec.fromFileName(file.location()); try (InputStream is = codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) { - return fromJson(io, file, JsonUtil.mapper().readValue(is, JsonNode.class)); + return fromJson(io, encryption, file, JsonUtil.mapper().readValue(is, JsonNode.class)); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read file: %s", file); } } @SuppressWarnings("checkstyle:CyclomaticComplexity") - static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { + static TableMetadata fromJson(FileIO io, EncryptionManager encryption, InputFile file, JsonNode node) { Preconditions.checkArgument(node.isObject(), "Cannot parse metadata from a non-object: %s", node); @@ -380,7 +398,7 @@ static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) { List snapshots = Lists.newArrayListWithExpectedSize(snapshotArray.size()); Iterator iterator = snapshotArray.elements(); while (iterator.hasNext()) { - snapshots.add(SnapshotParser.fromJson(io, iterator.next())); + snapshots.add(SnapshotParser.fromJson(io, encryption, iterator.next())); } ImmutableList.Builder entries = ImmutableList.builder(); diff --git a/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagers.java b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagers.java new file mode 100644 index 000000000000..42de4eead133 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/encryption/EncryptionManagers.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.encryption; + +public class EncryptionManagers { + private static final EncryptionManager PLAIN_TEXT_MANAGER = new PlaintextEncryptionManager(); + + private EncryptionManagers() { + } + + /** + * Get a {@link PlaintextEncryptionManager} that passes through for all encrypt and decrypt operations. + *

+ * This is used as the default encryption manager for old APIs that did not support encryption + * and tests that do not need encryption. + * @return plain text encryption manager + */ + public static EncryptionManager plainText() { + return PLAIN_TEXT_MANAGER; + } +} diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index 9d06c08b0044..77b5deb44929 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -263,7 +263,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { if (purge && lastMetadata != null) { // Since the data files and the metadata files may store in different locations, // so it has to call dropTableData to force delete the data file. - CatalogUtil.dropTableData(ops.io(), lastMetadata); + CatalogUtil.dropTableData(ops.io(), ops.encryption(), lastMetadata); } fs.delete(tablePath, true /* recursive */); return true; diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java index dcbd57f1f158..e11699652f34 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java @@ -90,7 +90,7 @@ private synchronized void updateVersionAndMetadata(int newVersion, String metada // update if the current version is out of date if (version == null || version != newVersion) { this.version = newVersion; - this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), metadataFile)); + this.currentMetadata = checkUUID(currentMetadata, TableMetadataParser.read(io(), encryption(), metadataFile)); } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java index dc1a2d256cee..3e4aad87df13 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTables.java @@ -40,6 +40,7 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -179,7 +180,7 @@ public boolean dropTable(String location, boolean purge) { if (purge && lastMetadata != null) { // Since the data files and the metadata files may store in different locations, // so it has to call dropTableData to force delete the data file. - CatalogUtil.dropTableData(ops.io(), lastMetadata); + CatalogUtil.dropTableData(ops.io(), ops.encryption(), lastMetadata); } Path tablePath = new Path(location); Util.getFs(tablePath, conf).delete(tablePath, true /* recursive */); @@ -192,7 +193,7 @@ public boolean dropTable(String location, boolean purge) { @VisibleForTesting TableOperations newTableOps(String location) { if (location.contains(METADATA_JSON)) { - return new StaticTableOperations(location, new HadoopFileIO(conf)); + return new StaticTableOperations(location, new HadoopFileIO(conf), EncryptionManagers.plainText()); } else { return new HadoopTableOperations(new Path(location), new HadoopFileIO(conf), conf); } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java index 0c5b97ce4558..6c145734ce4e 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java @@ -56,16 +56,17 @@ public class TestManifestListVersions { private static final int DELETED_FILES = 1; private static final long DELETED_ROWS = 22910L; private static final List PARTITION_SUMMARIES = ImmutableList.of(); + private static final ByteBuffer KEY_METADATA = null; private static final ManifestFile TEST_MANIFEST = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES); + PARTITION_SUMMARIES, KEY_METADATA); private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DELETES, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES); + PARTITION_SUMMARIES, KEY_METADATA); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -223,7 +224,7 @@ public void testManifestsPartitionSummary() throws IOException { ManifestFile manifest = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - partitionFieldSummaries); + partitionFieldSummaries, KEY_METADATA); InputFile manifestList = writeManifestList(manifest, 2); diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java index 664cdc5756fd..4abcbe27180a 100644 --- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java +++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java @@ -39,7 +39,7 @@ public class TestSnapshotJson { @Test public void testJsonConversion() { - Snapshot expected = new BaseSnapshot(ops.io(), System.currentTimeMillis(), + Snapshot expected = new BaseSnapshot(ops.io(), ops.encryption(), System.currentTimeMillis(), "file:/tmp/manifest1.avro", "file:/tmp/manifest2.avro"); String json = SnapshotParser.toJson(expected); Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json); @@ -60,7 +60,7 @@ public void testJsonConversionWithOperation() { new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0), new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0)); - Snapshot expected = new BaseSnapshot(ops.io(), id, parentId, System.currentTimeMillis(), + Snapshot expected = new BaseSnapshot(ops.io(), ops.encryption(), id, parentId, System.currentTimeMillis(), DataOperations.REPLACE, ImmutableMap.of("files-added", "4", "files-deleted", "100"), manifests); @@ -102,9 +102,10 @@ public void testJsonConversionWithManifestList() throws IOException { } Snapshot expected = new BaseSnapshot( - ops.io(), id, 34, parentId, System.currentTimeMillis(), null, null, localInput(manifestList).location()); + ops.io(), ops.encryption(), id, 34, parentId, System.currentTimeMillis(), null, null, + localInput(manifestList).location()); Snapshot inMemory = new BaseSnapshot( - ops.io(), id, parentId, expected.timestampMillis(), null, null, manifests); + ops.io(), ops.encryption(), id, parentId, expected.timestampMillis(), null, null, manifests); Assert.assertEquals("Files should match in memory list", inMemory.allManifests(), expected.allManifests()); diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java index c44ca63a126a..c213c3a9e750 100644 --- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java +++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java @@ -89,12 +89,12 @@ public class TestTableMetadata { public void testJsonConversion() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( - ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( + ops.io(), ops.encryption(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( - ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + ops.io(), ops.encryption(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List snapshotLog = ImmutableList.builder() .add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId())) @@ -113,7 +113,7 @@ public void testJsonConversion() throws Exception { Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog, ImmutableList.of()); String asJson = TableMetadataParser.toJson(expected); - TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), null, + TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); Assert.assertEquals("Format version should match", @@ -168,12 +168,12 @@ public void testBackwardCompat() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( - ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( + ops.io(), ops.encryption(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( - ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); + ops.io(), ops.encryption(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId()))); TableMetadata expected = new TableMetadata(null, 1, null, TEST_LOCATION, 0, System.currentTimeMillis(), 3, TableMetadata.INITIAL_SCHEMA_ID, @@ -183,7 +183,7 @@ public void testBackwardCompat() throws Exception { String asJson = toJsonWithoutSpecAndSchemaList(expected); TableMetadata metadata = TableMetadataParser - .fromJson(ops.io(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); + .fromJson(ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); Assert.assertEquals("Format version should match", expected.formatVersion(), metadata.formatVersion()); @@ -280,12 +280,12 @@ private static String toJsonWithoutSpecAndSchemaList(TableMetadata metadata) { public void testJsonWithPreviousMetadataLog() throws Exception { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( - ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( + ops.io(), ops.encryption(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( - ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + ops.io(), ops.encryption(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -301,7 +301,7 @@ public void testJsonWithPreviousMetadataLog() throws Exception { ImmutableList.copyOf(previousMetadataLog)); String asJson = TableMetadataParser.toJson(base); - TableMetadata metadataFromJson = TableMetadataParser.fromJson(ops.io(), null, + TableMetadata metadataFromJson = TableMetadataParser.fromJson(ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(asJson, JsonNode.class)); Assert.assertEquals("Metadata logs should match", previousMetadataLog, metadataFromJson.previousFiles()); @@ -311,12 +311,12 @@ public void testJsonWithPreviousMetadataLog() throws Exception { public void testAddPreviousMetadataRemoveNone() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( - ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( + ops.io(), ops.encryption(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( - ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + ops.io(), ops.encryption(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -351,12 +351,12 @@ public void testAddPreviousMetadataRemoveNone() { public void testAddPreviousMetadataRemoveOne() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( - ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( + ops.io(), ops.encryption(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( - ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + ops.io(), ops.encryption(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -403,12 +403,12 @@ public void testAddPreviousMetadataRemoveOne() { public void testAddPreviousMetadataRemoveMultiple() { long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600); Snapshot previousSnapshot = new BaseSnapshot( - ops.io(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( + ops.io(), ops.encryption(), previousSnapshotId, null, previousSnapshotId, null, null, ImmutableList.of( new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), SPEC_5.specId()))); long currentSnapshotId = System.currentTimeMillis(); Snapshot currentSnapshot = new BaseSnapshot( - ops.io(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, ImmutableList.of( - new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); + ops.io(), ops.encryption(), currentSnapshotId, previousSnapshotId, currentSnapshotId, null, null, + ImmutableList.of(new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), SPEC_5.specId()))); List reversedSnapshotLog = Lists.newArrayList(); long currentTimestamp = System.currentTimeMillis(); @@ -480,19 +480,19 @@ public void testVersionValidation() { public void testParserVersionValidation() throws Exception { String supportedVersion1 = readTableMetadataInputFile("TableMetadataV1Valid.json"); TableMetadata parsed1 = TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(supportedVersion1, JsonNode.class)); + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(supportedVersion1, JsonNode.class)); Assert.assertNotNull("Should successfully read supported metadata version", parsed1); String supportedVersion2 = readTableMetadataInputFile("TableMetadataV2Valid.json"); TableMetadata parsed2 = TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(supportedVersion2, JsonNode.class)); + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(supportedVersion2, JsonNode.class)); Assert.assertNotNull("Should successfully read supported metadata version", parsed2); String unsupportedVersion = readTableMetadataInputFile("TableMetadataUnsupportedVersion.json"); AssertHelpers.assertThrows("Should not read unsupported metadata", IllegalArgumentException.class, "Cannot read unsupported version", () -> TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) ); } @@ -503,7 +503,7 @@ public void testParserV2PartitionSpecsValidation() throws Exception { AssertHelpers.assertThrows("Should reject v2 metadata without partition specs", IllegalArgumentException.class, "partition-specs must exist in format v2", () -> TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) ); } @@ -513,7 +513,7 @@ public void testParserV2LastAssignedFieldIdValidation() throws Exception { AssertHelpers.assertThrows("Should reject v2 metadata without last assigned partition field id", IllegalArgumentException.class, "last-partition-id must exist in format v2", () -> TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) ); } @@ -523,7 +523,7 @@ public void testParserV2SortOrderValidation() throws Exception { AssertHelpers.assertThrows("Should reject v2 metadata without sort order", IllegalArgumentException.class, "sort-orders must exist in format v2", () -> TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(unsupportedVersion, JsonNode.class)) ); } @@ -533,7 +533,7 @@ public void testParserV2CurrentSchemaIdValidation() throws Exception { AssertHelpers.assertThrows("Should reject v2 metadata without valid schema id", IllegalArgumentException.class, "Cannot find schema with current-schema-id=2 from schemas", () -> TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(unsupported, JsonNode.class)) + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(unsupported, JsonNode.class)) ); } @@ -543,7 +543,7 @@ public void testParserV2SchemasValidation() throws Exception { AssertHelpers.assertThrows("Should reject v2 metadata without schemas", IllegalArgumentException.class, "schemas must exist in format v2", () -> TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(unsupported, JsonNode.class)) + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(unsupported, JsonNode.class)) ); } @@ -650,7 +650,7 @@ public void testUpdateSortOrder() { public void testParseSchemaIdentifierFields() throws Exception { String data = readTableMetadataInputFile("TableMetadataV2Valid.json"); TableMetadata parsed = TableMetadataParser.fromJson( - ops.io(), null, JsonUtil.mapper().readValue(data, JsonNode.class)); + ops.io(), ops.encryption(), null, JsonUtil.mapper().readValue(data, JsonNode.class)); Assert.assertEquals(Sets.newHashSet(), parsed.schemasById().get(0).identifierFieldIds()); Assert.assertEquals(Sets.newHashSet(1, 2), parsed.schemasById().get(1).identifierFieldIds()); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index b00018b3b770..9efd806f867b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -31,9 +31,10 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -44,7 +45,7 @@ class FlinkManifestUtil { private FlinkManifestUtil() { } - static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, List dataFiles) + static ManifestFile writeDataFiles(EncryptedOutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { ManifestWriter writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); @@ -55,8 +56,9 @@ static ManifestFile writeDataFiles(OutputFile outputFile, PartitionSpec spec, Li return writer.toManifestFile(); } - static List readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException { - try (CloseableIterable dataFiles = ManifestFiles.read(manifestFile, io)) { + static List readDataFiles(ManifestFile manifestFile, FileIO io, EncryptionManager encryption) + throws IOException { + try (CloseableIterable dataFiles = ManifestFiles.read(manifestFile, io, encryption)) { return Lists.newArrayList(dataFiles); } } @@ -64,11 +66,12 @@ static List readDataFiles(ManifestFile manifestFile, FileIO io) throws static ManifestOutputFileFactory createOutputFileFactory(Table table, String flinkJobId, int subTaskId, long attemptNumber) { TableOperations ops = ((HasTableOperations) table).operations(); - return new ManifestOutputFileFactory(ops, table.io(), table.properties(), flinkJobId, subTaskId, attemptNumber); + return new ManifestOutputFileFactory(ops, table.io(), table.encryption(), table.properties(), + flinkJobId, subTaskId, attemptNumber); } static DeltaManifests writeCompletedFiles(WriteResult result, - Supplier outputFileSupplier, + Supplier outputFileSupplier, PartitionSpec spec) throws IOException { ManifestFile dataManifest = null; @@ -81,7 +84,7 @@ static DeltaManifests writeCompletedFiles(WriteResult result, // Write the completed delete files into a newly created delete manifest file. if (result.deleteFiles() != null && result.deleteFiles().length > 0) { - OutputFile deleteManifestFile = outputFileSupplier.get(); + EncryptedOutputFile deleteManifestFile = outputFileSupplier.get(); ManifestWriter deleteManifestWriter = ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); @@ -97,18 +100,19 @@ static DeltaManifests writeCompletedFiles(WriteResult result, return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); } - static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io) throws IOException { + static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io, EncryptionManager encryption) + throws IOException { WriteResult.Builder builder = WriteResult.builder(); // Read the completed data files from persisted data manifest file. if (deltaManifests.dataManifest() != null) { - builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io)); + builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, encryption)); } // Read the completed delete files from persisted delete manifests file. if (deltaManifests.deleteManifest() != null) { try (CloseableIterable deleteFiles = ManifestFiles - .readDeleteManifest(deltaManifests.deleteManifest(), io, null)) { + .readDeleteManifest(deltaManifests.deleteManifest(), io, encryption, null)) { builder.addDeleteFiles(deleteFiles); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index ff9174a84399..8b457391fdbf 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -205,7 +205,8 @@ private void commitUpToCheckpoint(NavigableMap deltaManifestsMap, DeltaManifests deltaManifests = SimpleVersionedSerialization .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, e.getValue()); - pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io())); + pendingResults.put(e.getKey(), FlinkManifestUtil.readCompletedFiles( + deltaManifests, table.io(), table.encryption())); manifests.addAll(deltaManifests.manifests()); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java index fca86080b11a..a63be4282c27 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java @@ -23,8 +23,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.FileFormat; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Strings; class ManifestOutputFileFactory { @@ -33,16 +34,18 @@ class ManifestOutputFileFactory { private final TableOperations ops; private final FileIO io; + private final EncryptionManager encryption; private final Map props; private final String flinkJobId; private final int subTaskId; private final long attemptNumber; private final AtomicInteger fileCount = new AtomicInteger(0); - ManifestOutputFileFactory(TableOperations ops, FileIO io, Map props, + ManifestOutputFileFactory(TableOperations ops, FileIO io, EncryptionManager encryption, Map props, String flinkJobId, int subTaskId, long attemptNumber) { this.ops = ops; this.io = io; + this.encryption = encryption; this.props = props; this.flinkJobId = flinkJobId; this.subTaskId = subTaskId; @@ -54,7 +57,7 @@ private String generatePath(long checkpointId) { attemptNumber, checkpointId, fileCount.incrementAndGet())); } - OutputFile create(long checkpointId) { + EncryptedOutputFile create(long checkpointId) { String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION); String newManifestFullPath; @@ -65,7 +68,7 @@ OutputFile create(long checkpointId) { newManifestFullPath = String.format("%s/%s", stripTrailingSlash(flinkManifestDir), generatePath(checkpointId)); } - return io.newOutputFile(newManifestFullPath); + return encryption.encrypt(io.newOutputFile(newManifestFullPath)); } private static String stripTrailingSlash(String path) { diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java index c1538bcaff9d..eb9fd794af98 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java @@ -103,7 +103,7 @@ public void testIO() throws IOException { .build(), () -> factory.create(curCkpId), table.spec()); - WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()); + WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.encryption()); Assert.assertEquals("Size of data file list are not equal.", 10, result.deleteFiles().length); for (int i = 0; i < dataFiles.size(); i++) { TestHelpers.assertEquals(dataFiles.get(i), result.dataFiles()[i]); @@ -125,7 +125,7 @@ public void testUserProvidedManifestLocation() throws IOException { File userProvidedFolder = tempFolder.newFolder(); Map props = ImmutableMap.of(FLINK_MANIFEST_LOCATION, userProvidedFolder.getAbsolutePath() + "///"); ManifestOutputFileFactory factory = new ManifestOutputFileFactory( - ((HasTableOperations) table).operations(), table.io(), props, + ((HasTableOperations) table).operations(), table.io(), table.encryption(), props, flinkJobId, 1, 1); List dataFiles = generateDataFiles(5); @@ -141,7 +141,7 @@ public void testUserProvidedManifestLocation() throws IOException { Assert.assertEquals("The newly created manifest file should be located under the user provided directory", userProvidedFolder.toPath(), Paths.get(deltaManifests.dataManifest().path()).getParent()); - WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()); + WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.encryption()); Assert.assertEquals(0, result.deleteFiles().length); Assert.assertEquals(5, result.dataFiles().length); @@ -198,7 +198,7 @@ public void testCompatibility() throws IOException { Assert.assertNotNull("Serialization v1 should not have null data manifest.", delta.dataManifest()); TestHelpers.assertEquals(manifest, delta.dataManifest()); - List actualFiles = FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io()); + List actualFiles = FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io(), table.encryption()); Assert.assertEquals(10, actualFiles.size()); for (int i = 0; i < 10; i++) { TestHelpers.assertEquals(dataFiles.get(i), actualFiles.get(i)); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 4ada31a619ca..f3af06561af5 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -603,7 +603,8 @@ public void testFlinkManifests() throws Exception { String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); // 2. Read the data files from manifests and assert. - List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); + List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), + table.io(), table.encryption()); Assert.assertEquals(1, dataFiles.size()); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -644,7 +645,8 @@ public void testDeleteFiles() throws Exception { String.format("%s-%05d-%d-%d-%05d.avro", jobId, 0, 0, checkpoint, 1), manifestPath.getFileName().toString()); // 2. Read the data files from manifests and assert. - List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io()); + List dataFiles = FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), + table.io(), table.encryption()); Assert.assertEquals(1, dataFiles.size()); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -815,7 +817,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, - ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null); + ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 68ff6255f766..ba80a4c056de 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -169,7 +169,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) { }); if (purge && lastMetadata != null) { - CatalogUtil.dropTableData(ops.io(), lastMetadata); + CatalogUtil.dropTableData(ops.io(), ops.encryption(), lastMetadata); } LOG.info("Dropped table: {}", identifier); diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java index 438ca8768d2c..ee94aa509b34 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java @@ -36,6 +36,7 @@ import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.hive.HiveTableOperations; @@ -66,6 +67,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook { private Properties catalogProperties; private boolean deleteIcebergTable; private FileIO deleteIo; + private EncryptionManager deleteEncryption; private TableMetadata deleteMetadata; public HiveIcebergMetaHook(Configuration conf) { @@ -159,8 +161,10 @@ public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) { if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties)) { // Store the metadata and the id for deleting the actual table data String metadataLocation = hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); - this.deleteIo = Catalogs.loadTable(conf, catalogProperties).io(); - this.deleteMetadata = TableMetadataParser.read(deleteIo, metadataLocation); + Table table = Catalogs.loadTable(conf, catalogProperties); + this.deleteIo = table.io(); + this.deleteEncryption = table.encryption(); + this.deleteMetadata = TableMetadataParser.read(deleteIo, deleteEncryption, metadataLocation); } } @@ -179,7 +183,7 @@ public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, } else { // do nothing if metadata folder has been deleted already (Hive 4 behaviour for purge=TRUE) if (deleteIo.newInputFile(deleteMetadata.location()).exists()) { - CatalogUtil.dropTableData(deleteIo, deleteMetadata); + CatalogUtil.dropTableData(deleteIo, deleteEncryption, deleteMetadata); } } } catch (Exception e) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index c9bfd49697a1..14a18d3b5421 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -41,11 +41,12 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionManagers; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.base.Joiner; @@ -327,7 +328,7 @@ private static Iterator buildManifest(SerializableConfiguration co String suffix = String.format("stage-%d-task-%d-manifest", ctx.stageId(), ctx.taskAttemptId()); Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); - OutputFile outputFile = io.newOutputFile(outputPath); + EncryptedOutputFile outputFile = EncryptionManagers.plainText().encrypt(io.newOutputFile(outputPath)); ManifestWriter writer = ManifestFiles.write(spec, outputFile); try (ManifestWriter writerRef = writer) { diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java index e0f38f7e1fe6..05c0d89b01eb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java @@ -225,7 +225,7 @@ private Dataset appendTypeString(Dataset ds, String type) { } private Dataset buildValidFileDF(TableMetadata metadata) { - Table staticTable = newStaticTable(metadata, this.table.io()); + Table staticTable = newStaticTable(metadata, this.table.io(), this.table.encryption()); return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE) .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST)) .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST)); diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java index 05a845102cf5..03af9ec6c4a9 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java @@ -40,9 +40,10 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.BaseRewriteManifestsActionResult; import org.apache.iceberg.actions.RewriteManifests; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -90,6 +91,7 @@ public class BaseRewriteManifestsSparkAction private final Table table; private final int formatVersion; private final FileIO fileIO; + private final EncryptionManager encryptionManager; private final long targetManifestSizeBytes; private PartitionSpec spec = null; @@ -106,6 +108,7 @@ public BaseRewriteManifestsSparkAction(SparkSession spark, Table table) { TableProperties.MANIFEST_TARGET_SIZE_BYTES, TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT); this.fileIO = SparkUtil.serializableFileIO(table); + this.encryptionManager = table.encryption(); // default the staging location to the metadata location TableOperations ops = ((HasTableOperations) table).operations(); @@ -197,6 +200,8 @@ private Dataset buildManifestEntryDF(List manifests) { private List writeManifestsForUnpartitionedTable(Dataset manifestEntryDF, int numManifests) { Broadcast io = sparkContext().broadcast(fileIO); + Broadcast encryption = sparkContext().broadcast(encryptionManager); + StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); // we rely only on the target number of manifests for unpartitioned tables @@ -206,7 +211,7 @@ private List writeManifestsForUnpartitionedTable(Dataset mani return manifestEntryDF .repartition(numManifests) .mapPartitions( - toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), + toManifests(io, encryption, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), manifestEncoder ) .collectAsList(); @@ -217,6 +222,7 @@ private List writeManifestsForPartitionedTable( int targetNumManifestEntries) { Broadcast io = sparkContext().broadcast(fileIO); + Broadcast encryption = sparkContext().broadcast(encryptionManager); StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType(); // we allow the actual size of manifests to be 10% higher if the estimation is not precise enough @@ -227,7 +233,7 @@ private List writeManifestsForPartitionedTable( return df.repartitionByRange(numManifests, partitionColumn) .sortWithinPartitions(partitionColumn) .mapPartitions( - toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), + toManifests(io, encryption, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType), manifestEncoder ) .collectAsList(); @@ -311,12 +317,13 @@ private void deleteFiles(Iterable locations) { } private static ManifestFile writeManifest( - List rows, int startIndex, int endIndex, Broadcast io, + List rows, int startIndex, int endIndex, Broadcast io, Broadcast encryption, String location, int format, PartitionSpec spec, StructType sparkType) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); Path manifestPath = new Path(location, manifestName); - OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString())); + EncryptedOutputFile outputFile = encryption.value().encrypt( + io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()))); Types.StructType dataFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType); @@ -339,7 +346,7 @@ private static ManifestFile writeManifest( } private static MapPartitionsFunction toManifests( - Broadcast io, long maxNumManifestEntries, String location, + Broadcast io, Broadcast encryption, long maxNumManifestEntries, String location, int format, PartitionSpec spec, StructType sparkType) { return rows -> { @@ -351,11 +358,14 @@ private static MapPartitionsFunction toManifests( List manifests = Lists.newArrayList(); if (rowsAsList.size() <= maxNumManifestEntries) { - manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType)); + manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), + io, encryption, location, format, spec, sparkType)); } else { int midIndex = rowsAsList.size() / 2; - manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType)); - manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType)); + manifests.add(writeManifest(rowsAsList, 0, midIndex, + io, encryption, location, format, spec, sparkType)); + manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), + io, encryption, location, format, spec, sparkType)); } return manifests.iterator(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index e760a6de4236..96cc22c52148 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -35,6 +35,7 @@ import org.apache.iceberg.actions.Action; import org.apache.iceberg.actions.ManifestFileBean; import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.io.ClosingIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -143,15 +144,16 @@ private List getOtherMetadataFilePaths(TableOperations ops) { return otherMetadataFiles; } - protected Table newStaticTable(TableMetadata metadata, FileIO io) { + protected Table newStaticTable(TableMetadata metadata, FileIO io, EncryptionManager encryption) { String metadataFileLocation = metadata.metadataFileLocation(); - StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io, encryption); return new BaseTable(ops, metadataFileLocation); } protected Dataset buildValidDataFileDF(Table table) { JavaSparkContext context = new JavaSparkContext(spark.sparkContext()); Broadcast ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table)); + Broadcast encryptionBroadcast = context.broadcast(table.encryption()); Dataset allManifests = loadMetadataTable(table, ALL_MANIFESTS) .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId") @@ -159,7 +161,8 @@ protected Dataset buildValidDataFileDF(Table table) { .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks .as(Encoders.bean(ManifestFileBean.class)); - return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path"); + return allManifests.flatMap(new ReadManifest(ioBroadcast, encryptionBroadcast), + Encoders.STRING()).toDF("file_path"); } protected Dataset buildManifestFileDF(Table table) { @@ -228,14 +231,16 @@ protected Dataset loadMetadataTable(Table table, MetadataTableType type) { private static class ReadManifest implements FlatMapFunction { private final Broadcast io; + private final Broadcast encryption; - ReadManifest(Broadcast io) { + ReadManifest(Broadcast io, Broadcast encryption) { this.io = io; + this.encryption = encryption; } @Override public Iterator call(ManifestFileBean manifest) { - return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator()); + return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue(), encryption.getValue()).iterator()); } } }