diff --git a/build.gradle b/build.gradle index 78bcea88804d..ee8e1b930cab 100644 --- a/build.gradle +++ b/build.gradle @@ -286,6 +286,8 @@ project(':iceberg-core') { testImplementation "org.xerial:sqlite-jdbc" testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') testImplementation "com.esotericsoftware:kryo" + testImplementation "org.xerial.snappy:snappy-java" + testImplementation "com.github.luben:zstd-jni" } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 816bc0c8a7ec..d5c1bb566f30 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -44,6 +44,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.NumberUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.Tasks; @@ -164,7 +165,14 @@ private ManifestFile copyManifest(ManifestFile manifest) { InputFile toCopy = ops.io().newInputFile(manifest.path()); OutputFile newFile = newManifestOutput(); return ManifestFiles.copyRewriteManifest( - current.formatVersion(), toCopy, specsById, newFile, snapshotId(), summaryBuilder); + current.formatVersion(), + toCopy, + specsById, + newFile, + snapshotId(), + summaryBuilder, + current.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger(current.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))); } @Override diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index f3955e15f6ce..f6befcb81f49 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.NumberUtil; /** * {@link AppendFiles Append} implementation that adds a new manifest file for the write. @@ -137,7 +138,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { current.specsById(), newManifestPath, snapshotId(), - summaryBuilder); + summaryBuilder, + current.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger(current.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))); } @Override diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 85e268d43378..298dd34788cf 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -155,11 +155,40 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outp */ public static ManifestWriter write( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return write( + formatVersion, + spec, + outputFile, + snapshotId, + /* compressionCodec */ null, + /* compressionLevel */ null); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @param compressionCodec compression codec for the manifest file + * @param compressionLevel compression level of the compressionCodec + * @return a manifest writer + */ + public static ManifestWriter write( + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { switch (formatVersion) { case 1: - return new ManifestWriter.V1Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V1Writer( + spec, outputFile, snapshotId, compressionCodec, compressionLevel); case 2: - return new ManifestWriter.V2Writer(spec, outputFile, snapshotId); + return new ManifestWriter.V2Writer( + spec, outputFile, snapshotId, compressionCodec, compressionLevel); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -195,11 +224,39 @@ public static ManifestReader readDeleteManifest( */ public static ManifestWriter writeDeleteManifest( int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) { + return writeDeleteManifest( + formatVersion, + spec, + outputFile, + snapshotId, + /* compressionCodec */ null, + /* compressionLevel */ null); + } + + /** + * Create a new {@link ManifestWriter} for the given format version. + * + * @param formatVersion a target format version + * @param spec a {@link PartitionSpec} + * @param outputFile an {@link OutputFile} where the manifest will be written + * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID + * @param compressionCodec compression codec for the manifest file + * @param compressionLevel compression level of the compressionCodec + * @return a manifest writer + */ + public static ManifestWriter writeDeleteManifest( + int formatVersion, + PartitionSpec spec, + OutputFile outputFile, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { switch (formatVersion) { case 1: throw new IllegalArgumentException("Cannot write delete files in a v1 table"); case 2: - return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId); + return new ManifestWriter.V2DeleteWriter( + spec, outputFile, snapshotId, compressionCodec, compressionLevel); } throw new UnsupportedOperationException( "Cannot write manifest for table version: " + formatVersion); @@ -252,7 +309,9 @@ static ManifestFile copyAppendManifest( Map specsById, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { + SnapshotSummary.Builder summaryBuilder, + String compressionCodec, + Integer compressionLevel) { // use metadata that will add the current snapshot's ID for the rewrite InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId); try (ManifestReader reader = @@ -263,7 +322,9 @@ static ManifestFile copyAppendManifest( outputFile, snapshotId, summaryBuilder, - ManifestEntry.Status.ADDED); + ManifestEntry.Status.ADDED, + compressionCodec, + compressionLevel); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location()); } @@ -275,7 +336,9 @@ static ManifestFile copyRewriteManifest( Map specsById, OutputFile outputFile, long snapshotId, - SnapshotSummary.Builder summaryBuilder) { + SnapshotSummary.Builder summaryBuilder, + String compressionCodec, + Integer compressionLevel) { // for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an // exception if it is not InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty(); @@ -287,7 +350,9 @@ static ManifestFile copyRewriteManifest( outputFile, snapshotId, summaryBuilder, - ManifestEntry.Status.EXISTING); + ManifestEntry.Status.EXISTING, + compressionCodec, + compressionLevel); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location()); } @@ -300,8 +365,17 @@ private static ManifestFile copyManifestInternal( OutputFile outputFile, long snapshotId, SnapshotSummary.Builder summaryBuilder, - ManifestEntry.Status allowedEntryStatus) { - ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId); + ManifestEntry.Status allowedEntryStatus, + String compressionCodec, + Integer compressionLevel) { + ManifestWriter writer = + write( + formatVersion, + reader.spec(), + outputFile, + snapshotId, + compressionCodec, + compressionLevel); boolean threw = true; try { for (ManifestEntry entry : reader.entries()) { diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java index 3f7f20d4df6c..9bf2e41c7af1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java @@ -31,14 +31,18 @@ abstract class ManifestListWriter implements FileAppender { private final FileAppender writer; - private ManifestListWriter(OutputFile file, Map meta) { - this.writer = newAppender(file, meta); + private ManifestListWriter( + OutputFile file, + Map meta, + String compressionCodec, + Integer compressionLevel) { + this.writer = newAppender(file, meta, compressionCodec, compressionLevel); } protected abstract ManifestFile prepare(ManifestFile manifest); protected abstract FileAppender newAppender( - OutputFile file, Map meta); + OutputFile file, Map meta, String compressionCodec, Integer compressionLevel); @Override public void add(ManifestFile manifest) { @@ -73,14 +77,22 @@ public long length() { static class V2Writer extends ManifestListWriter { private final V2Metadata.IndexedManifestFile wrapper; - V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + V2Writer( + OutputFile snapshotFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + String compressionCodec, + Integer compressionLevel) { super( snapshotFile, ImmutableMap.of( "snapshot-id", String.valueOf(snapshotId), "parent-snapshot-id", String.valueOf(parentSnapshotId), "sequence-number", String.valueOf(sequenceNumber), - "format-version", "2")); + "format-version", "2"), + compressionCodec, + compressionLevel); this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber); } @@ -90,15 +102,25 @@ protected ManifestFile prepare(ManifestFile manifest) { } @Override - protected FileAppender newAppender(OutputFile file, Map meta) { + protected FileAppender newAppender( + OutputFile file, + Map meta, + String compressionCodec, + Integer compressionLevel) { try { - return Avro.write(file) - .schema(V2Metadata.MANIFEST_LIST_SCHEMA) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); - + Avro.WriteBuilder builder = + Avro.write(file) + .schema(V2Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite(); + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file); } @@ -108,13 +130,20 @@ protected FileAppender newAppender(OutputFile file, Map newAppender(OutputFile file, Map meta) { + protected FileAppender newAppender( + OutputFile file, + Map meta, + String compressionCodec, + Integer compressionLevel) { try { - return Avro.write(file) - .schema(V1Metadata.MANIFEST_LIST_SCHEMA) - .named("manifest_file") - .meta(meta) - .overwrite() - .build(); - + Avro.WriteBuilder builder = + Avro.write(file) + .schema(V1Metadata.MANIFEST_LIST_SCHEMA) + .named("manifest_file") + .meta(meta) + .overwrite(); + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java index c7b3e5fee5a9..4ebb46884e21 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestLists.java +++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java @@ -21,10 +21,12 @@ import java.io.IOException; import java.util.List; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -32,40 +34,66 @@ class ManifestLists { private ManifestLists() {} static List read(InputFile manifestList) { - try (CloseableIterable files = - Avro.read(manifestList) - .rename("manifest_file", GenericManifestFile.class.getName()) - .rename("partitions", GenericPartitionFieldSummary.class.getName()) - .rename("r508", GenericPartitionFieldSummary.class.getName()) - .classLoader(GenericManifestFile.class.getClassLoader()) - .project(ManifestFile.schema()) - .reuseContainers(false) - .build()) { - + try (CloseableIterable files = manifestFileIterable(manifestList)) { return Lists.newLinkedList(files); - } catch (IOException e) { throw new RuntimeIOException( e, "Cannot read manifest list file: %s", manifestList.location()); } } + @VisibleForTesting + static AvroIterable manifestFileIterable(InputFile manifestList) { + return Avro.read(manifestList) + .rename("manifest_file", GenericManifestFile.class.getName()) + .rename("partitions", GenericPartitionFieldSummary.class.getName()) + .rename("r508", GenericPartitionFieldSummary.class.getName()) + .classLoader(GenericManifestFile.class.getClassLoader()) + .project(ManifestFile.schema()) + .reuseContainers(false) + .build(); + } + static ManifestListWriter write( int formatVersion, OutputFile manifestListFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) { + return write( + formatVersion, + manifestListFile, + snapshotId, + parentSnapshotId, + sequenceNumber, + /* compressionCodec */ null, + /* compressionLevel */ null); + } + + static ManifestListWriter write( + int formatVersion, + OutputFile manifestListFile, + long snapshotId, + Long parentSnapshotId, + long sequenceNumber, + String compressionCodec, + Integer compressionLevel) { switch (formatVersion) { case 1: Preconditions.checkArgument( sequenceNumber == TableMetadata.INITIAL_SEQUENCE_NUMBER, "Invalid sequence number for v1 manifest list: %s", sequenceNumber); - return new ManifestListWriter.V1Writer(manifestListFile, snapshotId, parentSnapshotId); + return new ManifestListWriter.V1Writer( + manifestListFile, snapshotId, parentSnapshotId, compressionCodec, compressionLevel); case 2: return new ManifestListWriter.V2Writer( - manifestListFile, snapshotId, parentSnapshotId, sequenceNumber); + manifestListFile, + snapshotId, + parentSnapshotId, + sequenceNumber, + compressionCodec, + compressionLevel); } throw new UnsupportedOperationException( "Cannot write manifest list for table version: " + formatVersion); diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 5f47de534562..3fc2ce4db6a8 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -38,6 +38,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -265,6 +266,11 @@ CloseableIterable> liveEntries() { entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED); } + @VisibleForTesting + Map metadata() { + return metadata; + } + /** @return an Iterator of DataFile. Makes defensive copies of files before returning */ @Override public CloseableIterator iterator() { diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 7de0cb7be561..4dd85365344b 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -52,10 +52,15 @@ public abstract class ManifestWriter> implements FileAp private long deletedRows = 0L; private Long minSequenceNumber = null; - private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { + private ManifestWriter( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { this.file = file; this.specId = spec.specId(); - this.writer = newAppender(spec, file); + this.writer = newAppender(spec, file, compressionCodec, compressionLevel); this.snapshotId = snapshotId; this.reused = new GenericManifestEntry<>(spec.partitionType()); this.stats = new PartitionSummary(spec); @@ -64,7 +69,7 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { protected abstract ManifestEntry prepare(ManifestEntry entry); protected abstract FileAppender> newAppender( - PartitionSpec spec, OutputFile outputFile); + PartitionSpec spec, OutputFile outputFile, String compressionCodec, Integer compressionLevel); protected ManifestContent content() { return ManifestContent.DATA; @@ -202,8 +207,13 @@ public void close() throws IOException { static class V2Writer extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2Writer( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { + super(spec, file, snapshotId, compressionCodec, compressionLevel); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -214,19 +224,26 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( - PartitionSpec spec, OutputFile file) { + PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "2") - .meta("content", "data") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "data") + .overwrite(); + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } @@ -236,8 +253,13 @@ protected FileAppender> newAppender( static class V2DeleteWriter extends ManifestWriter { private final V2Metadata.IndexedManifestEntry entryWrapper; - V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V2DeleteWriter( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { + super(spec, file, snapshotId, compressionCodec, compressionLevel); this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType()); } @@ -248,19 +270,26 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( - PartitionSpec spec, OutputFile file) { + PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "2") - .meta("content", "deletes") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "2") + .meta("content", "deletes") + .overwrite(); + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } @@ -275,8 +304,13 @@ protected ManifestContent content() { static class V1Writer extends ManifestWriter { private final V1Metadata.IndexedManifestEntry entryWrapper; - V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) { - super(spec, file, snapshotId); + V1Writer( + PartitionSpec spec, + OutputFile file, + Long snapshotId, + String compressionCodec, + Integer compressionLevel) { + super(spec, file, snapshotId, compressionCodec, compressionLevel); this.entryWrapper = new V1Metadata.IndexedManifestEntry(spec.partitionType()); } @@ -287,18 +321,25 @@ protected ManifestEntry prepare(ManifestEntry entry) { @Override protected FileAppender> newAppender( - PartitionSpec spec, OutputFile file) { + PartitionSpec spec, OutputFile file, String compressionCodec, Integer compressionLevel) { Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType()); try { - return Avro.write(file) - .schema(manifestSchema) - .named("manifest_entry") - .meta("schema", SchemaParser.toJson(spec.schema())) - .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) - .meta("partition-spec-id", String.valueOf(spec.specId())) - .meta("format-version", "1") - .overwrite() - .build(); + Avro.WriteBuilder builder = + Avro.write(file) + .schema(manifestSchema) + .named("manifest_entry") + .meta("schema", SchemaParser.toJson(spec.schema())) + .meta("partition-spec", PartitionSpecParser.toJsonFields(spec)) + .meta("partition-spec-id", String.valueOf(spec.specId())) + .meta("format-version", "1") + .overwrite(); + if (compressionCodec != null) { + builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec); + } + if (compressionLevel != null) { + builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString()); + } + return builder.build(); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to create manifest writer for path: %s", file); } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index b82244f0714f..eebe98b63758 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.NumberUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; import org.apache.iceberg.util.SnapshotUtil; @@ -264,7 +265,9 @@ private ManifestFile copyManifest(ManifestFile manifest) { current.specsById(), newManifestPath, snapshotId(), - appendedManifestsSummary); + appendedManifestsSummary, + current.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger(current.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))); } /** diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index a561abf93518..d550245b9260 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -51,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.Exceptions; +import org.apache.iceberg.util.NumberUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; @@ -207,7 +208,10 @@ public Snapshot apply() { manifestList, snapshotId(), parentSnapshotId, - sequenceNumber)) { + sequenceNumber, + ops.current().properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + ops.current().properties().get(TableProperties.AVRO_COMPRESSION_LEVEL)))) { // keep track of the manifest lists created manifestLists.add(manifestList.location()); @@ -460,12 +464,24 @@ protected OutputFile newManifestOutput() { protected ManifestWriter newManifestWriter(PartitionSpec spec) { return ManifestFiles.write( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + ops.current().formatVersion(), + spec, + newManifestOutput(), + snapshotId(), + ops.current().properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + ops.current().properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))); } protected ManifestWriter newDeleteManifestWriter(PartitionSpec spec) { return ManifestFiles.writeDeleteManifest( - ops.current().formatVersion(), spec, newManifestOutput(), snapshotId()); + ops.current().formatVersion(), + spec, + newManifestOutput(), + snapshotId(), + ops.current().properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + ops.current().properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))); } protected ManifestReader newManifestReader(ManifestFile manifest) { diff --git a/core/src/main/java/org/apache/iceberg/util/NumberUtil.java b/core/src/main/java/org/apache/iceberg/util/NumberUtil.java new file mode 100644 index 000000000000..9c5edd3ec224 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/NumberUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +public class NumberUtil { + + private NumberUtil() {} + + /** + * @param value the string to convert, can be null + * @return parsed integer, returns null if the string is null + */ + public static Integer createInteger(String value) { + if (value == null) { + return null; + } + return Integer.parseInt(value); + } +} diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 53516c980fcd..86b02ed6c483 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -28,11 +28,14 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.LongStream; import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -163,6 +166,19 @@ public class TableTestBase { static final FileIO FILE_IO = new TestTables.LocalFileIO(); + static final Map CODEC_METADATA_MAPPING = + ImmutableMap.builder() + .put("uncompressed", "null") + .put("zstd", "zstandard") + .put("gzip", "deflate") + .build(); + + static final String AVRO_CODEC_KEY = "avro.codec"; + + static final long SNAPSHOT_ID = 987134631982734L; + + private static final long SEQUENCE_NUMBER = 34L; + @Rule public TemporaryFolder temp = new TemporaryFolder(); protected File tableDir = null; @@ -237,12 +253,23 @@ ManifestFile writeManifest(DataFile... files) throws IOException { } ManifestFile writeManifest(Long snapshotId, DataFile... files) throws IOException { - File manifestFile = temp.newFile("input.m0.avro"); + return writeManifest(snapshotId, /* compressionCodec */ null, files); + } + + ManifestFile writeManifest(Long snapshotId, String compressionCodec, DataFile... files) + throws IOException { + File manifestFile = temp.newFile(); Assert.assertTrue(manifestFile.delete()); OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath()); ManifestWriter writer = - ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId); + ManifestFiles.write( + formatVersion, + table.spec(), + outputFile, + snapshotId, + compressionCodec, + /* compressionLevel */ null); try { for (DataFile file : files) { writer.add(file); @@ -293,11 +320,24 @@ > ManifestFile writeManifest( ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFile... deleteFiles) throws IOException { + return writeDeleteManifest( + newFormatVersion, snapshotId, /* compressionCodec */ null, deleteFiles); + } + + ManifestFile writeDeleteManifest( + int newFormatVersion, Long snapshotId, String compressionCodec, DeleteFile... deleteFiles) + throws IOException { OutputFile manifestFile = org.apache.iceberg.Files.localOutput( FileFormat.AVRO.addExtension(temp.newFile().toString())); ManifestWriter writer = - ManifestFiles.writeDeleteManifest(newFormatVersion, SPEC, manifestFile, snapshotId); + ManifestFiles.writeDeleteManifest( + newFormatVersion, + SPEC, + manifestFile, + snapshotId, + compressionCodec, + /* compressionLevel */ null); try { for (DeleteFile deleteFile : deleteFiles) { writer.add(deleteFile); @@ -308,6 +348,30 @@ ManifestFile writeDeleteManifest(int newFormatVersion, Long snapshotId, DeleteFi return writer.toManifestFile(); } + InputFile writeManifestList(String compressionCodec, ManifestFile... manifestFiles) + throws IOException { + File manifestListFile = temp.newFile(); + Assert.assertTrue(manifestListFile.delete()); + OutputFile outputFile = + org.apache.iceberg.Files.localOutput( + FileFormat.AVRO.addExtension(manifestListFile.toString())); + + try (FileAppender writer = + ManifestLists.write( + formatVersion, + outputFile, + SNAPSHOT_ID, + SNAPSHOT_ID - 1, + formatVersion > 1 ? SEQUENCE_NUMBER : 0, + compressionCodec, + /* compressionLevel */ null)) { + for (ManifestFile manifestFile : manifestFiles) { + writer.add(manifestFile); + } + } + return outputFile.toInputFile(); + } + ManifestFile writeManifestWithName(String name, DataFile... files) throws IOException { File manifestFile = temp.newFile(name + ".avro"); Assert.assertTrue(manifestFile.delete()); @@ -619,4 +683,9 @@ void assertEquals(String context, Object expected, Object actual) { protected interface Action { void invoke(); } + + @FunctionalInterface + interface CheckedFunction { + R apply(T args) throws IOException; + } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java new file mode 100644 index 000000000000..3f0e3eb93e71 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestListWriter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Map; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.io.InputFile; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestManifestListWriter extends TableTestBase { + + @Parameterized.Parameters(name = "formatVersion = {0}") + public static Object[] parameters() { + return new Object[] {1, 2}; + } + + public TestManifestListWriter(int formatVersion) { + super(formatVersion); + } + + @Test + public void testWriteManifestListWithCompression() throws IOException { + validateManifestListCompressionCodec( + compressionCodec -> { + ManifestFile manifest = writeManifest(SNAPSHOT_ID, compressionCodec, FILE_A); + return writeManifestList(compressionCodec, manifest); + }); + } + + @Test + public void testWriteDeleteManifestListWithCompression() throws IOException { + Assume.assumeThat(formatVersion, Matchers.is(2)); + validateManifestListCompressionCodec( + compressionCodec -> { + ManifestFile manifest = writeManifest(SNAPSHOT_ID, compressionCodec, FILE_A); + ManifestFile deleteManifest = + writeDeleteManifest(formatVersion, SNAPSHOT_ID, compressionCodec, FILE_A_DELETES); + return writeManifestList(compressionCodec, manifest, deleteManifest); + }); + } + + void validateManifestListCompressionCodec( + CheckedFunction createManifestListFunc) throws IOException { + for (Map.Entry entry : CODEC_METADATA_MAPPING.entrySet()) { + String codec = entry.getKey(); + String expectedCodecValue = entry.getValue(); + + InputFile manifestList = createManifestListFunc.apply(codec); + try (AvroIterable reader = ManifestLists.manifestFileIterable(manifestList)) { + Map metadata = reader.getMetadata(); + Assert.assertEquals( + "Manifest list codec value must match", + expectedCodecValue, + metadata.get(AVRO_CODEC_KEY)); + } + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java index 4b7f4d3b8f92..f698542f0f98 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java @@ -21,11 +21,13 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Types; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -118,6 +120,22 @@ public void testWriteManifestWithSequenceNumber() throws IOException { } } + @Test + public void testWriteManifestWithCompression() throws IOException { + validateManifestCompressionCodec( + compressionCodec -> writeManifest(SNAPSHOT_ID, compressionCodec, FILE_A), + manifest -> ManifestFiles.read(manifest, FILE_IO)); + } + + @Test + public void testWriteDeleteManifestWithCompression() throws IOException { + Assume.assumeThat(formatVersion, Matchers.is(2)); + validateManifestCompressionCodec( + compressionCodec -> + writeDeleteManifest(formatVersion, SNAPSHOT_ID, compressionCodec, FILE_A_DELETES), + manifest -> ManifestFiles.readDeleteManifest(manifest, FILE_IO, null)); + } + private DataFile newFile(long recordCount) { return newFile(recordCount, null); } @@ -134,4 +152,24 @@ private DataFile newFile(long recordCount, StructLike partition) { } return builder.build(); } + + > void validateManifestCompressionCodec( + CheckedFunction createManifestFunc, + CheckedFunction> manifestReaderFunc) + throws IOException { + for (Map.Entry entry : CODEC_METADATA_MAPPING.entrySet()) { + String codec = entry.getKey(); + String expectedCodecValue = entry.getValue(); + + ManifestFile manifest = createManifestFunc.apply(codec); + + try (ManifestReader reader = manifestReaderFunc.apply(manifest)) { + Map metadata = reader.metadata(); + Assert.assertEquals( + "Manifest file codec value must match", + expectedCodecValue, + metadata.get(AVRO_CODEC_KEY)); + } + } + } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index 25badc372abf..1df113c66434 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -44,8 +44,20 @@ private FlinkManifestUtil() {} static ManifestFile writeDataFiles( OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { + return writeDataFiles( + outputFile, spec, dataFiles, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static ManifestFile writeDataFiles( + OutputFile outputFile, + PartitionSpec spec, + List dataFiles, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestWriter writer = - ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.write( + FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID, compressionCodec, compressionLevel); try (ManifestWriter closeableWriter = writer) { closeableWriter.addAll(dataFiles); @@ -76,6 +88,17 @@ static ManifestOutputFileFactory createOutputFileFactory( static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { + return writeCompletedFiles( + result, outputFileSupplier, spec, /* compressionCodec */ null, /* compressionLevel */ null); + } + + static DeltaManifests writeCompletedFiles( + WriteResult result, + Supplier outputFileSupplier, + PartitionSpec spec, + String compressionCodec, + Integer compressionLevel) + throws IOException { ManifestFile dataManifest = null; ManifestFile deleteManifest = null; @@ -83,7 +106,12 @@ static DeltaManifests writeCompletedFiles( // Write the completed data files into a newly created data manifest file. if (result.dataFiles() != null && result.dataFiles().length > 0) { dataManifest = - writeDataFiles(outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles())); + writeDataFiles( + outputFileSupplier.get(), + spec, + Lists.newArrayList(result.dataFiles()), + compressionCodec, + compressionLevel); } // Write the completed delete files into a newly created delete manifest file. @@ -91,7 +119,13 @@ static DeltaManifests writeCompletedFiles( OutputFile deleteManifestFile = outputFileSupplier.get(); ManifestWriter deleteManifestWriter = - ManifestFiles.writeDeleteManifest(FORMAT_V2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID); + ManifestFiles.writeDeleteManifest( + FORMAT_V2, + spec, + deleteManifestFile, + DUMMY_SNAPSHOT_ID, + compressionCodec, + compressionLevel); try (ManifestWriter writer = deleteManifestWriter) { for (DeleteFile deleteFile : result.deleteFiles()) { writer.add(deleteFile); diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java index 8aa2c0304eb0..be274d404b2b 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java @@ -46,6 +46,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -55,6 +56,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.NumberUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; @@ -407,7 +409,12 @@ private byte[] writeToManifest(long checkpointId) throws IOException { WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build(); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( - result, () -> manifestOutputFileFactory.create(checkpointId), table.spec()); + result, + () -> manifestOutputFileFactory.create(checkpointId), + table.spec(), + table.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + table.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index c38a394c5a25..2c16d5a1d2d9 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -59,6 +59,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.util.NumberUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -367,7 +368,9 @@ private static Iterator buildManifest( SerializableConfiguration conf, PartitionSpec spec, String basePath, - Iterator> fileTuples) { + Iterator> fileTuples, + String compressionCodec, + Integer compressionLevel) { if (fileTuples.hasNext()) { FileIO io = new HadoopFileIO(conf.get()); TaskContext ctx = TaskContext.get(); @@ -376,7 +379,8 @@ private static Iterator buildManifest( Path location = new Path(basePath, suffix); String outputPath = FileFormat.AVRO.addExtension(location.toString()); OutputFile outputFile = io.newOutputFile(outputPath); - ManifestWriter writer = ManifestFiles.write(spec, outputFile); + ManifestWriter writer = + ManifestFiles.write(1, spec, outputFile, null, compressionCodec, compressionLevel); try (ManifestWriter writerRef = writer) { fileTuples.forEachRemaining(fileTuple -> writerRef.add(fileTuple._2)); @@ -623,7 +627,17 @@ public static void importSparkPartitions( .orderBy(col("_1")) .mapPartitions( (MapPartitionsFunction, ManifestFile>) - fileTuple -> buildManifest(serializableConf, spec, stagingDir, fileTuple), + fileTuple -> + buildManifest( + serializableConf, + spec, + stagingDir, + fileTuple, + targetTable.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + targetTable + .properties() + .get(TableProperties.AVRO_COMPRESSION_LEVEL))), Encoders.javaSerialization(ManifestFile.class)) .collectAsList(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java index 43476d21daed..cb3067be5cce 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java @@ -54,6 +54,7 @@ import org.apache.iceberg.spark.SparkDataFile; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.NumberUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -226,7 +227,10 @@ private List writeManifestsForUnpartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + table.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))), manifestEncoder) .collectAsList(); } @@ -256,7 +260,10 @@ private List writeManifestsForPartitionedTable( formatVersion, combinedPartitionType, spec, - sparkType), + sparkType, + table.properties().get(TableProperties.AVRO_COMPRESSION), + NumberUtil.createInteger( + table.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL))), manifestEncoder) .collectAsList(); }); @@ -355,7 +362,9 @@ private static ManifestFile writeManifest( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) + StructType sparkType, + String compressionCodec, + Integer compressionLevel) throws IOException { String manifestName = "optimized-m-" + UUID.randomUUID(); @@ -367,7 +376,8 @@ private static ManifestFile writeManifest( Types.StructType manifestFileType = DataFile.getType(spec.partitionType()); SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType); - ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null); + ManifestWriter writer = + ManifestFiles.write(format, spec, outputFile, null, compressionCodec, compressionLevel); try { for (int index = startIndex; index < endIndex; index++) { @@ -391,7 +401,9 @@ private static MapPartitionsFunction toManifests( int format, Types.StructType combinedPartitionType, PartitionSpec spec, - StructType sparkType) { + StructType sparkType, + String compressionCodec, + Integer compressionLevel) { return rows -> { List rowsAsList = Lists.newArrayList(rows); @@ -412,7 +424,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } else { int midIndex = rowsAsList.size() / 2; manifests.add( @@ -425,7 +439,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); manifests.add( writeManifest( rowsAsList, @@ -436,7 +452,9 @@ private static MapPartitionsFunction toManifests( format, combinedPartitionType, spec, - sparkType)); + sparkType, + compressionCodec, + compressionLevel)); } return manifests.iterator(); diff --git a/versions.props b/versions.props index c24e43a49a85..93f0fe864dc5 100644 --- a/versions.props +++ b/versions.props @@ -44,3 +44,5 @@ org.mock-server:mockserver-netty = 5.13.2 org.mock-server:mockserver-client-java = 5.13.2 com.esotericsoftware:kryo = 4.0.2 org.eclipse.jetty:* = 9.4.43.v20210629 +org.xerial.snappy:snappy-java = 1.1.8.4 +com.github.luben:zstd-jni = 1.5.2-3 \ No newline at end of file