Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Oct 25, 2021

( The following are copied from #3307 (comment) )

I got a case that would break the flaky unit test from here:

In the unit test case, we are trying to write the following records into apache iceberg table by shuffling by partition field data (The parallelism is 2):

(1, 'aaa'), (1, 'bbb'), (1, 'ccc')
(2, 'aaa'), (2, 'bbb'), (2, 'ccc')
(3, 'aaa'), (3, 'bbb'), (3, 'ccc')

As we may produces multiple checkpoints when the streaming job is running, Then it's possible that we write the records in the following checkpoints:

  • checkpoint#1

    • (1, 'aaa')
    • (1, 'bbb')
    • (1, 'ccc')
  • checkpoint#2

    • (2, 'aaa'),
    • (2, 'bbb'),
    • (2, 'ccc')
    • (3, 'aaa'),
    • (3, 'bbb'),
    • (3, 'ccc')

Then it will produces a seperate data file for each partition in the given checkpoint. Let's say:

  • checkpoint#1

    • produces data-file-1 for partition aaa
    • produces data-file-2 for partition bbb
    • produces data-file-3 for partition ccc
  • checkpoint#2

    • produces data-file-4 for partition aaa
    • produces data-file-5 for partition bbb
    • produces data-file-6 for partition ccc

In the IcebergFilesCommitter , we will use the default MergeAppend to merge the manifest. Then we will produces:

  • checkpoint#1

    • manifest-1: includes data-file-1, data-file-2, data-file-3
  • checkpoint#2

    • manifest-2: includes data-file-1, data-file-2, data-file-3, data-file-4, data-file-5, data-file-6

Then finally, in this line, we will encounter the data-file1 twice in the result map. Finally, the assert would be failure.

I think we only need to find out the newly added data files for each given commit for fixing this unit test purpose.

@openinx openinx requested a review from szehon-ho October 25, 2021 09:19
@github-actions github-actions bot added the flink label Oct 25, 2021
@openinx openinx requested a review from rdblue October 25, 2021 09:51
@rdblue
Copy link
Contributor

rdblue commented Oct 25, 2021

@openinx, why would a manifest merge be triggered? It seems like this should not happen, or should happen consistently. I don't understand why this could cause a test to be flaky.

} else {
result.put(manifestFile.snapshotId(), dataFiles);
}
Snapshot current = table.currentSnapshot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to modify the old code rather than traversing snapshots and planning scans. All you need to do is to read the manifest entries and add the data files to the map using each entry's snapshot ID instead of the manifest's snapshot ID. You can use ManifestGroup.entries to get the entries.

Copy link
Member

@szehon-ho szehon-ho Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here ManifestEntry interface and methods are package-protected, hence they can't be used outside (As I mentioned over at the other thread, when I was also looking to fix the original code).

And so was wondering, whether the interface can be exposed as public? I was curious the original reason why it was kept package protected, as "Entries" metadata table is already publically exposed via Spark to users, and it's already publically documented on the spec.

Copy link
Contributor

@rdblue rdblue Oct 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why we don't expose the manifest entry is that it's confusing. We don't want users to read a manifest and assume that all of the entries represent files in the table because we track deleted files in the same metadata. So it is more that users would need to know more about the spec and we don't think that it is likely to be used correctly.

I'm still open to the idea of making this public. But if we don't need it then I'd opt not to.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you mean. If you are curious, a real life use case for ManifestEntry we designed was a bit similar to this.

We were trying to build a Data Latency monitoring application that measures max data latency per partition. So we wanted to go through snapshots , then search reachable manifest entries for all ADDED data files matching a partition, and find the latest commit time from them all.

We end up trying to join snapshot + all_entries metadata tables, but due to perf issues and bugs with metadata tables aggregation, started to look at Table and ManifestReader API to explore a set of known snapshots/manifest files directly, as we kind of knew what time frame the partition was expected to land at latest. But without knowing the context of DataFile, this solution is hard to get working (DataFile in EXISTING mode resulting from metadata rewrite throws it off).

Not sure if there was an easier way to do it :)

@szehon-ho
Copy link
Member

@rdblue for context, the possible issue is that FlinkSinks have auto-commit , and so sometimes there is a commit between the two writes of the same record value and thus two files are created. The test (checking that same records go to same file) was asserting only one file created.

