diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 6d96ed51f242d..c02219ae52af2 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -123,7 +123,7 @@ stages: inputs: mavenPomFile: 'pom.xml' goals: 'test' - options: -Punit-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync + options: -Punit-tests -pl hudi-cli,hudi-utilities publishJUnitResults: false jdkVersionOption: '1.8' mavenOptions: '-Xmx2g $(MAVEN_OPTS)' @@ -132,11 +132,12 @@ stages: inputs: mavenPomFile: 'pom.xml' goals: 'test' - options: -Pfunctional-tests -pl hudi-client/hudi-client-common,hudi-client/hudi-flink-client,hudi-client/hudi-java-client,hudi-cli,hudi-utilities,hudi-sync/hudi-hive-sync + options: -Pfunctional-tests -pl hudi-utilities publishJUnitResults: false jdkVersionOption: '1.8' mavenOptions: '-Xmx2g $(MAVEN_OPTS)' - job: UT_FT_4 + condition: false displayName: UT FT other modules steps: - task: Cache@2 @@ -175,6 +176,7 @@ stages: jdkVersionOption: '1.8' mavenOptions: '-Xmx2g $(MAVEN_OPTS)' - job: IT + condition: false steps: - task: AzureCLI@2 displayName: Azure CLI diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientMultiWriter.java similarity index 94% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientMultiWriter.java index 60979c33521d1..d103b09007054 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientMultiWriter.java @@ -16,9 +16,12 @@ * limitations under the License. */ -package org.apache.hudi.client; +package org.apache.hudi.client.functional; import org.apache.hadoop.fs.Path; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -40,6 +43,9 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -56,6 +62,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; @@ -64,6 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +@Tag("functional") public class TestHoodieClientMultiWriter extends HoodieClientTestBase { public void setUpMORTestTable() throws IOException { @@ -82,14 +90,19 @@ public void clean() throws IOException { cleanupResources(); } - @ParameterizedTest - @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) - public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception { + //@ParameterizedTest + //@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + @RepeatedTest(20) + public void testHoodieClientBasicMultiWriter() throws Exception { + LOG.warn("\n\n +++++ Starting a new test for testHoodieClientBasicMultiWriter"); + HoodieTableType tableType = HoodieTableType.MERGE_ON_READ; if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) @@ -103,6 +116,8 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E ExecutorService executors = Executors.newFixedThreadPool(2); SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + AtomicBoolean writer1Conflict = new AtomicBoolean(false); + AtomicBoolean writer2Conflict = new AtomicBoolean(false); Future future1 = executors.submit(() -> { String newCommitTime = "004"; int numRecords = 100; @@ -111,7 +126,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e1) { assertTrue(e1 instanceof HoodieWriteConflictException); - throw new RuntimeException(e1); + writer1Conflict.set(true); } }); Future future2 = executors.submit(() -> { @@ -122,11 +137,13 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e2) { assertTrue(e2 instanceof HoodieWriteConflictException); - throw new RuntimeException(e2); + writer2Conflict.set(true); } }); future1.get(); future2.get(); + Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get()); + Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get()); } @Test diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java index ee7427866feb8..8e426a6170e62 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/functional/SparkClientFunctionalTestSuite.java @@ -19,13 +19,16 @@ package org.apache.hudi.functional; +import org.apache.hudi.client.functional.TestHoodieClientMultiWriter; + import org.junit.platform.runner.JUnitPlatform; import org.junit.platform.suite.api.IncludeTags; -import org.junit.platform.suite.api.SelectPackages; +import org.junit.platform.suite.api.SelectClasses; import org.junit.runner.RunWith; @RunWith(JUnitPlatform.class) -@SelectPackages({"org.apache.hudi.client.functional", "org.apache.hudi.table.functional"}) +//@SelectPackages({"org.apache.hudi.client.functional", "org.apache.hudi.table.functional"}) +@SelectClasses({TestHoodieClientMultiWriter.class}) @IncludeTags("functional") public class SparkClientFunctionalTestSuite { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java index 41d8cd775f3a1..8bce236e663e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java @@ -30,14 +30,14 @@ public class LockConfiguration implements Serializable { public static final String LOCK_PREFIX = "hoodie.write.lock."; public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "wait_time_ms_between_retry"; - public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L); + public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(200L); public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "max_wait_time_ms_between_retry"; public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "client.wait_time_ms_between_retry"; public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "num_retries"; - public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(3); + public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(10); public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "client.num_retries"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 89f4cfc77f0ff..4ae99d4d1ac59 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -38,6 +38,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -83,17 +84,20 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona String tableBasePath; int totalRecords; - @ParameterizedTest - @EnumSource(HoodieTableType.class) - void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception { + //@ParameterizedTest + //@EnumSource(HoodieTableType.class) + @RepeatedTest(20) + void testUpsertsContinuousModeWithMultipleWritersForConflicts() throws Exception { + LOG.warn("\n\n++++ Starting a new test for testUpsertsContinuousModeWithMultipleWritersForConflicts "); + HoodieTableType tableType = HoodieTableType.MERGE_ON_READ; // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -123,17 +127,20 @@ void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType ta cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1"); } - @ParameterizedTest - @EnumSource(HoodieTableType.class) - void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception { + //@ParameterizedTest + //@EnumSource(HoodieTableType.class) + @RepeatedTest(20) + void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts() throws Exception { + LOG.warn("\n\n++++ Starting a new test for testUpsertsContinuousModeWithMultipleWritersWithoutConflicts "); + HoodieTableType tableType = HoodieTableType.MERGE_ON_READ; // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "15"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java index 98bba5b4eee6e..4bf74d0aec2b6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/UtilitiesFunctionalTestSuite.java @@ -21,11 +21,12 @@ import org.junit.platform.runner.JUnitPlatform; import org.junit.platform.suite.api.IncludeTags; -import org.junit.platform.suite.api.SelectPackages; +import org.junit.platform.suite.api.SelectClasses; import org.junit.runner.RunWith; @RunWith(JUnitPlatform.class) -@SelectPackages("org.apache.hudi.utilities.functional") +//@SelectPackages("org.apache.hudi.utilities.functional") +@SelectClasses({TestHoodieDeltaStreamerWithMultiWriter.class}) @IncludeTags("functional") public class UtilitiesFunctionalTestSuite {