Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ project(':iceberg-core') {
testImplementation libs.esotericsoftware.kryo
testImplementation libs.guava.testlib
testImplementation libs.awaitility
testRuntimeOnly libs.zstd.jni
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ private ManifestFile copyManifest(ManifestFile manifest) {
specsById,
newFile,
snapshotId(),
summaryBuilder);
summaryBuilder,
current.properties().get(TableProperties.AVRO_COMPRESSION),
current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL));
}

@Override
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ private ManifestFile copyManifest(ManifestFile manifest) {
current.specsById(),
newManifestPath,
snapshotId(),
summaryBuilder);
summaryBuilder,
current.properties().get(TableProperties.AVRO_COMPRESSION),
current.propertyAsNullableInt(TableProperties.AVRO_COMPRESSION_LEVEL));
}

@Override
Expand Down
75 changes: 66 additions & 9 deletions core/src/main/java/org/apache/iceberg/ManifestFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,30 @@ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outp
*/
public static ManifestWriter<DataFile> write(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return write(formatVersion, spec, outputFile, snapshotId, ManifestWriter.options());
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param options additional options for the manifest writer
* @return a manifest writer
*/
public static ManifestWriter<DataFile> write(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I already commented before going on vacation but can't seem to find the old discussion. Sorry if I post the same question again. Have we considered using a builder? My worry with the current approach was that we need to offer an overloaded method every time we add a new parameter.

@wypoon @nastra @rdblue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi thanks for reviewing. I'm interested to hear what @rdblue thinks. In the meantime, let me think about how to address your concern. However, using a builder will mean an API break, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumeetgajjar originally used a Map parameter for this , and @rdblue commented that "we don't want to pass a map of properties around. That's exposing too much where it doesn't need to be, and people tend to misuse generic arguments like this."
What I propose to do then is to introduce a ManifestWriter.Options class and use that here (instead of a Map). I'll also introduce a ManifestListWriter.Options class and use that in ManifestLists.write. These Options classes define what additional parameters are applicable and may be set. If in future, additional parameters are needed, they can be added to these Options classes.

int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
ManifestWriter.Options options) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId, options);
case 2:
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId);
return new ManifestWriter.V2Writer(spec, outputFile, snapshotId, options);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -198,11 +217,31 @@ public static ManifestReader<DeleteFile> readDeleteManifest(
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
return writeDeleteManifest(
formatVersion, spec, outputFile, snapshotId, ManifestWriter.options());
}

