diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index ebf3caccd9c62..20515f7c750ed 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -18,12 +18,16 @@ jobs:
include:
- scala: "scala-2.11"
spark: "spark2"
+ skipModules: ""
- scala: "scala-2.11"
spark: "spark2,spark-shade-unbundle-avro"
+ skipModules: ""
- scala: "scala-2.12"
spark: "spark3.1.x"
+ skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3.1.x,spark-shade-unbundle-avro"
+ skipModules: "!hudi-spark-datasource/hudi-spark3"
- scala: "scala-2.12"
spark: "spark3"
- scala: "scala-2.12"
@@ -40,4 +44,5 @@ jobs:
env:
SCALA_PROFILE: ${{ matrix.scala }}
SPARK_PROFILE: ${{ matrix.spark }}
- run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
+ SKIP_MODULES: ${{ matrix.skipModules }}
+ run: mvn install -P "$SCALA_PROFILE,$SPARK_PROFILE" -pl "$SKIP_MODULES" -DskipTests=true -Dmaven.javadoc.skip=true -B -V
diff --git a/README.md b/README.md
index af11e6a14d5df..6d3475755ff87 100644
--- a/README.md
+++ b/README.md
@@ -83,7 +83,7 @@ mvn clean package -DskipTests -Dscala-2.12
The default Spark version supported is 2.4.4. To build for different Spark 3 versions, use the corresponding profile
```
-# Build against Spark 3.2.0 (the default build shipped with the public Spark 3 bundle)
+# Build against Spark 3.2.1 (the default build shipped with the public Spark 3 bundle)
mvn clean package -DskipTests -Dspark3
# Build against Spark 3.1.2
diff --git a/doap_HUDI.rdf b/doap_HUDI.rdf
index 33f64ecf82ecb..a3b958a5cd7d0 100644
--- a/doap_HUDI.rdf
+++ b/doap_HUDI.rdf
@@ -81,6 +81,11 @@
2021-12-080.10.0
+
+ Apache Hudi 0.10.1
+ 2022-01-26
+ 0.10.1
+
diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml b/docker/demo/config/test-suite/cow-spark-long-running.yaml
index 493ad7a5578f6..8a1e58f840a37 100644
--- a/docker/demo/config/test-suite/cow-spark-long-running.yaml
+++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml
@@ -13,13 +13,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-spark-long-running-multi-partitions.yaml
-dag_rounds: 50
-dag_intermittent_delay_mins: 1
+dag_name: cow-spark-deltastreamer-long-running-multi-partitions.yaml
+dag_rounds: 30
+dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
- record_size: 1000
+ record_size: 200
num_partitions_insert: 50
repeat_count: 1
num_records_insert: 10000
@@ -33,12 +33,12 @@ dag_content:
deps: first_insert
first_validate:
config:
- validate_hive: true
+ validate_hive: false
type: ValidateDatasetNode
deps: first_hive_sync
first_upsert:
config:
- record_size: 1000
+ record_size: 200
num_partitions_insert: 50
num_records_insert: 300
repeat_count: 1
@@ -60,13 +60,13 @@ dag_content:
deps: first_delete
second_validate:
config:
- validate_hive: true
+ validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
deps: second_hive_sync
last_validate:
config:
- execute_itr_count: 50
+ execute_itr_count: 30
validate_clean: true
validate_archival: true
type: ValidateAsyncOperations
diff --git a/docker/demo/config/test-suite/cow-spark-simple.yaml b/docker/demo/config/test-suite/cow-spark-simple.yaml
index 21e7e6bbe39bc..0859c63200203 100644
--- a/docker/demo/config/test-suite/cow-spark-simple.yaml
+++ b/docker/demo/config/test-suite/cow-spark-simple.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-spark-simple.yaml
-dag_rounds: 2
+dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -33,7 +33,7 @@ dag_content:
deps: first_insert
first_validate:
config:
- validate_hive: true
+ validate_hive: false
type: ValidateDatasetNode
deps: first_hive_sync
first_upsert:
@@ -60,7 +60,7 @@ dag_content:
deps: first_delete
second_validate:
config:
- validate_hive: true
+ validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
deps: second_hive_sync
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
similarity index 96%
rename from docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
rename to docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 0ce529805567b..324a4b4a6d0d5 100644
--- a/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-long-running-multi-partitions.yaml
+dag_name: deltastreamer-long-running-multi-partitions.yaml
dag_rounds: 50
dag_intermittent_delay_mins: 1
dag_content:
@@ -76,7 +76,7 @@ dag_content:
deps: first_delete
second_validate:
config:
- validate_hive: false
+ validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
new file mode 100644
index 0000000000000..9d2766f1a5a7e
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: deltastreamer-long-running-multi-partitions.yaml
+dag_rounds: 50
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 50
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: third_insert
+ first_delete:
+ config:
+ num_partitions_delete: 50
+ num_records_delete: 8000
+ type: DeleteNode
+ deps: first_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
+ second_validate:
+ config:
+ validate_hive: false
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: second_hive_sync
+ last_validate:
+ config:
+ execute_itr_count: 50
+ validate_clean: true
+ validate_archival: true
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
new file mode 100644
index 0000000000000..2fc4961e15c07
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# to be used with test-aggressive-clean-archival.properties
+
+dag_name: deltastreamer-long-running-multi-partitions.yaml
+dag_rounds: 20
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 50
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 2
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: third_insert
+ first_delete:
+ config:
+ num_partitions_delete: 50
+ num_records_delete: 8000
+ type: DeleteNode
+ deps: first_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
+ second_validate:
+ config:
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
+ last_validate:
+ config:
+ execute_itr_count: 20
+ validate_clean: true
+ validate_archival: true
+ type: ValidateAsyncOperations
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
similarity index 97%
rename from docker/demo/config/test-suite/cow-long-running-example.yaml
rename to docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 29b6858bf0506..28578eb9b687e 100644
--- a/docker/demo/config/test-suite/cow-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-long-running-example.yaml
+dag_name: detlastreamer-long-running-example.yaml
dag_rounds: 50
dag_intermittent_delay_mins: 1
dag_content:
diff --git a/docker/demo/config/test-suite/insert-overwrite-table.yaml b/docker/demo/config/test-suite/insert-overwrite-table.yaml
new file mode 100644
index 0000000000000..8b5a26e4683b7
--- /dev/null
+++ b/docker/demo/config/test-suite/insert-overwrite-table.yaml
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: first_insert
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: first_upsert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: second_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: second_upsert
+ first_insert_overwrite_table:
+ config:
+ record_size: 1000
+ repeat_count: 10
+ num_records_insert: 10
+ type: SparkInsertOverwriteTableNode
+ deps: first_hive_sync
+ delete_all_input_except_last:
+ config:
+ delete_input_data_except_latest: true
+ type: DeleteInputDatasetNode
+ deps: first_insert_overwrite_table
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: delete_all_input_except_last
+ third_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: third_insert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_upsert
+ second_validate:
+ config:
+ validate_full_data : true
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/insert-overwrite.yaml b/docker/demo/config/test-suite/insert-overwrite.yaml
new file mode 100644
index 0000000000000..f2299c50c08f3
--- /dev/null
+++ b/docker/demo/config/test-suite/insert-overwrite.yaml
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: first_insert
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: first_upsert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: second_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: second_upsert
+ first_insert_overwrite:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10
+ type: SparkInsertOverwriteNode
+ deps: first_hive_sync
+ delete_all_input_except_last:
+ config:
+ delete_input_data_except_latest: true
+ type: DeleteInputDatasetNode
+ deps: first_insert_overwrite
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: delete_all_input_except_last
+ third_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: third_insert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_upsert
+ second_validate:
+ config:
+ validate_full_data : true
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/simple-clustering-hive.yaml b/docker/demo/config/test-suite/simple-clustering-hive.yaml
new file mode 100644
index 0000000000000..e1f79bfe93c0f
--- /dev/null
+++ b/docker/demo/config/test-suite/simple-clustering-hive.yaml
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-clustering-hive.yaml
+dag_rounds: 30
+dag_intermittent_delay_mins: 0
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_delete:
+ config:
+ num_partitions_delete: 1
+ num_records_delete: 9000
+ type: DeleteNode
+ deps: third_insert
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
+ first_validate:
+ config:
+ validate_hive: false
+ type: ValidateDatasetNode
+ deps: first_hive_sync
+ first_cluster:
+ config:
+ execute_itr_count: 20
+ type: ClusteringNode
+ deps: first_validate
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_cluster
+ second_validate:
+ config:
+ validate_hive: true
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/cow-clustering-example.yaml b/docker/demo/config/test-suite/simple-clustering.yaml
similarity index 96%
rename from docker/demo/config/test-suite/cow-clustering-example.yaml
rename to docker/demo/config/test-suite/simple-clustering.yaml
index 95932317c04fd..7389ee3ebc34b 100644
--- a/docker/demo/config/test-suite/cow-clustering-example.yaml
+++ b/docker/demo/config/test-suite/simple-clustering.yaml
@@ -13,8 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: cow-clustering-example.yaml
-dag_rounds: 3
+dag_name: simple-clustering.yaml
+dag_rounds: 30
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
@@ -60,7 +60,7 @@ dag_content:
deps: first_hive_sync
first_cluster:
config:
- execute_itr_count: 2
+ execute_itr_count: 25
type: ClusteringNode
deps: first_validate
second_hive_sync:
diff --git a/docker/demo/config/test-suite/simple-deltastreamer-hive.yaml b/docker/demo/config/test-suite/simple-deltastreamer-hive.yaml
new file mode 100644
index 0000000000000..e6738b6942b35
--- /dev/null
+++ b/docker/demo/config/test-suite/simple-deltastreamer-hive.yaml
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 1000
+ type: InsertNode
+ deps: none
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 10000
+ deps: first_insert
+ type: InsertNode
+ third_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ repeat_count: 1
+ num_records_insert: 300
+ deps: second_insert
+ type: InsertNode
+ first_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: third_insert
+ first_validate:
+ config:
+ validate_hive: false
+ type: ValidateDatasetNode
+ deps: first_hive_sync
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 1
+ num_records_insert: 300
+ repeat_count: 1
+ num_records_upsert: 100
+ num_partitions_upsert: 1
+ type: UpsertNode
+ deps: first_validate
+ first_delete:
+ config:
+ num_partitions_delete: 1
+ num_records_delete: 2000
+ type: DeleteNode
+ deps: first_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
+ second_validate:
+ config:
+ validate_hive: true
+ delete_input_data: true
+ type: ValidateDatasetNode
+ deps: second_hive_sync
diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/simple-deltastreamer.yaml
similarity index 98%
rename from docker/demo/config/test-suite/complex-dag-cow.yaml
rename to docker/demo/config/test-suite/simple-deltastreamer.yaml
index 3a84b0a0acecd..f49a41baf8541 100644
--- a/docker/demo/config/test-suite/complex-dag-cow.yaml
+++ b/docker/demo/config/test-suite/simple-deltastreamer.yaml
@@ -13,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-dag_name: complex-dag-cow.yaml
+dag_name: simple-deltastreamer.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
diff --git a/docker/demo/config/test-suite/spark-clustering.yaml b/docker/demo/config/test-suite/spark-clustering.yaml
new file mode 100644
index 0000000000000..e8e722ca77c7c
--- /dev/null
+++ b/docker/demo/config/test-suite/spark-clustering.yaml
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: cow-spark-simple.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: first_insert
+ second_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ repeat_count: 1
+ num_records_insert: 10000
+ type: SparkInsertNode
+ deps: first_upsert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 10
+ num_records_insert: 1000
+ repeat_count: 1
+ num_records_upsert: 8000
+ num_partitions_upsert: 10
+ type: SparkUpsertNode
+ deps: second_insert
+ first_delete:
+ config:
+ num_partitions_delete: 10
+ num_records_delete: 16000
+ type: SparkDeleteNode
+ deps: second_upsert
+ second_hive_sync:
+ config:
+ queue_name: "adhoc"
+ engine: "mr"
+ type: HiveSyncNode
+ deps: first_delete
+ second_validate:
+ config:
+ validate_hive: false
+ delete_input_data: false
+ type: ValidateDatasetNode
+ deps: second_hive_sync
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/test-aggressive-clean-archival.properties b/docker/demo/config/test-suite/test-aggressive-clean-archival.properties
new file mode 100644
index 0000000000000..dcbbfb31c9936
--- /dev/null
+++ b/docker/demo/config/test-suite/test-aggressive-clean-archival.properties
@@ -0,0 +1,54 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.cleaner.commits.retained=5
+hoodie.keep.min.commits=9
+hoodie.keep.max.commits=10
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
+
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.database=testdb
+hoodie.datasource.hive_sync.table=table1
+hoodie.datasource.hive_sync.assume_date_partitioning=false
+hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
+
diff --git a/docker/demo/config/test-suite/test-clustering-aggressive-clean-archival.properties b/docker/demo/config/test-suite/test-clustering-aggressive-clean-archival.properties
new file mode 100644
index 0000000000000..abddd77ba327a
--- /dev/null
+++ b/docker/demo/config/test-suite/test-clustering-aggressive-clean-archival.properties
@@ -0,0 +1,61 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.cleaner.commits.retained=5
+hoodie.keep.min.commits=9
+hoodie.keep.max.commits=10
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.clustering.inline=true
+hoodie.clustering.inline.max.commits=4
+hoodie.clustering.plan.strategy.sort.columns=_hoodie_partition_path,_row_key
+hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
+hoodie.clustering.plan.strategy.small.file.limit=629145600
+hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
+
+hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
+
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.database=testdb
+hoodie.datasource.hive_sync.table=table1
+hoodie.datasource.hive_sync.assume_date_partitioning=false
+hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
+
diff --git a/docker/demo/config/test-suite/test-clustering-metadata-aggressive-clean-archival.properties b/docker/demo/config/test-suite/test-clustering-metadata-aggressive-clean-archival.properties
new file mode 100644
index 0000000000000..931b1e3a09668
--- /dev/null
+++ b/docker/demo/config/test-suite/test-clustering-metadata-aggressive-clean-archival.properties
@@ -0,0 +1,63 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.cleaner.commits.retained=5
+hoodie.keep.min.commits=9
+hoodie.keep.max.commits=10
+
+hoodie.metadata.enable=true
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.clustering.inline=true
+hoodie.clustering.inline.max.commits=4
+hoodie.clustering.plan.strategy.sort.columns=_hoodie_partition_path,_row_key
+hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
+hoodie.clustering.plan.strategy.small.file.limit=629145600
+hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
+
+hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
+
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.database=testdb
+hoodie.datasource.hive_sync.table=table1
+hoodie.datasource.hive_sync.assume_date_partitioning=false
+hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
+
diff --git a/docker/demo/config/test-suite/test-clustering.properties b/docker/demo/config/test-suite/test-clustering.properties
new file mode 100644
index 0000000000000..9aa4843b2746e
--- /dev/null
+++ b/docker/demo/config/test-suite/test-clustering.properties
@@ -0,0 +1,57 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.clustering.inline=true
+hoodie.clustering.inline.max.commits=4
+hoodie.clustering.plan.strategy.sort.columns=_hoodie_partition_path,_row_key
+hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
+hoodie.clustering.plan.strategy.small.file.limit=629145600
+hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
+
+hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
+
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.database=testdb
+hoodie.datasource.hive_sync.table=table1
+hoodie.datasource.hive_sync.assume_date_partitioning=false
+hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
+
diff --git a/docker/demo/config/test-suite/test-metadata-aggressive-clean-archival.properties b/docker/demo/config/test-suite/test-metadata-aggressive-clean-archival.properties
new file mode 100644
index 0000000000000..8935ffb4264be
--- /dev/null
+++ b/docker/demo/config/test-suite/test-metadata-aggressive-clean-archival.properties
@@ -0,0 +1,56 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.cleaner.commits.retained=5
+hoodie.keep.min.commits=9
+hoodie.keep.max.commits=10
+
+hoodie.metadata.enable=true
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
+
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.database=testdb
+hoodie.datasource.hive_sync.table=table1
+hoodie.datasource.hive_sync.assume_date_partitioning=false
+hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
+
diff --git a/docker/demo/config/test-suite/test-metadata.properties b/docker/demo/config/test-suite/test-metadata.properties
new file mode 100644
index 0000000000000..48da77c511e93
--- /dev/null
+++ b/docker/demo/config/test-suite/test-metadata.properties
@@ -0,0 +1,56 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=100
+hoodie.upsert.shuffle.parallelism=100
+hoodie.bulkinsert.shuffle.parallelism=100
+
+hoodie.metadata.enable=true
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.clustering.plan.strategy.sort.columns=_row_key
+hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
+hoodie.clustering.inline.max.commits=1
+
+hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
+hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
+
+hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
+hoodie.datasource.hive_sync.database=testdb
+hoodie.datasource.hive_sync.table=table1
+hoodie.datasource.hive_sync.assume_date_partitioning=false
+hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
+hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
+
diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties
index 30cd1c1f02f09..509b9f4ba628e 100644
--- a/docker/demo/config/test-suite/test.properties
+++ b/docker/demo/config/test-suite/test.properties
@@ -19,6 +19,8 @@ hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
+hoodie.metadata.enable=false
+
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
@@ -32,10 +34,6 @@ hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
-hoodie.clustering.plan.strategy.sort.columns=_row_key
-hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
-hoodie.clustering.inline.max.commits=1
-
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 102fcc2ae7a63..1747a59f4f366 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -30,6 +30,7 @@
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.avro.generic.GenericRecord;
@@ -80,8 +81,7 @@ public String showArchivedCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- List records = blk.getRecords();
- readRecords.addAll(records);
+ blk.getRecordItr().forEachRemaining(readRecords::add);
}
List readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.filter(r -> r.get("actionType").toString().equals(HoodieTimeline.COMMIT_ACTION)
@@ -155,8 +155,9 @@ public String showCommits(
// read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- List records = blk.getRecords();
- readRecords.addAll(records);
+ try (ClosableIterator recordItr = blk.getRecordItr()) {
+ recordItr.forEachRemaining(readRecords::add);
+ }
}
List readCommits = readRecords.stream().map(r -> (GenericRecord) r)
.map(r -> readCommit(r, skipMetadata)).collect(Collectors.toList());
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
index 9adae1daa5336..4163f0cb5a6a4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ClusteringCommand.java
@@ -116,4 +116,40 @@ public String runClustering(
}
return "Succeeded to run clustering for " + clusteringInstantTime;
}
+
+ /**
+ * Run clustering table service.
+ *
+ * Example:
+ * > connect --path {path to hudi table}
+ * > clustering scheduleAndExecute --sparkMaster local --sparkMemory 2g
+ */
+ @CliCommand(value = "clustering scheduleAndExecute", help = "Run Clustering. Make a cluster plan first and execute that plan immediately")
+ public String runClustering(
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = SparkUtil.DEFAULT_SPARK_MASTER, help = "Spark master") final String master,
+ @CliOption(key = "sparkMemory", help = "Spark executor memory", unspecifiedDefaultValue = "4g") final String sparkMemory,
+ @CliOption(key = "parallelism", help = "Parallelism for hoodie clustering", unspecifiedDefaultValue = "1") final String parallelism,
+ @CliOption(key = "retry", help = "Number of retries", unspecifiedDefaultValue = "1") final String retry,
+ @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for "
+ + "hoodie client for compacting", unspecifiedDefaultValue = "") final String propsFilePath,
+ @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be "
+ + "passed here in the form of an array", unspecifiedDefaultValue = "") final String[] configs) throws Exception {
+ HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
+ boolean initialized = HoodieCLI.initConf();
+ HoodieCLI.initFS(initialized);
+
+ String sparkPropertiesPath =
+ Utils.getDefaultPropertiesFile(JavaConverters.mapAsScalaMapConverter(System.getenv()).asScala());
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+ sparkLauncher.addAppArgs(SparkCommand.CLUSTERING_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory,
+ client.getBasePath(), client.getTableConfig().getTableName(), parallelism, retry, propsFilePath);
+ UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ return "Failed to run clustering for scheduleAndExecute.";
+ }
+ return "Succeeded to run clustering for scheduleAndExecute";
+ }
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 9517234a0bb60..db1cd207df5a1 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -232,7 +232,9 @@ public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to roll
@CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
@CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
- help = "Spark executor memory") final String sparkMemory)
+ help = "Spark executor memory") final String sparkMemory,
+ @CliOption(key = "rollbackUsingMarkers", unspecifiedDefaultValue = "true",
+ help = "Enabling marker based rollback") final String rollbackUsingMarkers)
throws Exception {
HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline completedTimeline = activeTimeline.getCommitsTimeline().filterCompletedInstants();
@@ -243,7 +245,7 @@ public String rollbackCommit(@CliOption(key = {"commit"}, help = "Commit to roll
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK.toString(), master, sparkMemory, instantTime,
- HoodieCLI.getTableMetaClient().getBasePath());
+ HoodieCLI.getTableMetaClient().getBasePath(), rollbackUsingMarkers);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index b6a366bbb75ef..097c68a542c47 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -264,6 +264,41 @@ public String compact(
return "Compaction successfully completed for " + compactionInstantTime;
}
+ @CliCommand(value = "compaction scheduleAndExecute", help = "Schedule compaction plan and execute this plan")
+ public String compact(
+ @CliOption(key = {"parallelism"}, mandatory = true,
+ help = "Parallelism for hoodie compaction") final String parallelism,
+ @CliOption(key = "schemaFilePath", mandatory = true,
+ help = "Path for Avro schema file") final String schemaFilePath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local",
+ help = "Spark Master") String master,
+ @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+ help = "Spark executor memory") final String sparkMemory,
+ @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number of retries") final String retry,
+ @CliOption(key = "propsFilePath", help = "path to properties file on localfs or dfs with configurations for hoodie client for compacting",
+ unspecifiedDefaultValue = "") final String propsFilePath,
+ @CliOption(key = "hoodieConfigs", help = "Any configuration that can be set in the properties file can be passed here in the form of an array",
+ unspecifiedDefaultValue = "") final String[] configs)
+ throws Exception {
+ HoodieTableMetaClient client = checkAndGetMetaClient();
+ boolean initialized = HoodieCLI.initConf();
+ HoodieCLI.initFS(initialized);
+ String sparkPropertiesPath =
+ Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+ sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE_AND_EXECUTE.toString(), master, sparkMemory, client.getBasePath(),
+ client.getTableConfig().getTableName(), parallelism, schemaFilePath,
+ retry, propsFilePath);
+ UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ return "Failed to schedule and execute compaction ";
+ }
+ return "Schedule and execute compaction successfully completed";
+ }
+
/**
* Prints all compaction details.
*/
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index 119ccb0dcf039..1d8d6dcd6ae93 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -34,14 +34,16 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.exception.HoodieException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.springframework.shell.core.CommandMarker;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
@@ -69,8 +71,8 @@ public class ExportCommand implements CommandMarker {
@CliCommand(value = "export instants", help = "Export Instants and their metadata from the Timeline")
public String exportInstants(
@CliOption(key = {"limit"}, help = "Limit Instants", unspecifiedDefaultValue = "-1") final Integer limit,
- @CliOption(key = {"actions"}, help = "Comma seperated list of Instant actions to export",
- unspecifiedDefaultValue = "clean,commit,deltacommit,rollback,savepoint,restore") final String filter,
+ @CliOption(key = {"actions"}, help = "Comma separated list of Instant actions to export",
+ unspecifiedDefaultValue = "clean,commit,deltacommit,rollback,savepoint,restore") final String filter,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"localFolder"}, help = "Local Folder to export to", mandatory = true) String localFolder)
throws Exception {
@@ -122,44 +124,46 @@ private int copyArchivedInstants(List statuses, Set actionSe
// read the avro blocks
while (reader.hasNext() && copyCount < limit) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
- for (IndexedRecord ir : blk.getRecords()) {
- // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
- // metadata record from the entry and convert it to json.
- HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
- .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
-
- final String action = archiveEntryRecord.get("actionType").toString();
- if (!actionSet.contains(action)) {
- continue;
- }
-
- GenericRecord metadata = null;
- switch (action) {
- case HoodieTimeline.CLEAN_ACTION:
- metadata = archiveEntryRecord.getHoodieCleanMetadata();
+ try (ClosableIterator recordItr = blk.getRecordItr()) {
+ while (recordItr.hasNext()) {
+ IndexedRecord ir = recordItr.next();
+ // Archived instants are saved as arvo encoded HoodieArchivedMetaEntry records. We need to get the
+ // metadata record from the entry and convert it to json.
+ HoodieArchivedMetaEntry archiveEntryRecord = (HoodieArchivedMetaEntry) SpecificData.get()
+ .deepCopy(HoodieArchivedMetaEntry.SCHEMA$, ir);
+ final String action = archiveEntryRecord.get("actionType").toString();
+ if (!actionSet.contains(action)) {
+ continue;
+ }
+
+ GenericRecord metadata = null;
+ switch (action) {
+ case HoodieTimeline.CLEAN_ACTION:
+ metadata = archiveEntryRecord.getHoodieCleanMetadata();
+ break;
+ case HoodieTimeline.COMMIT_ACTION:
+ case HoodieTimeline.DELTA_COMMIT_ACTION:
+ metadata = archiveEntryRecord.getHoodieCommitMetadata();
+ break;
+ case HoodieTimeline.ROLLBACK_ACTION:
+ metadata = archiveEntryRecord.getHoodieRollbackMetadata();
+ break;
+ case HoodieTimeline.SAVEPOINT_ACTION:
+ metadata = archiveEntryRecord.getHoodieSavePointMetadata();
+ break;
+ case HoodieTimeline.COMPACTION_ACTION:
+ metadata = archiveEntryRecord.getHoodieCompactionMetadata();
+ break;
+ default:
+ throw new HoodieException("Unknown type of action " + action);
+ }
+
+ final String instantTime = archiveEntryRecord.get("commitTime").toString();
+ final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action;
+ writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
+ if (++copyCount == limit) {
break;
- case HoodieTimeline.COMMIT_ACTION:
- case HoodieTimeline.DELTA_COMMIT_ACTION:
- metadata = archiveEntryRecord.getHoodieCommitMetadata();
- break;
- case HoodieTimeline.ROLLBACK_ACTION:
- metadata = archiveEntryRecord.getHoodieRollbackMetadata();
- break;
- case HoodieTimeline.SAVEPOINT_ACTION:
- metadata = archiveEntryRecord.getHoodieSavePointMetadata();
- break;
- case HoodieTimeline.COMPACTION_ACTION:
- metadata = archiveEntryRecord.getHoodieCompactionMetadata();
- break;
- default:
- throw new HoodieException("Unknown type of action " + action);
- }
-
- final String instantTime = archiveEntryRecord.get("commitTime").toString();
- final String outPath = localFolder + Path.SEPARATOR + instantTime + "." + action;
- writeToFile(outPath, HoodieAvroUtils.avroToJson(metadata, true));
- if (++copyCount == limit) {
- break;
+ }
}
}
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 27bcd81faefec..4a56858f3926a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -37,6 +37,7 @@
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
@@ -60,6 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -100,7 +102,7 @@ public String showLogFileCommits(
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
String instantTime;
- int recordCount = 0;
+ AtomicInteger recordCount = new AtomicInteger(0);
if (n instanceof HoodieCorruptBlock) {
try {
instantTime = n.getLogBlockHeader().get(HeaderMetadataType.INSTANT_TIME);
@@ -120,17 +122,19 @@ public String showLogFileCommits(
instantTime = "dummy_instant_time_" + dummyInstantTimeCount;
}
if (n instanceof HoodieDataBlock) {
- recordCount = ((HoodieDataBlock) n).getRecords().size();
+ try (ClosableIterator recordItr = ((HoodieDataBlock) n).getRecordItr()) {
+ recordItr.forEachRemaining(r -> recordCount.incrementAndGet());
+ }
}
}
if (commitCountAndMetadata.containsKey(instantTime)) {
commitCountAndMetadata.get(instantTime).add(
- new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
+ new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
} else {
List, Map>, Integer>> list =
new ArrayList<>();
list.add(
- new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount));
+ new Tuple3<>(n.getBlockType(), new Tuple2<>(n.getLogBlockHeader(), n.getLogBlockFooter()), recordCount.get()));
commitCountAndMetadata.put(instantTime, list);
}
}
@@ -232,11 +236,12 @@ public String showLogFileRecords(
HoodieLogBlock n = reader.next();
if (n instanceof HoodieDataBlock) {
HoodieDataBlock blk = (HoodieDataBlock) n;
- List records = blk.getRecords();
- for (IndexedRecord record : records) {
- if (allRecords.size() < limit) {
- allRecords.add(record);
- }
+ try (ClosableIterator recordItr = blk.getRecordItr()) {
+ recordItr.forEachRemaining(record -> {
+ if (allRecords.size() < limit) {
+ allRecords.add(record);
+ }
+ });
}
}
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MarkersCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MarkersCommand.java
new file mode 100644
index 0000000000000..57a4ee1879855
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MarkersCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.utils.InputStreamConsumer;
+import org.apache.hudi.cli.utils.SparkUtil;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.spark.launcher.SparkLauncher;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+/**
+ * CLI command for marker options.
+ */
+@Component
+public class MarkersCommand implements CommandMarker {
+
+ @CliCommand(value = "marker delete", help = "Delete the marker")
+ public String deleteMarker(@CliOption(key = {"commit"}, help = "Delete a marker") final String instantTime,
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File Path") final String sparkPropertiesPath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = "Spark Master") String master,
+ @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "1G",
+ help = "Spark executor memory") final String sparkMemory)
+ throws Exception {
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+ sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_MARKER.toString(), master, sparkMemory, instantTime,
+ metaClient.getBasePath());
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ // Refresh the current
+ HoodieCLI.refreshTableMetadata();
+ if (exitCode != 0) {
+ return String.format("Failed: Could not delete marker \"%s\".", instantTime);
+ }
+ return String.format("Marker \"%s\" deleted.", instantTime);
+ }
+}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index 2533562d8206e..6c068c898b9be 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -45,7 +45,6 @@
import org.springframework.stereotype.Component;
import scala.collection.JavaConverters;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
@@ -153,10 +152,12 @@ public String overwriteHoodieProperties(
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
Properties newProps = new Properties();
- newProps.load(new FileInputStream(new File(overwriteFilePath)));
+ newProps.load(new FileInputStream(overwriteFilePath));
Map oldProps = client.getTableConfig().propsMap();
Path metaPathDir = new Path(client.getBasePath(), METAFOLDER_NAME);
HoodieTableConfig.create(client.getFs(), metaPathDir, newProps);
+ // reload new props as checksum would have been added
+ newProps = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().getProps();
TreeSet allPropKeys = new TreeSet<>();
allPropKeys.addAll(newProps.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index d1ee109f59042..0de1a1adfe0be 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -18,6 +18,7 @@
package org.apache.hudi.cli.commands;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.DedupeSparkJob;
@@ -25,6 +26,7 @@
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -38,7 +40,9 @@
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
+import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.utilities.HDFSParquetImporter;
@@ -51,8 +55,6 @@
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
-
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
@@ -74,9 +76,9 @@ public class SparkMain {
* Commands.
*/
enum SparkCommand {
- BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN,
+ BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLUSTERING_SCHEDULE,
- CLUSTERING_RUN, CLEAN, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
+ CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, DELETE_SAVEPOINT, UPGRADE, DOWNGRADE
}
public static void main(String[] args) throws Exception {
@@ -92,8 +94,8 @@ public static void main(String[] args) throws Exception {
try {
switch (cmd) {
case ROLLBACK:
- assert (args.length == 5);
- returnCode = rollback(jsc, args[3], args[4]);
+ assert (args.length == 6);
+ returnCode = rollback(jsc, args[3], args[4], Boolean.parseBoolean(args[5]));
break;
case DEDUPLICATE:
assert (args.length == 8);
@@ -128,7 +130,21 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = compact(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[7],
- Integer.parseInt(args[8]), false, propsFilePath, configs);
+ Integer.parseInt(args[8]), HoodieCompactor.EXECUTE, propsFilePath, configs);
+ break;
+ case COMPACT_SCHEDULE_AND_EXECUTE:
+ assert (args.length >= 9);
+ propsFilePath = null;
+ if (!StringUtils.isNullOrEmpty(args[8])) {
+ propsFilePath = args[8];
+ }
+ configs = new ArrayList<>();
+ if (args.length > 9) {
+ configs.addAll(Arrays.asList(args).subList(8, args.length));
+ }
+
+ returnCode = compact(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[6],
+ Integer.parseInt(args[7]), HoodieCompactor.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case COMPACT_SCHEDULE:
assert (args.length >= 7);
@@ -140,7 +156,7 @@ public static void main(String[] args) throws Exception {
if (args.length > 7) {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
- returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true, propsFilePath, configs);
+ returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, HoodieCompactor.SCHEDULE, propsFilePath, configs);
break;
case COMPACT_VALIDATE:
assert (args.length == 7);
@@ -176,7 +192,20 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], Integer.parseInt(args[6]), args[2],
- Integer.parseInt(args[7]), false, propsFilePath, configs);
+ Integer.parseInt(args[7]), HoodieClusteringJob.EXECUTE, propsFilePath, configs);
+ break;
+ case CLUSTERING_SCHEDULE_AND_EXECUTE:
+ assert (args.length >= 8);
+ propsFilePath = null;
+ if (!StringUtils.isNullOrEmpty(args[7])) {
+ propsFilePath = args[7];
+ }
+ configs = new ArrayList<>();
+ if (args.length > 8) {
+ configs.addAll(Arrays.asList(args).subList(8, args.length));
+ }
+ returnCode = cluster(jsc, args[3], args[4], null, Integer.parseInt(args[5]), args[2],
+ Integer.parseInt(args[6]), HoodieClusteringJob.SCHEDULE_AND_EXECUTE, propsFilePath, configs);
break;
case CLUSTERING_SCHEDULE:
assert (args.length >= 7);
@@ -189,7 +218,7 @@ public static void main(String[] args) throws Exception {
configs.addAll(Arrays.asList(args).subList(7, args.length));
}
returnCode = cluster(jsc, args[3], args[4], args[5], 1, args[2],
- 0, true, propsFilePath, configs);
+ 0, HoodieClusteringJob.SCHEDULE, propsFilePath, configs);
break;
case CLEAN:
assert (args.length >= 5);
@@ -207,6 +236,10 @@ public static void main(String[] args) throws Exception {
assert (args.length == 7);
returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]);
break;
+ case DELETE_MARKER:
+ assert (args.length == 5);
+ returnCode = deleteMarker(jsc, args[3], args[4]);
+ break;
case DELETE_SAVEPOINT:
assert (args.length == 5);
returnCode = deleteSavepoint(jsc, args[3], args[4]);
@@ -250,6 +283,21 @@ protected static void clean(JavaSparkContext jsc, String basePath, String propsF
new HoodieCleaner(cfg, jsc).run();
}
+ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, String basePath) {
+ try {
+ SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
+ HoodieWriteConfig config = client.getConfig();
+ HoodieEngineContext context = client.getEngineContext();
+ HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
+ WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
+ .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
+ return 0;
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed: Could not clean marker instantTime: \"%s\".", instantTime), e);
+ return -1;
+ }
+ }
+
private static int dataLoad(JavaSparkContext jsc, String command, String srcPath, String targetPath, String tableName,
String tableType, String rowKey, String partitionKey, int parallelism, String schemaFile,
int retry, String propsFilePath, List configs) {
@@ -320,7 +368,7 @@ private static void doCompactUnscheduleFile(JavaSparkContext jsc, String basePat
}
private static int compact(JavaSparkContext jsc, String basePath, String tableName, String compactionInstant,
- int parallelism, String schemaFile, int retry, boolean schedule, String propsFilePath,
+ int parallelism, String schemaFile, int retry, String mode, String propsFilePath,
List configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath;
@@ -330,20 +378,20 @@ private static int compact(JavaSparkContext jsc, String basePath, String tableNa
cfg.strategyClassName = UnBoundedCompactionStrategy.class.getCanonicalName();
cfg.parallelism = parallelism;
cfg.schemaFile = schemaFile;
- cfg.runSchedule = schedule;
+ cfg.runningMode = mode;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
return new HoodieCompactor(jsc, cfg).compact(retry);
}
private static int cluster(JavaSparkContext jsc, String basePath, String tableName, String clusteringInstant,
- int parallelism, String sparkMemory, int retry, boolean schedule, String propsFilePath, List configs) {
+ int parallelism, String sparkMemory, int retry, String runningMode, String propsFilePath, List configs) {
HoodieClusteringJob.Config cfg = new HoodieClusteringJob.Config();
cfg.basePath = basePath;
cfg.tableName = tableName;
cfg.clusteringInstantTime = clusteringInstant;
cfg.parallelism = parallelism;
- cfg.runSchedule = schedule;
+ cfg.runningMode = runningMode;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
jsc.getConf().set("spark.executor.memory", sparkMemory);
@@ -394,8 +442,8 @@ private static int doBootstrap(JavaSparkContext jsc, String tableName, String ta
return 0;
}
- private static int rollback(JavaSparkContext jsc, String instantTime, String basePath) throws Exception {
- SparkRDDWriteClient client = createHoodieClient(jsc, basePath);
+ private static int rollback(JavaSparkContext jsc, String instantTime, String basePath, Boolean rollbackUsingMarkers) throws Exception {
+ SparkRDDWriteClient client = createHoodieClient(jsc, basePath, rollbackUsingMarkers);
if (client.rollback(instantTime)) {
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
return 0;
@@ -425,7 +473,7 @@ private static int rollbackToSavepoint(JavaSparkContext jsc, String savepointTim
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
return 0;
} catch (Exception e) {
- LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime));
+ LOG.warn(String.format("The commit \"%s\" failed to roll back.", savepointTime), e);
return -1;
}
}
@@ -437,7 +485,7 @@ private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, S
LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
return 0;
} catch (Exception e) {
- LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime));
+ LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".", savepointTime), e);
return -1;
}
}
@@ -452,11 +500,12 @@ private static int deleteSavepoint(JavaSparkContext jsc, String savepointTime, S
* @throws Exception
*/
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePath, String toVersion) {
- HoodieWriteConfig config = getWriteConfig(basePath);
+ HoodieWriteConfig config = getWriteConfig(basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()));
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
try {
new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.valueOf(toVersion), null);
@@ -468,13 +517,18 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa
}
}
- private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
- HoodieWriteConfig config = getWriteConfig(basePath);
+ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath, Boolean rollbackUsingMarkers) throws Exception {
+ HoodieWriteConfig config = getWriteConfig(basePath, rollbackUsingMarkers);
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
}
- private static HoodieWriteConfig getWriteConfig(String basePath) {
+ private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
+ return createHoodieClient(jsc, basePath, Boolean.parseBoolean(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.defaultValue()));
+ }
+
+ private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withRollbackUsingMarkers(rollbackUsingMarkers)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index a95cc53df329c..5d58aa9d2e498 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -25,9 +25,6 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import java.io.IOException;
-import java.text.ParseException;
-import java.time.Instant;
-import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.List;
@@ -53,16 +50,4 @@ public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.formatDate(date);
}
-
- /**
- * Add hours to specified time. If hours <0, this acts as remove hours.
- * example, say compactionCommitTime: "20200202020000"
- * a) hours: +1, returns 20200202030000
- * b) hours: -1, returns 20200202010000
- */
- public static String addHours(String compactionCommitTime, int hours) throws ParseException {
- Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant();
- ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
- return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant()));
- }
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
index bbd8440448fd6..6f5a11ad6657f 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
@@ -20,8 +20,6 @@
import org.apache.hudi.exception.HoodieException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -31,27 +29,34 @@
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
+import org.springframework.shell.support.logging.HandlerUtils;
import java.util.List;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.Collectors;
public class SparkTempViewProvider implements TempViewProvider {
- private static final Logger LOG = LogManager.getLogger(SparkTempViewProvider.class);
+ private static final Logger LOG = HandlerUtils.getLogger(SparkTempViewProvider.class);
private JavaSparkContext jsc;
private SQLContext sqlContext;
public SparkTempViewProvider(String appName) {
try {
+ Handler handler = LOG.getParent().getHandlers()[0];
SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
jsc = new JavaSparkContext(sparkConf);
- jsc.setLogLevel("ERROR");
-
sqlContext = new SQLContext(jsc);
+ if (handler != null) {
+ LOG.getParent().removeHandler(LOG.getParent().getHandlers()[0]);
+ LOG.getParent().addHandler(handler);
+ }
} catch (Throwable ex) {
// log full stack trace and rethrow. Without this its difficult to debug failures, if any
- LOG.error("unable to initialize spark context ", ex);
+ LOG.log(Level.WARNING, "unable to initialize spark context ", ex);
throw new HoodieException(ex);
}
}
@@ -90,7 +95,7 @@ public void createOrReplace(String tableName, List headers, List entry : data.entrySet()) {
String key = entry.getKey();
Integer[] value = entry.getValue();
@@ -279,8 +295,8 @@ public void testShowArchivedCommitsWithMultiCommitsFile() throws Exception {
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
// need to create multi archive files
- HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
- archiveLog.archiveIfRequired(context());
+ HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+ archiver.archiveIfRequired(context());
}
CommandResult cr = shell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "160", "174"));
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
index 21841a5769450..17c1002f6b0dd 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
@@ -24,7 +24,9 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -35,12 +37,13 @@
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.CompactionTestUtils;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
+import org.apache.hudi.client.HoodieTimelineArchiver;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
@@ -152,7 +155,11 @@ private void generateCompactionInstances() throws IOException {
activeTimeline.transitionCompactionInflightToComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION, timestamp), Option.empty());
});
-
+ // Simulate a compaction commit in metadata table timeline
+ // so the archival in data table can happen
+ HoodieTestUtils.createCompactionCommitInMetadataTable(hadoopConf(),
+ new HoodieWrapperFileSystem(
+ FSUtils.getFs(tablePath, hadoopConf()), new NoOpConsistencyGuard()), tablePath, "007");
}
private void generateArchive() throws IOException {
@@ -162,13 +169,12 @@ private void generateArchive() throws IOException {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// archive
HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieSparkTable table = HoodieSparkTable.create(cfg, context(), metaClient);
- HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, table);
- archiveLog.archiveIfRequired(context());
+ HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+ archiver.archiveIfRequired(context());
}
/**
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index f2571ce3598d6..ee7fbda11b783 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -108,7 +108,7 @@ public void init() throws IOException, InterruptedException, URISyntaxException
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header);
+ dataBlock = new HoodieAvroDataBlock(records, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
}
}
@@ -188,7 +188,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
Map header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
} finally {
if (writer != null) {
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
index 048b2a20e6b2c..27cc31ccea2cf 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java
@@ -39,7 +39,6 @@
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
@@ -51,6 +50,14 @@
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
+import static org.apache.hudi.common.table.HoodieTableConfig.NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
+import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_LAYOUT_VERSION;
+import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
+import static org.apache.hudi.common.table.HoodieTableConfig.VERSION;
+import static org.apache.hudi.common.table.HoodieTableConfig.generateChecksum;
+import static org.apache.hudi.common.table.HoodieTableConfig.validateChecksum;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -104,7 +111,7 @@ public void testAddPartitionMetaWithDryRun() throws IOException {
// expected all 'No'.
String[][] rows = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath)
.stream()
- .map(partition -> new String[]{partition, "No", "None"})
+ .map(partition -> new String[] {partition, "No", "None"})
.toArray(String[][]::new);
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
@@ -135,7 +142,7 @@ public void testAddPartitionMetaWithRealRun() throws IOException {
List paths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, tablePath);
// after dry run, the action will be 'Repaired'
String[][] rows = paths.stream()
- .map(partition -> new String[]{partition, "No", "Repaired"})
+ .map(partition -> new String[] {partition, "No", "Repaired"})
.toArray(String[][]::new);
String expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
@@ -147,7 +154,7 @@ public void testAddPartitionMetaWithRealRun() throws IOException {
// after real run, Metadata is present now.
rows = paths.stream()
- .map(partition -> new String[]{partition, "Yes", "None"})
+ .map(partition -> new String[] {partition, "Yes", "None"})
.toArray(String[][]::new);
expected = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_PARTITION_PATH,
HoodieTableHeaderFields.HEADER_METADATA_PRESENT, HoodieTableHeaderFields.HEADER_ACTION}, rows);
@@ -170,19 +177,24 @@ public void testOverwriteHoodieProperties() throws IOException {
Map oldProps = HoodieCLI.getTableMetaClient().getTableConfig().propsMap();
// after overwrite, the stored value in .hoodie is equals to which read from properties.
- Map result = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig().propsMap();
+ HoodieTableConfig tableConfig = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()).getTableConfig();
+ Map result = tableConfig.propsMap();
+ // validate table checksum
+ assertTrue(result.containsKey(TABLE_CHECKSUM.key()));
+ assertTrue(validateChecksum(tableConfig.getProps()));
Properties expectProps = new Properties();
- expectProps.load(new FileInputStream(new File(newProps.getPath())));
+ expectProps.load(new FileInputStream(newProps.getPath()));
Map expected = expectProps.entrySet().stream()
.collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())));
+ expected.putIfAbsent(TABLE_CHECKSUM.key(), String.valueOf(generateChecksum(tableConfig.getProps())));
assertEquals(expected, result);
// check result
- List allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", "hoodie.table.version",
- "hoodie.archivelog.folder", "hoodie.timeline.layout.version");
- String[][] rows = allPropsStr.stream().sorted().map(key -> new String[]{key,
- oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")})
+ List allPropsStr = Arrays.asList(NAME.key(), TYPE.key(), VERSION.key(),
+ ARCHIVELOG_FOLDER.key(), TIMELINE_LAYOUT_VERSION.key(), TABLE_CHECKSUM.key());
+ String[][] rows = allPropsStr.stream().sorted().map(key -> new String[] {key,
+ oldProps.getOrDefault(key, "null"), result.getOrDefault(key, "null")})
.toArray(String[][]::new);
String expect = HoodiePrintHelper.print(new String[] {HoodieTableHeaderFields.HEADER_HOODIE_PROPERTY,
HoodieTableHeaderFields.HEADER_OLD_VALUE, HoodieTableHeaderFields.HEADER_NEW_VALUE}, rows);
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
index 17bc48f66f0c4..9a10893b35e89 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRollbacksCommand.java
@@ -24,7 +24,7 @@
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
-import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -93,7 +93,7 @@ public void init() throws Exception {
.withRollbackUsingMarkers(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
- try (AbstractHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
+ try (BaseHoodieWriteClient client = new SparkRDDWriteClient(context(), config)) {
// Rollback inflight commit3 and commit2
client.rollback("102");
client.rollback("101");
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
index cba6d901b956d..b3650fa027626 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java
@@ -106,7 +106,7 @@ public void testDowngradeCommand() throws Exception {
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode());
assertTableVersionFromPropertyFile();
- // verify marker files are non existant
+ // verify marker files are non existent
for (String partitionPath : DEFAULT_PARTITION_PATHS) {
assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE));
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java
new file mode 100644
index 0000000000000..17075f9d3dfb6
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestClusteringCommand.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link org.apache.hudi.cli.commands.ClusteringCommand}.
+ *
+ * A command use SparkLauncher need load jars under lib which generate during mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestClusteringCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+ private String tableName;
+
+ @BeforeEach
+ public void init() throws IOException {
+ tableName = "test_table_" + ITTestClusteringCommand.class.getName();
+ tablePath = Paths.get(basePath, tableName).toString();
+
+ HoodieCLI.conf = jsc.hadoopConfiguration();
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+ metaClient.setBasePath(tablePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ }
+
+ /**
+ * Test case for command 'clustering schedule'.
+ */
+ @Test
+ public void testScheduleClustering() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr = scheduleClustering();
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertTrue(
+ cr.getResult().toString().startsWith("Succeeded to schedule clustering for")));
+
+ // there is 1 requested clustering
+ HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(1, timeline.filterPendingReplaceTimeline().countInstants());
+ }
+
+ /**
+ * Test case for command 'clustering run'.
+ */
+ @Test
+ public void testClustering() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr1 = scheduleClustering();
+ assertTrue(cr1.isSuccess());
+
+ // get clustering instance
+ HoodieActiveTimeline timeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
+ Option instance =
+ timeline.filterPendingReplaceTimeline().firstInstant().map(HoodieInstant::getTimestamp);
+ assertTrue(instance.isPresent(), "Must have pending clustering.");
+
+ CommandResult cr2 = getShell().executeCommand(
+ String.format("clustering run --parallelism %s --clusteringInstant %s --sparkMaster %s",
+ 2, instance, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr2.isSuccess()),
+ () -> assertTrue(
+ cr2.getResult().toString().startsWith("Succeeded to run clustering for ")));
+
+ // assert clustering complete
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .filterCompletedInstants().getInstants()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
+ "Pending clustering must be completed");
+
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .getCompletedReplaceTimeline().getInstants()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
+ "Pending clustering must be completed");
+ }
+
+ /**
+ * Test case for command 'clustering scheduleAndExecute'.
+ */
+ @Test
+ public void testClusteringScheduleAndExecute() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr2 = getShell().executeCommand(
+ String.format("clustering scheduleAndExecute --parallelism %s --sparkMaster %s", 2, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr2.isSuccess()),
+ () -> assertTrue(
+ cr2.getResult().toString().startsWith("Succeeded to run clustering for scheduleAndExecute")));
+
+ // assert clustering complete
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .getCompletedReplaceTimeline().getInstants()
+ .map(HoodieInstant::getTimestamp).count() > 0,
+ "Completed clustering couldn't be 0");
+ }
+
+ private CommandResult scheduleClustering() {
+ // generate requested clustering
+ return getShell().executeCommand(
+ String.format("clustering schedule --hoodieConfigs hoodie.clustering.inline.max.commits=1 --sparkMaster %s", "local"));
+ }
+
+ private void generateCommits() throws IOException {
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // Create the write client to write some records in
+ HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
+ .withDeleteParallelism(2).forTable(tableName)
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+
+ SparkRDDWriteClient client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
+
+ insert(jsc, client, dataGen, "001");
+ insert(jsc, client, dataGen, "002");
+ }
+
+ private List insert(JavaSparkContext jsc, SparkRDDWriteClient client,
+ HoodieTestDataGenerator dataGen, String newCommitTime) throws IOException {
+ // inserts
+ client.startCommitWithTime(newCommitTime);
+
+ List records = dataGen.generateInserts(newCommitTime, 10);
+ JavaRDD writeRecords = jsc.parallelize(records, 1);
+ operateFunc(SparkRDDWriteClient::insert, client, writeRecords, newCommitTime);
+ return records;
+ }
+
+ private JavaRDD operateFunc(
+ HoodieClientTestBase.Function3, SparkRDDWriteClient, JavaRDD, String> writeFn,
+ SparkRDDWriteClient client, JavaRDD writeRecords, String commitTime)
+ throws IOException {
+ return writeFn.apply(client, writeRecords, commitTime);
+ }
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java
index b3c5c06be9a29..18f4a387d474e 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCommitsCommand.java
@@ -100,5 +100,18 @@ public void testRollbackCommit() throws Exception {
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
assertEquals(2, timeline.getCommitsTimeline().countInstants(), "There should have 2 instants.");
+
+ // rollback complete commit
+ CommandResult cr2 = getShell().executeCommand(String.format("commit rollback --commit %s --sparkMaster %s --sparkMemory %s",
+ "101", "local", "4G"));
+ assertTrue(cr2.isSuccess());
+
+ metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+
+ HoodieActiveTimeline rollbackTimeline2 = new RollbacksCommand.RollbackTimeline(metaClient);
+ assertEquals(1, rollbackTimeline2.getRollbackTimeline().countInstants(), "There should have 2 rollback instant.");
+
+ HoodieActiveTimeline timeline2 = metaClient.reloadActiveTimeline();
+ assertEquals(2, timeline2.getCommitsTimeline().countInstants(), "There should have 1 instants.");
}
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
index 37a2098d0cd18..4734f45e7074b 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
@@ -140,6 +140,33 @@ public void testCompact() throws IOException {
"Pending compaction must be completed");
}
+ /**
+ * Test case for command 'compaction scheduleAndExecute'.
+ */
+ @Test
+ public void testCompactScheduleAndExecute() throws IOException {
+ // generate commits
+ generateCommits();
+
+ String schemaPath = Paths.get(basePath, "compaction.schema").toString();
+ writeSchemaToTmpFile(schemaPath);
+
+ CommandResult cr2 = getShell().executeCommand(
+ String.format("compaction scheduleAndExecute --parallelism %s --schemaFilePath %s --sparkMaster %s",
+ 2, schemaPath, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr2.isSuccess()),
+ () -> assertTrue(
+ cr2.getResult().toString().startsWith("Schedule and execute compaction successfully completed")));
+
+ // assert compaction complete
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .filterCompletedInstants().getInstants()
+ .map(HoodieInstant::getTimestamp).count() > 0,
+ "Completed compaction couldn't be 0");
+ }
+
/**
* Test case for command 'compaction validate'.
*/
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
new file mode 100644
index 0000000000000..221a29f5250d2
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestMarkersCommand.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link org.apache.hudi.cli.commands.MarkersCommand}.
+ *
+ * A command use SparkLauncher need load jars under lib which generate during mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestMarkersCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+
+ @BeforeEach
+ public void init() throws IOException {
+ String tableName = "test_table";
+ tablePath = basePath + Path.SEPARATOR + tableName;
+
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
+ }
+
+ /**
+ * Test case of command 'marker delete'.
+ */
+ @Test
+ public void testDeleteMarker() throws IOException {
+ // generate markers
+ String instantTime1 = "101";
+
+ FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f0", IOType.APPEND);
+ FileCreateUtils.createMarkerFile(tablePath, "partA", instantTime1, "f1", IOType.APPEND);
+
+ assertEquals(2, FileCreateUtils.getTotalMarkerFileCount(tablePath, "partA", instantTime1, IOType.APPEND));
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("marker delete --commit %s --sparkMaster %s", instantTime1, "local"));
+ assertTrue(cr.isSuccess());
+
+ assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, "partA", instantTime1, IOType.APPEND));
+ }
+}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
index 105a9f639c792..f59dca4e1ea9f 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
@@ -73,9 +73,9 @@ public static void createCommitFileWithMetadata(String basePath, String commitTi
}
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
- Option writes, Option updates, Map extraMetdata) throws Exception {
+ Option writes, Option updates, Map extraMetadata) throws Exception {
createCommitFileWithMetadata(basePath, commitTime, configuration, UUID.randomUUID().toString(),
- UUID.randomUUID().toString(), writes, updates, extraMetdata);
+ UUID.randomUUID().toString(), writes, updates, extraMetadata);
}
public static void createCommitFileWithMetadata(String basePath, String commitTime, Configuration configuration,
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/SparkUtilTest.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/SparkUtilTest.java
new file mode 100644
index 0000000000000..4966438292949
--- /dev/null
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/SparkUtilTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.testutils;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.cli.utils.SparkUtil;
+import org.apache.spark.SparkConf;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SparkUtilTest {
+ @Test
+ public void testGetDefaultSparkConf() {
+ SparkConf sparkConf = SparkUtil.getDefaultConf("test-spark-app", Option.of(""));
+ assertEquals(SparkUtil.DEFAULT_SPARK_MASTER, sparkConf.get("spark.master"));
+ }
+}
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index a9209f5534df8..a55a136652728 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -163,7 +163,6 @@
org.awaitilityawaitility
- 3.1.2test
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java
new file mode 100644
index 0000000000000..3fdc21dd21683
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncArchiveService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Async archive service to run concurrently with write operation.
+ */
+public class AsyncArchiveService extends HoodieAsyncTableService {
+
+ private static final Logger LOG = LogManager.getLogger(AsyncArchiveService.class);
+
+ private final BaseHoodieWriteClient writeClient;
+ private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ protected AsyncArchiveService(BaseHoodieWriteClient writeClient) {
+ super(writeClient.getConfig());
+ this.writeClient = writeClient;
+ }
+
+ @Override
+ protected Pair startService() {
+ LOG.info("Starting async archive service...");
+ return Pair.of(CompletableFuture.supplyAsync(() -> {
+ writeClient.archive();
+ return true;
+ }, executor), executor);
+ }
+
+ public static AsyncArchiveService startAsyncArchiveIfEnabled(BaseHoodieWriteClient writeClient) {
+ HoodieWriteConfig config = writeClient.getConfig();
+ if (!config.isAutoArchive() || !config.isAsyncArchive()) {
+ LOG.info("The HoodieWriteClient is not configured to auto & async archive. Async archive service will not start.");
+ return null;
+ }
+ AsyncArchiveService asyncArchiveService = new AsyncArchiveService(writeClient);
+ asyncArchiveService.start(null);
+ return asyncArchiveService;
+ }
+
+ public static void waitForCompletion(AsyncArchiveService asyncArchiveService) {
+ if (asyncArchiveService != null) {
+ LOG.info("Waiting for async archive service to finish");
+ try {
+ asyncArchiveService.waitForShutdown();
+ } catch (Exception e) {
+ throw new HoodieException("Error waiting for async archive service to finish", e);
+ }
+ }
+ }
+
+ public static void forceShutdown(AsyncArchiveService asyncArchiveService) {
+ if (asyncArchiveService != null) {
+ LOG.info("Shutting down async archive service...");
+ asyncArchiveService.shutdown(true);
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java
similarity index 56%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java
index a5a38f2cc5949..72907e6d3fbcd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AsyncCleanerService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCleanerService.java
@@ -7,21 +7,24 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.hudi.client;
+package org.apache.hudi.async;
-import org.apache.hudi.async.HoodieAsyncService;
+import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -30,54 +33,55 @@
import java.util.concurrent.Executors;
/**
- * Clean service running concurrently with write operation.
+ * Async clean service to run concurrently with write operation.
*/
-class AsyncCleanerService extends HoodieAsyncService {
+public class AsyncCleanerService extends HoodieAsyncTableService {
private static final Logger LOG = LogManager.getLogger(AsyncCleanerService.class);
- private final AbstractHoodieWriteClient writeClient;
+ private final BaseHoodieWriteClient writeClient;
private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
- protected AsyncCleanerService(AbstractHoodieWriteClient writeClient) {
+ protected AsyncCleanerService(BaseHoodieWriteClient writeClient) {
+ super(writeClient.getConfig());
this.writeClient = writeClient;
}
@Override
protected Pair startService() {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
- LOG.info("Auto cleaning is enabled. Running cleaner async to write operation at instant time " + instantTime);
+ LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));
return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(instantTime);
return true;
}, executor), executor);
}
- public static AsyncCleanerService startAsyncCleaningIfEnabled(AbstractHoodieWriteClient writeClient) {
- AsyncCleanerService asyncCleanerService = null;
- if (writeClient.getConfig().isAutoClean() && writeClient.getConfig().isAsyncClean()) {
- asyncCleanerService = new AsyncCleanerService(writeClient);
- asyncCleanerService.start(null);
- } else {
- LOG.info("Async auto cleaning is not enabled. Not running cleaner now");
+ public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {
+ HoodieWriteConfig config = writeClient.getConfig();
+ if (!config.isAutoClean() || !config.isAsyncClean()) {
+ LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");
+ return null;
}
+ AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);
+ asyncCleanerService.start(null);
return asyncCleanerService;
}
public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {
if (asyncCleanerService != null) {
- LOG.info("Waiting for async cleaner to finish");
+ LOG.info("Waiting for async clean service to finish");
try {
asyncCleanerService.waitForShutdown();
} catch (Exception e) {
- throw new HoodieException("Error waiting for async cleaning to finish", e);
+ throw new HoodieException("Error waiting for async clean service to finish", e);
}
}
}
public static void forceShutdown(AsyncCleanerService asyncCleanerService) {
if (asyncCleanerService != null) {
- LOG.info("Shutting down async cleaner");
+ LOG.info("Shutting down async clean service...");
asyncCleanerService.shutdown(true);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
index b9707bb6d82a7..1c1cf2bb9f74b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncClusteringService.java
@@ -19,8 +19,8 @@
package org.apache.hudi.async;
-import org.apache.hudi.client.AbstractClusteringClient;
-import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.BaseClusterer;
+import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
@@ -38,25 +38,25 @@
* Async clustering service that runs in a separate thread.
* Currently, only one clustering thread is allowed to run at any time.
*/
-public abstract class AsyncClusteringService extends HoodieAsyncService {
+public abstract class AsyncClusteringService extends HoodieAsyncTableService {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);
private final int maxConcurrentClustering;
- private transient AbstractClusteringClient clusteringClient;
+ private transient BaseClusterer clusteringClient;
- public AsyncClusteringService(AbstractHoodieWriteClient writeClient) {
+ public AsyncClusteringService(BaseHoodieWriteClient writeClient) {
this(writeClient, false);
}
- public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) {
- super(runInDaemonMode);
+ public AsyncClusteringService(BaseHoodieWriteClient writeClient, boolean runInDaemonMode) {
+ super(writeClient.getConfig(), runInDaemonMode);
this.clusteringClient = createClusteringClient(writeClient);
this.maxConcurrentClustering = 1;
}
- protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client);
+ protected abstract BaseClusterer createClusteringClient(BaseHoodieWriteClient client);
/**
* Start clustering service.
@@ -82,10 +82,16 @@ protected Pair startService() {
}
LOG.info("Clustering executor shutting down properly");
} catch (InterruptedException ie) {
+ hasError = true;
LOG.warn("Clustering executor got interrupted exception! Stopping", ie);
} catch (IOException e) {
- LOG.error("Clustering executor failed", e);
+ hasError = true;
+ LOG.error("Clustering executor failed due to IOException", e);
throw new HoodieIOException(e.getMessage(), e);
+ } catch (Exception e) {
+ hasError = true;
+ LOG.error("Clustering executor failed", e);
+ throw e;
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
@@ -94,7 +100,7 @@ protected Pair startService() {
/**
* Update the write client to be used for clustering.
*/
- public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) {
+ public synchronized void updateWriteClient(BaseHoodieWriteClient writeClient) {
this.clusteringClient.updateWriteClient(writeClient);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
index 2f63297210e14..f1f7f416e466c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -17,8 +17,8 @@
package org.apache.hudi.async;
-import org.apache.hudi.client.AbstractCompactor;
-import org.apache.hudi.client.AbstractHoodieWriteClient;
+import org.apache.hudi.client.BaseCompactor;
+import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,7 +37,7 @@
/**
* Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
*/
-public abstract class AsyncCompactService extends HoodieAsyncService {
+public abstract class AsyncCompactService extends HoodieAsyncTableService {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
@@ -48,21 +48,21 @@ public abstract class AsyncCompactService extends HoodieAsyncService {
public static final String COMPACT_POOL_NAME = "hoodiecompact";
private final int maxConcurrentCompaction;
- private transient AbstractCompactor compactor;
+ private transient BaseCompactor compactor;
protected transient HoodieEngineContext context;
- public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) {
+ public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client) {
this(context, client, false);
}
- public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client, boolean runInDaemonMode) {
- super(runInDaemonMode);
+ public AsyncCompactService(HoodieEngineContext context, BaseHoodieWriteClient client, boolean runInDaemonMode) {
+ super(client.getConfig(), runInDaemonMode);
this.context = context;
this.compactor = createCompactor(client);
this.maxConcurrentCompaction = 1;
}
- protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient client);
+ protected abstract BaseCompactor createCompactor(BaseHoodieWriteClient client);
/**
* Start Compaction Service.
@@ -92,10 +92,16 @@ protected Pair startService() {
}
LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
+ hasError = true;
LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) {
- LOG.error("Compactor executor failed", e);
+ hasError = true;
+ LOG.error("Compactor executor failed due to IOException", e);
throw new HoodieIOException(e.getMessage(), e);
+ } catch (Exception e) {
+ hasError = true;
+ LOG.error("Compactor executor failed", e);
+ throw e;
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
@@ -110,7 +116,7 @@ protected boolean shouldStopCompactor() {
return false;
}
- public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) {
+ public synchronized void updateWriteClient(BaseHoodieWriteClient writeClient) {
this.compactor.updateWriteClient(writeClient);
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
index f57484d886c9b..1ce6dfb288d62 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncService.java
@@ -36,12 +36,15 @@
import java.util.function.Function;
/**
- * Base Class for running clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycle.
+ * Base Class for running archive/clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycles.
*/
public abstract class HoodieAsyncService implements Serializable {
private static final Logger LOG = LogManager.getLogger(HoodieAsyncService.class);
+ private static final long POLLING_SECONDS = 10;
+ // Flag indicating whether an error is incurred in the service
+ protected boolean hasError;
// Flag to track if the service is started.
private boolean started;
// Flag indicating shutdown is externally requested
@@ -70,21 +73,32 @@ protected HoodieAsyncService(boolean runInDaemonMode) {
this.runInDaemonMode = runInDaemonMode;
}
- protected boolean isShutdownRequested() {
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isShutdownRequested() {
return shutdownRequested;
}
- protected boolean isShutdown() {
+ public boolean isShutdown() {
return shutdown;
}
+ public boolean hasError() {
+ return hasError;
+ }
+
/**
* Wait till the service shutdown. If the service shutdown with exception, it will be thrown
- *
+ *
* @throws ExecutionException
* @throws InterruptedException
*/
public void waitForShutdown() throws ExecutionException, InterruptedException {
+ if (future == null) {
+ return;
+ }
try {
future.get();
} catch (ExecutionException ex) {
@@ -102,6 +116,7 @@ public void waitForShutdown() throws ExecutionException, InterruptedException {
public void shutdown(boolean force) {
if (!shutdownRequested || force) {
shutdownRequested = true;
+ shutdown = true;
if (executor != null) {
if (force) {
executor.shutdownNow();
@@ -125,6 +140,10 @@ public void shutdown(boolean force) {
* @param onShutdownCallback
*/
public void start(Function onShutdownCallback) {
+ if (started) {
+ LOG.warn("The async service already started.");
+ return;
+ }
Pair res = startService();
future = res.getKey();
executor = res.getValue();
@@ -134,8 +153,6 @@ public void start(Function onShutdownCallback) {
/**
* Service implementation.
- *
- * @return
*/
protected abstract Pair startService();
@@ -146,6 +163,9 @@ public void start(Function onShutdownCallback) {
*/
@SuppressWarnings("unchecked")
private void shutdownCallback(Function callback) {
+ if (future == null) {
+ return;
+ }
future.whenComplete((resp, error) -> {
if (null != callback) {
callback.apply(null != error);
@@ -166,8 +186,8 @@ public boolean isRunInDaemonMode() {
public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException {
try {
queueLock.lock();
- while (!isShutdown() && (pendingInstants.size() > numPending)) {
- consumed.await();
+ while (!isShutdown() && !hasError() && (pendingInstants.size() > numPending)) {
+ consumed.await(POLLING_SECONDS, TimeUnit.SECONDS);
}
} finally {
queueLock.unlock();
@@ -190,8 +210,8 @@ public void enqueuePendingAsyncServiceInstant(HoodieInstant instant) {
* @throws InterruptedException
*/
HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
- LOG.info("Waiting for next instant upto 10 seconds");
- HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS);
+ LOG.info(String.format("Waiting for next instant up to %d seconds", POLLING_SECONDS));
+ HoodieInstant instant = pendingInstants.poll(POLLING_SECONDS, TimeUnit.SECONDS);
if (instant != null) {
try {
queueLock.lock();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java
new file mode 100644
index 0000000000000..6a53d30063c1d
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/HoodieAsyncTableService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.RunsTableService;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.function.Function;
+
+public abstract class HoodieAsyncTableService extends HoodieAsyncService implements RunsTableService {
+
+ protected HoodieWriteConfig writeConfig;
+
+ protected HoodieAsyncTableService() {
+ }
+
+ protected HoodieAsyncTableService(HoodieWriteConfig writeConfig) {
+ this.writeConfig = writeConfig;
+ }
+
+ protected HoodieAsyncTableService(HoodieWriteConfig writeConfig, boolean runInDaemonMode) {
+ super(runInDaemonMode);
+ this.writeConfig = writeConfig;
+ }
+
+ @Override
+ public void start(Function onShutdownCallback) {
+ if (!tableServicesEnabled(writeConfig)) {
+ return;
+ }
+ super.start(onShutdownCallback);
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
similarity index 80%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
index 34234f546ed19..648ce805b0825 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractClusteringClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseClusterer.java
@@ -28,13 +28,13 @@
/**
* Client will run one round of clustering.
*/
-public abstract class AbstractClusteringClient implements Serializable {
+public abstract class BaseClusterer implements Serializable {
private static final long serialVersionUID = 1L;
- protected transient AbstractHoodieWriteClient clusteringClient;
+ protected transient BaseHoodieWriteClient clusteringClient;
- public AbstractClusteringClient(AbstractHoodieWriteClient clusteringClient) {
+ public BaseClusterer(BaseHoodieWriteClient clusteringClient) {
this.clusteringClient = clusteringClient;
}
@@ -49,7 +49,7 @@ public AbstractClusteringClient(AbstractHoodieWriteClient clustering
* Update the write client used by async clustering.
* @param writeClient
*/
- public void updateWriteClient(AbstractHoodieWriteClient writeClient) {
+ public void updateWriteClient(BaseHoodieWriteClient writeClient) {
this.clusteringClient = writeClient;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
similarity index 78%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
index c80b34a3ef656..88737dbcf1d7e 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractCompactor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseCompactor.java
@@ -27,19 +27,19 @@
/**
* Run one round of compaction.
*/
-public abstract class AbstractCompactor implements Serializable {
+public abstract class BaseCompactor implements Serializable {
private static final long serialVersionUID = 1L;
- protected transient AbstractHoodieWriteClient compactionClient;
+ protected transient BaseHoodieWriteClient compactionClient;
- public AbstractCompactor(AbstractHoodieWriteClient compactionClient) {
+ public BaseCompactor(BaseHoodieWriteClient compactionClient) {
this.compactionClient = compactionClient;
}
public abstract void compact(HoodieInstant instant) throws IOException;
- public void updateWriteClient(AbstractHoodieWriteClient writeClient) {
+ public void updateWriteClient(BaseHoodieWriteClient writeClient) {
this.compactionClient = writeClient;
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
similarity index 91%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 350fe0c9bf7e0..3f208a0f86a09 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -40,9 +40,9 @@
* Abstract class taking care of holding common member variables (FileSystem, SparkContext, HoodieConfigs) Also, manages
* embedded timeline-server if enabled.
*/
-public abstract class AbstractHoodieClient implements Serializable, AutoCloseable {
+public abstract class BaseHoodieClient implements Serializable, AutoCloseable {
- private static final Logger LOG = LogManager.getLogger(AbstractHoodieClient.class);
+ private static final Logger LOG = LogManager.getLogger(BaseHoodieClient.class);
protected final transient FileSystem fs;
protected final transient HoodieEngineContext context;
@@ -59,11 +59,11 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
private transient Option timelineServer;
private final boolean shouldStopTimelineServer;
- protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
+ protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, Option.empty());
}
- protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig,
+ protected BaseHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig,
Option timelineServer) {
this.hadoopConf = context.getHadoopConf().get();
this.fs = FSUtils.getFs(clientConfig.getBasePath(), hadoopConf);
@@ -134,7 +134,8 @@ protected void initWrapperFSMetrics() {
protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) {
return HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
- .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
+ .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
}
public Option getTimelineServer() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
similarity index 87%
rename from hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c9162de9e9ca1..7b67ff54a2aa5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,11 +18,14 @@
package org.apache.hudi.client;
+import org.apache.hudi.async.AsyncArchiveService;
+import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
@@ -66,7 +69,6 @@
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
@@ -98,14 +100,15 @@
* @param Type of keys
* @param Type of outputs
*/
-public abstract class AbstractHoodieWriteClient extends AbstractHoodieClient {
+public abstract class BaseHoodieWriteClient extends BaseHoodieClient
+ implements RunsTableService {
protected static final String LOOKUP_STR = "lookup";
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class);
+ private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
protected final transient HoodieMetrics metrics;
- private final transient HoodieIndex index;
+ private final transient HoodieIndex, ?> index;
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
@@ -114,6 +117,7 @@ public abstract class AbstractHoodieWriteClient>> lastCompletedTxnAndMetadata = Option.empty();
@@ -123,7 +127,7 @@ public abstract class AbstractHoodieWriteClient timelineService) {
super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config);
@@ -142,7 +146,7 @@ public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig
this.txnManager = new TransactionManager(config, fs);
}
- protected abstract HoodieIndex createIndex(HoodieWriteConfig writeConfig);
+ protected abstract HoodieIndex, ?> createIndex(HoodieWriteConfig writeConfig);
public void setOperationType(WriteOperationType operationType) {
this.operationType = operationType;
@@ -359,7 +363,7 @@ public void rollbackFailedBootstrap() {
* table for the very first time (e.g: converting an existing table to Hoodie).
*
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
- * the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}
+ * the numbers of files with less memory compared to the {@link BaseHoodieWriteClient#insert(I, String)}
*
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
@@ -372,7 +376,7 @@ public void rollbackFailedBootstrap() {
* table for the very first time (e.g: converting an existing table to Hoodie).
*
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
- * the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}. Optionally
+ * the numbers of files with less memory compared to the {@link BaseHoodieWriteClient#insert(I, String)}. Optionally
* it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
* {@link BulkInsertPartitioner}.
*
@@ -392,7 +396,7 @@ public abstract O bulkInsert(I records, final String instantTime,
* duplicates if needed.
*
* This implementation uses sortBy (which does range partitioning based on reservoir sampling) and attempts to control
- * the numbers of files with less memory compared to the {@link AbstractHoodieWriteClient#insert(I, String)}. Optionally
+ * the numbers of files with less memory compared to the {@link BaseHoodieWriteClient#insert(I, String)}. Optionally
* it allows users to specify their own partitioner. If specified then it will be used for repartitioning records. See
* {@link BulkInsertPartitioner}.
*
@@ -430,6 +434,11 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
} else {
this.asyncCleanerService.start(null);
}
+ if (null == this.asyncArchiveService) {
+ this.asyncArchiveService = AsyncArchiveService.startAsyncArchiveIfEnabled(this);
+ } else {
+ this.asyncArchiveService.start(null);
+ }
}
/**
@@ -455,16 +464,17 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata me
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
autoCleanOnCommit();
- if (config.isAutoArchive()) {
- archive(table);
- }
+ autoArchiveOnCommit(table);
} finally {
this.heartbeatClient.stop(instantTime);
}
}
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option