From d5a0c16e7dea51a1d1d3f220ad6484f03836ae6e Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 29 Aug 2022 14:57:04 -0700 Subject: [PATCH] Fixing flaky deltastreamer test --- .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 88948b03850ac..69d6dd7d3b298 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -902,6 +902,7 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "")); cfg.configs.add(String.format("%s=%s", HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0")); cfg.configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1")); + cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs); @@ -947,13 +948,14 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws assertFalse(replacedFilePaths.isEmpty()); // Step 4 : Insert 1 record and trigger sync/async cleaner and archive. - List configs = getAsyncServicesConfigs(1, "true", "true", "2", "", ""); + List configs = getAsyncServicesConfigs(1, "true", "true", "6", "", ""); configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS")); configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1")); configs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2")); configs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3")); configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), asyncClean)); configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1")); + cfg.configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT")); if (asyncClean) { configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));