diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/complex-dag-cow.yaml index a10026c0b948b..5fa8596830f38 100644 --- a/docker/demo/config/test-suite/complex-dag-cow.yaml +++ b/docker/demo/config/test-suite/complex-dag-cow.yaml @@ -13,122 +13,56 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 1000 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 10000 - deps: first_insert - type: InsertNode -third_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 300 - deps: second_insert - type: InsertNode -first_rollback: - config: - deps: third_insert - type: RollbackNode -first_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_rollback -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 11300 - type: HiveQueryNode - deps: first_hive_sync -second_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_hive_query -second_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 11600 - type: HiveQueryNode - deps: second_upsert -fourth_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 1000 - deps: second_hive_query - type: InsertNode -third_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 12600 - type: HiveQueryNode - deps: fourth_insert -first_delete: - config: - record_size: 70000 - num_partitions_delete: 1 - num_records_delete: 200 - deps: third_hive_query - type: DeleteNode -fourth_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_delete -fourth_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 12400 - type: HiveQueryNode - deps: fourth_hive_sync \ No newline at end of file +dag_name: cow-long-running-example.yaml +dag_rounds: 2 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_validate: + config: + type: ValidateDatasetNode + deps: third_insert + first_upsert: + config: + record_size: 100 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 2000 + type: DeleteNode + deps: first_upsert + second_validate: + config: + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml b/docker/demo/config/test-suite/complex-dag-mor.yaml index 2652b03070a7e..505e5e2945f9b 100644 --- a/docker/demo/config/test-suite/complex-dag-mor.yaml +++ b/docker/demo/config/test-suite/complex-dag-mor.yaml @@ -13,103 +13,107 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 100 - deps: first_insert - type: InsertNode -third_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 300 - deps: second_insert - type: InsertNode -first_rollback: - config: - deps: third_insert - type: RollbackNode -first_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_rollback -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveQueryNode - deps: first_hive_sync -second_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_hive_query -second_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 1100 - type: HiveQueryNode - deps: second_upsert -first_schedule_compact: - config: - type: ScheduleCompactNode - deps: second_hive_query -third_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_schedule_compact -first_compact: - config: - type: CompactNode - deps: first_schedule_compact -third_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 1400 - type: HiveQueryNode - deps: first_compact +dag_name: complex-dag-mor.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 5 + num_records_insert: 100 + type: InsertNode + deps: none + second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 5 + num_records_insert: 100 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 2 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_rollback: + config: + deps: third_insert + type: RollbackNode + first_upsert: + config: + record_size: 70000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 10 + type: UpsertNode + deps: first_rollback + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_upsert + first_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveQueryNode + deps: first_hive_sync + second_upsert: + config: + record_size: 70000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 10 + type: UpsertNode + deps: first_hive_query + second_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + hive_queries: + query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" + result1: 0 + query2: "select count(*) from testdb.table1" + result2: 1100 + type: HiveQueryNode + deps: second_upsert + first_schedule_compact: + config: + type: ScheduleCompactNode + deps: second_hive_query + third_upsert: + config: + record_size: 70000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 10 + type: UpsertNode + deps: first_schedule_compact + first_compact: + config: + type: CompactNode + deps: first_schedule_compact + third_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + hive_queries: + query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" + result1: 0 + query2: "select count(*) from testdb.table1" + result2: 1400 + type: HiveQueryNode + deps: first_compact diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml b/docker/demo/config/test-suite/cow-long-running-example.yaml new file mode 100644 index 0000000000000..b7026f2dd653a --- /dev/null +++ b/docker/demo/config/test-suite/cow-long-running-example.yaml @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-long-running-example.yaml +dag_rounds: 20 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_validate: + config: + type: ValidateDatasetNode + deps: third_insert + first_upsert: + config: + record_size: 100 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 2000 + type: DeleteNode + deps: first_upsert + second_validate: + config: + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/test-source.properties b/docker/demo/config/test-suite/test-source.properties deleted file mode 100644 index cc18a39d57086..0000000000000 --- a/docker/demo/config/test-suite/test-source.properties +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# write configs -hoodie.datasource.write.recordkey.field=_row_key -hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator -hoodie.datasource.write.partitionpath.field=timestamp - - -# deltastreamer configs -hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd -hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP -hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-bench/input -hoodie.deltastreamer.schemaprovider.source.schema.file=/var/hoodie/ws/docker/demo/config/test-suite/source.avsc -hoodie.deltastreamer.schemaprovider.target.schema.file=/var/hoodie/ws/docker/demo/config/bench/source.avsc - -#hive sync -hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ -hoodie.datasource.hive_sync.database=testdb -hoodie.datasource.hive_sync.table=table1 -hoodie.datasource.hive_sync.use_jdbc=false -hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor -hoodie.datasource.hive_sync.assume_date_partitioning=true -hoodie.datasource.hive_sync.use_pre_apache_input_format=true diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties index a7fd3986a730b..0aa0f45c05ed5 100644 --- a/docker/demo/config/test-suite/test.properties +++ b/docker/demo/config/test-suite/test.properties @@ -1,3 +1,4 @@ + hoodie.insert.shuffle.parallelism=100 hoodie.upsert.shuffle.parallelism=100 hoodie.bulkinsert.shuffle.parallelism=100 @@ -8,6 +9,13 @@ hoodie.deltastreamer.source.test.max_unique_records=100000000 hoodie.embed.timeline.server=false hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index a6cdd08478e1a..ff64ed1242fee 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -142,7 +142,9 @@ Start the Hudi Docker demo: docker/setup_demo.sh ``` -NOTE: We need to make a couple of environment changes for Hive 2.x support. This will be fixed once Hudi moves to Spark 3.x +NOTE: We need to make a couple of environment changes for Hive 2.x support. This will be fixed once Hudi moves to Spark 3.x. +Execute below if you are using Hudi query node in your dag. If not, below section is not required. +Also, for longer running tests, go to next section. ``` docker exec -it adhoc-2 bash @@ -214,7 +216,7 @@ spark-submit \ --conf spark.sql.catalogImplementation=hive \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ /opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \ ---source-ordering-field timestamp \ +--source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ --input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ @@ -253,7 +255,7 @@ spark-submit \ --conf spark.sql.catalogImplementation=hive \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ /opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \ ---source-ordering-field timestamp \ +--source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ --input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ @@ -267,3 +269,182 @@ spark-submit \ --table-type MERGE_ON_READ \ --compact-scheduling-minshare 1 ``` + +For long running test suite, validation has to be done differently. Idea is to run same dag in a repeated manner. +Hence "ValidateDatasetNode" is introduced which will read entire input data and compare it with hudi contents both via +spark datasource and hive table via spark sql engine. + +If you have "ValidateDatasetNode" in your dag, do not replace hive jars as instructed above. Spark sql engine does not +go well w/ hive2* jars. So, after running docker setup, just copy test.properties and your dag of interest and you are +good to go ahead. + +For repeated runs, two additional configs need to be set. "dag_rounds" and "dag_intermittent_delay_mins". +This means that your dag will be repeated for N times w/ a delay of Y mins between each round. + +Also, ValidateDatasetNode can be configured in two ways. Either with "delete_input_data: true" set or not set. +When "delete_input_data" is set for ValidateDatasetNode, once validation is complete, entire input data will be deleted. +So, suggestion is to use this ValidateDatasetNode as the last node in the dag with "delete_input_data". +Example dag: +``` + Insert + Upsert + ValidateDatasetNode with delete_input_data = true +``` + +If above dag is run with "dag_rounds" = 10 and "dag_intermittent_delay_mins" = 10, then this dag will run for 10 times +with 10 mins delay between every run. At the end of every run, records written as part of this round will be validated. +At the end of each validation, all contents of input are deleted. +For eg: incase of above dag, +``` +Round1: + insert => inputPath/batch1 + upsert -> inputPath/batch2 + Validate with delete_input_data = true + Validates contents from batch1 and batch2 are in hudi and ensures Row equality + Since "delete_input_data" is set, deletes contents from batch1 and batch2. +Round2: + insert => inputPath/batch3 + upsert -> inputPath/batch4 + Validate with delete_input_data = true + Validates contents from batch3 and batch4 are in hudi and ensures Row equality + Since "delete_input_data" is set, deletes contents from batch3 and batch4. +Round3: + insert => inputPath/batch5 + upsert -> inputPath/batch6 + Validate with delete_input_data = true + Validates contents from batch5 and batch6 are in hudi and ensures Row equality + Since "delete_input_data" is set, deletes contents from batch5 and batch6. +. +. +``` +If you wish to do a cumulative validation, do not set delete_input_data in ValidateDatasetNode. But remember that this +may not scale beyond certain point since input data as well as hudi content's keeps occupying the disk and grows for +every cycle. + +Lets see an example where you don't set "delete_input_data" as part of Validation. +``` +Round1: + insert => inputPath/batch1 + upsert -> inputPath/batch2 + Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality +Round2: + insert => inputPath/batch3 + upsert -> inputPath/batch4 + Validate: validates contents from batch1 to batch4 are in hudi and ensures Row equality +Round3: + insert => inputPath/batch5 + upsert -> inputPath/batch6 + Validate: validates contents from batch1 and batch6 are in hudi and ensures Row equality +. +. +``` + +You could also have validations in the middle of your dag and not set the "delete_input_data". But set it only in the +last node in the dag. +``` +Round1: + insert => inputPath/batch1 + upsert -> inputPath/batch2 + Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality + insert => inputPath/batch3 + upsert -> inputPath/batch4 + Validate with delete_input_data = true + Validates contents from batch1 to batch4 are in hudi and ensures Row equality + since "delete_input_data" is set to true, this node deletes contents from batch1 and batch4. +Round2: + insert => inputPath/batch5 + upsert -> inputPath/batch6 + Validate: validates contents from batch5 and batch6 are in hudi and ensures Row equality + insert => inputPath/batch7 + upsert -> inputPath/batch8 + Validate: validates contents from batch5 to batch8 are in hudi and ensures Row equality + since "delete_input_data" is set to true, this node deletes contents from batch5 to batch8. +Round3: + insert => inputPath/batch9 + upsert -> inputPath/batch10 + Validate: validates contents from batch9 and batch10 are in hudi and ensures Row equality + insert => inputPath/batch11 + upsert -> inputPath/batch12 + Validate with delete_input_data = true + Validates contents from batch9 to batch12 are in hudi and ensures Row equality + Set "delete_input_data" to true. so this node deletes contents from batch9 to batch12. +. +. +``` +Above dag was just an example for illustration purposes. But you can make it complex as per your needs. +``` + Insert + Upsert + Delete + Validate w/o deleting + Insert + Rollback + Validate w/o deleting + Upsert + Validate w/ deletion +``` +With this dag, you can set the two additional configs "dag_rounds" and "dag_intermittent_delay_mins" and have a long +running test suite. + +``` +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + Insert + Upsert + Delete + Validate w/o deleting + Insert + Rollback + Validate w/o deleting + Upsert + Validate w/ deletion + +``` + +Sample COW command with repeated runs. +``` +spark-submit \ +--packages org.apache.spark:spark-avro_2.11:2.4.0 \ +--conf spark.task.cpus=1 \ +--conf spark.executor.cores=1 \ +--conf spark.task.maxFailures=100 \ +--conf spark.memory.fraction=0.4 \ +--conf spark.rdd.compress=true \ +--conf spark.kryoserializer.buffer.max=2000m \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.memory.storageFraction=0.1 \ +--conf spark.shuffle.service.enabled=true \ +--conf spark.sql.hive.convertMetastoreParquet=false \ +--conf spark.driver.maxResultSize=12g \ +--conf spark.executor.heartbeatInterval=120s \ +--conf spark.network.timeout=600s \ +--conf spark.yarn.max.executor.failures=10 \ +--conf spark.sql.catalogImplementation=hive \ +--conf spark.driver.extraClassPath=/var/demo/jars/* \ +--conf spark.executor.extraClassPath=/var/demo/jars/* \ +--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ +/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \ +--source-ordering-field test_suite_source_ordering_field \ +--use-deltastreamer \ +--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ +--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ +--target-table table1 \ +--props test.properties \ +--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ +--source-class org.apache.hudi.utilities.sources.AvroDFSSource \ +--input-file-size 125829120 \ +--workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/complex-dag-cow.yaml \ +--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ +--table-type COPY_ON_WRITE \ +--compact-scheduling-minshare 1 +``` + +A ready to use dag is available under docker/demo/config/test-suite/ that could give you an idea for long running +dags. +cow-per-round-mixed-validate.yaml + +As of now, "ValidateDatasetNode" uses spark data source and hive tables for comparison. Hence COW and real time view in +MOR can be tested. + + \ No newline at end of file diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index 6e5027b7f2547..91230cc1f951b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -24,13 +24,13 @@ import org.apache.hudi.utilities.deltastreamer.DeltaSync; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.schema.SchemaProvider; + import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** - * Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite. - * This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer} 2) - * Piggyback on the suite to test {@link HoodieDeltaStreamer} + * Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite. This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer} + * 2) Piggyback on the suite to test {@link HoodieDeltaStreamer} */ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 7b3324e4b569e..b5037e9959cb3 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -18,13 +18,6 @@ package org.apache.hudi.integ.testsuite; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; @@ -35,23 +28,30 @@ import org.apache.hudi.integ.testsuite.dag.DagUtils; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator; +import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; -import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; -import org.apache.hudi.utilities.schema.SchemaProvider; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** - * This is the entry point for running a Hudi Test Suite. Although this class has similarities with - * {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency on the changes in - * DeltaStreamer. + * This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency + * on the changes in DeltaStreamer. */ public class HoodieTestSuiteJob { @@ -133,10 +133,10 @@ public static void main(String[] args) throws Exception { public WorkflowDag createWorkflowDag() throws IOException { WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils - .loadClass((this.cfg).workloadDagGenerator)).build() - : DagUtils.convertYamlPathToDag( - FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true), - this.cfg.workloadYamlPath); + .loadClass((this.cfg).workloadDagGenerator)).build() + : DagUtils.convertYamlPathToDag( + FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true), + this.cfg.workloadYamlPath); return workflowDag; } @@ -147,7 +147,7 @@ public void runTestSuite() { long startTime = System.currentTimeMillis(); WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession); writerContext.initContext(jsc); - DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext); + DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc); dagScheduler.schedule(); log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime); } catch (Exception e) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 81f406be3cc32..329ef16bd50d2 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -87,6 +87,7 @@ public static class Config { private static String HIVE_LOCAL = "hive_local"; private static String REINIT_CONTEXT = "reinitialize_context"; private static String START_PARTITION = "start_partition"; + private static String DELETE_INPUT_DATA = "delete_input_data"; private Map configsMap; @@ -154,6 +155,10 @@ public boolean getReinitContext() { return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString()); } + public boolean isDeleteInputData() { + return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString()); + } + public Map getOtherConfigs() { if (configsMap == null) { return new HashMap<>(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java index 24520a3626cf3..1e8acf580d962 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java @@ -18,12 +18,14 @@ package org.apache.hudi.integ.testsuite.converter; -import java.util.List; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator; import org.apache.hudi.integ.testsuite.generator.UpdateGeneratorIterator; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; +import java.util.List; + /** * This converter creates an update {@link GenericRecord} from an existing {@link GenericRecord}. */ @@ -36,7 +38,7 @@ public class UpdateConverter implements Converter private final List recordKeyFields; private final int minPayloadSize; - public UpdateConverter(String schemaStr, int minPayloadSize, List partitionPathFields, + public UpdateConverter(String schemaStr, int minPayloadSize, List partitionPathFields, List recordKeyFields) { this.schemaStr = schemaStr; this.partitionPathFields = partitionPathFields; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java index d5358238d51a1..1211c0098d23a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java @@ -48,6 +48,15 @@ */ public class DagUtils { + public static final String DAG_NAME = "dag_name"; + public static final String DAG_ROUNDS = "dag_rounds"; + public static final String DAG_INTERMITTENT_DELAY_MINS = "dag_intermittent_delay_mins"; + public static final String DAG_CONTENT = "dag_content"; + + public static int DEFAULT_DAG_ROUNDS = 1; + public static int DEFAULT_INTERMITTENT_DELAY_MINS = 10; + public static String DEFAULT_DAG_NAME = "TestDagName"; + static final ObjectMapper MAPPER = new ObjectMapper(); /** @@ -62,15 +71,38 @@ public static WorkflowDag convertYamlPathToDag(FileSystem fs, String path) throw * Converts a YAML representation to {@link WorkflowDag}. */ public static WorkflowDag convertYamlToDag(String yaml) throws IOException { + int dagRounds = DEFAULT_DAG_ROUNDS; + int intermittentDelayMins = DEFAULT_INTERMITTENT_DELAY_MINS; + String dagName = DEFAULT_DAG_NAME; Map allNodes = new HashMap<>(); final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); final JsonNode jsonNode = yamlReader.readTree(yaml); Iterator> itr = jsonNode.fields(); while (itr.hasNext()) { Entry dagNode = itr.next(); - allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getKey(), dagNode.getValue())); + String key = dagNode.getKey(); + switch (key) { + case DAG_NAME: + dagName = dagNode.getValue().asText(); + break; + case DAG_ROUNDS: + dagRounds = dagNode.getValue().asInt(); + break; + case DAG_INTERMITTENT_DELAY_MINS: + intermittentDelayMins = dagNode.getValue().asInt(); + break; + case DAG_CONTENT: + JsonNode dagContent = dagNode.getValue(); + Iterator> contentItr = dagContent.fields(); + while(contentItr.hasNext()) { + Entry dagContentNode = contentItr.next(); + allNodes.put(dagContentNode.getKey(), convertJsonToDagNode(allNodes, dagContentNode.getKey(), dagContentNode.getValue())); + } + default: + break; + } } - return new WorkflowDag(findRootNodes(allNodes)); + return new WorkflowDag(dagName, dagRounds, intermittentDelayMins, findRootNodes(allNodes)); } /** diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java index ad6e9cb0cb1b9..1fe2294423121 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java @@ -18,8 +18,6 @@ package org.apache.hudi.integ.testsuite.dag; -import java.util.ArrayList; -import java.util.List; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; @@ -27,9 +25,11 @@ import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode; import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode; +import java.util.ArrayList; +import java.util.List; + /** - * An example of how to generate a workflow dag programmatically. This is also used as the default workflow dag if - * none is provided. + * An example of how to generate a workflow dag programmatically. This is also used as the default workflow dag if none is provided. */ public class SimpleWorkflowDagGenerator implements WorkflowDagGenerator { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java index e9171fc4774d0..f622bb7a7e448 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java @@ -18,20 +18,47 @@ package org.apache.hudi.integ.testsuite.dag; -import java.util.List; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import java.util.List; + +import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_NAME; +import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_ROUNDS; +import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_INTERMITTENT_DELAY_MINS; + /** * Workflow dag that encapsulates all execute nodes. */ public class WorkflowDag { + private String dagName; + private int rounds; + private int intermittentDelayMins; private List> nodeList; public WorkflowDag(List> nodeList) { + this(DEFAULT_DAG_NAME, DEFAULT_DAG_ROUNDS, DEFAULT_INTERMITTENT_DELAY_MINS, nodeList); + } + + public WorkflowDag(String dagName, int rounds, int intermittentDelayMins, List> nodeList) { + this.dagName = dagName; + this.rounds = rounds; + this.intermittentDelayMins = intermittentDelayMins; this.nodeList = nodeList; } + public String getDagName() { + return dagName; + } + + public int getRounds() { + return rounds; + } + + public int getIntermittentDelayMins() { + return intermittentDelayMins; + } + public List> getNodeList() { return nodeList; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index e457f0a8daca7..650ab1eeaf4c2 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -21,15 +21,16 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; import org.apache.hudi.keygen.BuiltinKeyGenerator; -import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; -import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -38,8 +39,7 @@ import java.util.Map; /** - * WriterContext wraps the delta writer/data generator related configuration needed - * to init/reinit. + * WriterContext wraps the delta writer/data generator related configuration needed to init/reinit. */ public class WriterContext { @@ -53,8 +53,9 @@ public class WriterContext { private BuiltinKeyGenerator keyGenerator; private transient SparkSession sparkSession; private transient JavaSparkContext jsc; + public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteConfig cfg, - BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) { + BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) { this.cfg = cfg; this.props = props; this.keyGenerator = keyGenerator; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java index df54b4c811989..05ac242a5cefc 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java @@ -41,6 +41,17 @@ public abstract class DagNode implements Comparable> { protected Config config; private boolean isCompleted; + public DagNode clone() { + List> tempChildNodes = new ArrayList<>(); + for(DagNode dagNode: childNodes) { + tempChildNodes.add(dagNode.clone()); + } + this.childNodes = tempChildNodes; + this.result = null; + this.isCompleted = false; + return this; + } + public DagNode addChildNode(DagNode childNode) { childNode.getParentNodes().add(this); getChildNodes().add(childNode); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java new file mode 100644 index 0000000000000..c0671e8abb1ef --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Delay Node to add delays between each group of test runs. + */ +public class DelayNode extends DagNode { + + private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class); + private int delayMins; + + public DelayNode(int delayMins) { + this.delayMins = delayMins; + } + + @Override + public void execute(ExecutionContext context) throws Exception { + log.warn("Waiting for "+ delayMins+" mins before going for next test run"); + Thread.sleep(delayMins * 60 * 1000); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java new file mode 100644 index 0000000000000..12fc52529a0ab --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; +import org.apache.hudi.integ.testsuite.schema.SchemaUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; + +/** + * This nodes validates contents from input path are in tact with Hudi. This nodes uses spark datasource for comparison purposes. By default no configs are required for this node. But there is an + * optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites. + * README has more details under docker set up for usages of this node. + */ +public class ValidateDatasetNode extends DagNode { + + private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class); + + public ValidateDatasetNode(Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext context) throws Exception { + + SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); + + // todo: Fix partitioning schemes. For now, assumes data based partitioning. + String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*"; + String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*"; + log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " + hudiPath); + // listing batches to be validated + String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + FileSystem fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString()); + } + + String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); + String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()); + // todo: fix hard coded fields from configs. + // read input and resolve insert, updates, etc. + Dataset inputDf = session.read().format("avro").load(inputPath); + ExpressionEncoder encoder = getEncoder(inputDf.schema()); + Dataset inputSnapshotDf = inputDf.groupByKey( + (MapFunction) value -> partitionPathField + "+" + recordKeyField, Encoders.STRING()) + .reduceGroups((ReduceFunction) (v1, v2) -> { + int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD); + int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD); + if (ts1 > ts2) { + return v1; + } else { + return v2; + } + }) + .map((MapFunction, Row>) value -> value._2, encoder) + .filter("_hoodie_is_deleted is NULL"); + + // read from hudi and remove meta columns. + Dataset hudiDf = session.read().format("hudi").load(hudiPath); + Dataset trimmedDf = hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); + + Dataset intersectionDf = inputSnapshotDf.intersect(trimmedDf); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed. Total count in hudi " + trimmedDf.count() + ", input df count " + inputSnapshotDf.count()); + throw new AssertionError("Hudi contents does not match contents input data. "); + } + + String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY()); + String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()); + log.warn("Validating hive table with db : " + database + " and table : " + tableName); + Dataset cowDf = session.sql("SELECT * FROM " + database + "." + tableName); + Dataset trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); + intersectionDf = inputSnapshotDf.intersect(trimmedDf); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count()); + throw new AssertionError("Hudi hive table contents does not match contents input data. "); + } + + // if delete input data is enabled, erase input data. + if (config.isDeleteInputData()) { + // clean up input data for current group of writes. + inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); + fs.delete(fileStatus.getPath(), true); + } + } + } + + private ExpressionEncoder getEncoder(StructType schema) { + List attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream() + .map(Attribute::toAttribute).collect(Collectors.toList()); + return RowEncoder.apply(schema) + .resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(), + SimpleAnalyzer$.MODULE$); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index 5c70ea164e873..d4074bcccdae0 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -23,8 +23,10 @@ import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode; import org.apache.hudi.metrics.Metrics; +import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +52,9 @@ public class DagScheduler { private WorkflowDag workflowDag; private ExecutionContext executionContext; - public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) { + public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) { this.workflowDag = workflowDag; - this.executionContext = new ExecutionContext(null, writerContext); + this.executionContext = new ExecutionContext(jsc, writerContext); } /** @@ -63,7 +65,7 @@ public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) { public void schedule() throws Exception { ExecutorService service = Executors.newFixedThreadPool(2); try { - execute(service, workflowDag.getNodeList()); + execute(service, workflowDag); service.shutdown(); } finally { if (!service.isShutdown()) { @@ -77,33 +79,47 @@ public void schedule() throws Exception { * Method to start executing the nodes in workflow DAGs. * * @param service ExecutorService - * @param nodes Nodes to be executed + * @param workflowDag instance of workflow dag that needs to be executed * @throws Exception will be thrown if ant error occurred */ - private void execute(ExecutorService service, List nodes) throws Exception { + private void execute(ExecutorService service, WorkflowDag workflowDag) throws Exception { // Nodes at the same level are executed in parallel - Queue queue = new PriorityQueue<>(nodes); log.info("Running workloads"); + List nodes = workflowDag.getNodeList(); + int curRound = 1; do { - List futures = new ArrayList<>(); - Set childNodes = new HashSet<>(); - while (queue.size() > 0) { - DagNode nodeToExecute = queue.poll(); - log.info("Node to execute in dag scheduler " + nodeToExecute.getConfig().toString()); - futures.add(service.submit(() -> executeNode(nodeToExecute))); - if (nodeToExecute.getChildNodes().size() > 0) { - childNodes.addAll(nodeToExecute.getChildNodes()); - } + log.warn("==================================================================="); + log.warn("Running workloads for round num " + curRound); + log.warn("==================================================================="); + Queue queue = new PriorityQueue<>(); + for (DagNode dagNode : nodes) { + queue.add(dagNode.clone()); } - queue.addAll(childNodes); - childNodes.clear(); - for (Future future : futures) { - future.get(1, TimeUnit.HOURS); + do { + List futures = new ArrayList<>(); + Set childNodes = new HashSet<>(); + while (queue.size() > 0) { + DagNode nodeToExecute = queue.poll(); + log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + nodeToExecute.getConfig()); + futures.add(service.submit(() -> executeNode(nodeToExecute))); + if (nodeToExecute.getChildNodes().size() > 0) { + childNodes.addAll(nodeToExecute.getChildNodes()); + } + } + queue.addAll(childNodes); + childNodes.clear(); + for (Future future : futures) { + future.get(1, TimeUnit.HOURS); + } + } while (queue.size() > 0); + log.info("Finished workloads for round num " + curRound); + if (curRound < workflowDag.getRounds()) { + new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext); } // After each level, report and flush the metrics Metrics.flush(); - } while (queue.size() > 0); + } while (curRound++ < workflowDag.getRounds()); log.info("Finished workloads"); } @@ -119,7 +135,6 @@ private void executeNode(DagNode node) { try { int repeatCount = node.getConfig().getRepeatCount(); while (repeatCount > 0) { - log.warn("executing node: \"" + node.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" of type: " + node.getClass() + " :: " + node.getConfig().toString()); node.execute(executionContext); log.info("Finished executing {}", node.getName()); repeatCount--; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 53af8eb74068e..6242cbfc7b10b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -41,6 +41,10 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.converter.Converter; import org.apache.hudi.integ.testsuite.converter.DeleteConverter; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.converter.UpdateConverter; import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader; import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader; @@ -51,6 +55,7 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -58,6 +63,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.StreamSupport; import scala.Tuple2; @@ -77,7 +93,7 @@ public class DeltaGenerator implements Serializable { private int batchId; public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession, - String schemaStr, BuiltinKeyGenerator keyGenerator) { + String schemaStr, BuiltinKeyGenerator keyGenerator) { this.deltaOutputConfig = deltaOutputConfig; this.jsc = jsc; this.sparkSession = sparkSession; @@ -167,7 +183,6 @@ public JavaRDD generateUpdates(Config config) throws IOException log.info("Repartitioning records into " + numPartition + " partitions for updates"); adjustedRDD = adjustedRDD.repartition(numPartition); log.info("Repartitioning records done for updates"); - UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(), partitionPathFieldNames, recordRowKeyFieldNames); JavaRDD updates = converter.convert(adjustedRDD); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 256dfa49ebf6c..5477371a1defa 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -20,6 +20,11 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; import java.util.HashSet; import java.util.Iterator; @@ -67,7 +72,7 @@ public GenericRecord next() { lastRecord = record; return record; } else { - return this.generator.randomize(lastRecord, partitionPathFieldNames); + return this.generator.randomize(lastRecord, this.partitionPathFieldNames); } } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java index 7d5ca081444f0..49a5f312ecabe 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java @@ -134,12 +134,16 @@ public GenericRecord getNewPayload(Set partitionPathFieldNames) { protected GenericRecord create(Schema schema, Set partitionPathFieldNames) { GenericRecord result = new GenericData.Record(schema); for (Schema.Field f : schema.getFields()) { - if (isPartialLongField(f, partitionPathFieldNames)) { - // This is a long field used as partition field. Set it to seconds since epoch. - long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); - result.put(f.name(), (long) value); + if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) { + result.put(f.name(), false); } else { - result.put(f.name(), typeConvert(f)); + if (isPartialLongField(f, partitionPathFieldNames)) { + // This is a long field used as partition field. Set it to seconds since epoch. + long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); + result.put(f.name(), (long) value); + } else { + result.put(f.name(), typeConvert(f)); + } } } return result; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java index 51b1fd9ed48dc..89cda658e12cc 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java @@ -18,16 +18,16 @@ package org.apache.hudi.integ.testsuite.generator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + /** * A lazy update payload generator to generate {@link GenericRecord}s lazily. */ diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java index b7d71f583777a..94ff3a3ea2327 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java @@ -18,20 +18,6 @@ package org.apache.hudi.integ.testsuite.helpers; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; @@ -40,13 +26,26 @@ import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + /** * A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline. */ public class DFSTestSuitePathSelector extends DFSPathSelector { + private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class); public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) { @@ -67,6 +66,7 @@ public Pair, String> getNextFilePathsAndMaxModificationTime( lastBatchId = 0; nextBatchId = 1; } + // obtain all eligible files for the batch List eligibleFiles = new ArrayList<>(); FileStatus[] fileStatuses = fs.globStatus( @@ -87,7 +87,8 @@ public Pair, String> getNextFilePathsAndMaxModificationTime( if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream() .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { continue; - } else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0) { + } else if (Integer.parseInt(fileStatus.getPath().getName()) > lastBatchId && Integer.parseInt(fileStatus.getPath() + .getName()) <= nextBatchId) { RemoteIterator files = fs.listFiles(fileStatus.getPath(), true); while (files.hasNext()) { eligibleFiles.add(files.next()); @@ -95,7 +96,6 @@ public Pair, String> getNextFilePathsAndMaxModificationTime( } } - log.info("Reading " + eligibleFiles.size() + " files. "); // no data to readAvro if (eligibleFiles.size() == 0) { return new ImmutablePair<>(Option.empty(), diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index cfe7991f43ee5..48a23828cd885 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -18,26 +18,6 @@ package org.apache.hudi.integ.testsuite.reader; -import static java.util.Map.Entry.comparingByValue; -import static java.util.stream.Collectors.toMap; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -51,6 +31,12 @@ import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieMemoryConfig; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -58,11 +44,27 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + import scala.Tuple2; +import static java.util.Map.Entry.comparingByValue; +import static java.util.stream.Collectors.toMap; + /** - * This class helps to generate updates from an already existing hoodie dataset. It supports generating updates in - * across partitions, files and records. + * This class helps to generate updates from an already existing hoodie dataset. It supports generating updates in across partitions, files and records. */ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { @@ -147,16 +149,22 @@ private JavaRDD fetchRecordsFromDataset(Option numPartit long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice))); int numFilesToUpdate; long numRecordsToUpdatePerFile; - if (!numFiles.isPresent() || numFiles.get() == 0) { + if (!numFiles.isPresent() || numFiles.get() <= 0) { // If num files are not passed, find the number of files to update based on total records to update and records // per file - numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / recordsInSingleFile); - // recordsInSingleFile is not average so we still need to account for bias is records distribution - // in the files. Limit to the maximum number of files available. - int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); - numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount); - log.info("Files to update {}", numFilesToUpdate); - numRecordsToUpdatePerFile = recordsInSingleFile; + numFilesToUpdate = (int) Math.floor((double) numRecordsToUpdate.get() / recordsInSingleFile); + if (numFilesToUpdate > 0) { + // recordsInSingleFile is not average so we still need to account for bias is records distribution + // in the files. Limit to the maximum number of files available. + int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); + numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount); + log.info("Files to update {}, records to update per file {}", numFilesToUpdate, recordsInSingleFile); + numRecordsToUpdatePerFile = recordsInSingleFile; + } else { + numFilesToUpdate = 1; + numRecordsToUpdatePerFile = numRecordsToUpdate.get(); + log.info("Total records passed in < records in single file. Hence setting numFilesToUpdate to 1 and numRecordsToUpdate to {} ", numRecordsToUpdatePerFile); + } } else { // If num files is passed, find the number of records per file based on either percentage or total records to // update and num files passed @@ -170,6 +178,7 @@ private JavaRDD fetchRecordsFromDataset(Option numPartit partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap); JavaRDD updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile)); + if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get() != numRecordsToUpdatePerFile * numFiles.get()) { long remainingRecordsToAdd = (numRecordsToUpdate.get() - (numRecordsToUpdatePerFile * numFiles.get())); @@ -214,7 +223,7 @@ private Map getFilesToReadPerPartition(JavaPairRDD adjustedPartitionToFileIdCountMap = new HashMap<>(); partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> { if (e.getValue() <= numFilesPerPartition) { @@ -272,9 +281,7 @@ private Iterator readParquetOrLogFiles(FileSlice fileSlice) throw } /** - * Returns the number of elements remaining in {@code iterator}. The iterator - * will be left exhausted: its {@code hasNext()} method will return - * {@code false}. + * Returns the number of elements remaining in {@code iterator}. The iterator will be left exhausted: its {@code hasNext()} method will return {@code false}. */ private static int iteratorSize(Iterator iterator) { int count = 0; @@ -286,11 +293,8 @@ private static int iteratorSize(Iterator iterator) { } /** - * Creates an iterator returning the first {@code limitSize} elements of the - * given iterator. If the original iterator does not contain that many - * elements, the returned iterator will have the same behavior as the original - * iterator. The returned iterator supports {@code remove()} if the original - * iterator does. + * Creates an iterator returning the first {@code limitSize} elements of the given iterator. If the original iterator does not contain that many elements, the returned iterator will have the same + * behavior as the original iterator. The returned iterator supports {@code remove()} if the original iterator does. * * @param iterator the iterator to limit * @param limitSize the maximum number of elements in the returned iterator diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java new file mode 100644 index 0000000000000..2de945286d910 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.schema; + +public class SchemaUtils { + + public static final String SOURCE_ORDERING_FIELD = "test_suite_source_ordering_field"; + +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java new file mode 100644 index 0000000000000..e67c5afae80dc --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.schema; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.integ.testsuite.dag.WriterContext; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Appends source ordering field to both source and target schemas. This is required to assist in validation to differentiate records written in different batches. + */ +public class TestSuiteFileBasedSchemaProvider extends FilebasedSchemaProvider { + + protected static Logger log = LogManager.getLogger(WriterContext.class); + + public TestSuiteFileBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + this.sourceSchema = addSourceOrderingFieldToSchema(sourceSchema); + this.targetSchema = addSourceOrderingFieldToSchema(targetSchema); + } + + private Schema addSourceOrderingFieldToSchema(Schema schema) { + List fields = new ArrayList<>(); + for (Schema.Field field : schema.getFields()) { + Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); + for (Map.Entry prop : field.getObjectProps().entrySet()) { + newField.addProp(prop.getKey(), prop.getValue()); + } + fields.add(newField); + } + Schema.Field sourceOrderingField = + new Schema.Field(SchemaUtils.SOURCE_ORDERING_FIELD, Schema.create(Type.INT), "", 0); + fields.add(sourceOrderingField); + Schema mergedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); + mergedSchema.setFields(fields); + return mergedSchema; + } + +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java index 65e4ee13cf55f..4bd096ae0cbde 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java @@ -18,6 +18,8 @@ package org.apache.hudi.integ.testsuite.writer; +import org.apache.hudi.integ.testsuite.schema.SchemaUtils; + import org.apache.avro.generic.GenericRecord; import java.io.IOException; @@ -30,22 +32,29 @@ */ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter { - private DeltaInputWriter deltaInputGenerator; + private DeltaInputWriter deltaInputWriter; private List metrics = new ArrayList<>(); + private int preCombineFieldVal = 0; + + public DFSDeltaWriterAdapter(DeltaInputWriter deltaInputWriter, int preCombineFieldVal) { + this.deltaInputWriter = deltaInputWriter; + this.preCombineFieldVal = preCombineFieldVal; + } - public DFSDeltaWriterAdapter(DeltaInputWriter deltaInputGenerator) { - this.deltaInputGenerator = deltaInputGenerator; + public DFSDeltaWriterAdapter(DeltaInputWriter deltaInputWriter) { + this.deltaInputWriter = deltaInputWriter; } @Override public List write(Iterator input) throws IOException { while (input.hasNext()) { GenericRecord next = input.next(); - if (this.deltaInputGenerator.canWrite()) { - this.deltaInputGenerator.writeData(next); - } else if (input.hasNext()) { + next.put(SchemaUtils.SOURCE_ORDERING_FIELD, preCombineFieldVal); + if (this.deltaInputWriter.canWrite()) { + this.deltaInputWriter.writeData(next); + } else { rollOver(); - this.deltaInputGenerator.writeData(next); + this.deltaInputWriter.writeData(next); } } close(); @@ -54,11 +63,11 @@ public List write(Iterator input) throws IOExcep public void rollOver() throws IOException { close(); - this.deltaInputGenerator = this.deltaInputGenerator.getNewWriter(); + this.deltaInputWriter = this.deltaInputWriter.getNewWriter(); } private void close() throws IOException { - this.deltaInputGenerator.close(); - this.metrics.add(this.deltaInputGenerator.getDeltaWriteStats()); + this.deltaInputWriter.close(); + this.metrics.add(this.deltaInputWriter.getDeltaWriteStats()); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java index b4d9b9f8956d8..a00e8e15d401a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java @@ -18,16 +18,17 @@ package org.apache.hudi.integ.testsuite.writer; -import java.io.IOException; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; + /** - * A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputMode} and - * {@link DeltaInputType}. + * A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputMode} and {@link DeltaInputType}. */ public class DeltaWriterFactory { @@ -44,9 +45,9 @@ public static DeltaWriterAdapter getDeltaWriterAdapter(DeltaConfig config, Integ DeltaInputWriter fileDeltaInputGenerator = new AvroFileDeltaInputWriter( dfsDeltaConfig.getConfiguration(), StringUtils - .join(new String[]{dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()}, + .join(new String[] {dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()}, "/"), dfsDeltaConfig.getSchemaStr(), dfsDeltaConfig.getMaxFileSize()); - return new DFSDeltaWriterAdapter(fileDeltaInputGenerator); + return new DFSDeltaWriterAdapter(fileDeltaInputGenerator, batchId); default: throw new IllegalArgumentException("Invalid delta input format " + config.getDeltaInputType()); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java index 1e5ca6886c8e6..82350999ea42c 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java @@ -18,18 +18,19 @@ package org.apache.hudi.integ.testsuite.configuration; -import static junit.framework.Assert.assertTrue; -import static junit.framework.TestCase.assertEquals; - -import java.util.ArrayList; -import java.util.List; - +import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode; import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode; -import org.apache.hudi.integ.testsuite.dag.WorkflowDag; + import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.List; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertEquals; + /** * Unit test for the build process of {@link DagNode} and {@link WorkflowDag}. */ diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java index d94174471bd64..70e6da7d3c49a 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java @@ -47,6 +47,9 @@ public void testConvertDagToYaml() throws Exception { public void testConvertYamlToDag() throws Exception { WorkflowDag dag = DagUtils.convertYamlToDag(UtilitiesTestBase.Helpers .readFileFromAbsolutePath((System.getProperty("user.dir") + "/.." + COW_DAG_DOCKER_DEMO_RELATIVE_PATH))); + assertEquals(dag.getDagName(), "unit-test-cow-dag"); + assertEquals(dag.getRounds(), 1); + assertEquals(dag.getIntermittentDelayMins(), 10); assertEquals(dag.getNodeList().size(), 1); Assertions.assertEquals(((DagNode) dag.getNodeList().get(0)).getParentNodes().size(), 0); assertEquals(((DagNode) dag.getNodeList().get(0)).getChildNodes().size(), 1); diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml index 96a6c825a98d0..23691659cab24 100644 --- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml @@ -13,58 +13,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 2 - num_records_insert: 100 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_insert -first_rollback: - config: - deps: second_insert - type: RollbackNode -third_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_rollback -first_upsert: - config: - record_size: 70000 - num_partitions_upsert: 1 - repeat_count: 1 - num_records_upsert: 100 - type: UpsertNode - deps: third_insert -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - hive_props: - prop2: "set spark.yarn.queue=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" - hive_queries: - query1: "select count(*) from testdb1.table1" - result1: 300 - query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" - result2: 0 - type: HiveQueryNode - deps: first_hive_sync \ No newline at end of file +dag_name: unit-test-cow-dag +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 2 + num_records_insert: 100 + type: InsertNode + deps: none + second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_insert + first_rollback: + config: + deps: second_insert + type: RollbackNode + third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_rollback + first_upsert: + config: + record_size: 70000 + num_partitions_upsert: 1 + repeat_count: 1 + num_records_upsert: 100 + type: UpsertNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_upsert + first_hive_query: + config: + hive_props: + prop2: "set spark.yarn.queue=" + prop3: "set hive.strict.checks.large.query=false" + prop4: "set hive.stats.autogather=false" + hive_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 + query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + result2: 0 + type: HiveQueryNode + deps: first_hive_sync \ No newline at end of file diff --git a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml index 96a6c825a98d0..2ba42455d4874 100644 --- a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml @@ -13,58 +13,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 2 - num_records_insert: 100 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_insert -first_rollback: - config: - deps: second_insert - type: RollbackNode -third_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_rollback -first_upsert: - config: - record_size: 70000 - num_partitions_upsert: 1 - repeat_count: 1 - num_records_upsert: 100 - type: UpsertNode - deps: third_insert -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - hive_props: - prop2: "set spark.yarn.queue=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" - hive_queries: - query1: "select count(*) from testdb1.table1" - result1: 300 - query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" - result2: 0 - type: HiveQueryNode - deps: first_hive_sync \ No newline at end of file +dag_name: unit-test-mor-dag +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 2 + num_records_insert: 100 + type: InsertNode + deps: none + second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_insert + first_rollback: + config: + deps: second_insert + type: RollbackNode + third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_rollback + first_upsert: + config: + record_size: 70000 + num_partitions_upsert: 1 + repeat_count: 1 + num_records_upsert: 100 + type: UpsertNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_upsert + first_hive_query: + config: + hive_props: + prop2: "set spark.yarn.queue=" + prop3: "set hive.strict.checks.large.query=false" + prop4: "set hive.stats.autogather=false" + hive_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 + query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + result2: 0 + type: HiveQueryNode + deps: first_hive_sync \ No newline at end of file diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java index 43f2ff27d2eff..7542755b2491e 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java @@ -46,9 +46,9 @@ public static class Config { private final FileSystem fs; - private final Schema sourceSchema; + protected Schema sourceSchema; - private Schema targetSchema; + protected Schema targetSchema; public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc);