Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion hudi-integ-test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,20 @@ hdfs dfs -rm -r /user/hive/warehouse/hudi-integ-test-suite/input/
As of now, "ValidateDatasetNode" uses spark data source and hive tables for comparison. Hence COW and real time view in
MOR can be tested.


To run test suite jobs for validating all versions of schema, a DAG with insert, upsert nodes can be supplied with every version of schema to be evaluated, with "--saferSchemaEvolution" flag indicating the job is for schema validations. First run of the job will populate the dataset with data files with every version of schema and perform an upsert operation for verifying schema evolution.

Second and subsequent runs will verify that the data can be inserted with latest version of schema and perform an upsert operation to evolve all older version of schema (created by older run) to the latest version of schema.

Sample DAG:
```
rollback with num_rollbacks = 2
insert with schema_version = <version>
....
upsert with fraction_upsert_per_file = 0.5
```

Spark submit with the flag:
```
--saferSchemaEvolution
```

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,26 @@

package org.apache.hudi.integ.testsuite;

import org.apache.avro.Schema;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.integ.testsuite.dag.DagUtils;
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.dag.WriterContext;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode;
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
import org.apache.hudi.integ.testsuite.dag.scheduler.SaferSchemaDagScheduler;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
Expand All @@ -47,6 +56,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency
Expand Down Expand Up @@ -79,6 +90,7 @@ public class HoodieTestSuiteJob {
private transient HiveConf hiveConf;

private BuiltinKeyGenerator keyGenerator;
private transient HoodieTableMetaClient metaClient;

public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
Expand All @@ -92,13 +104,11 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) DataSourceUtils.createKeyGenerator(props);

if (!fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient.withPropertyBuilder()
metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder("archived")
.initTable(jsc.hadoopConfiguration(), cfg.targetBasePath);
}

if (cfg.cleanInput) {
Path inputPath = new Path(cfg.inputBasePath);
Expand All @@ -115,6 +125,28 @@ public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throw
}
}

int getSchemaVersionFromCommit(int nthCommit) throws Exception {
int version = 0;
try {
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitsTimeline();
// Pickup the schema version from nth commit from last (most recent insert/upsert will be rolled back).
HoodieInstant prevInstant = timeline.nthFromLastInstant(nthCommit).get();
HoodieCommitMetadata commit = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(prevInstant).get(),
HoodieCommitMetadata.class);
Map<String, String> extraMetadata = commit.getExtraMetadata();
String avroSchemaStr = extraMetadata.get(HoodieCommitMetadata.SCHEMA_KEY);
Schema avroSchema = new Schema.Parser().parse(avroSchemaStr);
version = Integer.parseInt(avroSchema.getObjectProp("schemaVersion").toString());
// DAG will generate & ingest data for 2 versions (n-th version being validated, n-1).
log.info(String.format("Last used schemaVersion from latest commit file was %d. Optimizing the DAG.", version));
} catch (Exception e) {
// failed to open the commit to read schema version.
// continue executing the DAG without any changes.
log.info("Last used schemaVersion could not be validated from commit file. Skipping SaferSchema Optimization.");
}
return version;
}

