diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index b3a4ba39120c..440229ce8fbc 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -802,10 +802,6 @@ acceptedBreaks: old: "method org.apache.iceberg.view.ViewBuilder org.apache.iceberg.view.ViewBuilder::withQueryColumnNames(java.util.List)" justification: "Acceptable break due to updating View APIs and the View Spec" org.apache.iceberg:iceberg-core: - - code: "java.method.visibilityReduced" - old: "method void org.apache.iceberg.encryption.Ciphers::()" - new: "method void org.apache.iceberg.encryption.Ciphers::()" - justification: "Static utility class - should not have public constructor" - code: "java.class.removed" old: "class org.apache.iceberg.actions.BaseDeleteOrphanFilesActionResult" justification: "Removing deprecated code" @@ -833,6 +829,13 @@ acceptedBreaks: - code: "java.class.removed" old: "interface org.apache.iceberg.actions.RewritePositionDeleteStrategy" justification: "Removing deprecated code" + - code: "java.method.abstractMethodAdded" + new: "method org.apache.iceberg.io.FileAppender>\ + \ org.apache.iceberg.ManifestWriter>>::newAppender(org.apache.iceberg.PartitionSpec,\ + \ org.apache.iceberg.io.OutputFile, java.lang.String, java.lang.Integer)" + justification: "Allow adding a new method to the abstract class - old method\ + \ is deprecated" - code: "java.method.removed" old: "method java.util.List org.apache.iceberg.MergingSnapshotProducer::addedFiles()\ \ @ org.apache.iceberg.BaseOverwriteFiles" @@ -857,6 +860,10 @@ acceptedBreaks: old: "method void org.apache.iceberg.MergingSnapshotProducer::setNewFilesSequenceNumber(long)\ \ @ org.apache.iceberg.StreamingDelete" justification: "Removing deprecated code" + - code: "java.method.visibilityReduced" + old: "method void org.apache.iceberg.encryption.Ciphers::()" + new: "method void org.apache.iceberg.encryption.Ciphers::()" + justification: "Static utility class - should not have public constructor" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/build.gradle b/build.gradle index 08f6f8a3ad4f..6173757db0b6 100644 --- a/build.gradle +++ b/build.gradle @@ -358,6 +358,7 @@ project(':iceberg-core') { testImplementation libs.esotericsoftware.kryo testImplementation libs.guava.testlib testImplementation libs.awaitility + testRuntimeOnly libs.zstd.jni } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 54bf3c6e44c4..5ae3da9daee6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -170,7 +170,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { specsById, newFile, snapshotId(), - summaryBuilder); + summaryBuilder, + current.properties().get(TableProperties.AVRO_COMPRESSION), + current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 3079757392cd..19051068ced6 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -138,7 +138,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { current.specsById(), newManifestPath, snapshotId(), - summaryBuilder); + summaryBuilder, + current.properties().get(TableProperties.AVRO_COMPRESSION), + current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index c23ab667a41b..eaa7c9075a4e 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -157,11 +157,34 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outp */ public static ManifestWriter write( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return write(formatVersion, spec, outputFile, snapshotId, null, null); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @param compressionCodec compression codec for the manifest file + * @param compressionLevel compression level of the compressionCodec + * @return a manifest writer + */ + public static ManifestWriter write( + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { switch (formatVersion) { case 1: - return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V1Writer( + spec, outputFile, snapshotId, compressionCodec, compressionLevel); case 2: - return new ManifestWriter.V2Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V2Writer( + spec, outputFile, snapshotId, compressionCodec, compressionLevel); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -198,11 +221,33 @@ public static ManifestReader readDeleteManifest( */ public static ManifestWriter writeDeleteManifest( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return writeDeleteManifest(formatVersion, spec, outputFile, snapshotId, null, null); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @param compressionCodec compression codec for the manifest file + * @param compressionLevel compression level of the compressionCodec + * @return a manifest writer + */ + public static ManifestWriter writeDeleteManifest( + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { switch (formatVersion) { case 1: throw new IllegalArgumentException("Cannot write delete files in a v1 table"); case 2: - return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId); + return new ManifestWriter.V2DeleteWriter( + spec, outputFile, snapshotId, compressionCodec, compressionLevel); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -256,7 +301,9 @@ static ManifestFile copyAppendManifest( Map specsById, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { + SnapshotSummary.Builder summaryBuilder, + String compressionCodec, + Integer compressionLevel) { // use metadata that will add the current snapshot's ID for the rewrite InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId); try (ManifestReader reader = @@ -267,7 +314,9 @@ static ManifestFile copyAppendManifest( outputFile, snapshotId, summaryBuilder, - ManifestEntry.Status.ADDED); + ManifestEntry.Status.ADDED, + compressionCodec, + compressionLevel); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location()); } @@ -280,7 +329,9 @@ static ManifestFile copyRewriteManifest( Map specsById, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { + SnapshotSummary.Builder summaryBuilder, + String compressionCodec, + Integer compressionLevel) { // for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an // exception if it is not InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty(); @@ -292,7 +343,9 @@ static ManifestFile copyRewriteManifest( outputFile, snapshotId, summaryBuilder, - ManifestEntry.Status.EXISTING); + ManifestEntry.Status.EXISTING, + compressionCodec, + compressionLevel); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location()); } @@ -305,8 +358,17 @@ private static ManifestFile copyManifestInternal( OutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, - ManifestEntry.Status allowedEntryStatus) { - ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + ManifestEntry.Status allowedEntryStatus, + String compressionCodec, + Integer compressionLevel) { + ManifestWriter writer = + write( + formatVersion, + reader.spec(), + outputFile, + snapshotId, + compressionCodec, + compressionLevel); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 3f7f20d4df6c..e0b98e473631 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -31,14 +31,18 @@ abstract class ManifestListWriter implements FileAppender { private final FileAppender writer; - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + private ManifestListWriter( + OutputFile file, + Map meta, + String compressionCodec, + Integer compressionLevel) { + this.writer = newAppender(file, meta, compressionCodec, compressionLevel); } protected abstract ManifestFile prepare(ManifestFile manifest); protected abstract FileAppender newAppender( - OutputFile file, Map meta); + OutputFile file, Map meta, String compressionCodec, Integer compressionLevel); @Override public void add(ManifestFile manifest) { @@ -73,14 +77,22 @@ public long length() { static class V2Writer extends ManifestListWriter { private final V2Metadata.IndexedManifestFile wrapper; - V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V2Writer( + OutputFile snapshotFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + String compressionCodec, + Integer compressionLevel) { super( snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), - "format-version", "2")); + "format-version", "2"), + compressionCodec, + compressionLevel); this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber); } @@ -90,15 +102,28 @@ protected ManifestFile prepare(ManifestFile manifest) { } @Override - protected FileAppender newAppender(OutputFile file, Map meta) { + protected FileAppender newAppender( + OutputFile file, + Map meta, + String compressionCodec, + Integer compressionLevel) { try { - return Avro.write(file) - .schema(V2Metadata.MANIFEST_LIST_SCHEMA) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); - + Avro.WriteBuilder builder = + Avro.write(file) + .schema(V2Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file); } @@ -108,13 +133,20 @@ protected FileAppender newAppender(OutputFile file, Map newAppender(OutputFile file, Map meta) { + protected FileAppender newAppender( + OutputFile file, + Map meta, + String compressionCodec, + Integer compressionLevel) { try { - return Avro.write(file) - .schema(V1Metadata.MANIFEST_LIST_SCHEMA) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); - + Avro.WriteBuilder builder = + Avro.write(file) + .schema(V1Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index c7b3e5fee5a9..a92743d847d0 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -32,16 +34,7 @@ class ManifestLists { private ManifestLists() {} static List read(InputFile manifestList) { - try (CloseableIterable files = - Avro.read(manifestList) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .project(ManifestFile.schema()) - .reuseContainers(false) - .build()) { - + try (CloseableIterable files = manifestFileIterable(manifestList)) { return Lists.newLinkedList(files); } catch (IOException e) { @@ -50,22 +43,52 @@ static List read(InputFile manifestList) { } } + @VisibleForTesting + static AvroIterable manifestFileIterable(InputFile manifestList) { + return Avro.read(manifestList) + .rename("manifest_file", GenericManifestFile.class.getName()) + .rename("partitions", GenericPartitionFieldSummary.class.getName()) + .rename("r508", GenericPartitionFieldSummary.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .project(ManifestFile.schema()) + .reuseContainers(false) + .build(); + } + static ManifestListWriter write( int formatVersion, OutputFile manifestListFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + return write( + formatVersion, manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, null, null); + } + + static ManifestListWriter write( + int formatVersion, + OutputFile manifestListFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + String compressionCodec, + Integer compressionLevel) { switch (formatVersion) { case 1: Preconditions.checkArgument( sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, "Invalid sequence number for v1 manifest list: %s", sequenceNumber); - return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + return new ManifestListWriter.V1Writer( + manifestListFile, snapshotId, parentSnapshotId, compressionCodec, compressionLevel); case 2: return new ManifestListWriter.V2Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + manifestListFile, + snapshotId, + parentSnapshotId, + sequenceNumber, + compressionCodec, + compressionLevel); } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 4ee51aa60c31..64150a3706aa 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -84,6 +85,7 @@ private String fileClass() { private final FileType content; private final PartitionSpec spec; private final Schema fileSchema; + private Map metadata; // updated by configuration methods private PartitionSet partitionSet = null; @@ -118,7 +120,7 @@ protected ManifestReader( } private > PartitionSpec readPartitionSpec(InputFile inputFile) { - Map metadata = readMetadata(inputFile); + this.metadata = readMetadata(inputFile); int specId = TableMetadata.INITIAL_SPEC_ID; String specProperty = metadata.get("partition-spec-id"); @@ -292,6 +294,11 @@ private boolean isLiveEntry(ManifestEntry entry) { return entry != null && entry.status() != ManifestEntry.Status.DELETED; } + @VisibleForTesting + Map metadata() { + return metadata; + } + /** @return an Iterator of DataFile. Makes defensive copies of files before returning */ @Override public CloseableIterator iterator() { diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 4865ccfc3b2d..c216b9c8b146 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -52,10 +52,15 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minDataSequenceNumber = null; - private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + private ManifestWriter( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { this.file = file; this.specId = spec.specId(); - this.writer = newAppender(spec, file); + this.writer = newAppender(spec, file, compressionCodec, compressionLevel); this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); @@ -63,9 +68,17 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { protected abstract ManifestEntry prepare(ManifestEntry entry); + /** + * @deprecated since 1.4.0, will be removed in 1.5.0; use {@link + * ManifestWriter#newAppender(PartitionSpec, OutputFile, String, Integer)} instead. + */ + @Deprecated protected abstract FileAppender> newAppender( PartitionSpec spec, OutputFile outputFile); + protected abstract FileAppender> newAppender( + PartitionSpec spec, OutputFile outputFile, String compressionCodec, Integer compressionLevel); + protected ManifestContent content() { return ManifestContent.DATA; } @@ -216,8 +229,13 @@ public void close() throws IOException { static class V2Writer extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2Writer( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { + super(spec, file, snapshotId, compressionCodec, compressionLevel); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -229,18 +247,34 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { + return newAppender(spec, file, null, null); + } + + @Override + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "2") - .meta("content", "data") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "data") + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } @@ -250,8 +284,13 @@ protected FileAppender> newAppender( static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2DeleteWriter( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { + super(spec, file, snapshotId, compressionCodec, compressionLevel); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -263,18 +302,34 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { + return newAppender(spec, file, null, null); + } + + @Override + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "2") - .meta("content", "deletes") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "deletes") + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } @@ -289,8 +344,13 @@ protected ManifestContent content() { static class V1Writer extends ManifestWriter { private final V1Metadata.IndexedManifestEntry entryWrapper; - V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V1Writer( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { + super(spec, file, snapshotId, compressionCodec, compressionLevel); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); } @@ -302,17 +362,33 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { + return newAppender(spec, file, null, null); + } + + @Override + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "1") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "1") + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index e7b2ccf69020..2def35a000bc 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -292,7 +292,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { current.specsById(), newManifestPath, snapshotId(), - appendedManifestsSummary); + appendedManifestsSummary, + current.properties().get(TableProperties.AVRO_COMPRESSION), + current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } /** diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 4ecdd21d9386..981a6bb0a5da 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -229,11 +229,13 @@ public Snapshot apply() { try (ManifestListWriter writer = ManifestLists.write( - ops.current().formatVersion(), + base.formatVersion(), manifestList, snapshotId(), parentSnapshotId, - sequenceNumber)) { + sequenceNumber, + base.properties().get(TableProperties.AVRO_COMPRESSION), + base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL))) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -500,12 +502,22 @@ protected OutputFile newManifestOutput() { protected ManifestWriter newManifestWriter(PartitionSpec spec) { return ManifestFiles.write( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + base.formatVersion(), + spec, + newManifestOutput(), + snapshotId(), + base.properties().get(TableProperties.AVRO_COMPRESSION), + base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) { return ManifestFiles.writeDeleteManifest( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + base.formatVersion(), + spec, + newManifestOutput(), + snapshotId(), + base.properties().get(TableProperties.AVRO_COMPRESSION), + base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } protected RollingManifestWriter newRollingManifestWriter(PartitionSpec spec) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index a6f1d428f41a..5ea9f186d1b3 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -472,6 +472,10 @@ public int propertyAsInt(String property, int defaultValue) { return PropertyUtil.propertyAsInt(properties, property, defaultValue); } + public Integer propertyAsNullableInt(String property) { + return PropertyUtil.propertyAsNullableInt(properties, property); + } + public long propertyAsLong(String property, long defaultValue) { return PropertyUtil.propertyAsLong(properties, property, defaultValue); } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index a800214bc9a7..2f0be89e7e69 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -28,10 +28,13 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -162,6 +165,19 @@ public class TableTestBase { static final FileIO FILE_IO = new TestTables.LocalFileIO(); + // Mapping of Avro codec name used by Iceberg to name used by Avro (and appearing in Avro metadata + // under the key, avro.codec). + // In tests, we use the Iceberg name to specify the codec, and we verify the codec used by reading + // the Avro metadata and checking for the Avro name in avro.codec. + static final Map AVRO_CODEC_NAME_MAPPING = + ImmutableMap.builder() + .put("uncompressed", "null") + .put("zstd", "zstandard") + .put("gzip", "deflate") + .build(); + + static final long EXAMPLE_SNAPSHOT_ID = 987134631982734L; + @Rule public TemporaryFolder temp = new TemporaryFolder(); protected File tableDir = null; @@ -236,12 +252,22 @@ ManifestFile writeManifest(DataFile... files) throws IOException { } ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException { - File manifestFile = temp.newFile("input.m0.avro"); + return writeManifest(snapshotId, null, files); + } + + ManifestFile writeManifest(Long snapshotId, String compressionCodec, DataFile... files) + throws IOException { + File manifestFile = temp.newFile(); Assert.assertTrue(manifestFile.delete()); - OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); + OutputFile outputFile = + table + .ops() + .io() + .newOutputFile(FileFormat.AVRO.addExtension(manifestFile.getCanonicalPath())); ManifestWriter writer = - ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); + ManifestFiles.write( + formatVersion, table.spec(), outputFile, snapshotId, compressionCodec, null); try { for (DataFile file : files) { writer.add(file); @@ -292,11 +318,18 @@ > ManifestFile writeManifest( ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFile... deleteFiles) throws IOException { + return writeDeleteManifest(newFormatVersion, snapshotId, null, deleteFiles); + } + + ManifestFile writeDeleteManifest( + int newFormatVersion, Long snapshotId, String compressionCodec, DeleteFile... deleteFiles) + throws IOException { OutputFile manifestFile = org.apache.iceberg.Files.localOutput( FileFormat.AVRO.addExtension(temp.newFile().toString())); ManifestWriter writer = - ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId); + ManifestFiles.writeDeleteManifest( + newFormatVersion, SPEC, manifestFile, snapshotId, compressionCodec, null); try { for (DeleteFile deleteFile : deleteFiles) { writer.add(deleteFile); @@ -307,6 +340,31 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi return writer.toManifestFile(); } + InputFile writeManifestList(String compressionCodec, ManifestFile... manifestFiles) + throws IOException { + File manifestListFile = temp.newFile(); + Assert.assertTrue(manifestListFile.delete()); + OutputFile outputFile = + org.apache.iceberg.Files.localOutput( + FileFormat.AVRO.addExtension(manifestListFile.toString())); + + try (FileAppender writer = + ManifestLists.write( + formatVersion, + outputFile, + EXAMPLE_SNAPSHOT_ID, + EXAMPLE_SNAPSHOT_ID - 1, + formatVersion > 1 ? 34L : 0, + compressionCodec, + null)) { + for (ManifestFile manifestFile : manifestFiles) { + writer.add(manifestFile); + } + } + + return outputFile.toInputFile(); + } + ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException { File manifestFile = temp.newFile(name + ".avro"); Assert.assertTrue(manifestFile.delete()); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java new file mode 100644 index 000000000000..aea21dfedb12 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Map; +import org.apache.avro.file.DataFileConstants; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.io.InputFile; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestManifestListWriter extends TableTestBase { + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {1, 2}; + } + + public TestManifestListWriter(int formatVersion) { + super(formatVersion); + } + + @Test + public void testWriteManifestListWithCompression() throws IOException { + validateManifestListCompressionCodec(false); + } + + @Test + public void testWriteDeleteManifestListWithCompression() throws IOException { + Assumptions.assumeThat(formatVersion).isGreaterThan(1); + validateManifestListCompressionCodec(true); + } + + void validateManifestListCompressionCodec(boolean testDeletes) throws IOException { + for (Map.Entry entry : AVRO_CODEC_NAME_MAPPING.entrySet()) { + String codec = entry.getKey(); + String expectedCodecValue = entry.getValue(); + + ManifestFile manifest = writeManifest(EXAMPLE_SNAPSHOT_ID, codec, FILE_A); + ManifestFile deleteManifest = + testDeletes + ? writeDeleteManifest(formatVersion, EXAMPLE_SNAPSHOT_ID, codec, FILE_A_DELETES) + : null; + InputFile manifestList = + testDeletes + ? writeManifestList(codec, manifest, deleteManifest) + : writeManifestList(codec, manifest); + try (AvroIterable reader = ManifestLists.manifestFileIterable(manifestList)) { + Assertions.assertThat(reader.getMetadata()) + .containsEntry(DataFileConstants.CODEC, expectedCodecValue); + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index 17a41f418a8e..92be7a83dbe3 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.UUID; +import org.apache.avro.file.DataFileConstants; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Conversions; @@ -30,7 +32,6 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -105,7 +106,7 @@ public void testManifestPartitionStats() throws IOException { @Test public void testWriteManifestWithSequenceNumber() throws IOException { - Assume.assumeTrue("sequence number is only valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); File manifestFile = temp.newFile("manifest.avro"); Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -130,7 +131,7 @@ public void testWriteManifestWithSequenceNumber() throws IOException { @Test public void testCommitManifestWithExplicitDataSequenceNumber() throws IOException { - Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); DataFile file1 = newFile(50); DataFile file2 = newFile(50); @@ -175,7 +176,7 @@ public void testCommitManifestWithExplicitDataSequenceNumber() throws IOExceptio @Test public void testCommitManifestWithExistingEntriesWithoutFileSequenceNumber() throws IOException { - Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); DataFile file1 = newFile(50); DataFile file2 = newFile(50); @@ -384,6 +385,17 @@ private void checkManifests( } } + @Test + public void testWriteManifestWithCompression() throws IOException { + validateManifestCompressionCodec(false); + } + + @Test + public void testWriteDeleteManifestWithCompression() throws IOException { + Assumptions.assumeThat(formatVersion).isGreaterThan(1); + validateManifestCompressionCodec(true); + } + private DataFile newFile(long recordCount) { return newFile(recordCount, null); } @@ -435,4 +447,23 @@ private OutputFile newManifestFile() { throw new UncheckedIOException(e); } } + + void validateManifestCompressionCodec(boolean testDeletes) throws IOException { + for (Map.Entry entry : AVRO_CODEC_NAME_MAPPING.entrySet()) { + String codec = entry.getKey(); + String expectedCodecValue = entry.getValue(); + + ManifestFile manifest = + testDeletes + ? writeDeleteManifest(formatVersion, EXAMPLE_SNAPSHOT_ID, codec, FILE_A_DELETES) + : writeManifest(EXAMPLE_SNAPSHOT_ID, codec, FILE_A); + try (ManifestReader> reader = + testDeletes + ? ManifestFiles.readDeleteManifest(manifest, FILE_IO, null) + : ManifestFiles.read(manifest, FILE_IO)) { + Assertions.assertThat(reader.metadata()) + .containsEntry(DataFileConstants.CODEC, expectedCodecValue); + } + } + } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 00d55f937cc4..07afe292013d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -45,8 +45,20 @@ private FlinkManifestUtil() {} static ManifestFile writeDataFiles( OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + return writeDataFiles( + outputFile, spec, dataFiles, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, + PartitionSpec spec, + List dataFiles, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestWriter writer = - ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.write( + FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID, compressionCodec, compressionLevel); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -85,6 +97,17 @@ static ManifestOutputFileFactory createOutputFileFactory( static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { + return writeCompletedFiles( + result, outputFileSupplier, spec, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static DeltaManifests writeCompletedFiles( + WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; @@ -92,7 +115,12 @@ static DeltaManifests writeCompletedFiles( // Write the completed data files into a newly created data manifest file. if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = - writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + writeDataFiles( + outputFileSupplier.get(), + spec, + Lists.newArrayList(result.dataFiles()), + compressionCodec, + compressionLevel); } // Write the completed delete files into a newly created delete manifest file. @@ -100,7 +128,13 @@ static DeltaManifests writeCompletedFiles( OutputFile deleteManifestFile = outputFileSupplier.get(); ManifestWriter deleteManifestWriter = - ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.writeDeleteManifest( + FORMAT_V2, + spec, + deleteManifestFile, + DUMMY_SNAPSHOT_ID, + compressionCodec, + compressionLevel); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 3805ab298428..b56fc8b9d849 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -47,6 +47,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -449,7 +450,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException { WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( - result, () -> manifestOutputFileFactory.create(checkpointId), spec); + result, + () -> manifestOutputFileFactory.create(checkpointId), + table.spec(), + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 00d55f937cc4..07afe292013d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -45,8 +45,20 @@ private FlinkManifestUtil() {} static ManifestFile writeDataFiles( OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + return writeDataFiles( + outputFile, spec, dataFiles, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, + PartitionSpec spec, + List dataFiles, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestWriter writer = - ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.write( + FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID, compressionCodec, compressionLevel); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -85,6 +97,17 @@ static ManifestOutputFileFactory createOutputFileFactory( static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { + return writeCompletedFiles( + result, outputFileSupplier, spec, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static DeltaManifests writeCompletedFiles( + WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; @@ -92,7 +115,12 @@ static DeltaManifests writeCompletedFiles( // Write the completed data files into a newly created data manifest file. if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = - writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + writeDataFiles( + outputFileSupplier.get(), + spec, + Lists.newArrayList(result.dataFiles()), + compressionCodec, + compressionLevel); } // Write the completed delete files into a newly created delete manifest file. @@ -100,7 +128,13 @@ static DeltaManifests writeCompletedFiles( OutputFile deleteManifestFile = outputFileSupplier.get(); ManifestWriter deleteManifestWriter = - ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.writeDeleteManifest( + FORMAT_V2, + spec, + deleteManifestFile, + DUMMY_SNAPSHOT_ID, + compressionCodec, + compressionLevel); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 3805ab298428..b56fc8b9d849 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -47,6 +47,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -449,7 +450,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException { WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( - result, () -> manifestOutputFileFactory.create(checkpointId), spec); + result, + () -> manifestOutputFileFactory.create(checkpointId), + table.spec(), + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d562ff7e799b..f37e8f110ef5 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -70,6 +70,7 @@ sqlite-jdbc = "3.42.0.0" testcontainers = "1.17.6" tez010 = "0.10.2" tez08 = { strictly = "[0.8, 0.9[", prefer = "0.8.4"} # see rich version usage explanation above +zstd-jni = "1.5.4-1" [libraries] activation = { module = "javax.activation:activation", version.ref = "activation" } @@ -199,3 +200,4 @@ tez010-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez010" } tez010-mapreduce = { module = "org.apache.tez:tez-mapreduce", version.ref = "tez010" } tez08-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez08" } tez08-mapreduce = { module = "org.apache.tez:tez-mapreduce", version.ref = "tez08" } +zstd-jni = { module = "com.github.luben:zstd-jni", version.ref = "zstd-jni" } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 37f3f0c6c48d..79144eacd1e1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -369,7 +369,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -380,7 +382,8 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -628,7 +631,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..5c3f29c75d52 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -240,7 +240,10 @@ private List writeManifestsForUnpartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); } @@ -270,7 +273,10 @@ private List writeManifestsForPartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); }); @@ -369,7 +375,9 @@ private static ManifestFile writeManifest( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) + StructType sparkType, + String compressionCodec, + Integer compressionLevel) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); @@ -384,7 +392,8 @@ private static ManifestFile writeManifest( Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); - ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); + ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); try { for (int index = startIndex; index < endIndex; index++) { @@ -409,7 +418,9 @@ private static MapPartitionsFunction toManifests( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) { + StructType sparkType, + String compressionCodec, + Integer compressionLevel) { return rows -> { List rowsAsList = Lists.newArrayList(rows); @@ -430,7 +441,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } else { int midIndex = rowsAsList.size() / 2; manifests.add( @@ -443,7 +456,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); manifests.add( writeManifest( rowsAsList, @@ -454,7 +469,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } return manifests.iterator(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..8edd55acb08f 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -332,7 +332,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -343,7 +345,8 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -591,7 +594,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..5c3f29c75d52 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -240,7 +240,10 @@ private List writeManifestsForUnpartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); } @@ -270,7 +273,10 @@ private List writeManifestsForPartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); }); @@ -369,7 +375,9 @@ private static ManifestFile writeManifest( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) + StructType sparkType, + String compressionCodec, + Integer compressionLevel) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); @@ -384,7 +392,8 @@ private static ManifestFile writeManifest( Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); - ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); + ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); try { for (int index = startIndex; index < endIndex; index++) { @@ -409,7 +418,9 @@ private static MapPartitionsFunction toManifests( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) { + StructType sparkType, + String compressionCodec, + Integer compressionLevel) { return rows -> { List rowsAsList = Lists.newArrayList(rows); @@ -430,7 +441,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } else { int midIndex = rowsAsList.size() / 2; manifests.add( @@ -443,7 +456,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); manifests.add( writeManifest( rowsAsList, @@ -454,7 +469,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } return manifests.iterator(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..8edd55acb08f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -332,7 +332,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -343,7 +345,8 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -591,7 +594,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 06a5c8c5720f..5c3f29c75d52 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -240,7 +240,10 @@ private List writeManifestsForUnpartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); } @@ -270,7 +273,10 @@ private List writeManifestsForPartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); }); @@ -369,7 +375,9 @@ private static ManifestFile writeManifest( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) + StructType sparkType, + String compressionCodec, + Integer compressionLevel) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); @@ -384,7 +392,8 @@ private static ManifestFile writeManifest( Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); - ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); + ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); try { for (int index = startIndex; index < endIndex; index++) { @@ -409,7 +418,9 @@ private static MapPartitionsFunction toManifests( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) { + StructType sparkType, + String compressionCodec, + Integer compressionLevel) { return rows -> { List rowsAsList = Lists.newArrayList(rows); @@ -430,7 +441,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } else { int midIndex = rowsAsList.size() / 2; manifests.add( @@ -443,7 +456,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); manifests.add( writeManifest( rowsAsList, @@ -454,7 +469,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } return manifests.iterator();