diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java index 95c492385c80..7eb89f49b5ed 100644 --- a/api/src/main/java/org/apache/iceberg/ManifestFile.java +++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java @@ -62,14 +62,16 @@ public interface ManifestFile { Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions", Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE), "Summary for each partition"); - // next ID to assign: 519 + Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(), + "Encryption key metadata blob"); + // next ID to assign: 520 Schema SCHEMA = new Schema( PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT, SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID, ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT, ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT, - PARTITION_SUMMARIES); + PARTITION_SUMMARIES, KEY_METADATA); static Schema schema() { return SCHEMA; @@ -179,6 +181,13 @@ default boolean hasDeletedFiles() { */ List partitions(); + /** + * Returns metadata about how this manifest file is encrypted, or null if the file is stored in plain text. + */ + default ByteBuffer keyMetadata() { + return null; + } + /** * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use * this method to make defensive copies. diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index fd4ea08036c4..69b5338485d5 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.util.ByteBuffers; import org.junit.Assert; public class TestHelpers { @@ -240,10 +241,11 @@ public static class TestManifestFile implements ManifestFile { private final Integer deletedFiles; private final Long deletedRows; private final List partitions; + private final byte[] keyMetadata; public TestManifestFile(String path, long length, int specId, Long snapshotId, Integer addedFiles, Integer existingFiles, Integer deletedFiles, - List partitions) { + List partitions, ByteBuffer keyMetadata) { this.path = path; this.length = length; this.specId = specId; @@ -256,12 +258,13 @@ public TestManifestFile(String path, long length, int specId, Long snapshotId, this.deletedFiles = deletedFiles; this.deletedRows = null; this.partitions = partitions; + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } public TestManifestFile(String path, long length, int specId, ManifestContent content, Long snapshotId, Integer addedFiles, Long addedRows, Integer existingFiles, Long existingRows, Integer deletedFiles, Long deletedRows, - List partitions) { + List partitions, ByteBuffer keyMetadata) { this.path = path; this.length = length; this.specId = specId; @@ -274,6 +277,7 @@ public TestManifestFile(String path, long length, int specId, ManifestContent co this.deletedFiles = deletedFiles; this.deletedRows = deletedRows; this.partitions = partitions; + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } @Override @@ -346,6 +350,11 @@ public List partitions() { return partitions; } + @Override + public ByteBuffer keyMetadata() { + return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata); + } + @Override public ManifestFile copy() { return this; diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java index 972fc8ed5871..f41b8dcd7b76 100644 --- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java +++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveManifestEvaluator.java @@ -91,7 +91,7 @@ public class TestInclusiveManifestEvaluator { private static final ByteBuffer STRING_MAX = toByteBuffer(Types.StringType.get(), "z"); private static final ManifestFile NO_STATS = new TestHelpers.TestManifestFile( - "manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null); + "manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null, null); private static final ManifestFile FILE = new TestHelpers.TestManifestFile("manifest-list.avro", 1024, 0, System.currentTimeMillis(), 5, 10, 0, ImmutableList.of( @@ -110,7 +110,7 @@ public class TestInclusiveManifestEvaluator { toByteBuffer(Types.FloatType.get(), 0F), toByteBuffer(Types.FloatType.get(), 20F)), new TestHelpers.TestFieldSummary(true, null, null) - )); + ), null); @Test public void testAllNulls() { diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 716f41b3938f..1a95149a8af3 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -20,6 +20,7 @@ package org.apache.iceberg; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; import java.util.function.Function; @@ -33,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; public class GenericManifestFile implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable { @@ -58,6 +60,7 @@ public class GenericManifestFile private Long existingRowsCount = null; private Long deletedRowsCount = null; private PartitionFieldSummary[] partitions = null; + private byte[] keyMetadata = null; /** * Used by Avro reflection to instantiate this class when reading manifest files. @@ -101,13 +104,14 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) { this.deletedRowsCount = null; this.partitions = null; this.fromProjectionPos = null; + this.keyMetadata = null; } public GenericManifestFile(String path, long length, int specId, ManifestContent content, long sequenceNumber, long minSequenceNumber, Long snapshotId, int addedFilesCount, long addedRowsCount, int existingFilesCount, long existingRowsCount, int deletedFilesCount, long deletedRowsCount, - List partitions) { + List partitions, ByteBuffer keyMetadata) { this.avroSchema = AVRO_SCHEMA; this.manifestPath = path; this.length = length; @@ -124,6 +128,7 @@ public GenericManifestFile(String path, long length, int specId, ManifestContent this.deletedRowsCount = deletedRowsCount; this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); this.fromProjectionPos = null; + this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } /** @@ -154,6 +159,7 @@ private GenericManifestFile(GenericManifestFile toCopy) { this.partitions = null; } this.fromProjectionPos = toCopy.fromProjectionPos; + this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length); } /** @@ -245,6 +251,11 @@ public List partitions() { return partitions == null ? null : Arrays.asList(partitions); } + @Override + public ByteBuffer keyMetadata() { + return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata); + } + @Override public int size() { return ManifestFile.schema().columns().size(); @@ -291,6 +302,8 @@ public Object get(int i) { return deletedRowsCount; case 13: return partitions(); + case 14: + return keyMetadata(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -349,6 +362,9 @@ public void set(int i, T value) { this.partitions = value == null ? null : ((List) value).toArray(new PartitionFieldSummary[0]); return; + case 14: + this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value); + return; default: // ignore the object, it must be from a newer version of the format } @@ -399,6 +415,7 @@ public String toString() { .add("deleted_data_files_count", deletedFilesCount) .add("deleted_rows_count", deletedRowsCount) .add("partitions", partitions) + .add("key_metadata", keyMetadata == null ? "null" : "(redacted)") .toString(); } @@ -418,7 +435,7 @@ private CopyBuilder(ManifestFile toCopy) { toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(), toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), - copyList(toCopy.partitions(), PartitionFieldSummary::copy)); + copyList(toCopy.partitions(), PartitionFieldSummary::copy), toCopy.keyMetadata()); } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index b03d68ed217e..a03d18a8a1a0 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -156,7 +156,7 @@ public ManifestFile toManifestFile() { long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ; return new GenericManifestFile(file.location(), writer.length(), specId, content(), UNASSIGNED_SEQ, minSeqNumber, snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); + addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), null); } @Override diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 3a179f99fbe4..c9048f7e96c8 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -441,7 +441,7 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(), ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, - addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries()); + addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), null); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path()); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 4b064ffeebcb..40b8624baa3b 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -132,6 +132,8 @@ public Object get(int pos) { return wrapped.deletedRowsCount(); case 13: return wrapped.partitions(); + case 14: + return wrapped.keyMetadata(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -222,6 +224,11 @@ public List partitions() { return wrapped.partitions(); } + @Override + public ByteBuffer keyMetadata() { + return wrapped.keyMetadata(); + } + @Override public ManifestFile copy() { return wrapped.copy(); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java index 0c5b97ce4558..6c145734ce4e 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java @@ -56,16 +56,17 @@ public class TestManifestListVersions { private static final int DELETED_FILES = 1; private static final long DELETED_ROWS = 22910L; private static final List PARTITION_SUMMARIES = ImmutableList.of(); + private static final ByteBuffer KEY_METADATA = null; private static final ManifestFile TEST_MANIFEST = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES); + PARTITION_SUMMARIES, KEY_METADATA); private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DELETES, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - PARTITION_SUMMARIES); + PARTITION_SUMMARIES, KEY_METADATA); @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -223,7 +224,7 @@ public void testManifestsPartitionSummary() throws IOException { ManifestFile manifest = new GenericManifestFile( PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID, ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS, - partitionFieldSummaries); + partitionFieldSummaries, KEY_METADATA); InputFile manifestList = writeManifestList(manifest, 2); diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 4ada31a619ca..34785cfb6a34 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -815,7 +815,7 @@ private FileAppenderFactory createDeletableAppenderFactory() { private ManifestFile createTestingManifestFile(Path manifestPath) { return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0, - ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null); + ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java b/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java index f51499dcf1aa..4beb12c5e945 100644 --- a/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java +++ b/spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java @@ -19,6 +19,7 @@ package org.apache.iceberg.actions; +import java.nio.ByteBuffer; import java.util.List; import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; @@ -131,6 +132,11 @@ public List partitions() { return null; } + @Override + public ByteBuffer keyMetadata() { + return null; + } + @Override public ManifestFile copy() { throw new UnsupportedOperationException("Cannot copy");