private static HiveConf getDefaultHiveConf(Configuration cfg) {
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(cfg);
Expand Down Expand Up @@ -150,8 +182,22 @@ public void runTestSuite() {
long startTime = System.currentTimeMillis();
WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession);
writerContext.initContext(jsc);
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc);
dagScheduler.schedule();
if (this.cfg.saferSchemaEvolution) {
int numRollbacks = 2; // rollback most recent upsert/insert, by default.

// if root is RollbackNode, get num_rollbacks
List<DagNode> root = workflowDag.getNodeList();
if (!root.isEmpty() && root.get(0) instanceof RollbackNode) {
numRollbacks = root.get(0).getConfig().getNumRollbacks();
}

int version = getSchemaVersionFromCommit(numRollbacks - 1);
SaferSchemaDagScheduler dagScheduler = new SaferSchemaDagScheduler(workflowDag, writerContext, jsc, version);
dagScheduler.schedule();
} else {
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc);
dagScheduler.schedule();
}
log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime);
} catch (Exception e) {
log.error("Failed to run Test Suite ", e);
Expand Down Expand Up @@ -211,5 +257,10 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
@Parameter(names = {"--clean-output"}, description = "Clean the output folders and delete all files within it "
+ "before starting the Job")
public Boolean cleanOutput = false;

@Parameter(names = {"--saferSchemaEvolution"}, description = "Optimize the DAG for safer schema evolution."
+ "(If not provided, assumed to be false.)",
required = false)
public Boolean saferSchemaEvolution = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public static class Config {
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
private static String VALIDATE_ARCHIVAL = "validate_archival";
private static String VALIDATE_CLEAN = "validate_clean";
private static String SCHEMA_VERSION = "schema_version";
private static String NUM_ROLLBACKS = "num_rollbacks";

private Map<String, Object> configsMap;

Expand Down Expand Up @@ -131,6 +133,14 @@ public int getNumUpsertPartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_UPSERT, 0).toString());
}

public int getSchemaVersion() {
return Integer.valueOf(configsMap.getOrDefault(SCHEMA_VERSION, Integer.MAX_VALUE).toString());
}

public int getNumRollbacks() {
return Integer.valueOf(configsMap.getOrDefault(NUM_ROLLBACKS, 1).toString());
}

public int getStartPartition() {
return Integer.valueOf(configsMap.getOrDefault(START_PARTITION, 0).toString());
}
Expand All @@ -140,7 +150,7 @@ public int getNumDeletePartitions() {
}

public int getNumUpsertFiles() {
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 0).toString());
return Integer.valueOf(configsMap.getOrDefault(NUM_FILES_UPSERT, 1).toString());
}

