Skip to content

Conversation

@hankfanchiu
Copy link

Summary

Partially revert e4df91e (from #2960) and allow a no-op partition replacement operation to be committed.

Motivation

#2895 encountered an exception when attempting to insert overwrite with an empty dataset from Spark.

#2960 addressed the issue above by skipping the commit operation entirely (in both Spark 2 and Spark 3).

However, we need to be able to differentiate between a no-op commit vs. a lack of attempt to commit.

Concretely, we have scheduled Spark pipelines that use Iceberg metadata to track commits and read targeted Iceberg snapshots. We additionally set some snapshot-property.<custom key> to externally "name" each snapshot.

With #2960, an upstream Spark application skipping a commit would cause the downstream Spark application to fail to find and read the expected Iceberg snapshot by the custom snapshot property.

Testing

The test case introduced by #2960 still passes:

@Test
public void testEmptyOverwrite() throws IOException {
File parent = temp.newFolder(format.toString());
File location = new File(parent, "test");
HadoopTables tables = new HadoopTables(CONF);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build();
Table table = tables.create(SCHEMA, spec, location.toString());
List<SimpleRecord> records = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);
List<SimpleRecord> expected = records;
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
df.select("id", "data").write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Append)
.save(location.toString());
Dataset<Row> empty = spark.createDataFrame(ImmutableList.of(), SimpleRecord.class);
empty.select("id", "data").write()
.format("iceberg")
.option(SparkWriteOptions.WRITE_FORMAT, format.toString())
.mode(SaveMode.Overwrite)
.option("overwrite-mode", "dynamic")
.save(location.toString());
table.refresh();
Dataset<Row> result = spark.read()
.format("iceberg")
.load(location.toString());
List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
Assert.assertEquals("Result rows should match", expected, actual);
}

On Spark 2, I've also run an application that saves an empty Dataset in overwrite mode, resulting in a new but no-op snapshot:

  "snapshots" : [ {
    "snapshot-id" : 1680973636538102330,
    "timestamp-ms" : 1630102232337,
    "summary" : {
      "operation" : "overwrite",
      "spark.app.id" : "<omitted>",
      "replace-partitions" : "true",
      "<custom key>" : "<omitted>",
      "changed-partition-count" : "0",
      "total-records" : "0",
      "total-files-size" : "0",
      "total-data-files" : "0",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "<omitted>.avro",
    "schema-id" : 0
  } ],

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the consensus in Slack was that we don't want this to be the default behavior. We only want to allow empty commits if a flag is set, otherwise we want it to be a no-op.

So we need to add a new table option, and check it on whether or not to make the commit in the MergingSnapshotProducer apply method.

@hankfanchiu
Copy link
Author

we need to add a new table option, and check it on whether or not to make the commit in the MergingSnapshotProducer apply method.

What would you suggest as the name of this configuration option? How about one of the following?

  1. commit.allow-empty.enabled, default: false
  2. commit.skip-empty.enabled, default: true
  3. commit.omit-empty.enabled, default: true

Having the verb first is different than an existing option that has the affected entity followed by the action, i.e. noun and then verb:

public static final String MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled";
public static final boolean MANIFEST_MERGE_ENABLED_DEFAULT = true;

Some other options:

  • empty-allow or empty-skip or empty-omit seems unintuitive?
  • empty-allowed.enabled or empty-skipped.enabled or empty-omitted.enabled stutters a bit?
  • empty.enabled might be ambiguous?

@rdblue
Copy link
Contributor

rdblue commented Sep 12, 2021

Allow isn't a good term because it implies failure if something is not allowed, not skipping. Skip and omit are okay, but I think that we want the default to be false so that adding the setting is positive: keep empty commits vs skip empty commits. That helps the default seem less surprising.

So what I'm leaning toward is commit.keep-empty.enabled. Does that sound alright to everyone?

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Sep 12, 2021 via email

@github-actions
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 19, 2024
@github-actions
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jul 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants