Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docker/demo/config/test-suite/complex-dag-cow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ first_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 5
repeat_count: 1
num_records_insert: 1000
type: InsertNode
deps: none
second_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 5
repeat_count: 1
num_records_insert: 10000
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 70000
num_insert_partitions: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@n3nash : all dags need to be fixed for partition config.
right ones are
num_partitions_insert
num_partitions_upsert.
where as so far we are using
num_insert_partitions
num_upsert_partitions.

Can you fix these and re-run all your tests and dags. specially around upsert and rollbacks to ensure things are looking good.

repeat_count: 2
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
Expand All @@ -46,7 +46,7 @@ first_upsert:
record_size: 70000
num_insert_partitions: 1
num_records_insert: 300
repeat_count: 5
repeat_count: 1
num_records_upsert: 100
num_upsert_partitions: 10
type: UpsertNode
Expand Down Expand Up @@ -75,7 +75,7 @@ second_upsert:
record_size: 70000
num_insert_partitions: 1
num_records_insert: 300
repeat_count: 5
repeat_count: 1
num_records_upsert: 100
num_upsert_partitions: 10
type: UpsertNode
Expand Down
22 changes: 9 additions & 13 deletions docker/demo/config/test-suite/complex-dag-mor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ first_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 5
repeat_count: 1
num_records_insert: 100
type: InsertNode
deps: none
second_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 5
repeat_count: 1
num_records_insert: 100
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 2
repeat_count: 1
num_records_insert: 300
deps: second_insert
type: InsertNode
Expand All @@ -46,7 +46,7 @@ first_upsert:
record_size: 70000
num_insert_partitions: 1
num_records_insert: 300
repeat_count: 5
repeat_count: 1
num_records_upsert: 100
num_upsert_partitions: 10
type: UpsertNode
Expand All @@ -68,7 +68,7 @@ second_upsert:
record_size: 70000
num_insert_partitions: 1
num_records_insert: 300
repeat_count: 5
repeat_count: 1
num_records_upsert: 100
num_upsert_partitions: 10
type: UpsertNode
Expand All @@ -81,11 +81,7 @@ second_hive_query:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 3100
query3: "select count(*) from testdb.table1_rt group by `_row_key` having count(*) > 1"
result3: 0
query4: "select count(*) from testdb.table1_rt"
result4: 3100
result2: 1100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you are at it, since rollback is not effective, can we remove it from this dag file until we have a fix. I see this dag more as a sanity check dag and so everything should be succeeding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for cow dag as well.

type: HiveQueryNode
deps: second_upsert
first_schedule_compact:
Expand All @@ -97,7 +93,7 @@ third_upsert:
record_size: 70000
num_insert_partitions: 1
num_records_insert: 300
repeat_count: 5
repeat_count: 1
num_records_upsert: 100
num_upsert_partitions: 10
type: UpsertNode
Expand All @@ -114,6 +110,6 @@ third_hive_query:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 2210
result2: 1400
type: HiveQueryNode
deps: second_upsert
deps: first_compact
14 changes: 11 additions & 3 deletions hudi-integ-test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,15 @@ docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.6.1-S
Copy the following test properties file:
```
echo '
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100

hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector

hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
Expand All @@ -202,11 +207,16 @@ hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
hoodie.datasource.hive_sync.skip_ro_suffix=true
' > test.properties

docker cp test.properties adhoc-2:/opt
```

```
docker exec -it adhoc-2 /bin/bash
```

Clean the working directories before starting a new test:

```
Expand All @@ -217,7 +227,6 @@ hdfs dfs -rm -r /user/hive/warehouse/hudi-integ-test-suite/input/
Launch a Copy-on-Write job:

```
docker exec -it adhoc-2 /bin/bash
# COPY_ON_WRITE tables
=========================
## Run the following command to start the test suite
Expand Down Expand Up @@ -292,5 +301,4 @@ spark-submit \
--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
--table-type MERGE_ON_READ \
--compact-scheduling-minshare 1
```

```
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@

package org.apache.hudi.integ.testsuite;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.HoodieReadClient;
Expand All @@ -38,16 +32,24 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
* A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform
* write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper}
Expand All @@ -66,6 +68,7 @@ public class HoodieTestSuiteWriter {
private transient JavaSparkContext sparkContext;
private static Set<String> VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>(
Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName()));
private static final String GENERATED_DATA_PATH = "generated.data.path";