public double getFractionUpsertPerFile() {
Expand Down Expand Up @@ -248,6 +258,16 @@ public Builder withNumDeletePartitions(int numDeletePartitions) {
return this;
}

public Builder withSchemaVersion(int version) {
this.configsMap.put(SCHEMA_VERSION, version);
return this;
}

public Builder withNumRollbacks(int numRollbacks) {
this.configsMap.put(NUM_ROLLBACKS, numRollbacks);
return this;
}

public Builder withNumUpsertFiles(int numUpsertFiles) {
this.configsMap.put(NUM_FILES_UPSERT, numUpsertFiles);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@

package org.apache.hudi.integ.testsuite.dag;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -190,18 +203,24 @@ private static Map<String, Object> convertJsonNodeToMap(JsonNode node) {

private static List<Pair<String, Integer>> getHiveQueries(Entry<String, JsonNode> entry) {
List<Pair<String, Integer>> queries = new ArrayList<>();
Iterator<Entry<String, JsonNode>> queriesItr = entry.getValue().fields();
while (queriesItr.hasNext()) {
queries.add(Pair.of(queriesItr.next().getValue().textValue(), queriesItr.next().getValue().asInt()));
try {
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
queries = (List<Pair<String, Integer>>)getHiveQueryMapper().readValue(flattened.toString(), List.class);
} catch (Exception e) {
e.printStackTrace();
}
return queries;
}

private static List<String> getProperties(Entry<String, JsonNode> entry) {
List<String> properties = new ArrayList<>();
Iterator<Entry<String, JsonNode>> queriesItr = entry.getValue().fields();
while (queriesItr.hasNext()) {
properties.add(queriesItr.next().getValue().textValue());
try {
List<JsonNode> flattened = new ArrayList<>();
flattened.add(entry.getValue());
properties = (List<String>)getHivePropertyMapper().readValue(flattened.toString(), List.class);
} catch (Exception e) {
e.printStackTrace();
}
return properties;
}
Expand All @@ -226,6 +245,22 @@ private static Object getValue(JsonNode node) {
private static JsonNode createJsonNode(DagNode node, String type) throws IOException {
JsonNode configNode = MAPPER.readTree(node.getConfig().toString());
JsonNode jsonNode = MAPPER.createObjectNode();
Iterator<Entry<String, JsonNode>> itr = configNode.fields();
while (itr.hasNext()) {
Entry<String, JsonNode> entry = itr.next();
switch (entry.getKey()) {
case DeltaConfig.Config.HIVE_QUERIES:
((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES,
MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
break;
case DeltaConfig.Config.HIVE_PROPERTIES:
((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES,
MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
break;
default:
break;
}
}
((ObjectNode) jsonNode).put(DeltaConfig.Config.CONFIG_NAME, configNode);
((ObjectNode) jsonNode).put(DeltaConfig.Config.TYPE, type);
((ObjectNode) jsonNode).put(DeltaConfig.Config.DEPENDENCIES, getDependencyNames(node));
Expand All @@ -248,4 +283,101 @@ public static String toString(InputStream inputStream) throws IOException {
return result.toString("utf-8");
}

private static ObjectMapper getHiveQueryMapper() {
SimpleModule module = new SimpleModule();
ObjectMapper queryMapper = new ObjectMapper();
module.addSerializer(List.class, new HiveQuerySerializer());
module.addDeserializer(List.class, new HiveQueryDeserializer());
queryMapper.registerModule(module);
return queryMapper;
}

private static final class HiveQuerySerializer extends JsonSerializer<List> {
Integer index = 0;
@Override
public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
for (Pair pair : (List<Pair>)pairs) {
gen.writeStringField("query" + index, pair.getLeft().toString());
gen.writeNumberField("result" + index, Integer.parseInt(pair.getRight().toString()));
index++;
}
gen.writeEndObject();
}
}

private static final class HiveQueryDeserializer extends JsonDeserializer<List> {
@Override
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
List<Pair<String, Integer>> pairs = new ArrayList<>();
String query = "";
Integer result = 0;
// [{query0:<query>, result0:<result>,query1:<query>, result1:<result>}]
while (!parser.isClosed()) {
JsonToken jsonToken = parser.nextToken();
if (jsonToken.equals(JsonToken.END_ARRAY)) {
break;
}
if (JsonToken.FIELD_NAME.equals(jsonToken)) {
String fieldName = parser.getCurrentName();
parser.nextToken();

if (fieldName.contains("query")) {
query = parser.getValueAsString();
} else if (fieldName.contains("result")) {
result = parser.getValueAsInt();
pairs.add(Pair.of(query, result));
}
}
}
return pairs;
}
}

private static ObjectMapper getHivePropertyMapper() {
SimpleModule module = new SimpleModule();
ObjectMapper propMapper = new ObjectMapper();
module.addSerializer(List.class, new HivePropertySerializer());
module.addDeserializer(List.class, new HivePropertyDeserializer());
propMapper.registerModule(module);
return propMapper;
}

private static final class HivePropertySerializer extends JsonSerializer<List> {
Integer index = 0;
@Override
public void serialize(List props, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
for (String prop : (List<String>)props) {
gen.writeStringField("prop" + index, prop);
index++;
}
gen.writeEndObject();
}
}

private static final class HivePropertyDeserializer extends JsonDeserializer<List> {
@Override
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
List<String> props = new ArrayList<>();
String prop = "";
// [{prop0:<property>,...}]
while (!parser.isClosed()) {
JsonToken jsonToken = parser.nextToken();
if (jsonToken.equals(JsonToken.END_ARRAY)) {
break;
}
if (JsonToken.FIELD_NAME.equals(jsonToken)) {
String fieldName = parser.getCurrentName();
parser.nextToken();

if (parser.getCurrentName().contains("prop")) {
prop = parser.getValueAsString();
props.add(prop);
}
}
}
return props;
}
}
}
Loading