diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java deleted file mode 100644 index 6dfe0da797f8e..0000000000000 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FixtureUtils.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.common.testutils; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URL; -import java.nio.file.Path; -import java.util.Objects; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -public final class FixtureUtils { - - public static Path prepareFixtureTable(URL fixtureResource, Path basePath) throws IOException { - File zippedFixtureTable = new File(fixtureResource.getFile()); - try (ZipInputStream zis = new ZipInputStream(new FileInputStream(zippedFixtureTable))) { - byte[] buffer = new byte[1024]; - ZipEntry zipEntry = zis.getNextEntry(); - Path tableBasePath = basePath.resolve(Objects.requireNonNull(zipEntry).getName() - .replaceAll(File.separator + "$", "")); - while (zipEntry != null) { - File newFile = newFile(basePath.toFile(), zipEntry); - if (zipEntry.isDirectory()) { - if (!newFile.isDirectory() && !newFile.mkdirs()) { - throw new IOException("Failed to create directory " + newFile); - } - } else { - // fix for Windows-created archives - File parent = newFile.getParentFile(); - if (!parent.isDirectory() && !parent.mkdirs()) { - throw new IOException("Failed to create directory " + parent); - } - - // write file content - try (FileOutputStream fos = new FileOutputStream(newFile)) { - int len; - while ((len = zis.read(buffer)) > 0) { - fos.write(buffer, 0, len); - } - } - } - zipEntry = zis.getNextEntry(); - } - zis.closeEntry(); - return tableBasePath; - } - } - - public static File newFile(File destinationDir, ZipEntry zipEntry) throws IOException { - File destFile = new File(destinationDir, zipEntry.getName()); - - String destDirPath = destinationDir.getCanonicalPath(); - String destFilePath = destFile.getCanonicalPath(); - - if (!destFilePath.startsWith(destDirPath + File.separator)) { - throw new IOException("Entry is outside of the target dir: " + zipEntry.getName()); - } - - return destFile; - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 6a3831a960561..5d79e83907b5b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -38,6 +38,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -48,15 +49,12 @@ import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.List; -import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import static org.apache.hudi.common.testutils.FixtureUtils.prepareFixtureTable; -import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE; import static org.apache.hudi.config.HoodieWriteConfig.BULK_INSERT_SORT_MODE; import static org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE; @@ -68,31 +66,50 @@ import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs; import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner; -import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.DEFAULT_PARTITION_NUM; -import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.dataGeneratorMap; -import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource.initDataGen; @Tag("functional") public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness { - private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE"; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class); String basePath; String propsFilePath; String tableBasePath; - int totalRecords; @ParameterizedTest @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - setUpTestTable(tableType); + basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; + tableBasePath = basePath + "/testtable_" + tableType; prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + HoodieDeltaStreamer.Config prepJobConfig = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + prepJobConfig.continuousMode = true; + prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); + + // Prepare base dataset with some commits + deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + } else { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + } + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + return true; + }); HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); @@ -125,12 +142,36 @@ void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType ta @EnumSource(HoodieTableType.class) void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - setUpTestTable(tableType); + basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; + tableBasePath = basePath + "/testtable_" + tableType; prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; + + HoodieDeltaStreamer.Config prepJobConfig = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); + prepJobConfig.continuousMode = true; + prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); + + // Prepare base dataset with some commits + deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + } else { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + } + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + return true; + }); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict props = prepareMultiWriterProps(fs(), basePath, propsFilePath); @@ -164,26 +205,41 @@ void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableTyp cfgIngestionJob2, backfillJob2, cfgBackfillJob2, false, "batch2"); } + @Disabled @ParameterizedTest @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"}) - public void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { - testCheckpointCarryOver(tableType); - } - - private void testCheckpointCarryOver(HoodieTableType tableType) throws Exception { + void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to InProcessLockProvider since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts - setUpTestTable(tableType); + basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); + propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; + tableBasePath = basePath + "/testtable_" + tableType; prepareInitialConfigs(fs(), basePath, "foo"); TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); + // Keep it higher than batch-size to test continuous mode + int totalRecords = 3000; - HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, + HoodieDeltaStreamer.Config prepJobConfig = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName())); - cfgIngestionJob.continuousMode = true; - cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); - cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + prepJobConfig.continuousMode = true; + prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); + prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); + + // Prepare base dataset with some commits + deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { + if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs()); + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs()); + } else { + TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs()); + } + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext()); + return true; + }); // create a backfill job with checkpoint from the first instant HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -292,26 +348,6 @@ private static HoodieDeltaStreamer.Config getDeltaStreamerConfig(String basePath return cfg; } - /** - * Specifically used for {@link TestHoodieDeltaStreamerWithMultiWriter}. - * - * The fixture test tables have random records generated by - * {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator} using - * {@link org.apache.hudi.common.testutils.HoodieTestDataGenerator#TRIP_EXAMPLE_SCHEMA}. - * - * The COW fixture test table has 3000 unique records in 7 commits. - * The MOR fixture test table has 3000 unique records in 9 deltacommits and 1 compaction commit. - */ - private void setUpTestTable(HoodieTableType tableType) throws IOException { - basePath = Paths.get(URI.create(basePath().replaceAll("/$", ""))).toString(); - propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER; - String fixtureName = String.format("fixtures/testUpsertsContinuousModeWithMultipleWriters.%s.zip", tableType.name()); - tableBasePath = prepareFixtureTable(Objects.requireNonNull(getClass() - .getClassLoader().getResource(fixtureName)), Paths.get(basePath)).toString(); - initDataGen(sqlContext(), tableBasePath + "/*/*.parquet", DEFAULT_PARTITION_NUM); - totalRecords = dataGeneratorMap.get(DEFAULT_PARTITION_NUM).getNumExistingKeys(TRIP_EXAMPLE_SCHEMA); - } - private void runJobsInParallel(String tableBasePath, HoodieTableType tableType, int totalRecords, HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config cfgIngestionJob, HoodieDeltaStreamer backfillJob, HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, String jobId) throws Exception { diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip deleted file mode 100644 index 9611d27690577..0000000000000 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.COPY_ON_WRITE.zip and /dev/null differ diff --git a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip b/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip deleted file mode 100644 index 1e498310ff71a..0000000000000 Binary files a/hudi-utilities/src/test/resources/fixtures/testUpsertsContinuousModeWithMultipleWriters.MERGE_ON_READ.zip and /dev/null differ