Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ public interface ManifestFile {
Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions",
Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE),
"Summary for each partition");
// next ID to assign: 519
Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(),
"Encryption key metadata blob");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to the spec in #2654.

// next ID to assign: 520

Schema SCHEMA = new Schema(
PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT,
SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
PARTITION_SUMMARIES);
PARTITION_SUMMARIES, KEY_METADATA);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -179,6 +181,13 @@ default boolean hasDeletedFiles() {
*/
List<PartitionFieldSummary> partitions();

/**
* Returns metadata about how this manifest file is encrypted, or null if the file is stored in plain text.
*/
default ByteBuffer keyMetadata() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does keyMetadata have substructure or is it a pure binary buffer? Looks like it will have substructures form the description. Are we going to to define it later or in this patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about this too, should we make this a struct with name like encryptionContext or something so that if we only plan to add new things in future (e.g. KEK id for double wrapping?), we can collect them in a single struct; and to workaround the problem of having to unwrap two layers to reach this buffer from ManifestFile we may return EncryptionKeyMetadata here, and potentially extend EncryptionKeyMetadata to have more fields when needed in the future. Or will this binary buffer free formed and could contain whatever information needed if the right encryption manager is used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent here is that an encryption manager can decide what the key metadata holds. It could be an encrypted key or it could be a key reference. There are lots of possibilities and we did it this way to not constrain what the encryption manager can choose to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to a byte array, serialized by an encryption manager from its structs. Btw, besides encryption keys, we have the AAD prefixes. We can keep them inside the key metadata (because it is convenient and flexible) - or we can add a separate manifest field/column for them (because technically AADs are not used for key retrieval). In both cases, the decision can be made later, when we get to handle end-to-end table integrity.

return null;
}

/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {
.build());
LOG.info("Successfully dropped table {} from Glue", identifier);
if (purge && lastMetadata != null) {
CatalogUtil.dropTableData(ops.io(), lastMetadata);
CatalogUtil.dropTableData(ops.io(), ops.encryption(), lastMetadata);
LOG.info("Glue table {} data purged", identifier);
}
LOG.info("Dropped table: {}", identifier);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ protected CloseableIterable<FileScanTask> planFiles(
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(manifests, manifest ->
new DataFilesTable.ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
new DataFilesTable.ManifestReadTask(ops.io(), ops.encryption(), manifest, fileSchema,
schemaString, specString, residuals));
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ protected CloseableIterable<FileScanTask> planFiles(
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById()));
ops.io(), ops.encryption(), manifest, fileSchema, schemaString, specString, residuals,
ops.current().specsById()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected void refreshFromMetadataLocation(String newLocation, Predicate<Excepti
.throwFailureWhenFinished()
.shouldRetryTest(shouldRetry)
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(io(), metadataLocation)));
TableMetadataParser.read(io(), encryption(), metadataLocation)));

