Skip to content

Conversation

@zhongyujiang
Copy link
Contributor

This pr fix unit test TestFlinkSink#testHashDistributeMode which fails occassionally in Flink CI, have been discussed a lot in #2989 and #3365.
I think the root cause is the way notifyCheckpointComplete works in IcebergFilesCommitter:

public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
// It's possible that we have the following events:
// 1. snapshotState(ckpId);
// 2. snapshotState(ckpId+1);
// 3. notifyCheckpointComplete(ckpId+1);
// 4. notifyCheckpointComplete(ckpId);
// For step#4, we don't need to commit iceberg table again because in step#3 we've committed all the files,
// Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
}
}

private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
if (deleteFilesNum == 0) {
// To be compatible with iceberg format V1.
AppendFiles appendFiles = table.newAppend();
int numFiles = 0;
for (WriteResult result : pendingResults.values()) {
Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
numFiles += result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
}
commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
} else {

As show above, results of multiple ckpts may be merged into one commit in streaming mode, and the checkpoint interval (400 ms) here is rather small, which makes this situation very likely.

Increasing checkpoint interval would reduce such failure, but it cannot be completely eliminated in theory. So I simply made this unit test only apply for batch mode, which is enough to validate Hash distribute mode in my opinion.

@openinx @szehon-ho could help review this? thanks!

@github-actions github-actions bot added the flink label Feb 14, 2022
@szehon-ho
Copy link
Member

@stevenzwu what do you think?

@rdblue
Copy link
Contributor

rdblue commented Feb 16, 2022

@zhongyujiang, I think I would prefer a fix that avoids the root cause but still runs the test in streaming mode. I understand your concern about not being able to necessarily guarantee we won't have a flaky test, but we can probably set that high enough (1s?) that we don't see it in practice.

@zhongyujiang
Copy link
Contributor Author

@rdblue I have updated, not sure 1s is high enough but let's give it a try first.

Assert.assertEquals("There should be 1 data file in partition 'ccc'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "ccc")).size());
Assert.assertTrue("There should be no more than 1 data file in partition 'aaa'",
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size() < 2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the assert condition, because if there are multiple checkpoints, data may arrive in this way:
ck1: (1, "aaa")
ck2: (1, "bbb")
...
so I think we should assert each snapshot has no more than 1 file per partition, since it could be 0 file as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear to me how this change of assertion is related the potential cause you described where 2 checkpoint cycles can be committed in one shot. Then we can get 2 files for one partition. why would we get 0 file for a partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't figure out a way to validate hash distribution when there have merged results of multiple ckpts, originally I simply disabled this test running in streaming mode.
This is an update for blue's comment,

I think I would prefer a fix that avoids the root cause but still runs the test in streaming mode. I understand your concern about not being able to necessarily guarantee we won't have a flaky test, but we can probably set that high enough (1s?) that we don't see it in practice.

I improved ck interval to 1000ms to reduce merge results possibility. And in streaming mode, I think the original assert is not right given the checkpoint scenario mentioned in my last comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error you encountered is value is 2 (not 1). Hence I said this change from == 1 to < 2 won't even work around the error. anyway, it seems that other discussions in the PR already led us to the right root cause and solution.

@stevenzwu
Copy link
Contributor

@zhongyujiang Flink default only has 1 concurrent checkpoint. could the scenario you described happen in this case?

@openinx
Copy link
Member

openinx commented Feb 17, 2022

@zhongyujiang , what's the current failure stacktrace you encountered ? I'd like to take a careful look to this problem, and hope we can fix this in this round work.

@zhongyujiang
Copy link
Contributor Author

@zhongyujiang Flink default only has 1 concurrent checkpoint. could the scenario you described happen in this case?

I think it's not relevant to Flink checkpoint but to notifyCheckpointComplete working mechanism, quoted from method docs of notifyCheckpointComplete:

Notifies the listener that the checkpoint with the given checkpointId completed and was committed.
These notifications are "best effort", meaning they can sometimes be skipped. To behave properly, implementers need to follow the "Checkpoint Subsuming Contract". Please see the class-level JavaDocs for details.
Please note that checkpoints may generally overlap, so you cannot assume that the notifyCheckpointComplete() call is always for the latest prior checkpoint (or snapshot) that was taken on the function/operator implementing this interface. It might be for a checkpoint that was triggered earlier. Implementing the "Checkpoint Subsuming Contract" (see above) properly handles this situation correctly as well.
Please note that throwing exceptions from this method will not cause the completed checkpoint to be revoked. Throwing exceptions will typically cause task/job failure and trigger recovery.

IcebergFilesCommitter will commit to Iceberg once get notified, and such notifications does not have the same ordering guarantee as Flink checkpoint.

@zhongyujiang
Copy link
Contributor Author

@zhongyujiang , what's the current failure stacktrace you encountered ? I'd like to take a careful look to this problem, and hope we can fix this in this round work.

Like this:

java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>

Haven't encountered locally yet.

@zhongyujiang
Copy link
Contributor Author

@openinx Found one in CI:

org.apache.iceberg.flink.TestFlinkTableSink > testHashDistributeMode[catalogName=testhadoop, baseNamespace=, format=ORC, isStreaming=true] FAILED
java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:283)

@openinx
Copy link
Member

openinx commented Feb 17, 2022

Reconsidered this test case, I think @zhongyujiang is getting the root cause in the correct direction. Let's explain the cause here:

In the unit test case, we are trying to write the following records into apache iceberg table by shuffling by partition field data (The parallelism is 2):

(1, 'aaa'), (1, 'bbb'), (1, 'ccc')
(2, 'aaa'), (2, 'bbb'), (2, 'ccc')
(3, 'aaa'), (3, 'bbb'), (3, 'ccc')

As we may produces multiple checkpoints when the streaming job is running, Then it's possible that we write the records in the following checkpoints:

  • checkpoint#1

    • (1, 'aaa')
    • (1, 'bbb')
    • (1, 'ccc')
  • checkpoint#2

    • (2, 'aaa'),
    • (2, 'bbb'),
    • (2, 'ccc')
    • (3, 'aaa'),
    • (3, 'bbb'),
    • (3, 'ccc')

Then it will produces a seperate data file for each partition in the given checkpoint. Let's say:

  • checkpoint#1

    • produces data-file-1 for partition aaa
    • produces data-file-2 for partition bbb
    • produces data-file-3 for partition ccc
  • checkpoint#2

    • produces data-file-4 for partition aaa
    • produces data-file-5 for partition bbb
    • produces data-file-6 for partition ccc

Assume the snapshotState & notifyCheckpointComplete are arrived as the following:

  1. snapshotState(ckpt1);
  2. snapshotState(ckpt2);
  3. notifyCheckpointComplete(ckpt2); ( It's possible just as the flink javadoc said)
  4. notifyCheckpointComplete(ckpt1);

Then in the step#3, it will commit one transaction with the alll the data files which comes from checkpoint#1 & checkpoint#2 (According to the this IcebergFilesCommitter implementation) , finally this latest snapshot will include all the data files from data-file-1 to data-file-6. That is why we encounter the failure assertion.

java.lang.AssertionError: There should be 1 data file in partition 'aaa' expected:<1> but was:<2>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:647)
at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:283)

@openinx
Copy link
Member

openinx commented Feb 17, 2022

But I generally don't think the current fix is in the correct direction, there are my points:

  • Indeed, increasing the checkpoint interval from 400ms to 1000ms reduce the probability to encounter this assertion failure, But it does not resolve the underlying real problem. So I don't think it's right to increase the checkpoint interval.
  • Assert that the snapshot's data file size is less than 2 does not change any thing in my view.

I think the real intention that we designed this unit test is: we want to ensure that there is only one generated data file in each given partition if we commit those rows in only one single deterministic iceberg transaction, once we enable the switch write.distribution-mode=hash in both flink streaming & batch jobs.

The current root cause is: we cannot make it trigger only one checkpoint for the given 9 rows in the flink streaming sql job. So I think the correct direction is: make only one checkpoint to write those 9 rows and finally we still assert there is only one data file in each given partition. To accomplish this goal, I think we can use the BoundedTestSource to reimplement this unit test. About the BoundedTestSource, here is a good example for how to producing multiple rows into a single checkpoint.

@zhongyujiang
Copy link
Contributor Author

Assert that the snapshot's data file size is less than 2 does not change any thing in my view.

Changing the assertion condition is not intended to solve the validation problem when there are merged results actually, like you said, there could be more than one checkpoint in streaming mode, but there is no guarantee that each checkpoint contains exactly each partition's data. The situation could be like this:

  • checkpoint#1
    • (1, 'aaa')
  • checkpoint#2
    • (1, 'bbb')
      ...

When results of ck1 and ck2 are not merged, then snapshot of ck1 would have only 1 data file for partition aaa but 0 file for other partitions and snapshot of ck2 is also similar, that's why I changed the assertition condition.

To accomplish this goal, I think we can use the BoundedTestSource to reimplement this unit test. About the BoundedTestSource, here is a good example for how to producing multiple rows into a single checkpoint.

I also wanted to solve the problem by controlling the checkpoint in the beginning but didn't figure a convenient way to do so. Using BoundedTestSource seems like a feasible way, I'll try with it. @openinx Thanks for your advice.

@stevenzwu
Copy link
Contributor

@openinx read the javadoc that you linked. seems that notifyCheckpointComplete can be skipped due to best effort. it also said that the notification can't be assumed for the latest snapshot. But it didn't say if they can come out of order. so the scenario could be

snapshotState(ckpt1);
// notifyCheckpointComplete(ckpt1); (missed)
snapshotState(ckpt2);
notifyCheckpointComplete(ckpt2); 

Agree that precise control on the source could be the right solution here.

@yittg
Copy link
Contributor

yittg commented Feb 21, 2022

@openinx @rdblue i add some log here about this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants