Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions docker/demo/config/test-suite/insert-overwrite-table.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: simple-deltastreamer.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: none
first_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: first_insert
second_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: first_upsert
second_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: second_insert
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: second_upsert
first_insert_overwrite_table:
config:
record_size: 1000
repeat_count: 10
num_records_insert: 10
type: SparkInsertOverwriteTableNode
deps: first_hive_sync
delete_all_input_except_last:
config:
delete_input_data_except_latest: true
type: DeleteInputDatasetNode
deps: first_insert_overwrite_table
third_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: delete_all_input_except_last
third_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: third_insert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: third_upsert
second_validate:
config:
validate_full_data : true
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
deps: second_hive_sync
106 changes: 106 additions & 0 deletions docker/demo/config/test-suite/insert-overwrite.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: simple-deltastreamer.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:

first_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: none
first_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: first_insert
second_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: first_upsert
second_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: second_insert
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: second_upsert
first_insert_overwrite:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10
type: SparkInsertOverwriteNode
deps: first_hive_sync
delete_all_input_except_last:
config:
delete_input_data_except_latest: true
type: DeleteInputDatasetNode
deps: first_insert_overwrite
third_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: delete_all_input_except_last
third_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: third_insert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: third_upsert
second_validate:
config:
validate_full_data : true
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
deps: second_hive_sync
73 changes: 73 additions & 0 deletions docker/demo/config/test-suite/spark-clustering.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-spark-simple.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: none
first_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: first_insert
second_insert:
config:
record_size: 1000
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: first_upsert
second_upsert:
config:
record_size: 1000
num_partitions_insert: 10
num_records_insert: 1000
repeat_count: 1
num_records_upsert: 8000
num_partitions_upsert: 10
type: SparkUpsertNode
deps: second_insert
first_delete:
config:
num_partitions_delete: 10
num_records_delete: 16000
type: SparkDeleteNode
deps: second_upsert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
second_validate:
config:
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
deps: second_hive_sync
56 changes: 56 additions & 0 deletions docker/demo/config/test-suite/test-metadata.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100

hoodie.metadata.enable=true

hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector

hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true

hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp

hoodie.clustering.plan.strategy.sort.columns=_row_key
hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
hoodie.clustering.inline.max.commits=1

hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd

hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

6 changes: 2 additions & 4 deletions docker/demo/config/test-suite/test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100

hoodie.metadata.enable=false

hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
Expand All @@ -32,10 +34,6 @@ hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp

hoodie.clustering.plan.strategy.sort.columns=_row_key
hoodie.clustering.plan.strategy.daybased.lookback.partitions=0
hoodie.clustering.inline.max.commits=1

hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public static class Config {
private static String NUM_ROLLBACKS = "num_rollbacks";
private static String ENABLE_ROW_WRITING = "enable_row_writing";
private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate";
private static String VALIDATE_FULL_DATA = "validate_full_data";
private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest";

// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
Expand Down Expand Up @@ -206,10 +208,18 @@ public boolean isDeleteInputData() {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString());
}

public boolean isDeleteInputDataExceptLatest() {
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA_EXCEPT_LATEST, false).toString());
}

public boolean isValidateHive() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
}

public boolean isValidateFullData() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
}

public int getIterationCountToExecute() {
return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString());
}
Expand Down
Loading