diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/complex-dag-cow.yaml index a54edabd2ca67..45f538bb09e8c 100644 --- a/docker/demo/config/test-suite/complex-dag-cow.yaml +++ b/docker/demo/config/test-suite/complex-dag-cow.yaml @@ -17,7 +17,7 @@ first_insert: config: record_size: 70000 num_insert_partitions: 1 - repeat_count: 5 + repeat_count: 1 num_records_insert: 1000 type: InsertNode deps: none @@ -25,7 +25,7 @@ second_insert: config: record_size: 70000 num_insert_partitions: 1 - repeat_count: 5 + repeat_count: 1 num_records_insert: 10000 deps: first_insert type: InsertNode @@ -33,7 +33,7 @@ third_insert: config: record_size: 70000 num_insert_partitions: 1 - repeat_count: 2 + repeat_count: 1 num_records_insert: 300 deps: second_insert type: InsertNode @@ -46,7 +46,7 @@ first_upsert: record_size: 70000 num_insert_partitions: 1 num_records_insert: 300 - repeat_count: 5 + repeat_count: 1 num_records_upsert: 100 num_upsert_partitions: 10 type: UpsertNode @@ -75,7 +75,7 @@ second_upsert: record_size: 70000 num_insert_partitions: 1 num_records_insert: 300 - repeat_count: 5 + repeat_count: 1 num_records_upsert: 100 num_upsert_partitions: 10 type: UpsertNode diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml b/docker/demo/config/test-suite/complex-dag-mor.yaml index 3981603e6277a..2652b03070a7e 100644 --- a/docker/demo/config/test-suite/complex-dag-mor.yaml +++ b/docker/demo/config/test-suite/complex-dag-mor.yaml @@ -17,7 +17,7 @@ first_insert: config: record_size: 70000 num_insert_partitions: 1 - repeat_count: 5 + repeat_count: 1 num_records_insert: 100 type: InsertNode deps: none @@ -25,7 +25,7 @@ second_insert: config: record_size: 70000 num_insert_partitions: 1 - repeat_count: 5 + repeat_count: 1 num_records_insert: 100 deps: first_insert type: InsertNode @@ -33,7 +33,7 @@ third_insert: config: record_size: 70000 num_insert_partitions: 1 - repeat_count: 2 + repeat_count: 1 num_records_insert: 300 deps: second_insert type: InsertNode @@ -46,7 +46,7 @@ first_upsert: record_size: 70000 num_insert_partitions: 1 num_records_insert: 300 - repeat_count: 5 + repeat_count: 1 num_records_upsert: 100 num_upsert_partitions: 10 type: UpsertNode @@ -68,7 +68,7 @@ second_upsert: record_size: 70000 num_insert_partitions: 1 num_records_insert: 300 - repeat_count: 5 + repeat_count: 1 num_records_upsert: 100 num_upsert_partitions: 10 type: UpsertNode @@ -81,11 +81,7 @@ second_hive_query: query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" result1: 0 query2: "select count(*) from testdb.table1" - result2: 3100 - query3: "select count(*) from testdb.table1_rt group by `_row_key` having count(*) > 1" - result3: 0 - query4: "select count(*) from testdb.table1_rt" - result4: 3100 + result2: 1100 type: HiveQueryNode deps: second_upsert first_schedule_compact: @@ -97,7 +93,7 @@ third_upsert: record_size: 70000 num_insert_partitions: 1 num_records_insert: 300 - repeat_count: 5 + repeat_count: 1 num_records_upsert: 100 num_upsert_partitions: 10 type: UpsertNode @@ -114,6 +110,6 @@ third_hive_query: query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" result1: 0 query2: "select count(*) from testdb.table1" - result2: 2210 + result2: 1400 type: HiveQueryNode - deps: second_upsert \ No newline at end of file + deps: first_compact diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index b5e76e8c68745..766db2e982c00 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -181,10 +181,15 @@ docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.6.1-S Copy the following test properties file: ``` echo ' +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + hoodie.deltastreamer.source.test.num_partitions=100 hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false hoodie.deltastreamer.source.test.max_unique_records=100000000 hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator @@ -202,11 +207,16 @@ hoodie.datasource.hive_sync.table=table1 hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor +hoodie.datasource.hive_sync.skip_ro_suffix=true ' > test.properties docker cp test.properties adhoc-2:/opt ``` +``` +docker exec -it adhoc-2 /bin/bash +``` + Clean the working directories before starting a new test: ``` @@ -217,7 +227,6 @@ hdfs dfs -rm -r /user/hive/warehouse/hudi-integ-test-suite/input/ Launch a Copy-on-Write job: ``` -docker exec -it adhoc-2 /bin/bash # COPY_ON_WRITE tables ========================= ## Run the following command to start the test suite @@ -292,5 +301,4 @@ spark-submit \ --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ --table-type MERGE_ON_READ \ --compact-scheduling-minshare 1 -``` - \ No newline at end of file +``` diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index ff4f0a6738dff..a5c1b0ef21c31 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -18,12 +18,6 @@ package org.apache.hudi.integ.testsuite; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieReadClient; @@ -38,16 +32,24 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode; import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode; -import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + /** * A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform * write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper} @@ -66,6 +68,7 @@ public class HoodieTestSuiteWriter { private transient JavaSparkContext sparkContext; private static Set VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>( Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName())); + private static final String GENERATED_DATA_PATH = "generated.data.path"; public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws Exception { @@ -181,12 +184,17 @@ public Option scheduleCompaction(Option> previousCom } } - public void commit(JavaRDD records, Option instantTime) { + public void commit(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) { if (!cfg.useDeltaStreamer) { Map extraMetadata = new HashMap<>(); /** Store the checkpoint in the commit metadata just like * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); + if (generatedDataStats != null) { + // Just stores the path where this batch of data is generated to + extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); + } writeClient.commit(instantTime.get(), records, Option.of(extraMetadata)); } } @@ -218,4 +226,8 @@ public Configuration getConfiguration() { public JavaSparkContext getSparkContext() { return sparkContext; } + + public Option getLastCheckpoint() { + return lastCheckpoint; + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index 320c986323b32..21a84db4a3430 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -96,6 +96,14 @@ public DeltaGenerator getDeltaGenerator() { return deltaGenerator; } + public HoodieTestSuiteConfig getCfg() { + return cfg; + } + + public TypedProperties getProps() { + return props; + } + public String toString() { return this.hoodieTestSuiteWriter.toString() + "\n" + this.deltaGenerator.toString() + "\n"; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java index fdbcc1b127a73..1571349f834ba 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java @@ -24,6 +24,7 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; import org.apache.spark.api.java.JavaRDD; /** @@ -31,6 +32,8 @@ */ public class InsertNode extends DagNode> { + protected JavaRDD deltaWriteStatsRDD; + public InsertNode(Config config) { this.config = config; } @@ -48,7 +51,7 @@ public void execute(ExecutionContext executionContext) throws Exception { log.info("Inserting input data {}", this.getName()); Option commitTime = executionContext.getHoodieTestSuiteWriter().startCommit(); JavaRDD writeStatus = ingest(executionContext.getHoodieTestSuiteWriter(), commitTime); - executionContext.getHoodieTestSuiteWriter().commit(writeStatus, commitTime); + executionContext.getHoodieTestSuiteWriter().commit(writeStatus, this.deltaWriteStatsRDD, commitTime); this.result = writeStatus; } } @@ -56,7 +59,8 @@ public void execute(ExecutionContext executionContext) throws Exception { protected void generate(DeltaGenerator deltaGenerator) throws Exception { if (!config.isDisableGenerate()) { log.info("Generating input data for node {}", this.getName()); - deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)).count(); + this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)); + this.deltaWriteStatsRDD.count(); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java index cf96961a4f337..12588ac036cbd 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java @@ -18,11 +18,16 @@ package org.apache.hudi.integ.testsuite.dag.nodes; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; +import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector; +import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; /** * A rollback node in the DAG helps to perform rollback operations. @@ -49,7 +54,12 @@ public void execute(ExecutionContext executionContext) throws Exception { Option lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant(); if (lastInstant.isPresent()) { log.info("Rolling back last instant {}", lastInstant.get()); + log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get()); + ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR, + DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector"); executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp()); + metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath, + executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true); this.result = lastInstant; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index bf6f5da53e0a5..87686d42ca446 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -18,6 +18,15 @@ package org.apache.hudi.integ.testsuite.dag.scheduler; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; +import org.apache.hudi.integ.testsuite.dag.WorkflowDag; +import org.apache.hudi.integ.testsuite.dag.WriterContext; +import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.metrics.Metrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -28,14 +37,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; -import org.apache.hudi.metrics.Metrics; -import org.apache.hudi.integ.testsuite.dag.ExecutionContext; -import org.apache.hudi.integ.testsuite.dag.WorkflowDag; -import org.apache.hudi.integ.testsuite.dag.WriterContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to @@ -113,12 +114,16 @@ private void executeNode(DagNode node) { throw new RuntimeException("DagNode already completed! Cannot re-execute"); } try { - log.info("executing node: " + node.getName() + " of type: " + node.getClass()); - node.execute(executionContext); + int repeatCount = node.getConfig().getRepeatCount(); + while (repeatCount > 0) { + log.warn("executing node: " + node.getName() + " of type: " + node.getClass()); + node.execute(executionContext); + log.info("Finished executing {}", node.getName()); + repeatCount--; + } node.setCompleted(true); - log.info("Finished executing {}", node.getName()); } catch (Exception e) { - log.error("Exception executing node"); + log.error("Exception executing node", e); throw new HoodieException(e); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java index bfc8368d56e73..b7d71f583777a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java @@ -18,12 +18,17 @@ package org.apache.hudi.integ.testsuite.helpers; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; @@ -62,19 +67,27 @@ public Pair, String> getNextFilePathsAndMaxModificationTime( lastBatchId = 0; nextBatchId = 1; } - - log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit - + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId); // obtain all eligible files for the batch List eligibleFiles = new ArrayList<>(); FileStatus[] fileStatuses = fs.globStatus( new Path(props.getString(Config.ROOT_INPUT_PATH_PROP), "*")); + // Say input data is as follow input/1, input/2, input/5 since 3,4 was rolled back and 5 is new generated data + // checkpoint from the latest commit metadata will be 2 since 3,4 has been rolled back. We need to set the + // next batch id correctly as 5 instead of 3 + Optional correctBatchIdDueToRollback = Arrays.stream(fileStatuses) + .map(f -> f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1]) + .filter(bid1 -> Integer.parseInt(bid1) > lastBatchId) + .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), Integer.parseInt(bid2))); + if (correctBatchIdDueToRollback.isPresent() && Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) { + nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get()); + } + log.info("Using DFSTestSuitePathSelector, checkpoint: " + lastCheckpointStr + " sourceLimit: " + sourceLimit + + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId); for (FileStatus fileStatus : fileStatuses) { if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream() .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { continue; - } else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0 && fileStatus.getPath() - .getName().compareTo(nextBatchId.toString()) <= 0) { + } else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0) { RemoteIterator files = fs.listFiles(fileStatus.getPath(), true); while (files.hasNext()) { eligibleFiles.add(files.next()); diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java index 5568a19a8651e..b14ef1808d00a 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java @@ -28,12 +28,14 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.dag.ComplexDagGenerator; import org.apache.hudi.integ.testsuite.dag.HiveSyncDagGenerator; import org.apache.hudi.integ.testsuite.dag.HiveSyncDagGeneratorMOR; import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator; +import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; import org.apache.hudi.keygen.TimestampBasedKeyGenerator; @@ -105,6 +107,15 @@ public static void initClass() throws Exception { props.setProperty("hoodie.compact.inline.max.delta.commits", "3"); props.setProperty("hoodie.parquet.max.file.size", "1024000"); props.setProperty("hoodie.compact.inline.max.delta.commits", "0"); + props.setProperty("hoodie.index.type", HoodieIndex.IndexType.GLOBAL_SIMPLE.name()); + props.setProperty("hoodie.global.simple.index.parallelism", "2"); + // Reduce shuffle parallelism, spark hangs when numPartitions >> numRecords to process + props.setProperty("hoodie.insert.shuffle.parallelism", "10"); + props.setProperty("hoodie.upsert.shuffle.parallelism", "10"); + props.setProperty("hoodie.bulkinsert.shuffle.parallelism", "10"); + props.setProperty("hoodie.compact.inline.max.delta.commits", "0"); + // Make path selection test suite specific + props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName()); // Hive Configs props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://127.0.0.1:9999/"); props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "testdb1"); diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml index 0592a903a105a..96a6c825a98d0 100644 --- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml @@ -16,17 +16,45 @@ first_insert: config: record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 + num_partitions_insert: 1 + repeat_count: 2 num_records_insert: 100 type: InsertNode deps: none +second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_insert +first_rollback: + config: + deps: second_insert + type: RollbackNode +third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_rollback +first_upsert: + config: + record_size: 70000 + num_partitions_upsert: 1 + repeat_count: 1 + num_records_upsert: 100 + type: UpsertNode + deps: third_insert first_hive_sync: config: queue_name: "adhoc" engine: "mr" type: HiveSyncNode - deps: first_insert + deps: first_upsert first_hive_query: config: hive_props: @@ -34,6 +62,8 @@ first_hive_query: prop3: "set hive.strict.checks.large.query=false" prop4: "set hive.stats.autogather=false" hive_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" result2: 0 type: HiveQueryNode diff --git a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml index 0592a903a105a..96a6c825a98d0 100644 --- a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml @@ -16,17 +16,45 @@ first_insert: config: record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 + num_partitions_insert: 1 + repeat_count: 2 num_records_insert: 100 type: InsertNode deps: none +second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_insert +first_rollback: + config: + deps: second_insert + type: RollbackNode +third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_rollback +first_upsert: + config: + record_size: 70000 + num_partitions_upsert: 1 + repeat_count: 1 + num_records_upsert: 100 + type: UpsertNode + deps: third_insert first_hive_sync: config: queue_name: "adhoc" engine: "mr" type: HiveSyncNode - deps: first_insert + deps: first_upsert first_hive_query: config: hive_props: @@ -34,6 +62,8 @@ first_hive_query: prop3: "set hive.strict.checks.large.query=false" prop4: "set hive.stats.autogather=false" hive_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" result2: 0 type: HiveQueryNode