Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected TableEnvironment getTableEnv() {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(400);
env.enableCheckpointing(1000);
env.setMaxParallelism(2);
env.setParallelism(2);
tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
Expand Down Expand Up @@ -277,15 +277,15 @@ public void testHashDistributeMode() throws Exception {
));

// Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval,
// thus producing multiple snapshots. Here we assert that each snapshot has only 1 file per partition.
// thus producing multiple snapshots. Here we assert that each snapshot has no more than 1 file per partition.
Map<Long, List<DataFile>> snapshotToDataFiles = SimpleDataUtil.snapshotToDataFiles(table);
for (List<DataFile> dataFiles : snapshotToDataFiles.values()) {
Assert.assertEquals("There should be 1 data file in partition 'aaa'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size());
Assert.assertEquals("There should be 1 data file in partition 'bbb'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "bbb")).size());
Assert.assertEquals("There should be 1 data file in partition 'ccc'", 1,
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "ccc")).size());
Assert.assertTrue("There should be no more than 1 data file in partition 'aaa'",
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "aaa")).size() < 2);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the assert condition, because if there are multiple checkpoints, data may arrive in this way:
ck1: (1, "aaa")
ck2: (1, "bbb")
...
so I think we should assert each snapshot has no more than 1 file per partition, since it could be 0 file as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear to me how this change of assertion is related the potential cause you described where 2 checkpoint cycles can be committed in one shot. Then we can get 2 files for one partition. why would we get 0 file for a partition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't figure out a way to validate hash distribution when there have merged results of multiple ckpts, originally I simply disabled this test running in streaming mode.
This is an update for blue's comment,

I think I would prefer a fix that avoids the root cause but still runs the test in streaming mode. I understand your concern about not being able to necessarily guarantee we won't have a flaky test, but we can probably set that high enough (1s?) that we don't see it in practice.

I improved ck interval to 1000ms to reduce merge results possibility. And in streaming mode, I think the original assert is not right given the checkpoint scenario mentioned in my last comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error you encountered is value is 2 (not 1). Hence I said this change from == 1 to < 2 won't even work around the error. anyway, it seems that other discussions in the PR already led us to the right root cause and solution.

Assert.assertTrue("There should be no more than 1 data file in partition 'bbb'",
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "bbb")).size() < 2);
Assert.assertTrue("There should be no more than 1 data file in partition 'ccc'",
SimpleDataUtil.matchingPartitions(dataFiles, table.spec(), ImmutableMap.of("data", "ccc")).size() < 2);
}
} finally {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
Expand Down