Thanks @openinx for giving a try to fix it.

@rdblue
Copy link
Contributor

rdblue commented Oct 25, 2021

@szehon-ho, I see that there could be multiple data files here, but I don't quite see how the current code would get it wrong. Still, being more careful and making sure the snapshot ID matches the actual data file is likely to fix it if that's the problem.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 after @szehon-ho's comments that the relevant interface is package-private.

@rdblue rdblue merged commit 8e22f36 into apache:master Oct 25, 2021
@openinx
Copy link
Member Author

openinx commented Oct 26, 2021

@openinx, why would a manifest merge be triggered? It seems like this should not happen, or should happen consistently. I don't understand why this could cause a test to be flaky.

Reconsidered this question and checked the condition to trigger real manifest file merge in MergeAppend, in this testHashDistributeMode unit tests, we will produce 9 data files at most, so it's unlikely to trigger the manifests merge ( the default threshold is 100) .

  public static final String MANIFEST_MIN_MERGE_COUNT = "commit.manifest.min-count-to-merge";
  public static final int MANIFEST_MIN_MERGE_COUNT_DEFAULT = 100;

I also wrote a small case to verify this:

@RunWith(Parameterized.class)
public class TestSimpleDataUtil extends TableTestBase {

  @Parameterized.Parameters(name = "formatVersion = {0}")
  public static Object[] parameters() {
    return new Object[] {1}; // We don't actually use the format version since everything is mock
  }

  public TestSimpleDataUtil(int formatVersion) {
    super(formatVersion);
  }

  @Test
  public void testDataFiles() throws IOException {
    table.newAppend()
        .appendFile(FILE_A)
        .commit();

    table.newAppend()
        .appendFile(FILE_B)
        .commit();

    table.newAppend()
        .appendFile(FILE_C)
        .commit();

    Map<Long, List<DataFile>> files = SimpleDataUtil.snapshotToDataFiles(table);
    Assert.assertEquals(3, files.size());
    for (Map.Entry<Long, List<DataFile>> entry : files.entrySet()) {
      Assert.assertEquals(1, entry.getValue().size());
    }
  }
}

So I think @rdblue is right, we still don't get the real root cause why does it break the UT. Will need more careful work to catch it.

@openinx
Copy link
Member Author

openinx commented Oct 26, 2021

Thanks @szehon-ho and @stevenzwu for the explaining and check.

@rdblue
Copy link
Contributor

rdblue commented Oct 26, 2021

@openinx, thanks for looking into it. Even if merge isn't the cause, having the wrong snapshot ID in the map could cause an issue so it's good to fix this and see if it helps. Thanks for working on this!

@stevenzwu
Copy link
Contributor

I am seeing the same issue in the CI for my PR: https://github.com/apache/iceberg/runs/4146901429?check_suite_focus=true

I have rebased the branch with the latest master. so it has this change already.

@zhongyujiang
Copy link
Contributor

zhongyujiang commented Jan 25, 2022

Hi @openinx, I am still seeing this error in CI sometimes, I think this is maybe caused by the way notifyCheckpointComplete works in IcebergFilesCommitter:

public void notifyCheckpointComplete(long checkpointId) throws Exception {
    super.notifyCheckpointComplete(checkpointId);
    // It's possible that we have the following events:
    //   1. snapshotState(ckpId);
    //   2. snapshotState(ckpId+1);
    //   3. notifyCheckpointComplete(ckpId+1);
    //   4. notifyCheckpointComplete(ckpId);
    // For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
    // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
    if (checkpointId > maxCommittedCheckpointId) {
      commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
      this.maxCommittedCheckpointId = checkpointId;
    }
  }

My understanding is that not every ckpt of Flink corresponds to a commit in Iceberg, sometimes a snapshot may contains data files of several ckpts. And the checkpoint interval (400 ms) here is rather small, which makes this situation very likely. What do you think?

@zhongyujiang
Copy link
Contributor

zhongyujiang commented Jan 25, 2022

private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
if (deleteFilesNum == 0) {
// To be compatible with iceberg format V1.
AppendFiles appendFiles = table.newAppend();
int numFiles = 0;
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
numFiles += result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
} else {

Results of ckpts without delete file will be merged into one transaction.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants