Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ public interface ManifestFile {
Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions",
Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE),
"Summary for each partition");
// next ID to assign: 519
Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(),
"Encryption key metadata blob");
// next ID to assign: 520

Schema SCHEMA = new Schema(
PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT,
SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
PARTITION_SUMMARIES);
PARTITION_SUMMARIES, KEY_METADATA);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -179,6 +181,13 @@ default boolean hasDeletedFiles() {
*/
List<PartitionFieldSummary> partitions();

/**
* Returns metadata about how this manifest file is encrypted, or null if the file is stored in plain text.
*/
default ByteBuffer keyMetadata() {
return null;
}

/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
Expand Down
13 changes: 11 additions & 2 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.util.ByteBuffers;
import org.junit.Assert;

public class TestHelpers {
Expand Down Expand Up @@ -240,10 +241,11 @@ public static class TestManifestFile implements ManifestFile {
private final Integer deletedFiles;
private final Long deletedRows;
private final List<PartitionFieldSummary> partitions;
private final byte[] keyMetadata;

public TestManifestFile(String path, long length, int specId, Long snapshotId,
Integer addedFiles, Integer existingFiles, Integer deletedFiles,
List<PartitionFieldSummary> partitions) {
List<PartitionFieldSummary> partitions, ByteBuffer keyMetadata) {
this.path = path;
this.length = length;
this.specId = specId;
Expand All @@ -256,12 +258,13 @@ public TestManifestFile(String path, long length, int specId, Long snapshotId,
this.deletedFiles = deletedFiles;
this.deletedRows = null;
this.partitions = partitions;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}

public TestManifestFile(String path, long length, int specId, ManifestContent content, Long snapshotId,
Integer addedFiles, Long addedRows, Integer existingFiles,
Long existingRows, Integer deletedFiles, Long deletedRows,
List<PartitionFieldSummary> partitions) {
List<PartitionFieldSummary> partitions, ByteBuffer keyMetadata) {
this.path = path;
this.length = length;
this.specId = specId;
Expand All @@ -274,6 +277,7 @@ public TestManifestFile(String path, long length, int specId, ManifestContent co
this.deletedFiles = deletedFiles;
this.deletedRows = deletedRows;
this.partitions = partitions;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}

@Override
Expand Down Expand Up @@ -346,6 +350,11 @@ public List<PartitionFieldSummary> partitions() {
return partitions;
}

@Override
public ByteBuffer keyMetadata() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used in tests? We could just allow this to use the default implementation, unless we added tests that exercise it.

return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata);
}

@Override
public ManifestFile copy() {
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public class TestInclusiveManifestEvaluator {
private static final ByteBuffer STRING_MAX = toByteBuffer(Types.StringType.get(), "z");

private static final ManifestFile NO_STATS = new TestHelpers.TestManifestFile(
"manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null);
"manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null, null);

private static final ManifestFile FILE = new TestHelpers.TestManifestFile("manifest-list.avro",
1024, 0, System.currentTimeMillis(), 5, 10, 0, ImmutableList.of(
Expand All @@ -110,7 +110,7 @@ public class TestInclusiveManifestEvaluator {
toByteBuffer(Types.FloatType.get(), 0F),
toByteBuffer(Types.FloatType.get(), 20F)),
new TestHelpers.TestFieldSummary(true, null, null)
));
), null);

@Test
public void testAllNulls() {
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/org/apache/iceberg/GenericManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
Expand All @@ -33,6 +34,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;

public class GenericManifestFile
implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable {
Expand All @@ -58,6 +60,7 @@ public class GenericManifestFile
private Long existingRowsCount = null;
private Long deletedRowsCount = null;
private PartitionFieldSummary[] partitions = null;
private byte[] keyMetadata = null;

/**
* Used by Avro reflection to instantiate this class when reading manifest files.
Expand Down Expand Up @@ -101,13 +104,14 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) {
this.deletedRowsCount = null;
this.partitions = null;
this.fromProjectionPos = null;
this.keyMetadata = null;
}

public GenericManifestFile(String path, long length, int specId, ManifestContent content,
long sequenceNumber, long minSequenceNumber, Long snapshotId,
int addedFilesCount, long addedRowsCount, int existingFilesCount,
long existingRowsCount, int deletedFilesCount, long deletedRowsCount,
List<PartitionFieldSummary> partitions) {
List<PartitionFieldSummary> partitions, ByteBuffer keyMetadata) {
this.avroSchema = AVRO_SCHEMA;
this.manifestPath = path;
this.length = length;
Expand All @@ -124,6 +128,7 @@ public GenericManifestFile(String path, long length, int specId, ManifestContent
this.deletedRowsCount = deletedRowsCount;
this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]);
this.fromProjectionPos = null;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}

/**
Expand Down Expand Up @@ -154,6 +159,7 @@ private GenericManifestFile(GenericManifestFile toCopy) {
this.partitions = null;
}
this.fromProjectionPos = toCopy.fromProjectionPos;
this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length);
}

/**
Expand Down Expand Up @@ -245,6 +251,11 @@ public List<PartitionFieldSummary> partitions() {
return partitions == null ? null : Arrays.asList(partitions);
}

@Override
public ByteBuffer keyMetadata() {
return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata);
}

@Override
public int size() {
return ManifestFile.schema().columns().size();
Expand Down Expand Up @@ -291,6 +302,8 @@ public Object get(int i) {
return deletedRowsCount;
case 13:
return partitions();
case 14:
return keyMetadata();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -349,6 +362,9 @@ public <T> void set(int i, T value) {
this.partitions = value == null ? null :
((List<PartitionFieldSummary>) value).toArray(new PartitionFieldSummary[0]);
return;
case 14:
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand Down Expand Up @@ -399,6 +415,7 @@ public String toString() {
.add("deleted_data_files_count", deletedFilesCount)
.add("deleted_rows_count", deletedRowsCount)
.add("partitions", partitions)
.add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually be more against using this approach to support redact, for 2 reasons:

  1. this is only for toString. If you really want to see the details of the key metadata you can read through the manifest table, and that should show the information fully if you can decrypt an encrypted manifest list. I think that provides better access control. Changing toString feels to me like opening a backdoor.
  2. if we can encrypt a manifest, we should be able to support redact configuration natively through encryption manager as a part of data masking. When we can do that, you don't need a forked version to use a custom static method for this purpose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe we should just drop this from toString? I'm fine with that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also fine dropping this from toString. I think the main concern is that we don't want to dump binary key metadata as a string. I'm fine if this shows nothing at all.

.toString();
}

Expand All @@ -418,7 +435,7 @@ private CopyBuilder(ManifestFile toCopy) {
toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(),
toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(),
toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(),
copyList(toCopy.partitions(), PartitionFieldSummary::copy));
copyList(toCopy.partitions(), PartitionFieldSummary::copy), toCopy.keyMetadata());
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/ManifestWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public ManifestFile toManifestFile() {
long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ;
return new GenericManifestFile(file.location(), writer.length(), specId, content(),
UNASSIGNED_SEQ, minSeqNumber, snapshotId,
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife

return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId,
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries(), null);

} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public Object get(int pos) {
return wrapped.deletedRowsCount();
case 13:
return wrapped.partitions();
case 14:
return wrapped.keyMetadata();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
Expand Down Expand Up @@ -222,6 +224,11 @@ public List<PartitionFieldSummary> partitions() {
return wrapped.partitions();
}

@Override
public ByteBuffer keyMetadata() {
return wrapped.keyMetadata();
}

@Override
public ManifestFile copy() {
return wrapped.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,17 @@ public class TestManifestListVersions {
private static final int DELETED_FILES = 1;
private static final long DELETED_ROWS = 22910L;
private static final List<ManifestFile.PartitionFieldSummary> PARTITION_SUMMARIES = ImmutableList.of();
private static final ByteBuffer KEY_METADATA = null;

private static final ManifestFile TEST_MANIFEST = new GenericManifestFile(
PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
PARTITION_SUMMARIES);
PARTITION_SUMMARIES, KEY_METADATA);

private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile(
PATH, LENGTH, SPEC_ID, ManifestContent.DELETES, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
PARTITION_SUMMARIES);
PARTITION_SUMMARIES, KEY_METADATA);

@Rule
public TemporaryFolder temp = new TemporaryFolder();
Expand Down Expand Up @@ -223,7 +224,7 @@ public void testManifestsPartitionSummary() throws IOException {
ManifestFile manifest = new GenericManifestFile(
PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
partitionFieldSummaries);
partitionFieldSummaries, KEY_METADATA);

InputFile manifestList = writeManifestList(manifest, 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ private FileAppenderFactory<RowData> createDeletableAppenderFactory() {

private ManifestFile createTestingManifestFile(Path manifestPath) {
return new GenericManifestFile(manifestPath.toAbsolutePath().toString(), manifestPath.toFile().length(), 0,
ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null);
ManifestContent.DATA, 0, 0, 0L, 0, 0, 0, 0, 0, 0, null, null);
}

private List<Path> assertFlinkManifests(int expectedCount) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.actions;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
Expand Down Expand Up @@ -131,6 +132,11 @@ public List<PartitionFieldSummary> partitions() {
return null;
}

@Override
public ByteBuffer keyMetadata() {
return null;
}

@Override
public ManifestFile copy() {
throw new UnsupportedOperationException("Cannot copy");
Expand Down