diff --git a/build.gradle b/build.gradle index 679ffc6f2fc8..e2491c396fb8 100644 --- a/build.gradle +++ b/build.gradle @@ -364,6 +364,7 @@ project(':iceberg-core') { testImplementation libs.esotericsoftware.kryo testImplementation libs.guava.testlib testImplementation libs.awaitility + testRuntimeOnly libs.zstd.jni } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 87768e34894a..d38276696d92 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -163,7 +163,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { specsById, newFile, snapshotId(), - summaryBuilder); + summaryBuilder, + current.properties().get(TableProperties.AVRO_COMPRESSION), + current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 9619ec087ad6..c82cc06ef608 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -138,7 +138,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { current.specsById(), newManifestPath, snapshotId(), - summaryBuilder); + summaryBuilder, + current.properties().get(TableProperties.AVRO_COMPRESSION), + current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index c23ab667a41b..ee08261edbce 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -157,11 +157,30 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outp */ public static ManifestWriter write( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return write(formatVersion, spec, outputFile, snapshotId, ManifestWriter.options()); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @param options additional options for the manifest writer + * @return a manifest writer + */ + public static ManifestWriter write( + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + Long snapshotId, + ManifestWriter.Options options) { switch (formatVersion) { case 1: - return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V1Writer(spec, outputFile, snapshotId, options); case 2: - return new ManifestWriter.V2Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V2Writer(spec, outputFile, snapshotId, options); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -198,11 +217,31 @@ public static ManifestReader readDeleteManifest( */ public static ManifestWriter writeDeleteManifest( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return writeDeleteManifest( + formatVersion, spec, outputFile, snapshotId, ManifestWriter.options()); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @param options additional options for the manifest writer + * @return a manifest writer + */ + public static ManifestWriter writeDeleteManifest( + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + Long snapshotId, + ManifestWriter.Options options) { switch (formatVersion) { case 1: throw new IllegalArgumentException("Cannot write delete files in a v1 table"); case 2: - return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId); + return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, options); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -256,7 +295,9 @@ static ManifestFile copyAppendManifest( Map specsById, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { + SnapshotSummary.Builder summaryBuilder, + String compressionCodec, + Integer compressionLevel) { // use metadata that will add the current snapshot's ID for the rewrite InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId); try (ManifestReader reader = @@ -267,7 +308,9 @@ static ManifestFile copyAppendManifest( outputFile, snapshotId, summaryBuilder, - ManifestEntry.Status.ADDED); + ManifestEntry.Status.ADDED, + compressionCodec, + compressionLevel); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location()); } @@ -280,7 +323,9 @@ static ManifestFile copyRewriteManifest( Map specsById, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { + SnapshotSummary.Builder summaryBuilder, + String compressionCodec, + Integer compressionLevel) { // for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an // exception if it is not InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty(); @@ -292,7 +337,9 @@ static ManifestFile copyRewriteManifest( outputFile, snapshotId, summaryBuilder, - ManifestEntry.Status.EXISTING); + ManifestEntry.Status.EXISTING, + compressionCodec, + compressionLevel); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location()); } @@ -305,8 +352,18 @@ private static ManifestFile copyManifestInternal( OutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, - ManifestEntry.Status allowedEntryStatus) { - ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + ManifestEntry.Status allowedEntryStatus, + String compressionCodec, + Integer compressionLevel) { + ManifestWriter writer = + write( + formatVersion, + reader.spec(), + outputFile, + snapshotId, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 3f7f20d4df6c..7e8bcd9d6ee3 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -31,14 +31,14 @@ abstract class ManifestListWriter implements FileAppender { private final FileAppender writer; - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + private ManifestListWriter(OutputFile file, Map meta, Options options) { + this.writer = newAppender(file, meta, options); } protected abstract ManifestFile prepare(ManifestFile manifest); protected abstract FileAppender newAppender( - OutputFile file, Map meta); + OutputFile file, Map meta, Options options); @Override public void add(ManifestFile manifest) { @@ -70,17 +70,52 @@ public long length() { return writer.length(); } + public static Options options() { + return new Options(); + } + + static class Options { + private String compCodec; + private Integer compLevel; + + private Options() {} + + public Options compressionCodec(String codec) { + compCodec = codec; + return this; + } + + public Options compressionLevel(Integer level) { + compLevel = level; + return this; + } + + String compressionCodec() { + return compCodec; + } + + Integer compressionLevel() { + return compLevel; + } + } + static class V2Writer extends ManifestListWriter { private final V2Metadata.IndexedManifestFile wrapper; - V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V2Writer( + OutputFile snapshotFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + Options options) { super( snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), - "format-version", "2")); + "format-version", "2"), + options); this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber); } @@ -90,15 +125,28 @@ protected ManifestFile prepare(ManifestFile manifest) { } @Override - protected FileAppender newAppender(OutputFile file, Map meta) { - try { - return Avro.write(file) - .schema(V2Metadata.MANIFEST_LIST_SCHEMA) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); + protected FileAppender newAppender( + OutputFile file, Map meta, Options options) { + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { + Avro.WriteBuilder builder = + Avro.write(file) + .schema(V2Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file); } @@ -108,13 +156,14 @@ protected FileAppender newAppender(OutputFile file, Map newAppender(OutputFile file, Map meta) { - try { - return Avro.write(file) - .schema(V1Metadata.MANIFEST_LIST_SCHEMA) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); + protected FileAppender newAppender( + OutputFile file, Map meta, Options options) { + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { + Avro.WriteBuilder builder = + Avro.write(file) + .schema(V1Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index c7b3e5fee5a9..7d39dc68ea11 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -32,16 +34,7 @@ class ManifestLists { private ManifestLists() {} static List read(InputFile manifestList) { - try (CloseableIterable files = - Avro.read(manifestList) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .project(ManifestFile.schema()) - .reuseContainers(false) - .build()) { - + try (CloseableIterable files = manifestFileIterable(manifestList)) { return Lists.newLinkedList(files); } catch (IOException e) { @@ -50,22 +43,51 @@ static List read(InputFile manifestList) { } } + @VisibleForTesting + static AvroIterable manifestFileIterable(InputFile manifestList) { + return Avro.read(manifestList) + .rename("manifest_file", GenericManifestFile.class.getName()) + .rename("partitions", GenericPartitionFieldSummary.class.getName()) + .rename("r508", GenericPartitionFieldSummary.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .project(ManifestFile.schema()) + .reuseContainers(false) + .build(); + } + static ManifestListWriter write( int formatVersion, OutputFile manifestListFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + return write( + formatVersion, + manifestListFile, + snapshotId, + parentSnapshotId, + sequenceNumber, + ManifestListWriter.options()); + } + + static ManifestListWriter write( + int formatVersion, + OutputFile manifestListFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + ManifestListWriter.Options options) { switch (formatVersion) { case 1: Preconditions.checkArgument( sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, "Invalid sequence number for v1 manifest list: %s", sequenceNumber); - return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + return new ManifestListWriter.V1Writer( + manifestListFile, snapshotId, parentSnapshotId, options); case 2: return new ManifestListWriter.V2Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + manifestListFile, snapshotId, parentSnapshotId, sequenceNumber, options); } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 4ee51aa60c31..64150a3706aa 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.metrics.ScanMetrics; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -84,6 +85,7 @@ private String fileClass() { private final FileType content; private final PartitionSpec spec; private final Schema fileSchema; + private Map metadata; // updated by configuration methods private PartitionSet partitionSet = null; @@ -118,7 +120,7 @@ protected ManifestReader( } private > PartitionSpec readPartitionSpec(InputFile inputFile) { - Map metadata = readMetadata(inputFile); + this.metadata = readMetadata(inputFile); int specId = TableMetadata.INITIAL_SPEC_ID; String specProperty = metadata.get("partition-spec-id"); @@ -292,6 +294,11 @@ private boolean isLiveEntry(ManifestEntry entry) { return entry != null && entry.status() != ManifestEntry.Status.DELETED; } + @VisibleForTesting + Map metadata() { + return metadata; + } + /** @return an Iterator of DataFile. Makes defensive copies of files before returning */ @Override public CloseableIterator iterator() { diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 4865ccfc3b2d..1fd87c78179c 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -52,10 +52,10 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minDataSequenceNumber = null; - private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { this.file = file; this.specId = spec.specId(); - this.writer = newAppender(spec, file); + this.writer = newAppender(spec, file, options); this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); @@ -63,9 +63,19 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { protected abstract ManifestEntry prepare(ManifestEntry entry); + /** + * @deprecated since 1.5.0, will be removed in 1.6.0; use {@link + * ManifestWriter#newAppender(PartitionSpec, OutputFile, Options)} instead. + */ + @Deprecated protected abstract FileAppender> newAppender( PartitionSpec spec, OutputFile outputFile); + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile outputFile, Options options) { + return newAppender(spec, outputFile); + } + protected ManifestContent content() { return ManifestContent.DATA; } @@ -213,11 +223,40 @@ public void close() throws IOException { writer.close(); } + public static Options options() { + return new Options(); + } + + public static class Options { + private String compCodec; + private Integer compLevel; + + private Options() {} + + public Options compressionCodec(String codec) { + compCodec = codec; + return this; + } + + public Options compressionLevel(Integer level) { + compLevel = level; + return this; + } + + String compressionCodec() { + return compCodec; + } + + Integer compressionLevel() { + return compLevel; + } + } + static class V2Writer extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { + super(spec, file, snapshotId, options); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -229,18 +268,37 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { + return newAppender(spec, file, options()); + } + + @Override + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile file, Options options) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "2") - .meta("content", "data") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "data") + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } @@ -250,8 +308,8 @@ protected FileAppender> newAppender( static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { + super(spec, file, snapshotId, options); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -263,18 +321,37 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { + return newAppender(spec, file, options()); + } + + @Override + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile file, Options options) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "2") - .meta("content", "deletes") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "deletes") + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } @@ -289,8 +366,8 @@ protected ManifestContent content() { static class V1Writer extends ManifestWriter { private final V1Metadata.IndexedManifestEntry entryWrapper; - V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId, Options options) { + super(spec, file, snapshotId, options); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); } @@ -302,17 +379,36 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( PartitionSpec spec, OutputFile file) { + return newAppender(spec, file, options()); + } + + @Override + protected FileAppender> newAppender( + PartitionSpec spec, OutputFile file, Options options) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); + String compressionCodec = options.compressionCodec(); + Integer compressionLevel = options.compressionLevel(); + try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "1") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "1") + .overwrite(); + + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 1dcfa6d3d41d..90421975974c 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -292,7 +292,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { current.specsById(), newManifestPath, snapshotId(), - appendedManifestsSummary); + appendedManifestsSummary, + current.properties().get(TableProperties.AVRO_COMPRESSION), + current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); } /** diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 757d0b78bca7..aa41b26b473b 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -235,13 +235,18 @@ public Snapshot apply() { OutputFile manifestList = manifestListPath(); + ManifestListWriter.Options options = + ManifestListWriter.options() + .compressionCodec(base.properties().get(TableProperties.AVRO_COMPRESSION)) + .compressionLevel(base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); try (ManifestListWriter writer = ManifestLists.write( - ops.current().formatVersion(), + base.formatVersion(), manifestList, snapshotId(), parentSnapshotId, - sequenceNumber)) { + sequenceNumber, + options)) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -507,13 +512,21 @@ protected OutputFile newManifestOutput() { } protected ManifestWriter newManifestWriter(PartitionSpec spec) { + ManifestWriter.Options options = + ManifestWriter.options() + .compressionCodec(base.properties().get(TableProperties.AVRO_COMPRESSION)) + .compressionLevel(base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); return ManifestFiles.write( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + base.formatVersion(), spec, newManifestOutput(), snapshotId(), options); } protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) { + ManifestWriter.Options options = + ManifestWriter.options() + .compressionCodec(base.properties().get(TableProperties.AVRO_COMPRESSION)) + .compressionLevel(base.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL)); return ManifestFiles.writeDeleteManifest( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + base.formatVersion(), spec, newManifestOutput(), snapshotId(), options); } protected RollingManifestWriter newRollingManifestWriter(PartitionSpec spec) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 56ff1f772461..d30bd542175d 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -487,6 +487,10 @@ public int propertyAsInt(String property, int defaultValue) { return PropertyUtil.propertyAsInt(properties, property, defaultValue); } + public Integer propertyAsNullableInt(String property) { + return PropertyUtil.propertyAsNullableInt(properties, property); + } + public long propertyAsLong(String property, long defaultValue) { return PropertyUtil.propertyAsLong(properties, property, defaultValue); } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 68ce05528964..1b2ef0ff77eb 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -28,10 +28,13 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -165,6 +168,19 @@ public class TableTestBase { static final FileIO FILE_IO = new TestTables.LocalFileIO(); + // Mapping of Avro codec name used by Iceberg to name used by Avro (and appearing in Avro metadata + // under the key, avro.codec). + // In tests, we use the Iceberg name to specify the codec, and we verify the codec used by reading + // the Avro metadata and checking for the Avro name in avro.codec. + static final Map AVRO_CODEC_NAME_MAPPING = + ImmutableMap.builder() + .put("uncompressed", "null") + .put("zstd", "zstandard") + .put("gzip", "deflate") + .build(); + + static final long EXAMPLE_SNAPSHOT_ID = 987134631982734L; + @Rule public TemporaryFolder temp = new TemporaryFolder(); protected File tableDir = null; @@ -239,12 +255,26 @@ ManifestFile writeManifest(DataFile... files) throws IOException { } ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException { - File manifestFile = temp.newFile("input.m0.avro"); + return writeManifest(snapshotId, null, files); + } + + ManifestFile writeManifest(Long snapshotId, String compressionCodec, DataFile... files) + throws IOException { + File manifestFile = temp.newFile(); Assert.assertTrue(manifestFile.delete()); - OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); + OutputFile outputFile = + table + .ops() + .io() + .newOutputFile(FileFormat.AVRO.addExtension(manifestFile.getCanonicalPath())); ManifestWriter writer = - ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); + ManifestFiles.write( + formatVersion, + table.spec(), + outputFile, + snapshotId, + ManifestWriter.options().compressionCodec(compressionCodec)); try { for (DataFile file : files) { writer.add(file); @@ -295,11 +325,22 @@ > ManifestFile writeManifest( ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFile... deleteFiles) throws IOException { + return writeDeleteManifest(newFormatVersion, snapshotId, null, deleteFiles); + } + + ManifestFile writeDeleteManifest( + int newFormatVersion, Long snapshotId, String compressionCodec, DeleteFile... deleteFiles) + throws IOException { OutputFile manifestFile = org.apache.iceberg.Files.localOutput( FileFormat.AVRO.addExtension(temp.newFile().toString())); ManifestWriter writer = - ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId); + ManifestFiles.writeDeleteManifest( + newFormatVersion, + SPEC, + manifestFile, + snapshotId, + ManifestWriter.options().compressionCodec(compressionCodec)); try { for (DeleteFile deleteFile : deleteFiles) { writer.add(deleteFile); @@ -310,6 +351,30 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi return writer.toManifestFile(); } + InputFile writeManifestList(String compressionCodec, ManifestFile... manifestFiles) + throws IOException { + File manifestListFile = temp.newFile(); + Assert.assertTrue(manifestListFile.delete()); + OutputFile outputFile = + org.apache.iceberg.Files.localOutput( + FileFormat.AVRO.addExtension(manifestListFile.toString())); + + try (FileAppender writer = + ManifestLists.write( + formatVersion, + outputFile, + EXAMPLE_SNAPSHOT_ID, + EXAMPLE_SNAPSHOT_ID - 1, + formatVersion > 1 ? 34L : 0, + ManifestListWriter.options().compressionCodec(compressionCodec))) { + for (ManifestFile manifestFile : manifestFiles) { + writer.add(manifestFile); + } + } + + return outputFile.toInputFile(); + } + ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException { File manifestFile = temp.newFile(name + ".avro"); Assert.assertTrue(manifestFile.delete()); diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java new file mode 100644 index 000000000000..aea21dfedb12 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Map; +import org.apache.avro.file.DataFileConstants; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.io.InputFile; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestManifestListWriter extends TableTestBase { + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {1, 2}; + } + + public TestManifestListWriter(int formatVersion) { + super(formatVersion); + } + + @Test + public void testWriteManifestListWithCompression() throws IOException { + validateManifestListCompressionCodec(false); + } + + @Test + public void testWriteDeleteManifestListWithCompression() throws IOException { + Assumptions.assumeThat(formatVersion).isGreaterThan(1); + validateManifestListCompressionCodec(true); + } + + void validateManifestListCompressionCodec(boolean testDeletes) throws IOException { + for (Map.Entry entry : AVRO_CODEC_NAME_MAPPING.entrySet()) { + String codec = entry.getKey(); + String expectedCodecValue = entry.getValue(); + + ManifestFile manifest = writeManifest(EXAMPLE_SNAPSHOT_ID, codec, FILE_A); + ManifestFile deleteManifest = + testDeletes + ? writeDeleteManifest(formatVersion, EXAMPLE_SNAPSHOT_ID, codec, FILE_A_DELETES) + : null; + InputFile manifestList = + testDeletes + ? writeManifestList(codec, manifest, deleteManifest) + : writeManifestList(codec, manifest); + try (AvroIterable reader = ManifestLists.manifestFileIterable(manifestList)) { + Assertions.assertThat(reader.getMetadata()) + .containsEntry(DataFileConstants.CODEC, expectedCodecValue); + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index 17a41f418a8e..92be7a83dbe3 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.UUID; +import org.apache.avro.file.DataFileConstants; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Conversions; @@ -30,7 +32,6 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -105,7 +106,7 @@ public void testManifestPartitionStats() throws IOException { @Test public void testWriteManifestWithSequenceNumber() throws IOException { - Assume.assumeTrue("sequence number is only valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); File manifestFile = temp.newFile("manifest.avro"); Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); @@ -130,7 +131,7 @@ public void testWriteManifestWithSequenceNumber() throws IOException { @Test public void testCommitManifestWithExplicitDataSequenceNumber() throws IOException { - Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); DataFile file1 = newFile(50); DataFile file2 = newFile(50); @@ -175,7 +176,7 @@ public void testCommitManifestWithExplicitDataSequenceNumber() throws IOExceptio @Test public void testCommitManifestWithExistingEntriesWithoutFileSequenceNumber() throws IOException { - Assume.assumeTrue("Sequence numbers are valid for format version > 1", formatVersion > 1); + Assumptions.assumeThat(formatVersion).isGreaterThan(1); DataFile file1 = newFile(50); DataFile file2 = newFile(50); @@ -384,6 +385,17 @@ private void checkManifests( } } + @Test + public void testWriteManifestWithCompression() throws IOException { + validateManifestCompressionCodec(false); + } + + @Test + public void testWriteDeleteManifestWithCompression() throws IOException { + Assumptions.assumeThat(formatVersion).isGreaterThan(1); + validateManifestCompressionCodec(true); + } + private DataFile newFile(long recordCount) { return newFile(recordCount, null); } @@ -435,4 +447,23 @@ private OutputFile newManifestFile() { throw new UncheckedIOException(e); } } + + void validateManifestCompressionCodec(boolean testDeletes) throws IOException { + for (Map.Entry entry : AVRO_CODEC_NAME_MAPPING.entrySet()) { + String codec = entry.getKey(); + String expectedCodecValue = entry.getValue(); + + ManifestFile manifest = + testDeletes + ? writeDeleteManifest(formatVersion, EXAMPLE_SNAPSHOT_ID, codec, FILE_A_DELETES) + : writeManifest(EXAMPLE_SNAPSHOT_ID, codec, FILE_A); + try (ManifestReader> reader = + testDeletes + ? ManifestFiles.readDeleteManifest(manifest, FILE_IO, null) + : ManifestFiles.read(manifest, FILE_IO)) { + Assertions.assertThat(reader.metadata()) + .containsEntry(DataFileConstants.CODEC, expectedCodecValue); + } + } + } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index c7e8a2dea7cb..e1917644ec77 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -43,8 +43,26 @@ private FlinkManifestUtil() {} static ManifestFile writeDataFiles( OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + return writeDataFiles( + outputFile, spec, dataFiles, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, + PartitionSpec spec, + List dataFiles, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestWriter writer = - ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.write( + FORMAT_V2, + spec, + outputFile, + DUMMY_SNAPSHOT_ID, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -81,6 +99,17 @@ static ManifestOutputFileFactory createOutputFileFactory( static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { + return writeCompletedFiles( + result, outputFileSupplier, spec, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static DeltaManifests writeCompletedFiles( + WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; @@ -88,7 +117,12 @@ static DeltaManifests writeCompletedFiles( // Write the completed data files into a newly created data manifest file. if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = - writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + writeDataFiles( + outputFileSupplier.get(), + spec, + Lists.newArrayList(result.dataFiles()), + compressionCodec, + compressionLevel); } // Write the completed delete files into a newly created delete manifest file. @@ -96,7 +130,14 @@ static DeltaManifests writeCompletedFiles( OutputFile deleteManifestFile = outputFileSupplier.get(); ManifestWriter deleteManifestWriter = - ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.writeDeleteManifest( + FORMAT_V2, + spec, + deleteManifestFile, + DUMMY_SNAPSHOT_ID, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index b9bceaa9311d..47a9c38ff30d 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -47,6 +47,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -452,7 +453,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException { WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( - result, () -> manifestOutputFileFactory.create(checkpointId), spec); + result, + () -> manifestOutputFileFactory.create(checkpointId), + table.spec(), + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index c7e8a2dea7cb..e1917644ec77 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -43,8 +43,26 @@ private FlinkManifestUtil() {} static ManifestFile writeDataFiles( OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + return writeDataFiles( + outputFile, spec, dataFiles, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, + PartitionSpec spec, + List dataFiles, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestWriter writer = - ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.write( + FORMAT_V2, + spec, + outputFile, + DUMMY_SNAPSHOT_ID, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -81,6 +99,17 @@ static ManifestOutputFileFactory createOutputFileFactory( static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { + return writeCompletedFiles( + result, outputFileSupplier, spec, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static DeltaManifests writeCompletedFiles( + WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; @@ -88,7 +117,12 @@ static DeltaManifests writeCompletedFiles( // Write the completed data files into a newly created data manifest file. if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = - writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + writeDataFiles( + outputFileSupplier.get(), + spec, + Lists.newArrayList(result.dataFiles()), + compressionCodec, + compressionLevel); } // Write the completed delete files into a newly created delete manifest file. @@ -96,7 +130,14 @@ static DeltaManifests writeCompletedFiles( OutputFile deleteManifestFile = outputFileSupplier.get(); ManifestWriter deleteManifestWriter = - ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.writeDeleteManifest( + FORMAT_V2, + spec, + deleteManifestFile, + DUMMY_SNAPSHOT_ID, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index b9bceaa9311d..47a9c38ff30d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -47,6 +47,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -452,7 +453,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException { WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( - result, () -> manifestOutputFileFactory.create(checkpointId), spec); + result, + () -> manifestOutputFileFactory.create(checkpointId), + table.spec(), + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 46d7a6cfa426..c40c7a581294 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -86,6 +86,7 @@ sqlite-jdbc = "3.44.0.0" testcontainers = "1.19.3" tez010 = "0.10.2" tez08 = { strictly = "[0.8, 0.9[", prefer = "0.8.4"} # see rich version usage explanation above +zstd-jni = "1.5.5-10" [libraries] activation = { module = "javax.activation:activation", version.ref = "activation" } @@ -215,3 +216,4 @@ tez010-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez010" } tez010-mapreduce = { module = "org.apache.tez:tez-mapreduce", version.ref = "tez010" } tez08-dag = { module = "org.apache.tez:tez-dag", version.ref = "tez08" } tez08-mapreduce = { module = "org.apache.tez:tez-mapreduce", version.ref = "tez08" } +zstd-jni = { module = "com.github.luben:zstd-jni", version.ref = "zstd-jni" } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 37f3f0c6c48d..2177cfd3e958 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -369,7 +369,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -380,7 +382,15 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write( + 1, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -628,7 +638,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index bc2ef2306790..f941a78030d1 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -252,7 +252,10 @@ private List writeManifestsForUnpartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); } @@ -283,7 +286,10 @@ private List writeManifestsForPartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + table.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), manifestEncoder) .collectAsList(); }); @@ -369,7 +375,9 @@ private static ManifestFile writeManifest( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) + StructType sparkType, + String compressionCodec, + Integer compressionLevel) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); @@ -384,7 +392,15 @@ private static ManifestFile writeManifest( Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); - ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); + ManifestWriter writer = + ManifestFiles.write( + format, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try { for (int index = startIndex; index < endIndex; index++) { @@ -409,7 +425,9 @@ private static MapPartitionsFunction toManifests( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) { + StructType sparkType, + String compressionCodec, + Integer compressionLevel) { return rows -> { List rowsAsList = Lists.newArrayList(rows); @@ -430,7 +448,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } else { int midIndex = rowsAsList.size() / 2; manifests.add( @@ -443,7 +463,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); manifests.add( writeManifest( rowsAsList, @@ -454,7 +476,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } return manifests.iterator(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..7ce4ee0c45cc 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -332,7 +332,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -343,7 +345,15 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write( + 1, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -591,7 +601,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index af442ec300bc..833464dc3c11 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -394,7 +394,14 @@ public RollingManifestWriter newRollingManifestWriter() { } private ManifestWriter newManifestWriter() { - return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null); + return ManifestFiles.write( + formatVersion, + spec(), + newOutputFile(), + null, + ManifestWriter.options() + .compressionCodec(compressionCodec()) + .compressionLevel(compressionLevel())); } private PartitionSpec spec() { @@ -405,6 +412,15 @@ private OutputFile newOutputFile() { return table().io().newOutputFile(newManifestLocation()); } + private String compressionCodec() { + return table().properties().get(TableProperties.AVRO_COMPRESSION); + } + + private Integer compressionLevel() { + return PropertyUtil.propertyAsNullableInt( + table().properties(), TableProperties.AVRO_COMPRESSION_LEVEL); + } + private String newManifestLocation() { String fileName = FileFormat.AVRO.addExtension("optimized-m-" + UUID.randomUUID()); Path filePath = new Path(outputLocation, fileName); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 88b752c3c6dd..7ce4ee0c45cc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -332,7 +332,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -343,7 +345,15 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write( + 1, + spec, + outputFile, + null, + ManifestWriter.options() + .compressionCodec(compressionCodec) + .compressionLevel(compressionLevel)); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -591,7 +601,15 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + PropertyUtil.propertyAsNullableInt( + targetTable.properties(), TableProperties.AVRO_COMPRESSION_LEVEL)), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 5b1d6165695b..1c2ca7d7812f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -515,7 +515,14 @@ public RollingManifestWriter newRollingManifestWriter() { } private ManifestWriter newManifestWriter() { - return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null); + return ManifestFiles.write( + formatVersion, + spec(), + newOutputFile(), + null, + ManifestWriter.options() + .compressionCodec(compressionCodec()) + .compressionLevel(compressionLevel())); } public RollingManifestWriter newRollingDeleteManifestWriter() { @@ -523,7 +530,14 @@ public RollingManifestWriter newRollingDeleteManifestWriter() { } private ManifestWriter newDeleteManifestWriter() { - return ManifestFiles.writeDeleteManifest(formatVersion, spec(), newOutputFile(), null); + return ManifestFiles.writeDeleteManifest( + formatVersion, + spec(), + newOutputFile(), + null, + ManifestWriter.options() + .compressionCodec(compressionCodec()) + .compressionLevel(compressionLevel())); } private PartitionSpec spec() { @@ -534,6 +548,15 @@ private OutputFile newOutputFile() { return table().io().newOutputFile(newManifestLocation()); } + private String compressionCodec() { + return table().properties().get(TableProperties.AVRO_COMPRESSION); + } + + private Integer compressionLevel() { + return PropertyUtil.propertyAsNullableInt( + table().properties(), TableProperties.AVRO_COMPRESSION_LEVEL); + } + private String newManifestLocation() { String fileName = FileFormat.AVRO.addExtension("optimized-m-" + UUID.randomUUID()); Path filePath = new Path(outputLocation, fileName);