diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index c2d5b510de2a3..397ff9800c4b3 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -57,7 +57,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Punit-tests -pl hudi-common,hudi-flink,hudi-client/hudi-spark-client
+ options: -Punit-tests -pl hudi-common,hudi-flink-datasource/hudi-flink,hudi-client/hudi-spark-client
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -66,7 +66,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Pfunctional-tests -pl hudi-common,hudi-flink
+ options: -Pfunctional-tests -pl hudi-common,hudi-flink-datasource/hudi-flink
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -165,7 +165,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Punit-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
+ options: -Punit-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
@@ -174,7 +174,7 @@ stages:
inputs:
mavenPomFile: 'pom.xml'
goals: 'test'
- options: -Pfunctional-tests -pl !hudi-common,!hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
+ options: -Pfunctional-tests -pl !hudi-common,!hudi-flink-datasource/hudi-flink,!hudi-client/hudi-spark-client,!hudi-client/hudi-client-common,!hudi-client/hudi-flink-client,!hudi-client/hudi-java-client,!hudi-cli,!hudi-utilities,!hudi-sync/hudi-hive-sync
publishJUnitResults: false
jdkVersionOption: '1.8'
mavenOptions: '-Xmx2g $(MAVEN_OPTS)'
diff --git a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml
index 3c1acbdfe7714..086004f121e97 100644
--- a/docker/compose/docker-compose_hadoop284_hive233_spark244.yml
+++ b/docker/compose/docker-compose_hadoop284_hive233_spark244.yml
@@ -184,7 +184,7 @@ services:
presto-coordinator-1:
container_name: presto-coordinator-1
hostname: presto-coordinator-1
- image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.268:latest
+ image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest
ports:
- '8090:8090'
environment:
@@ -201,25 +201,25 @@ services:
command: coordinator
presto-worker-1:
- container_name: presto-worker-1
- hostname: presto-worker-1
- image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.268:latest
- depends_on: ["presto-coordinator-1"]
- environment:
- - PRESTO_JVM_MAX_HEAP=512M
- - PRESTO_QUERY_MAX_MEMORY=1GB
- - PRESTO_QUERY_MAX_MEMORY_PER_NODE=256MB
- - PRESTO_QUERY_MAX_TOTAL_MEMORY_PER_NODE=384MB
- - PRESTO_MEMORY_HEAP_HEADROOM_PER_NODE=100MB
- - TERM=xterm
- links:
- - "hivemetastore"
- - "hiveserver"
- - "hive-metastore-postgresql"
- - "namenode"
- volumes:
- - ${HUDI_WS}:/var/hoodie/ws
- command: worker
+ container_name: presto-worker-1
+ hostname: presto-worker-1
+ image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest
+ depends_on: [ "presto-coordinator-1" ]
+ environment:
+ - PRESTO_JVM_MAX_HEAP=512M
+ - PRESTO_QUERY_MAX_MEMORY=1GB
+ - PRESTO_QUERY_MAX_MEMORY_PER_NODE=256MB
+ - PRESTO_QUERY_MAX_TOTAL_MEMORY_PER_NODE=384MB
+ - PRESTO_MEMORY_HEAP_HEADROOM_PER_NODE=100MB
+ - TERM=xterm
+ links:
+ - "hivemetastore"
+ - "hiveserver"
+ - "hive-metastore-postgresql"
+ - "namenode"
+ volumes:
+ - ${HUDI_WS}:/var/hoodie/ws
+ command: worker
trino-coordinator-1:
container_name: trino-coordinator-1
diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml b/docker/demo/config/test-suite/cow-spark-long-running.yaml
index 8a1e58f840a37..795a4a5f60709 100644
--- a/docker/demo/config/test-suite/cow-spark-long-running.yaml
+++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml
@@ -25,17 +25,6 @@ dag_content:
num_records_insert: 10000
type: SparkInsertNode
deps: none
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_insert
- first_validate:
- config:
- validate_hive: false
- type: ValidateDatasetNode
- deps: first_hive_sync
first_upsert:
config:
record_size: 200
@@ -45,29 +34,22 @@ dag_content:
num_records_upsert: 3000
num_partitions_upsert: 50
type: SparkUpsertNode
- deps: first_validate
+ deps: first_insert
first_delete:
config:
num_partitions_delete: 50
num_records_delete: 8000
type: SparkDeleteNode
deps: first_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: first_delete
last_validate:
config:
execute_itr_count: 30
- validate_clean: true
- validate_archival: true
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/cow-spark-simple.yaml b/docker/demo/config/test-suite/cow-spark-simple.yaml
index 0859c63200203..192adcf377dc0 100644
--- a/docker/demo/config/test-suite/cow-spark-simple.yaml
+++ b/docker/demo/config/test-suite/cow-spark-simple.yaml
@@ -25,17 +25,11 @@ dag_content:
num_records_insert: 100
type: SparkInsertNode
deps: none
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_insert
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
- deps: first_hive_sync
+ deps: first_insert
first_upsert:
config:
record_size: 1000
@@ -52,15 +46,9 @@ dag_content:
num_records_delete: 30
type: SparkDeleteNode
deps: first_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
- deps: second_hive_sync
\ No newline at end of file
+ deps: first_delete
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 324a4b4a6d0d5..09dd6168c985e 100644
--- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -47,11 +47,6 @@ dag_content:
engine: "mr"
type: HiveSyncNode
deps: third_insert
- first_validate:
- config:
- validate_hive: false
- type: ValidateDatasetNode
- deps: first_hive_sync
first_upsert:
config:
record_size: 1000
@@ -61,7 +56,7 @@ dag_content:
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
- deps: first_validate
+ deps: first_hive_sync
first_delete:
config:
num_partitions_delete: 50
@@ -76,6 +71,7 @@ dag_content:
deps: first_delete
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
@@ -83,7 +79,5 @@ dag_content:
last_validate:
config:
execute_itr_count: 50
- validate_clean: true
- validate_archival: true
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
new file mode 100644
index 0000000000000..b2ab525b1af65
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: deltastreamer-long-running-multi-partitions.yaml
+dag_rounds: 30
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 50
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: third_insert
+ first_delete:
+ config:
+ num_partitions_delete: 50
+ num_records_delete: 8000
+ type: DeleteNode
+ deps: first_upsert
+ second_validate:
+ config:
+ validate_once_every_itr : 5
+ validate_hive: false
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: first_delete
+ last_validate:
+ config:
+ execute_itr_count: 30
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index 9d2766f1a5a7e..b8f2b686066c3 100644
--- a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -57,22 +57,15 @@ dag_content:
num_records_delete: 8000
type: DeleteNode
deps: first_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: first_delete
last_validate:
config:
execute_itr_count: 50
- validate_clean: true
- validate_archival: true
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
new file mode 100644
index 0000000000000..0cd4108cb6334
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# to be used with test-aggressive-clean-archival.properties
+
+dag_name: deltastreamer-medium-clustering.yaml
+dag_rounds: 20
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 50
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: third_insert
+ first_delete:
+ config:
+ num_partitions_delete: 50
+ num_records_delete: 8000
+ type: DeleteNode
+ deps: first_upsert
+ second_validate:
+ config:
+ validate_hive: false
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: first_delete
+ last_validate:
+ config:
+ execute_itr_count: 20
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index 2fc4961e15c07..a20870f262d8b 100644
--- a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -60,22 +60,15 @@ dag_content:
num_records_delete: 8000
type: DeleteNode
deps: first_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: first_delete
last_validate:
config:
execute_itr_count: 20
- validate_clean: true
- validate_archival: true
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 28578eb9b687e..1c2f44b060036 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -41,17 +41,6 @@ dag_content:
num_records_insert: 300
deps: second_insert
type: InsertNode
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: third_insert
- first_validate:
- config:
- validate_hive: false
- type: ValidateDatasetNode
- deps: first_hive_sync
first_upsert:
config:
record_size: 1000
@@ -61,29 +50,22 @@ dag_content:
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
- deps: first_validate
+ deps: third_insert
first_delete:
config:
num_partitions_delete: 1
num_records_delete: 8000
type: DeleteNode
deps: first_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
+ validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: first_delete
last_validate:
config:
execute_itr_count: 50
- validate_clean: true
- validate_archival: true
type: ValidateAsyncOperations
deps: second_validate
diff --git a/docker/demo/config/test-suite/insert-overwrite-table.yaml b/docker/demo/config/test-suite/insert-overwrite-table.yaml
index 8b5a26e4683b7..1a58abdcc4789 100644
--- a/docker/demo/config/test-suite/insert-overwrite-table.yaml
+++ b/docker/demo/config/test-suite/insert-overwrite-table.yaml
@@ -53,19 +53,13 @@ dag_content:
num_partitions_upsert: 10
type: SparkUpsertNode
deps: second_insert
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: second_upsert
first_insert_overwrite_table:
config:
record_size: 1000
repeat_count: 10
num_records_insert: 10
type: SparkInsertOverwriteTableNode
- deps: first_hive_sync
+ deps: second_upsert
delete_all_input_except_last:
config:
delete_input_data_except_latest: true
@@ -89,16 +83,10 @@ dag_content:
num_partitions_upsert: 10
type: SparkUpsertNode
deps: third_insert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: third_upsert
second_validate:
config:
validate_full_data : true
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: third_upsert
diff --git a/docker/demo/config/test-suite/insert-overwrite.yaml b/docker/demo/config/test-suite/insert-overwrite.yaml
index f2299c50c08f3..dc185d5938f6d 100644
--- a/docker/demo/config/test-suite/insert-overwrite.yaml
+++ b/docker/demo/config/test-suite/insert-overwrite.yaml
@@ -54,12 +54,6 @@ dag_content:
num_partitions_upsert: 10
type: SparkUpsertNode
deps: second_insert
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: second_upsert
first_insert_overwrite:
config:
record_size: 1000
@@ -67,7 +61,7 @@ dag_content:
repeat_count: 1
num_records_insert: 10
type: SparkInsertOverwriteNode
- deps: first_hive_sync
+ deps: second_upsert
delete_all_input_except_last:
config:
delete_input_data_except_latest: true
@@ -91,16 +85,10 @@ dag_content:
num_partitions_upsert: 10
type: SparkUpsertNode
deps: third_insert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: third_upsert
second_validate:
config:
validate_full_data : true
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: third_upsert
diff --git a/docker/demo/config/test-suite/simple-clustering.yaml b/docker/demo/config/test-suite/simple-clustering.yaml
index 7389ee3ebc34b..01849bb6436ea 100644
--- a/docker/demo/config/test-suite/simple-clustering.yaml
+++ b/docker/demo/config/test-suite/simple-clustering.yaml
@@ -47,30 +47,18 @@ dag_content:
num_records_delete: 9000
type: DeleteNode
deps: third_insert
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
- deps: first_hive_sync
+ deps: first_delete
first_cluster:
config:
execute_itr_count: 25
type: ClusteringNode
deps: first_validate
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_cluster
second_validate:
config:
validate_hive: false
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: first_cluster
diff --git a/docker/demo/config/test-suite/simple-deltastreamer.yaml b/docker/demo/config/test-suite/simple-deltastreamer.yaml
index f49a41baf8541..11b7f17d34a4d 100644
--- a/docker/demo/config/test-suite/simple-deltastreamer.yaml
+++ b/docker/demo/config/test-suite/simple-deltastreamer.yaml
@@ -41,17 +41,11 @@ dag_content:
num_records_insert: 300
deps: second_insert
type: InsertNode
- first_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: third_insert
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
- deps: first_hive_sync
+ deps: third_insert
first_upsert:
config:
record_size: 1000
@@ -68,15 +62,9 @@ dag_content:
num_records_delete: 2000
type: DeleteNode
deps: first_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
- deps: second_hive_sync
+ deps: first_delete
diff --git a/docker/demo/config/test-suite/spark-clustering.yaml b/docker/demo/config/test-suite/spark-clustering.yaml
index e8e722ca77c7c..8da4f953983b8 100644
--- a/docker/demo/config/test-suite/spark-clustering.yaml
+++ b/docker/demo/config/test-suite/spark-clustering.yaml
@@ -59,15 +59,9 @@ dag_content:
num_records_delete: 16000
type: SparkDeleteNode
deps: second_upsert
- second_hive_sync:
- config:
- queue_name: "adhoc"
- engine: "mr"
- type: HiveSyncNode
- deps: first_delete
second_validate:
config:
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
- deps: second_hive_sync
\ No newline at end of file
+ deps: first_delete
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/spark-medium-clustering.yaml b/docker/demo/config/test-suite/spark-medium-clustering.yaml
new file mode 100644
index 0000000000000..09537a23d553e
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-medium-clustering.yaml
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: spark-medium-clustering.yaml
+dag_rounds: 20
+dag_intermittent_delay_mins: 0
+dag_content:
+ first_insert:
+ config:
+ record_size: 200
+ num_partitions_insert: 50
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_validate:
+ config:
+ validate_hive: false
+ type: ValidateDatasetNode
+ deps: first_insert
+ first_upsert:
+ config:
+ record_size: 200
+ num_partitions_insert: 50
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 3000
+ num_partitions_upsert: 50
+ type: SparkUpsertNode
+ deps: first_validate
+ first_delete:
+ config:
+ num_partitions_delete: 50
+ num_records_delete: 8000
+ type: SparkDeleteNode
+ deps: first_upsert
+ second_validate:
+ config:
+ validate_hive: false
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: first_delete
+ last_validate:
+ config:
+ execute_itr_count: 20
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/sparksql-incremental.commands b/docker/demo/sparksql-incremental.commands
index da61347ec275b..5ea4729b932f2 100644
--- a/docker/demo/sparksql-incremental.commands
+++ b/docker/demo/sparksql-incremental.commands
@@ -21,6 +21,8 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;
@@ -43,14 +45,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
- option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor").
- option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
- option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
- option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
- option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
- option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
- option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
- option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
+ option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
+ option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
+ option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
+ option(HiveSyncConfig.HIVE_USER.key(), "hive").
+ option(HiveSyncConfig.HIVE_PASS.key(), "hive").
+ option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
+ option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
+ option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");
@@ -75,14 +77,14 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "ts").
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
- option(DataSourceWriteOptions.HIVE_TABLE.key(), "stock_ticks_derived_mor_bs").
- option(DataSourceWriteOptions.HIVE_DATABASE.key(), "default").
- option(DataSourceWriteOptions.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
- option(DataSourceWriteOptions.HIVE_USER.key(), "hive").
- option(DataSourceWriteOptions.HIVE_PASS.key(), "hive").
- option(DataSourceWriteOptions.HIVE_SYNC_ENABLED.key(), "true").
- option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), "datestr").
- option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
+ option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
+ option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
+ option(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
+ option(HiveSyncConfig.HIVE_USER.key(), "hive").
+ option(HiveSyncConfig.HIVE_PASS.key(), "hive").
+ option(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "true").
+ option(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr").
+ option(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key(), "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");
diff --git a/docker/hoodie/hadoop/pom.xml b/docker/hoodie/hadoop/pom.xml
index de3bd3d57832d..b029abafa83e5 100644
--- a/docker/hoodie/hadoop/pom.xml
+++ b/docker/hoodie/hadoop/pom.xml
@@ -57,7 +57,7 @@
2.4.4
2.3.3
2.8.4
- 0.268
+ 0.271
368
1.4.13
true
diff --git a/docker/hoodie/hadoop/prestobase/Dockerfile b/docker/hoodie/hadoop/prestobase/Dockerfile
index 12b644aa06314..accedb94db3dc 100644
--- a/docker/hoodie/hadoop/prestobase/Dockerfile
+++ b/docker/hoodie/hadoop/prestobase/Dockerfile
@@ -22,7 +22,7 @@ ARG HADOOP_VERSION=2.8.4
ARG HIVE_VERSION=2.3.3
FROM apachehudi/hudi-hadoop_${HADOOP_VERSION}-base:latest as hadoop-base
-ARG PRESTO_VERSION=0.268
+ARG PRESTO_VERSION=0.271
ENV PRESTO_VERSION ${PRESTO_VERSION}
ENV PRESTO_HOME /opt/presto-server-${PRESTO_VERSION}
@@ -79,6 +79,15 @@ RUN chmod +x /usr/local/bin/entrypoint.sh
ADD target/ /var/hoodie/ws/docker/hoodie/hadoop/prestobase/target/
ENV HUDI_PRESTO_BUNDLE /var/hoodie/ws/docker/hoodie/hadoop/prestobase/target/hudi-presto-bundle.jar
RUN cp ${HUDI_PRESTO_BUNDLE} ${PRESTO_HOME}/plugin/hive-hadoop2/
+# TODO: the latest master of Presto relies on hudi-presto-bundle, while current Presto releases
+# rely on hudi-common and hudi-hadoop-mr 0.9.0, which are pulled in plugin/hive-hadoop2/ in the
+# docker setup, making it hard to test the latest changes in Hudi due to class conflict.
+# To get around the conflicts due to older Hudi jars below, they are removed for integration tests,
+# so the hudi-presto-bundle build can be used solely for testing. This temporary logic must be
+# removed once Presto has a new release depending on hudi-presto-bundle and we upgrade docker setup
+# to that release version.
+RUN rm ${PRESTO_HOME}/plugin/hive-hadoop2/hudi-common-*
+RUN rm ${PRESTO_HOME}/plugin/hive-hadoop2/hudi-hadoop-mr-*
VOLUME ["${PRESTO_LOG_DIR}"]
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9bad2e3486e7f..a6a7e18b1f6ab 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,8 +18,6 @@
package org.apache.hudi.client;
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -78,6 +76,9 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -242,11 +243,11 @@ protected void commit(HoodieTable table, String commitActionType, String instant
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
- protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}
- protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
+ protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
@@ -397,7 +398,7 @@ protected void rollbackFailedBootstrap() {
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsert(I records, final String instantTime,
- Option> userDefinedBulkInsertPartitioner);
+ Option userDefinedBulkInsertPartitioner);
/**
@@ -417,7 +418,7 @@ public abstract O bulkInsert(I records, final String instantTime,
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instantTime,
- Option> bulkInsertPartitioner);
+ Option bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -458,7 +459,7 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
* @param hoodieTable Hoodie Table
* @return Write Status
*/
- protected abstract O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable hoodieTable);
+ protected abstract O postWrite(HoodieWriteMetadata result, String instantTime, HoodieTable hoodieTable);
/**
* Post Commit Hook. Derived classes use this method to perform post-commit processing
@@ -468,7 +469,7 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
- protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option