/**
* Create a new {@link ManifestWriter} for the given format version.
*
* @param formatVersion a target format version
* @param spec a {@link PartitionSpec}
* @param outputFile an {@link OutputFile} where the manifest will be written
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @param options additional options for the manifest writer
* @return a manifest writer
*/
public static ManifestWriter<DeleteFile> writeDeleteManifest(
int formatVersion,
PartitionSpec spec,
OutputFile outputFile,
Long snapshotId,
ManifestWriter.Options options) {
switch (formatVersion) {
case 1:
throw new IllegalArgumentException("Cannot write delete files in a v1 table");
case 2:
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId, options);
}
throw new UnsupportedOperationException(
"Cannot write manifest for table version: " + formatVersion);
Expand Down Expand Up @@ -256,7 +295,9 @@ static ManifestFile copyAppendManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
String compressionCodec,
Integer compressionLevel) {
// use metadata that will add the current snapshot's ID for the rewrite
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
try (ManifestReader<DataFile> reader =
Expand All @@ -267,7 +308,9 @@ static ManifestFile copyAppendManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.ADDED);
ManifestEntry.Status.ADDED,
compressionCodec,
compressionLevel);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -280,7 +323,9 @@ static ManifestFile copyRewriteManifest(
Map<Integer, PartitionSpec> specsById,
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
SnapshotSummary.Builder summaryBuilder,
String compressionCodec,
Integer compressionLevel) {
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an
// exception if it is not
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
Expand All @@ -292,7 +337,9 @@ static ManifestFile copyRewriteManifest(
outputFile,
snapshotId,
summaryBuilder,
ManifestEntry.Status.EXISTING);
ManifestEntry.Status.EXISTING,
compressionCodec,
compressionLevel);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", toCopy.location());
}
Expand All @@ -305,8 +352,18 @@ private static ManifestFile copyManifestInternal(
OutputFile outputFile,
long snapshotId,
SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
ManifestEntry.Status allowedEntryStatus,
String compressionCodec,
Integer compressionLevel) {
ManifestWriter<DataFile> writer =
write(
formatVersion,
reader.spec(),
outputFile,
snapshotId,
ManifestWriter.options()
.compressionCodec(compressionCodec)
.compressionLevel(compressionLevel));
boolean threw = true;
try {
for (ManifestEntry<DataFile> entry : reader.entries()) {
Expand Down
108 changes: 85 additions & 23 deletions core/src/main/java/org/apache/iceberg/ManifestListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
abstract class ManifestListWriter implements FileAppender<ManifestFile> {
private final FileAppender<ManifestFile> writer;

private ManifestListWriter(OutputFile file, Map<String, String> meta) {
this.writer = newAppender(file, meta);
private ManifestListWriter(OutputFile file, Map<String, String> meta, Options options) {
this.writer = newAppender(file, meta, options);
}

protected abstract ManifestFile prepare(ManifestFile manifest);

protected abstract FileAppender<ManifestFile> newAppender(
OutputFile file, Map<String, String> meta);
OutputFile file, Map<String, String> meta, Options options);

@Override
public void add(ManifestFile manifest) {
Expand Down Expand Up @@ -70,17 +70,52 @@ public long length() {
return writer.length();
}

public static Options options() {
return new Options();
}

static class Options {
private String compCodec;
private Integer compLevel;

private Options() {}

public Options compressionCodec(String codec) {
compCodec = codec;
return this;
}

public Options compressionLevel(Integer level) {
compLevel = level;
return this;
}

String compressionCodec() {
return compCodec;
}

Integer compressionLevel() {
return compLevel;
}
}

static class V2Writer extends ManifestListWriter {
private final V2Metadata.IndexedManifestFile wrapper;

V2Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, long sequenceNumber) {
V2Writer(
OutputFile snapshotFile,
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
Options options) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"sequence-number", String.valueOf(sequenceNumber),
"format-version", "2"));
"format-version", "2"),
options);
this.wrapper = new V2Metadata.IndexedManifestFile(snapshotId, sequenceNumber);
}

Expand All @@ -90,15 +125,28 @@ protected ManifestFile prepare(ManifestFile manifest) {
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();
protected FileAppender<ManifestFile> newAppender(
OutputFile file, Map<String, String> meta, Options options) {
String compressionCodec = options.compressionCodec();
Integer compressionLevel = options.compressionLevel();

try {
Avro.WriteBuilder builder =
Avro.write(file)
.schema(V2Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite();

if (compressionCodec != null) {
builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
}

if (compressionLevel != null) {
builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
}

return builder.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
Expand All @@ -108,13 +156,14 @@ protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, St
static class V1Writer extends ManifestListWriter {
private final V1Metadata.IndexedManifestFile wrapper = new V1Metadata.IndexedManifestFile();

V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
V1Writer(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId, Options options) {
super(
snapshotFile,
ImmutableMap.of(
"snapshot-id", String.valueOf(snapshotId),
"parent-snapshot-id", String.valueOf(parentSnapshotId),
"format-version", "1"));
"format-version", "1"),
options);
}

@Override
Expand All @@ -126,15 +175,28 @@ protected ManifestFile prepare(ManifestFile manifest) {
}

@Override
protected FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
try {
return Avro.write(file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite()
.build();
protected FileAppender<ManifestFile> newAppender(
OutputFile file, Map<String, String> meta, Options options) {
String compressionCodec = options.compressionCodec();
Integer compressionLevel = options.compressionLevel();

try {
Avro.WriteBuilder builder =
Avro.write(file)
.schema(V1Metadata.MANIFEST_LIST_SCHEMA)
.named("manifest_file")
.meta(meta)
.overwrite();

if (compressionCodec != null) {
builder.set(TableProperties.AVRO_COMPRESSION, compressionCodec);
}

if (compressionLevel != null) {
builder.set(TableProperties.AVRO_COMPRESSION_LEVEL, compressionLevel.toString());
}

return builder.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: %s", file);
}
Expand Down
Loading