public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws
Exception {
Expand Down Expand Up @@ -181,12 +184,17 @@ public Option<String> scheduleCompaction(Option<Map<String, String>> previousCom
}
}

public void commit(JavaRDD<WriteStatus> records, Option<String> instantTime) {
public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
}
writeClient.commit(instantTime.get(), records, Option.of(extraMetadata));
}
}
Expand Down Expand Up @@ -218,4 +226,8 @@ public Configuration getConfiguration() {
public JavaSparkContext getSparkContext() {
return sparkContext;
}

public Option<String> getLastCheckpoint() {
return lastCheckpoint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ public DeltaGenerator getDeltaGenerator() {
return deltaGenerator;
}

public HoodieTestSuiteConfig getCfg() {
return cfg;
}

public TypedProperties getProps() {
return props;
}

public String toString() {
return this.hoodieTestSuiteWriter.toString() + "\n" + this.deltaGenerator.toString() + "\n";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.spark.api.java.JavaRDD;

/**
* An insert node in the DAG of operations for a workflow.
*/
public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {

protected JavaRDD<DeltaWriteStats> deltaWriteStatsRDD;

public InsertNode(Config config) {
this.config = config;
}
Expand All @@ -48,15 +51,16 @@ public void execute(ExecutionContext executionContext) throws Exception {
log.info("Inserting input data {}", this.getName());
Option<String> commitTime = executionContext.getHoodieTestSuiteWriter().startCommit();
JavaRDD<WriteStatus> writeStatus = ingest(executionContext.getHoodieTestSuiteWriter(), commitTime);
executionContext.getHoodieTestSuiteWriter().commit(writeStatus, commitTime);
executionContext.getHoodieTestSuiteWriter().commit(writeStatus, this.deltaWriteStatsRDD, commitTime);
this.result = writeStatus;
}
}

protected void generate(DeltaGenerator deltaGenerator) throws Exception {
if (!config.isDisableGenerate()) {
log.info("Generating input data for node {}", this.getName());
deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)).count();
this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config));
this.deltaWriteStatsRDD.count();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;

/**
* A rollback node in the DAG helps to perform rollback operations.
Expand All @@ -49,7 +54,12 @@ public void execute(ExecutionContext executionContext) throws Exception {
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
log.info("Rolling back last instant {}", lastInstant.get());
log.info("Cleaning up generated data for the instant being rolled back {}", lastInstant.get());
ValidationUtils.checkArgument(executionContext.getWriterContext().getProps().getOrDefault(DFSPathSelector.Config.SOURCE_INPUT_SELECTOR,
DFSPathSelector.class.getName()).toString().equalsIgnoreCase(DFSTestSuitePathSelector.class.getName()), "Test Suite only supports DFSTestSuitePathSelector");
executionContext.getHoodieTestSuiteWriter().getWriteClient(this).rollback(lastInstant.get().getTimestamp());
metaClient.getFs().delete(new Path(executionContext.getWriterContext().getCfg().inputBasePath,
executionContext.getWriterContext().getHoodieTestSuiteWriter().getLastCheckpoint().orElse("")), true);
this.result = lastInstant;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@

package org.apache.hudi.integ.testsuite.dag.scheduler;

import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -28,14 +37,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to
Expand Down Expand Up @@ -113,12 +114,16 @@ private void executeNode(DagNode node) {
throw new RuntimeException("DagNode already completed! Cannot re-execute");
}
try {
log.info("executing node: " + node.getName() + " of type: " + node.getClass());
node.execute(executionContext);
int repeatCount = node.getConfig().getRepeatCount();
while (repeatCount > 0) {
log.warn("executing node: " + node.getName() + " of type: " + node.getClass());
node.execute(executionContext);
log.info("Finished executing {}", node.getName());
repeatCount--;
}
node.setCompleted(true);
log.info("Finished executing {}", node.getName());
} catch (Exception e) {
log.error("Exception executing node");
log.error("Exception executing node", e);
throw new HoodieException(e);
}
}
Expand Down
Loading