String newUUID = newMetadata.get().uuid();
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -156,7 +156,7 @@ public RewriteManifests addManifest(ManifestFile manifest) {
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
EncryptedOutputFile newFile = newManifestOutput();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(), toCopy, specsById, newFile, snapshotId(), summaryBuilder);
}
Expand Down Expand Up @@ -236,8 +236,8 @@ private void performRewrite(List<ManifestFile> currentManifests) {
keptManifests.add(manifest);
} else {
rewrittenManifests.add(manifest);
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
.select(Arrays.asList("*"))) {
try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.encryption(),
ops.current().specsById()).select(Arrays.asList("*"))) {
reader.liveEntries().forEach(
entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId())
);
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
Expand All @@ -36,6 +37,7 @@ class BaseSnapshot implements Snapshot {
private static final long INITIAL_SEQUENCE_NUMBER = 0;

private final FileIO io;
private final EncryptionManager encryption;
private final long snapshotId;
private final Long parentId;
private final long sequenceNumber;
Expand All @@ -55,14 +57,16 @@ class BaseSnapshot implements Snapshot {
* For testing only.
*/
BaseSnapshot(FileIO io,
EncryptionManager encryption,
long snapshotId,
String... manifestFiles) {
this(io, snapshotId, null, System.currentTimeMillis(), null, null,
this(io, encryption, snapshotId, null, System.currentTimeMillis(), null, null,
Lists.transform(Arrays.asList(manifestFiles),
path -> new GenericManifestFile(io.newInputFile(path), 0)));
}

BaseSnapshot(FileIO io,
EncryptionManager encryption,
long sequenceNumber,
long snapshotId,
Long parentId,
Expand All @@ -71,6 +75,7 @@ class BaseSnapshot implements Snapshot {
Map<String, String> summary,
String manifestList) {
this.io = io;
this.encryption = encryption;
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
Expand All @@ -81,13 +86,14 @@ class BaseSnapshot implements Snapshot {
}

BaseSnapshot(FileIO io,
EncryptionManager encryption,
long snapshotId,
Long parentId,
long timestampMillis,
String operation,
Map<String, String> summary,
List<ManifestFile> dataManifests) {
this(io, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null);
this(io, encryption, INITIAL_SEQUENCE_NUMBER, snapshotId, parentId, timestampMillis, operation, summary, null);
this.allManifests = dataManifests;
}

Expand Down Expand Up @@ -187,7 +193,7 @@ private void cacheChanges() {
// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedManifests = Iterables.filter(dataManifests(),
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(io, changedManifests)
try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(io, encryption, changedManifests)
.ignoreExisting()
.entries()) {
for (ManifestEntry<DataFile> entry : entries) {
Expand Down
18 changes: 14 additions & 4 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.encryption.EncryptionManagers;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
Expand Down Expand Up @@ -54,6 +56,14 @@ public class CatalogUtil {
private CatalogUtil() {
}

/**
* @deprecated please use {@link #dropTableData(FileIO, EncryptionManager, TableMetadata)}
*/
@Deprecated
public static void dropTableData(FileIO io, TableMetadata metadata) {
dropTableData(io, EncryptionManagers.plainText(), metadata);
}

/**
* Drops all data and metadata files referenced by TableMetadata.
* <p>
Expand All @@ -63,7 +73,7 @@ private CatalogUtil() {
* @param io a FileIO to use for deletes
* @param metadata the last valid TableMetadata instance for a dropped table.
*/
public static void dropTableData(FileIO io, TableMetadata metadata) {
public static void dropTableData(FileIO io, EncryptionManager encryption, TableMetadata metadata) {
// Reads and deletes are done using Tasks.foreach(...).suppressFailureWhenFinished to complete
// as much of the delete work as possible and avoid orphaned data or manifest files.

Expand All @@ -86,7 +96,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {

if (gcEnabled) {
// delete data files only if we are sure this won't corrupt other tables
deleteFiles(io, manifestsToDelete);
deleteFiles(io, encryption, manifestsToDelete);
}

Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
Expand All @@ -106,7 +116,7 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {
}

@SuppressWarnings("DangerousStringInternUsage")
private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
private static void deleteFiles(FileIO io, EncryptionManager encryption, Set<ManifestFile> allManifests) {
// keep track of deleted files in a map that can be cleaned up when memory runs low
Map<String, Boolean> deletedFiles = new MapMaker()
.concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE)
Expand All @@ -118,7 +128,7 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
.executeWith(ThreadPools.getWorkerPool())
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
try (ManifestReader<?> reader = ManifestFiles.open(manifest, io, encryption)) {
for (ManifestEntry<?> entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of equals
String path = entry.file().path().toString().intern();
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg;

import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ResidualEvaluator;
Expand Down Expand Up @@ -102,27 +103,29 @@ protected CloseableIterable<FileScanTask> planFiles(
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
new ManifestReadTask(ops.io(), ops.encryption(), manifest, fileSchema, schemaString, specString, residuals));
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final FileIO io;
private final EncryptionManager encryption;
private final ManifestFile manifest;
private final Schema schema;

ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
ManifestReadTask(FileIO io, EncryptionManager encryption, ManifestFile manifest, Schema schema, String schemaString,
String specString, ResidualEvaluator residuals) {
super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
this.io = io;
this.encryption = encryption;
this.manifest = manifest;
this.schema = schema;
}

@Override
public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(
ManifestFiles.read(manifest, io).project(schema),
ManifestFiles.read(manifest, io, encryption).project(schema),
file -> (GenericDataFile) file);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema sche
public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
Expression rowFilter, boolean ignoreResiduals,
boolean caseSensitive, boolean colStats) {
ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
ManifestGroup manifestGroup = new ManifestGroup(ops.io(), ops.encryption(),
snapshot.dataManifests(), snapshot.deleteManifests())
.caseSensitive(caseSensitive)
.select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(rowFilter)
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -305,21 +306,23 @@ private static Stream<DeleteFile> limitBySequenceNumber(long sequenceNumber, lon
return Arrays.stream(files, start, files.length);
}

static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) {
return new Builder(io, Sets.newHashSet(deleteManifests));
static Builder builderFor(FileIO io, EncryptionManager encryption, Iterable<ManifestFile> deleteManifests) {
return new Builder(io, encryption, Sets.newHashSet(deleteManifests));
}

static class Builder {
private final FileIO io;
private final EncryptionManager encryption;
private final Set<ManifestFile> deleteManifests;
private Map<Integer, PartitionSpec> specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
private Expression partitionFilter = Expressions.alwaysTrue();
private boolean caseSensitive = true;
private ExecutorService executorService = null;

Builder(FileIO io, Set<ManifestFile> deleteManifests) {
Builder(FileIO io, EncryptionManager encryption, Set<ManifestFile> deleteManifests) {
this.io = io;
this.encryption = encryption;
this.deleteManifests = Sets.newHashSet(deleteManifests);
}

Expand Down Expand Up @@ -445,7 +448,7 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea
return Iterables.transform(
matchingManifests,
manifest ->
ManifestFiles.readDeleteManifest(manifest, io, specsById)
ManifestFiles.readDeleteManifest(manifest, io, encryption, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -117,7 +117,7 @@ public FastAppend appendManifest(ManifestFile manifest) {
private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newManifestPath = newManifestOutput();
EncryptedOutputFile newManifestPath = newManifestOutput();
return ManifestFiles.copyAppendManifest(
current.formatVersion(), toCopy, current.specsById(), newManifestPath, snapshotId(), summaryBuilder);
}
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/FindFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public CloseableIterable<DataFile> collect() {
}

// when snapshot is not null
CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(ops.io(), snapshot.dataManifests())
CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(
ops.io(), ops.encryption(), snapshot.dataManifests())
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
Expand Down
Loading