Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ project(':iceberg-core') {
testImplementation "org.xerial:sqlite-jdbc"
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation "com.esotericsoftware:kryo"
testImplementation "org.xerial.snappy:snappy-java"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used anywhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems not and can be removed.

testImplementation "com.github.luben:zstd-jni"
}
}

Expand Down
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.NumberUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;

Expand Down Expand Up @@ -164,7 +165,14 @@ private ManifestFile copyManifest(ManifestFile manifest) {
InputFile toCopy = ops.io().newInputFile(manifest.path());
OutputFile newFile = newManifestOutput();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(), toCopy, specsById, newFile, snapshotId(), summaryBuilder);
current.formatVersion(),
toCopy,
specsById,
newFile,
snapshotId(),
summaryBuilder,
current.properties().get(TableProperties.AVRO_COMPRESSION),
NumberUtil.createInteger(current.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL)));
}

@Override
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.NumberUtil;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand Down Expand Up @@ -137,7 +138,9 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.specsById(),
newManifestPath,
snapshotId(),
summaryBuilder);
summaryBuilder,
current.properties().get(TableProperties.AVRO_COMPRESSION),
NumberUtil.createInteger(current.properties().get(TableProperties.AVRO_COMPRESSION_LEVEL)));
}

@Override
Expand Down
92 changes: 83 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,40 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return write(
formatVersion,
spec,
outputFile,
snapshotId,
/* compressionCodec */ null,
/* compressionLevel */ null);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param compressionCodec compression codec for the manifest file
* @param compressionLevel compression level of the compressionCodec
* @return a manifest writer
*/
public static ManifestWriter<DataFile> write(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
String compressionCodec,
Integer compressionLevel) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V1Writer(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
case 2:
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V2Writer(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -195,11 +224,39 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return writeDeleteManifest(
formatVersion,
spec,
outputFile,
snapshotId,
/* compressionCodec */ null,
/* compressionLevel */ null);
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param compressionCodec compression codec for the manifest file
* @param compressionLevel compression level of the compressionCodec
* @return a manifest writer
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
String compressionCodec,
Integer compressionLevel) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
case 2:
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
return new ManifestWriter.V2DeleteWriter(
spec, outputFile, snapshotId, compressionCodec, compressionLevel);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -252,7 +309,9 @@ static ManifestFile copyAppendManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
String compressionCodec,
Integer compressionLevel) {
// use metadata that will add the current snapshot's ID for the rewrite
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
try (ManifestReader<DataFile> reader =
Expand All @@ -263,7 +322,9 @@ static ManifestFile copyAppendManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.ADDED);
ManifestEntry.Status.ADDED,
compressionCodec,
compressionLevel);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -275,7 +336,9 @@ static ManifestFile copyRewriteManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
String compressionCodec,
Integer compressionLevel) {
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an
// exception if it is not
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
Expand All @@ -287,7 +350,9 @@ static ManifestFile copyRewriteManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.EXISTING);
ManifestEntry.Status.EXISTING,
compressionCodec,
compressionLevel);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -300,8 +365,17 @@ private static ManifestFile copyManifestInternal(
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
ManifestEntry.Status allowedEntryStatus,
String compressionCodec,
Integer compressionLevel) {
ManifestWriter<DataFile> writer =
write(
formatVersion,
reader.spec(),
outputFile,
snapshotId,
compressionCodec,
compressionLevel);
boolean threw = true;
try {
for (ManifestEntry<DataFile> entry : reader.entries()) {
Expand Down
85 changes: 62 additions & 23 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@
abstract class ManifestListWriter implements FileAppender<ManifestFile> {
private final FileAppender<ManifestFile> writer;

private ManifestListWriter(OutputFile file, Map<String, String> meta) {
this.writer = newAppender(file, meta);
private ManifestListWriter(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
this.writer = newAppender(file, meta, compressionCodec, compressionLevel);
}

protected abstract ManifestFile prepare(ManifestFile manifest);

protected abstract FileAppender<ManifestFile> newAppender(
OutputFile file, Map<String, String> meta);
OutputFile file, Map<String, String> meta, String compressionCodec, Integer compressionLevel);

@Override
public void add(ManifestFile manifest) {
Expand Down Expand Up @@ -73,14 +77,22 @@ public long length() {
static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;

V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
V2Writer(
OutputFile snapshotFile,
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
String compressionCodec,
Integer compressionLevel) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "2"));
"format-version", "2"),
compressionCodec,
compressionLevel);
this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
}

Expand All @@ -90,15 +102,25 @@ protected ManifestFile prepare(ManifestFile manifest) {
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
protected FileAppender<ManifestFile> newAppender(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
try {
return Avro.write(file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();

Avro.WriteBuilder builder =
Avro.write(file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite();
if (compressionCodec != null) {
builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
}
if (compressionLevel != null) {
builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
}
return builder.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
Expand All @@ -108,13 +130,20 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
static class V1Writer extends ManifestListWriter {
private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile();

V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
V1Writer(
OutputFile snapshotFile,
long snapshotId,
Long parentSnapshotId,
String compressionCodec,
Integer compressionLevel) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"format-version", "1"));
"format-version", "1"),
compressionCodec,
compressionLevel);
}

@Override
Expand All @@ -126,15 +155,25 @@ protected ManifestFile prepare(ManifestFile manifest) {
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
protected FileAppender<ManifestFile> newAppender(
OutputFile file,
Map<String, String> meta,
String compressionCodec,
Integer compressionLevel) {
try {
return Avro.write(file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();

Avro.WriteBuilder builder =
Avro.write(file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite();
if (compressionCodec != null) {
builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
}
if (compressionLevel != null) {
builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
}
Comment on lines +170 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline between blocks and the next statement

    if (compressionCodec != null) {
      builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
    }
    
    if (compressionLevel != null) {
      builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
    }
    
    return builder.build();

return builder.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
Expand Down
Loading