Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 49 additions & 10 deletions docker/demo/config/test-suite/complex-dag-cow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,13 @@ first_hive_sync:
deps: first_upsert
first_hive_query:
config:
hive_props:
prop2: "set spark.yarn.queue="
prop3: "set hive.strict.checks.large.query=false"
prop4: "set hive.stats.autogather=false"
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 11600
result2: 11300
type: HiveQueryNode
deps: first_hive_sync
second_upsert:
Expand All @@ -82,14 +80,55 @@ second_upsert:
deps: first_hive_query
second_hive_query:
config:
hive_props:
prop2: "set mapred.job.queue.name="
prop3: "set hive.strict.checks.large.query=false"
prop4: "set hive.stats.autogather=false"
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 11900
result2: 11600
type: HiveQueryNode
deps: second_upsert
fourth_insert:
config:
record_size: 70000
num_insert_partitions: 1
repeat_count: 1
num_records_insert: 1000
deps: second_hive_query
type: InsertNode
third_hive_query:
config:
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 12600
type: HiveQueryNode
deps: fourth_insert
first_delete:
config:
record_size: 70000
num_partitions_delete: 1
num_records_delete: 200
deps: third_hive_query
type: DeleteNode
fourth_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
fourth_hive_query:
config:
queue_name: "adhoc"
engine: "mr"
hive_queries:
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
result1: 0
query2: "select count(*) from testdb.table1"
result2: 12400
type: HiveQueryNode
deps: fourth_hive_sync
6 changes: 5 additions & 1 deletion docker/demo/config/test-suite/source.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
}, {
"name" : "fare",
"type" : "double"
} ]
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}]
}

4 changes: 4 additions & 0 deletions docker/demo/config/test-suite/target.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
}, {
"name" : "fare",
"type" : "double"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}, {
"name" : "haversine_distance",
"type" : "double"
Expand Down
27 changes: 27 additions & 0 deletions docker/demo/config/test-suite/test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100

hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector

hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp

hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd

hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

39 changes: 2 additions & 37 deletions hudi-integ-test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,41 +178,6 @@ Copy the integration tests jar into the docker container
docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar adhoc-2:/opt
```

Copy the following test properties file:
```
echo '
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100

hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector

hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp

hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd

hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
hoodie.datasource.hive_sync.skip_ro_suffix=true
' > test.properties

docker cp test.properties adhoc-2:/opt
```

```
docker exec -it adhoc-2 /bin/bash
```
Expand Down Expand Up @@ -254,7 +219,7 @@ spark-submit \
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
--target-table table1 \
--props test.properties \
--props file:/var/hoodie/ws/docker/demo/config/test-suite/test.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
--input-file-size 125829120 \
Expand Down Expand Up @@ -293,7 +258,7 @@ spark-submit \
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
--target-table table1 \
--props test.properties \
--props file:/var/hoodie/ws/docker/demo/config/test-suite/test.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
--input-file-size 125829120 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@

package org.apache.hudi.integ.testsuite.configuration;

import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;

/**
* Configuration to hold the delta output type and delta input format.
Expand Down Expand Up @@ -59,8 +61,7 @@ public Configuration getConfiguration() {
}

/**
* Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of
* actions that can be executed in parallel.
* Represents any kind of workload operation for new data. Each workload also contains a set of Option sequence of actions that can be executed in parallel.
*/
public static class Config {

Expand All @@ -73,10 +74,12 @@ public static class Config {
public static final String HIVE_PROPERTIES = "hive_props";
private static String NUM_RECORDS_INSERT = "num_records_insert";
private static String NUM_RECORDS_UPSERT = "num_records_upsert";
private static String NUM_RECORDS_DELETE = "num_records_delete";
private static String REPEAT_COUNT = "repeat_count";
private static String RECORD_SIZE = "record_size";
private static String NUM_PARTITIONS_INSERT = "num_partitions_insert";
private static String NUM_PARTITIONS_UPSERT = "num_partitions_upsert";
private static String NUM_PARTITIONS_DELETE = "num_partitions_delete";
private static String NUM_FILES_UPSERT = "num_files_upsert";
private static String FRACTION_UPSERT_PER_FILE = "fraction_upsert_per_file";
private static String DISABLE_GENERATE = "disable_generate";
Expand All @@ -103,6 +106,10 @@ public long getNumRecordsUpsert() {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_UPSERT, 0).toString());
}

public long getNumRecordsDelete() {
return Long.valueOf(configsMap.getOrDefault(NUM_RECORDS_DELETE, 0).toString());
}

public int getRecordSize() {
return Integer.valueOf(configsMap.getOrDefault(RECORD_SIZE, 1024).toString());
}
Expand All @@ -123,6 +130,10 @@ public int getStartPartition() {
return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
}

public int getNumDeletePartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_DELETE, 1).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does default value "1" mean in this case ?

}

public int getNumUpsertFiles() {
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString());
}
Expand Down Expand Up @@ -192,6 +203,11 @@ public Builder withNumRecordsToUpdate(long numRecordsUpsert) {
return this;
}

public Builder withNumRecordsToDelete(long numRecordsDelete) {
this.configsMap.put(NUM_RECORDS_DELETE, numRecordsDelete);
return this;
}

public Builder withNumInsertPartitions(int numInsertPartitions) {
this.configsMap.put(NUM_PARTITIONS_INSERT, numInsertPartitions);
return this;
Expand All @@ -202,6 +218,11 @@ public Builder withNumUpsertPartitions(int numUpsertPartitions) {
return this;
}

public Builder withNumDeletePartitions(int numDeletePartitions) {
this.configsMap.put(NUM_PARTITIONS_DELETE, numDeletePartitions);
return this;
}

public Builder withNumUpsertFiles(int numUpsertFiles) {
this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles);
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.integ.testsuite.converter;

import org.apache.hudi.integ.testsuite.generator.DeleteGeneratorIterator;
import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator;

import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;

import java.util.List;

public class DeleteConverter implements Converter<GenericRecord, GenericRecord> {

private final String schemaStr;
private final int minPayloadSize;

public DeleteConverter(String schemaStr, int minPayloadSize) {
this.schemaStr = schemaStr;
this.minPayloadSize = minPayloadSize;
}

@Override
public JavaRDD<GenericRecord> convert(JavaRDD<GenericRecord> inputRDD) {
return inputRDD.mapPartitions(recordItr -> new LazyRecordGeneratorIterator(new DeleteGeneratorIterator(recordItr,
schemaStr, minPayloadSize)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.integ.testsuite.dag.nodes;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;

import org.apache.spark.api.java.JavaRDD;

/**
* Delete node to assist in issuing deletes.
*/
public class DeleteNode extends InsertNode {

public DeleteNode(Config config) {
super(config);
}

@Override
protected void generate(DeltaGenerator deltaGenerator) throws Exception {
if (!config.isDisableGenerate()) {
deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).count();
}
}

@Override
protected JavaRDD<WriteStatus> ingest(HoodieTestSuiteWriter hoodieTestSuiteWriter, Option<String> commitTime)
throws Exception {
if (!config.isDisableIngest()) {
log.info("Deleting input data {}", this.getName());
this.result = hoodieTestSuiteWriter.upsert(commitTime);
}
return this.result;
}
}
Loading