diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml index 76172203866b0..8b82415982f90 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml @@ -74,10 +74,12 @@ dag_content: validate_once_every_itr : 5 validate_hive: true delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: second_hive_sync last_validate: config: execute_itr_count: 50 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml index dc1e99a431209..031664cd15c99 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml @@ -62,10 +62,12 @@ dag_content: validate_once_every_itr : 5 validate_hive: false delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 30 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml index eca4eac1c710a..c23775b2ce546 100644 --- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml @@ -62,10 +62,12 @@ dag_content: validate_once_every_itr : 5 validate_hive: false delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 50 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml index 81c21a7be67c8..2fc68596d84a4 100644 --- a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml +++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml @@ -64,10 +64,12 @@ dag_content: config: validate_hive: false delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 20 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml index a2d85a7a4d0f5..db7edb8f8f28c 100644 --- a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml +++ b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml @@ -65,10 +65,12 @@ dag_content: validate_once_every_itr : 5 validate_hive: false delete_input_data: false + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 20 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml index 1c2f44b060036..102807ec435be 100644 --- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml +++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml @@ -62,10 +62,12 @@ dag_content: validate_once_every_itr : 5 validate_hive: false delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 50 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml index 3c47729e66470..de43fd339deb5 100644 --- a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml +++ b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml @@ -46,10 +46,12 @@ dag_content: validate_once_every_itr : 3 validate_hive: false delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 6 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/docker/demo/config/test-suite/spark-long-running.yaml b/docker/demo/config/test-suite/spark-long-running.yaml index 00fea43f4578e..2ffef557815c7 100644 --- a/docker/demo/config/test-suite/spark-long-running.yaml +++ b/docker/demo/config/test-suite/spark-long-running.yaml @@ -46,10 +46,12 @@ dag_content: validate_once_every_itr : 5 validate_hive: false delete_input_data: true + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateDatasetNode deps: first_delete last_validate: config: execute_itr_count: 30 + max_wait_time_for_deltastreamer_catch_up_ms: 600000 type: ValidateAsyncOperations deps: second_validate diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index 5d26d03a20a89..687ad9a2a90d2 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -593,6 +593,56 @@ Sample spark-submit command to test one delta streamer and a spark data source w --use-hudi-data-to-generate-updates ``` +======= +### Testing async table services +We can test async table services with deltastreamer using below command. 3 additional arguments are required to test async +table services comapared to previous command. + +```shell +--continuous \ +--test-continuous-mode \ +--min-sync-interval-seconds 20 +``` + +Here is the full command: +```shell +./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \ + --conf spark.task.cpus=1 --conf spark.executor.cores=1 \ +--conf spark.task.maxFailures=100 \ +--conf spark.memory.fraction=0.4 \ +--conf spark.rdd.compress=true \ +--conf spark.kryoserializer.buffer.max=2000m \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.memory.storageFraction=0.1 \ +--conf spark.shuffle.service.enabled=true \ +--conf spark.sql.hive.convertMetastoreParquet=false \ +--conf spark.driver.maxResultSize=12g \ +--conf spark.executor.heartbeatInterval=120s \ +--conf spark.network.timeout=600s \ +--conf spark.yarn.max.executor.failures=10 \ +--conf spark.sql.catalogImplementation=hive \ +--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob /hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \ +--source-ordering-field test_suite_source_ordering_field \ +--use-deltastreamer \ +--target-base-path /tmp/hudi/output \ +--input-base-path /tmp/hudi/input \ +--target-table table1 \ +-props file:/tmp/test.properties \ +--schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ +--source-class org.apache.hudi.utilities.sources.AvroDFSSource \ +--input-file-size 125829120 \ +--workload-yaml-path file:/tmp/simple-deltastreamer.yaml \ +--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ +--table-type COPY_ON_WRITE \ +--compact-scheduling-minshare 1 \ +--clean-input \ +--clean-output \ +--continuous \ +--test-continuous-mode \ +--min-sync-interval-seconds 20 +``` + +We can use any yaml and properties file w/ above spark-submit command to test deltastreamer w/ async table services. ## Automated tests for N no of yamls in Local Docker environment diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java new file mode 100644 index 0000000000000..1bf69aaf836cc --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieContinousTestSuiteWriter.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; +import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +/** + * Test suite Writer that assists in testing async table operations with Deltastreamer continuous mode. + * + * Sample command + * ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 \ + * --conf spark.task.cpus=1 --conf spark.executor.cores=1 \ + * --conf spark.task.maxFailures=100 \ + * --conf spark.memory.fraction=0.4 \ + * --conf spark.rdd.compress=true \ + * --conf spark.kryoserializer.buffer.max=2000m \ + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + * --conf spark.memory.storageFraction=0.1 \ + * --conf spark.shuffle.service.enabled=true \ + * --conf spark.sql.hive.convertMetastoreParquet=false \ + * --conf spark.driver.maxResultSize=12g \ + * --conf spark.executor.heartbeatInterval=120s \ + * --conf spark.network.timeout=600s \ + * --conf spark.yarn.max.executor.failures=10 \ + * --conf spark.sql.catalogImplementation=hive \ + * --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob /hudi-integ-test-bundle-0.12.0-SNAPSHOT.jar \ + * --source-ordering-field test_suite_source_ordering_field \ + * --use-deltastreamer \ + * --target-base-path /tmp/hudi/output \ + * --input-base-path /tmp/hudi/input \ + * --target-table table1 \ + * -props file:/tmp/test.properties \ + * --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ + * --source-class org.apache.hudi.utilities.sources.AvroDFSSource \ + * --input-file-size 125829120 \ + * --workload-yaml-path file:/tmp/simple-deltastreamer.yaml \ + * --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ + * --table-type COPY_ON_WRITE \ + * --compact-scheduling-minshare 1 \ + * --clean-input \ + * --clean-output \ + * --continuous \ + * --test-continuous-mode \ + * --min-sync-interval-seconds 20 + */ +public class HoodieContinousTestSuiteWriter extends HoodieTestSuiteWriter { + + private static Logger log = LoggerFactory.getLogger(HoodieContinousTestSuiteWriter.class); + + public HoodieContinousTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception { + super(jsc, props, cfg, schema); + } + + @Override + public void shutdownResources() { + log.info("Shutting down deltastreamer gracefully "); + this.deltaStreamerWrapper.shutdownGracefully(); + } + + @Override + public RDD getNextBatch() throws Exception { + return null; + } + + @Override + public Pair>> fetchSource() throws Exception { + return null; + } + + @Override + public Option startCommit() { + return null; + } + + public JavaRDD upsert(Option instantTime) throws Exception { + return null; + } + + @Override + public JavaRDD insert(Option instantTime) throws Exception { + return null; + } + + @Override + public JavaRDD insertOverwrite(Option instantTime) throws Exception { + return null; + } + + @Override + public JavaRDD insertOverwriteTable(Option instantTime) throws Exception { + return null; + } + + @Override + public JavaRDD bulkInsert(Option instantTime) throws Exception { + return null; + } + + @Override + public JavaRDD compact(Option instantTime) throws Exception { + return null; + } + + @Override + public void inlineClustering() { + } + + @Override + public Option scheduleCompaction(Option> previousCommitExtraMetadata) throws + Exception { + return Option.empty(); + } + + @Override + public void commit(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) { + } + + @Override + public void commitCompaction(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) throws IOException { + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java new file mode 100644 index 0000000000000..63805e71a5645 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite; + +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.HoodieReadClient; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodiePayloadConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.CompactHelpers; +import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.rdd.RDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform write operations into the target hudi dataset. Current supported writers are + * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}. + */ +public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter { + + private static Logger log = LoggerFactory.getLogger(HoodieInlineTestSuiteWriter.class); + + private static final String GENERATED_DATA_PATH = "generated.data.path"; + + public HoodieInlineTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws Exception { + super(jsc, props, cfg, schema); + } + + public void shutdownResources() { + // no-op for non continuous mode test suite writer. + } + + public RDD getNextBatch() throws Exception { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + JavaRDD inputRDD = nextBatch.getRight().getRight(); + return inputRDD.map(r -> (GenericRecord) ((HoodieAvroRecord) r).getData() + .getInsertValue(new Schema.Parser().parse(schema)).get()).rdd(); + } + + public Pair>> fetchSource() throws Exception { + return this.deltaStreamerWrapper.fetchSource(); + } + + public Option startCommit() { + if (cfg.useDeltaStreamer) { + return Option.of(HoodieActiveTimeline.createNewInstantTime()); + } else { + return Option.of(writeClient.startCommit()); + } + } + + public JavaRDD upsert(Option instantTime) throws Exception { + if (cfg.useDeltaStreamer) { + return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.upsert(nextBatch.getRight().getRight(), instantTime.get()); + } + } + + public JavaRDD insert(Option instantTime) throws Exception { + if (cfg.useDeltaStreamer) { + return deltaStreamerWrapper.insert(); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.insert(nextBatch.getRight().getRight(), instantTime.get()); + } + } + + public JavaRDD insertOverwrite(Option instantTime) throws Exception { + if (cfg.useDeltaStreamer) { + return deltaStreamerWrapper.insertOverwrite(); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.insertOverwrite(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses(); + } + } + + public JavaRDD insertOverwriteTable(Option instantTime) throws Exception { + if (cfg.useDeltaStreamer) { + return deltaStreamerWrapper.insertOverwriteTable(); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses(); + } + } + + public JavaRDD bulkInsert(Option instantTime) throws Exception { + if (cfg.useDeltaStreamer) { + return deltaStreamerWrapper.bulkInsert(); + } else { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + return writeClient.bulkInsert(nextBatch.getRight().getRight(), instantTime.get()); + } + } + + public JavaRDD compact(Option instantTime) throws Exception { + if (cfg.useDeltaStreamer) { + return deltaStreamerWrapper.compact(); + } else { + if (!instantTime.isPresent()) { + Option> compactionPlanPair = Option + .fromJavaOptional(hoodieReadClient.getPendingCompactions() + .stream().findFirst()); + if (compactionPlanPair.isPresent()) { + instantTime = Option.of(compactionPlanPair.get().getLeft()); + } + } + if (instantTime.isPresent()) { + HoodieWriteMetadata> compactionMetadata = writeClient.compact(instantTime.get()); + return compactionMetadata.getWriteStatuses(); + } else { + return null; + } + } + } + + public void inlineClustering() { + if (!cfg.useDeltaStreamer) { + Option clusteringInstantOpt = writeClient.scheduleClustering(Option.empty()); + clusteringInstantOpt.ifPresent(clusteringInstant -> { + // inline cluster should auto commit as the user is never given control + log.warn("Clustering instant :: " + clusteringInstant); + writeClient.cluster(clusteringInstant, true); + }); + } else { + // TODO: fix clustering to be done async https://issues.apache.org/jira/browse/HUDI-1590 + throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer"); + } + } + + public Option scheduleCompaction(Option> previousCommitExtraMetadata) throws + Exception { + if (cfg.useDeltaStreamer) { + deltaStreamerWrapper.scheduleCompact(); + return Option.empty(); + } else { + return writeClient.scheduleCompaction(previousCommitExtraMetadata); + } + } + + public void commit(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) { + if (!cfg.useDeltaStreamer) { + Map extraMetadata = new HashMap<>(); + /** Store the checkpoint in the commit metadata just like + * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ + extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); + if (generatedDataStats != null && generatedDataStats.count() > 1) { + // Just stores the path where this batch of data is generated to + extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); + } + writeClient.commit(instantTime.get(), records, Option.of(extraMetadata)); + } + } + + public void commitCompaction(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) throws IOException { + if (!cfg.useDeltaStreamer) { + Map extraMetadata = new HashMap<>(); + /** Store the checkpoint in the commit metadata just like + * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ + extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); + if (generatedDataStats != null && generatedDataStats.count() > 1) { + // Just stores the path where this batch of data is generated to + extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); + } + HoodieSparkTable table = HoodieSparkTable.create(writeClient.getConfig(), writeClient.getEngineContext()); + HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), HoodieJavaRDD.of(records), writeClient.getConfig().getSchema()); + writeClient.commitCompaction(instantTime.get(), metadata, Option.of(extraMetadata)); + } + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 2d9f841ae351c..5e2f9812ba529 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -190,11 +190,12 @@ public WorkflowDag createWorkflowDag() throws IOException { } public void runTestSuite() { + WriterContext writerContext = null; try { WorkflowDag workflowDag = createWorkflowDag(); log.info("Workflow Dag => " + DagUtils.convertDagToYaml(workflowDag)); long startTime = System.currentTimeMillis(); - WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession); + writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession); writerContext.initContext(jsc); startOtherServicesIfNeeded(writerContext); if (this.cfg.saferSchemaEvolution) { @@ -217,6 +218,9 @@ public void runTestSuite() { log.error("Failed to run Test Suite ", e); throw new HoodieException("Failed to run Test Suite ", e); } finally { + if (writerContext != null) { + writerContext.shutdownResources(); + } if (stopJsc) { stopQuietly(); } @@ -310,5 +314,8 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config { @Parameter(names = {"--use-hudi-data-to-generate-updates"}, description = "Use data from hudi to generate updates for new batches ") public Boolean useHudiToGenerateUpdates = false; + + @Parameter(names = {"--test-continuous-mode"}, description = "Tests continuous mode in deltastreamer.") + public Boolean testContinousMode = false; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index a98c7f2aec3f0..7a9e122e86c15 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -18,37 +18,25 @@ package org.apache.hudi.integ.testsuite; -import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieAvroRecord; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.index.HoodieIndex; -import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.dag.nodes.CleanNode; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode; import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode; import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; @@ -57,38 +45,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; -/** - * A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform write operations into the target hudi dataset. Current supported writers are - * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}. - */ -public class HoodieTestSuiteWriter implements Serializable { +public abstract class HoodieTestSuiteWriter implements Serializable { private static Logger log = LoggerFactory.getLogger(HoodieTestSuiteWriter.class); - private HoodieDeltaStreamerWrapper deltaStreamerWrapper; - private HoodieWriteConfig writeConfig; - private SparkRDDWriteClient writeClient; - protected HoodieTestSuiteConfig cfg; - private Option lastCheckpoint; - private HoodieReadClient hoodieReadClient; - private Properties props; - private String schema; - private transient Configuration configuration; - private transient JavaSparkContext sparkContext; - private static Set VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>( + protected HoodieDeltaStreamerWrapper deltaStreamerWrapper; + protected HoodieWriteConfig writeConfig; + protected SparkRDDWriteClient writeClient; + protected HoodieTestSuiteJob.HoodieTestSuiteConfig cfg; + protected Option lastCheckpoint; + protected HoodieReadClient hoodieReadClient; + protected Properties props; + protected String schema; + protected transient Configuration configuration; + protected transient JavaSparkContext sparkContext; + protected static Set VALID_DAG_NODES_TO_ALLOW_WRITE_CLIENT_IN_DELTASTREAMER_MODE = new HashSet<>( Arrays.asList(RollbackNode.class.getName(), CleanNode.class.getName(), ScheduleCompactNode.class.getName())); - private static final String GENERATED_DATA_PATH = "generated.data.path"; - public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteConfig cfg, String schema) throws Exception { + public HoodieTestSuiteWriter(JavaSparkContext jsc, Properties props, HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, String schema) throws Exception { // We ensure that only 1 instance of HoodieWriteClient is instantiated for a HoodieTestSuiteWriter // This does not instantiate a HoodieWriteClient until a // {@link HoodieDeltaStreamer#commit(HoodieWriteClient, JavaRDD, Option)} is invoked. @@ -110,7 +91,7 @@ public HoodieWriteConfig getWriteConfig() { return this.writeConfig; } - private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, Properties props, String schema) { + private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties props, String schema) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) .withAutoCommit(false) @@ -131,159 +112,35 @@ private boolean allowWriteClientAccess(DagNode dagNode) { return false; } - public RDD getNextBatch() throws Exception { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - JavaRDD inputRDD = nextBatch.getRight().getRight(); - return inputRDD.map(r -> (GenericRecord) ((HoodieAvroRecord) r).getData() - .getInsertValue(new Schema.Parser().parse(schema)).get()).rdd(); - } + public abstract void shutdownResources(); - public void getNextBatchForDeletes() throws Exception { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - JavaRDD inputRDD = nextBatch.getRight().getRight(); - inputRDD.collect(); - } + public abstract RDD getNextBatch() throws Exception; - public Pair>> fetchSource() throws Exception { - return this.deltaStreamerWrapper.fetchSource(); - } + public abstract Pair>> fetchSource() throws Exception ; - public Option startCommit() { - if (cfg.useDeltaStreamer) { - return Option.of(HoodieActiveTimeline.createNewInstantTime()); - } else { - return Option.of(writeClient.startCommit()); - } - } + public abstract Option startCommit(); - public JavaRDD upsert(Option instantTime) throws Exception { - if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.upsert(WriteOperationType.UPSERT); - } else { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - return writeClient.upsert(nextBatch.getRight().getRight(), instantTime.get()); - } - } + public abstract JavaRDD upsert(Option instantTime) throws Exception; - public JavaRDD insert(Option instantTime) throws Exception { - if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.insert(); - } else { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - return writeClient.insert(nextBatch.getRight().getRight(), instantTime.get()); - } - } + public abstract JavaRDD insert(Option instantTime) throws Exception; - public JavaRDD insertOverwrite(Option instantTime) throws Exception { - if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.insertOverwrite(); - } else { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - return writeClient.insertOverwrite(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses(); - } - } + public abstract JavaRDD insertOverwrite(Option instantTime) throws Exception; - public JavaRDD insertOverwriteTable(Option instantTime) throws Exception { - if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.insertOverwriteTable(); - } else { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - return writeClient.insertOverwriteTable(nextBatch.getRight().getRight(), instantTime.get()).getWriteStatuses(); - } - } + public abstract JavaRDD insertOverwriteTable(Option instantTime) throws Exception; - public JavaRDD bulkInsert(Option instantTime) throws Exception { - if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.bulkInsert(); - } else { - Pair>> nextBatch = fetchSource(); - lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); - return writeClient.bulkInsert(nextBatch.getRight().getRight(), instantTime.get()); - } - } + public abstract JavaRDD bulkInsert(Option instantTime) throws Exception; - public JavaRDD compact(Option instantTime) throws Exception { - if (cfg.useDeltaStreamer) { - return deltaStreamerWrapper.compact(); - } else { - if (!instantTime.isPresent()) { - Option> compactionPlanPair = Option - .fromJavaOptional(hoodieReadClient.getPendingCompactions() - .stream().findFirst()); - if (compactionPlanPair.isPresent()) { - instantTime = Option.of(compactionPlanPair.get().getLeft()); - } - } - if (instantTime.isPresent()) { - HoodieWriteMetadata> compactionMetadata = writeClient.compact(instantTime.get()); - return compactionMetadata.getWriteStatuses(); - } else { - return null; - } - } - } + public abstract JavaRDD compact(Option instantTime) throws Exception; - public void inlineClustering() { - if (!cfg.useDeltaStreamer) { - Option clusteringInstantOpt = writeClient.scheduleClustering(Option.empty()); - clusteringInstantOpt.ifPresent(clusteringInstant -> { - // inline cluster should auto commit as the user is never given control - log.warn("Clustering instant :: " + clusteringInstant); - writeClient.cluster(clusteringInstant, true); - }); - } else { - // TODO: fix clustering to be done async https://issues.apache.org/jira/browse/HUDI-1590 - throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer"); - } - } + public abstract void inlineClustering() throws Exception ; - public Option scheduleCompaction(Option> previousCommitExtraMetadata) throws - Exception { - if (cfg.useDeltaStreamer) { - deltaStreamerWrapper.scheduleCompact(); - return Option.empty(); - } else { - return writeClient.scheduleCompaction(previousCommitExtraMetadata); - } - } + public abstract Option scheduleCompaction(Option> previousCommitExtraMetadata) throws Exception; - public void commit(JavaRDD records, JavaRDD generatedDataStats, - Option instantTime) { - if (!cfg.useDeltaStreamer) { - Map extraMetadata = new HashMap<>(); - /** Store the checkpoint in the commit metadata just like - * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ - extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); - if (generatedDataStats != null && generatedDataStats.count() > 1) { - // Just stores the path where this batch of data is generated to - extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); - } - writeClient.commit(instantTime.get(), records, Option.of(extraMetadata)); - } - } + public abstract void commit(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime); - public void commitCompaction(JavaRDD records, JavaRDD generatedDataStats, - Option instantTime) throws IOException { - if (!cfg.useDeltaStreamer) { - Map extraMetadata = new HashMap<>(); - /** Store the checkpoint in the commit metadata just like - * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ - extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); - if (generatedDataStats != null && generatedDataStats.count() > 1) { - // Just stores the path where this batch of data is generated to - extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); - } - HoodieSparkTable table = HoodieSparkTable.create(writeClient.getConfig(), writeClient.getEngineContext()); - HoodieCommitMetadata metadata = CompactHelpers.getInstance().createCompactionMetadata(table, instantTime.get(), HoodieJavaRDD.of(records), writeClient.getConfig().getSchema()); - writeClient.commitCompaction(instantTime.get(), metadata, Option.of(extraMetadata)); - } - } + public abstract void commitCompaction(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) throws Exception; public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException { if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) { @@ -301,7 +158,7 @@ public HoodieDeltaStreamerWrapper getDeltaStreamerWrapper() { return deltaStreamerWrapper; } - public HoodieTestSuiteConfig getCfg() { + public HoodieTestSuiteJob.HoodieTestSuiteConfig getCfg() { return cfg; } @@ -325,3 +182,4 @@ public String getSchema() { return schema; } } + diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 1578e86be47b6..a781d19cb78c5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -103,6 +103,7 @@ public static class Config { private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest"; private static String PARTITIONS_TO_DELETE = "partitions_to_delete"; private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE = "input_partitions_to_skip_validate"; + private static String MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS = "max_wait_time_for_deltastreamer_catch_up_ms"; // Spark SQL Create Table private static String TABLE_TYPE = "table_type"; @@ -253,6 +254,10 @@ public boolean enableRowWriting() { return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, false).toString()); } + public long maxWaitTimeForDeltastreamerToCatchupMs() { + return Long.valueOf(configsMap.getOrDefault(MAX_WAIT_TIME_FOR_DELTASTREAMER_TO_CATCH_UP_MS, 5 * 60 * 1000).toString()); + } + public Option getTableType() { return !configsMap.containsKey(TABLE_TYPE) ? Option.empty() : Option.of(configsMap.get(TABLE_TYPE).toString()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index d31ef195ecdd5..83b5751c8646b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -21,7 +21,9 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.integ.testsuite.HoodieContinousTestSuiteWriter; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; +import org.apache.hudi.integ.testsuite.HoodieInlineTestSuiteWriter; import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; @@ -37,6 +39,8 @@ import org.apache.spark.sql.SparkSession; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * WriterContext wraps the delta writer/data generator related configuration needed to init/reinit. @@ -53,6 +57,7 @@ public class WriterContext { private BuiltinKeyGenerator keyGenerator; private transient SparkSession sparkSession; private transient JavaSparkContext jsc; + private ExecutorService executorService; public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteConfig cfg, BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) { @@ -67,7 +72,8 @@ public void initContext(JavaSparkContext jsc) throws HoodieException { try { this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jsc); String schemaStr = schemaProvider.getSourceSchema().toString(); - this.hoodieTestSuiteWriter = new HoodieTestSuiteWriter(jsc, props, cfg, schemaStr); + this.hoodieTestSuiteWriter = (cfg.testContinousMode && cfg.useDeltaStreamer) ? new HoodieContinousTestSuiteWriter(jsc, props, cfg, schemaStr) + : new HoodieInlineTestSuiteWriter(jsc, props, cfg, schemaStr); int inputParallelism = cfg.inputParallelism > 0 ? cfg.inputParallelism : jsc.defaultParallelism(); this.deltaGenerator = new DeltaGenerator( new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName), @@ -75,6 +81,10 @@ public void initContext(JavaSparkContext jsc) throws HoodieException { schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput, cfg.useHudiToGenerateUpdates), jsc, sparkSession, schemaStr, keyGenerator); log.info(String.format("Initialized writerContext with: %s", schemaStr)); + if (cfg.testContinousMode) { + executorService = Executors.newFixedThreadPool(1); + executorService.execute(new TestSuiteWriterRunnable(hoodieTestSuiteWriter)); + } } catch (Exception e) { throw new HoodieException("Failed to reinitialize writerContext", e); } @@ -113,4 +123,35 @@ public String toString() { public SparkSession getSparkSession() { return sparkSession; } + + public void shutdownResources() { + this.hoodieTestSuiteWriter.shutdownResources(); + if (executorService != null) { + executorService.shutdownNow(); + } + } + + /** + * TestSuiteWriterRunnable to spin up a thread to execute deltastreamer with async table services. + */ + class TestSuiteWriterRunnable implements Runnable { + private HoodieTestSuiteWriter hoodieTestSuiteWriter; + + TestSuiteWriterRunnable(HoodieTestSuiteWriter hoodieTestSuiteWriter) { + this.hoodieTestSuiteWriter = hoodieTestSuiteWriter; + } + + @Override + public void run() { + try { + Thread.sleep(20000); + log.info("Starting continuous sync with deltastreamer "); + hoodieTestSuiteWriter.getDeltaStreamerWrapper().sync(); + log.info("Completed continuous sync with deltastreamer "); + } catch (Exception e) { + log.error("Deltastreamer failed in continuous mode " + e.getMessage()); + throw new HoodieException("Shutting down deltastreamer in continuous mode failed ", e); + } + } + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java index a0ebdc5754716..15c209e4752b8 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java @@ -20,10 +20,17 @@ package org.apache.hudi.integ.testsuite.dag.nodes; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.schema.SchemaUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,6 +47,9 @@ import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -47,6 +57,8 @@ import scala.collection.JavaConversions; import scala.collection.JavaConverters; +import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; + /** * This nodes validates contents from input path are in tact with Hudi. By default no configs are required for this node. But there is an * optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites. @@ -78,6 +90,12 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception int itrCountToExecute = config.getIterationCountToExecute(); if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) || (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) { + FileSystem fs = new Path(context.getHoodieTestSuiteWriter().getCfg().inputBasePath) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + if (context.getHoodieTestSuiteWriter().getCfg().testContinousMode) { + awaitUntilDeltaStreamerCaughtUp(context, context.getHoodieTestSuiteWriter().getCfg().targetBasePath, fs, + context.getHoodieTestSuiteWriter().getCfg().inputBasePath); + } SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); // todo: Fix partitioning schemes. For now, assumes data based partitioning. String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*"; @@ -85,8 +103,6 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception // listing batches to be validated String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; if (log.isDebugEnabled()) { - FileSystem fs = new Path(inputPathStr) - .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); log.info("fileStatuses length: " + fileStatuses.length); for (FileStatus fileStatus : fileStatuses) { @@ -145,8 +161,6 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception if (config.isDeleteInputData()) { // clean up input data for current group of writes. inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; - FileSystem fs = new Path(inputPathStr) - .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); for (FileStatus fileStatus : fileStatuses) { log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); @@ -157,6 +171,50 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception } } + private void awaitUntilDeltaStreamerCaughtUp(ExecutionContext context, String hudiTablePath, FileSystem fs, String inputPath) throws IOException, InterruptedException { + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())).setBasePath(hudiTablePath).build(); + HoodieTimeline commitTimeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + Option latestCheckpoint = getLatestCheckpoint(commitTimeline); + FileStatus[] subDirs = fs.listStatus(new Path(inputPath)); + List subDirList = Arrays.asList(subDirs); + subDirList.sort(Comparator.comparingLong(entry -> Long.parseLong(entry.getPath().getName()))); + String latestSubDir = subDirList.get(subDirList.size() -1).getPath().getName(); + log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + + (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none")); + long maxWaitTime = config.maxWaitTimeForDeltastreamerToCatchupMs(); + long waitedSoFar = 0; + while (!(latestCheckpoint.isPresent() && latestCheckpoint.get().equals(latestSubDir))) { + log.warn("Sleeping for 20 secs awaiting for deltastreamer to catch up with ingested data"); + Thread.sleep(20000); + meta.reloadActiveTimeline(); + commitTimeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + latestCheckpoint = getLatestCheckpoint(commitTimeline); + waitedSoFar += 20000; + if (waitedSoFar >= maxWaitTime) { + throw new AssertionError("DeltaStreamer has not caught up after 5 mins of wait time. Last known checkpoint " + + (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none") + ", expected checkpoint to have caugth up " + latestSubDir); + } + log.info("Latest sub directory in input path " + latestSubDir + ", latest checkpoint from deltastreamer " + + (latestCheckpoint.isPresent() ? latestCheckpoint.get() : "none")); + } + } + + private Option getLatestCheckpoint(HoodieTimeline timeline) { + return (Option) timeline.getReverseOrderedInstants().map(instant -> { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_KEY))) { + return Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY)); + } else { + return Option.empty(); + } + } catch (IOException e) { + throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e); + } + }).filter(Option::isPresent).findFirst().orElse(Option.empty()); + } + private Dataset getInputDf(ExecutionContext context, SparkSession session, String inputPath) { String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()); String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index c30be2a2a5d2c..20e12e9030854 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -216,15 +216,22 @@ public JavaRDD generateDeletes(Config config) throws IOException adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete()); adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsDelete()); } else { - deltaInputReader = - new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(), - schemaStr); - if (config.getFractionUpsertPerFile() > 0) { - adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), - config.getFractionUpsertPerFile()); + if (((DFSDeltaConfig) deltaOutputConfig).shouldUseHudiToGenerateUpdates()) { + deltaInputReader = + new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(), + schemaStr); + if (config.getFractionUpsertPerFile() > 0) { + adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), + config.getFractionUpsertPerFile()); + } else { + adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), config + .getNumRecordsDelete()); + } } else { - adjustedRDD = deltaInputReader.read(config.getNumDeletePartitions(), config.getNumUpsertFiles(), config - .getNumRecordsDelete()); + deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr, + ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty()); + adjustedRDD = deltaInputReader.read(config.getNumRecordsDelete()); + adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsDelete()); } }