Skip to content
Closed
15 changes: 11 additions & 4 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,6 @@ acceptedBreaks:
old: "method org.apache.iceberg.view.ViewBuilder org.apache.iceberg.view.ViewBuilder::withQueryColumnNames(java.util.List<java.lang.String>)"
justification: "Acceptable break due to updating View APIs and the View Spec"
org.apache.iceberg:iceberg-core:
- code: "java.method.visibilityReduced"
old: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult"
justification: "Removing deprecated code"
Expand Down Expand Up @@ -833,6 +829,13 @@ acceptedBreaks:
- code: "java.class.removed"
old: "interface org.apache.iceberg.actions.RewritePositionDeleteStrategy"
justification: "Removing deprecated code"
- code: "java.method.abstractMethodAdded"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't be breaking the API. See also my comment in #6799 (comment) on how to avoid breaking the API for adding the new method

Copy link
Contributor Author

@wypoon wypoon Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, your suggestion was followed in #6799 and I haven't reverted it there.
I think @ConeyLiu's point is that the new newAppender method doesn't break the API because it is not abstract -- it calls the old newAppender. However, the old newAppender is deprecated and when it is removed, the new newAppender then has to be changed. There is no natural implementation for it, so it should be abstract. Of course, one could do something else, such as return null or throw an UnsupportedOperationException.
If anyone out there actually writes their own ManifestWriter subclass, they would need to implement a newAppender(PartitionSpec, OutputFile, String, Integer) method. We avoid breaking the API now, which allows them to get away without implementing the new method, but when newAppender(PartitionSpec, OutputFile) is then removed, that will break them (even if we do not technically break the API by keeping newAppender(PartitionSpec, OutputFile, String, Integer) non-abstract and throwing an UnsupportedOperationException -- it effectively breaks them).

new: "method org.apache.iceberg.io.FileAppender<org.apache.iceberg.ManifestEntry<F>>\
\ org.apache.iceberg.ManifestWriter<F extends org.apache.iceberg.ContentFile<F\
\ extends org.apache.iceberg.ContentFile<F>>>::newAppender(org.apache.iceberg.PartitionSpec,\
\ org.apache.iceberg.io.OutputFile, java.lang.String, java.lang.Integer)"
justification: "Allow adding a new method to the abstract class - old method\
\ is deprecated"
- code: "java.method.removed"
old: "method java.util.List<org.apache.iceberg.DataFile> org.apache.iceberg.MergingSnapshotProducer<ThisT>::addedFiles()\
\ @ org.apache.iceberg.BaseOverwriteFiles"
Expand All @@ -857,6 +860,10 @@ acceptedBreaks:
old: "method void org.apache.iceberg.MergingSnapshotProducer<ThisT>::setNewFilesSequenceNumber(long)\
\ @ org.apache.iceberg.StreamingDelete"
justification: "Removing deprecated code"
- code: "java.method.visibilityReduced"
old: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ project(':iceberg-core') {
testImplementation libs.esotericsoftware.kryo
testImplementation libs.guava.testlib
testImplementation libs.awaitility
testRuntimeOnly libs.zstd.jni
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ private ManifestFile copyManifest(ManifestFile manifest) {
specsById,
newFile,
snapshotId(),
summaryBuilder);
summaryBuilder,
current.properties().get(TableProperties.AVRO_COMPRESSION),
current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL));
}

@Override
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.specsById(),
newManifestPath,
snapshotId(),
summaryBuilder);
summaryBuilder,
current.properties().get(TableProperties.AVRO_COMPRESSION),
current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL));
}

@Override
Expand Down
80 changes: 71 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,34 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return write(formatVersion, spec, outputFile, snapshotId, null, null);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param compressionCodec compression codec for the manifest file
* @param compressionLevel compression level of the compressionCodec
* @return a manifest writer
*/
public static ManifestWriter<DataFile> write(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
String compressionCodec,
Integer compressionLevel) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V1Writer(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
case 2:
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V2Writer(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -198,11 +221,33 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, null, null);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param compressionCodec compression codec for the manifest file
* @param compressionLevel compression level of the compressionCodec
* @return a manifest writer
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
String compressionCodec,
Integer compressionLevel) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
case 2:
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
return new ManifestWriter.V2DeleteWriter(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -256,7 +301,9 @@ static ManifestFile copyAppendManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
String compressionCodec,
Integer compressionLevel) {
// use metadata that will add the current snapshot's ID for the rewrite
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
try (ManifestReader<DataFile> reader =
Expand All @@ -267,7 +314,9 @@ static ManifestFile copyAppendManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.ADDED);
ManifestEntry.Status.ADDED,
compressionCodec,
compressionLevel);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -280,7 +329,9 @@ static ManifestFile copyRewriteManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
String compressionCodec,
Integer compressionLevel) {
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an
// exception if it is not
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
Expand All @@ -292,7 +343,9 @@ static ManifestFile copyRewriteManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.EXISTING);
ManifestEntry.Status.EXISTING,
compressionCodec,
compressionLevel);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -305,8 +358,17 @@ private static ManifestFile copyManifestInternal(
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
ManifestEntry.Status allowedEntryStatus,
String compressionCodec,
Integer compressionLevel) {
ManifestWriter<DataFile> writer =
write(
formatVersion,
reader.spec(),
outputFile,
snapshotId,
compressionCodec,
compressionLevel);
boolean threw = true;
try {
for (ManifestEntry<DataFile> entry : reader.entries()) {
Expand Down
91 changes: 68 additions & 23 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@
abstract class ManifestListWriter implements FileAppender<ManifestFile> {
private final FileAppender<ManifestFile> writer;

private ManifestListWriter(OutputFile file, Map<String, String> meta) {
this.writer = newAppender(file, meta);
private ManifestListWriter(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
this.writer = newAppender(file, meta, compressionCodec, compressionLevel);
}

protected abstract ManifestFile prepare(ManifestFile manifest);

protected abstract FileAppender<ManifestFile> newAppender(
OutputFile file, Map<String, String> meta);
OutputFile file, Map<String, String> meta, String compressionCodec, Integer compressionLevel);

@Override
public void add(ManifestFile manifest) {
Expand Down Expand Up @@ -73,14 +77,22 @@ public long length() {
static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;

V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
V2Writer(
OutputFile snapshotFile,
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
String compressionCodec,
Integer compressionLevel) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "2"));
"format-version", "2"),
compressionCodec,
compressionLevel);
this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
}

Expand All @@ -90,15 +102,28 @@ protected ManifestFile prepare(ManifestFile manifest) {
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
protected FileAppender<ManifestFile> newAppender(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
try {
return Avro.write(file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();

Avro.WriteBuilder builder =
Avro.write(file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite();

if (compressionCodec != null) {
builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
}

if (compressionLevel != null) {
builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
}

return builder.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
Expand All @@ -108,13 +133,20 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
static class V1Writer extends ManifestListWriter {
private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile();

V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
V1Writer(
OutputFile snapshotFile,
long snapshotId,
Long parentSnapshotId,
String compressionCodec,
Integer compressionLevel) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"format-version", "1"));
"format-version", "1"),
compressionCodec,
compressionLevel);
}

@Override
Expand All @@ -126,15 +158,28 @@ protected ManifestFile prepare(ManifestFile manifest) {
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
protected FileAppender<ManifestFile> newAppender(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
try {
return Avro.write(file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();

Avro.WriteBuilder builder =
Avro.write(file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite();

if (compressionCodec != null) {
builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
}

if (compressionLevel != null) {
builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
}

return builder.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
Expand Down
Loading