Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -276,15 +277,21 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient());
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty()) {
InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema()));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema);
if (historySchemaStr.isEmpty()) {
internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema);
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr);
} else {
evolvedSchema.setSchemaId(Long.parseLong(instantTime));
String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<Hood
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
InternalSchema querySchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(readSchema, querySchemaOpt.get(), true);
InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
long commitInstantTime = Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
InternalSchema writeInternalSchema = InternalSchemaCache.searchSchemaAndCache(commitInstantTime, table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
if (writeInternalSchema.isEmptySchema()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecord
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord
Expand All @@ -39,8 +38,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

import java.util.Properties

import org.apache.hudi.avro.HoodieAvroUtils

import scala.collection.JavaConverters._

object HoodieSparkUtils extends SparkAdapterSupport {
Expand Down Expand Up @@ -162,11 +163,11 @@ object HoodieSparkUtils extends SparkAdapterSupport {
if (rows.isEmpty) {
Iterator.empty
} else {
val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
val transform: GenericRecord => GenericRecord =
if (sameSchema) identity
else {
val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
rewriteRecord(_, readerAvroSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao since we're changing this, shall we also revisit all the other places that use rewriteRecord and consider rebasing them onto the new methods?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, one miss for the new API is that previously rewriteRecord was validating that the record adheres to the new schema while new method doesn't do that (this obscures the issues when conversion is not following Avro evolution rules)

HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema)
}

// Since caller might request to get records in a different ("evolved") schema, we will be rewriting from
Expand Down
42 changes: 21 additions & 21 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -745,55 +745,51 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord,
* b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
*
* @param oldRecord oldRecord to be rewritten
* @param oldAvroSchema old avro schema.
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @param fieldNames track the full name of visited field when we travel new schema.
* @return newRecord for new Schema
*/
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
if (oldRecord == null) {
return null;
}
// try to get real schema for union type
Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
switch (newSchema.getType()) {
case RECORD:
if (!(oldRecord instanceof IndexedRecord)) {
throw new IllegalArgumentException("cannot rewrite record with different type");
}
IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
List<Schema.Field> fields = newSchema.getFields();
Map<Integer, Object> helper = new HashMap<>();

GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
// deal with rename
if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
}
}
fieldNames.pop();
}
GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
if (helper.containsKey(i)) {
newRecord.put(i, helper.get(i));
} else {
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
newRecord.put(i, null);
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
newRecord.put(i, fields.get(i).defaultVal());
// deal with default value
if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
newRecord.put(i, null);
} else {
newRecord.put(i, fields.get(i).defaultVal());
}
}
}
fieldNames.pop();
}
return newRecord;
case ARRAY:
Expand Down Expand Up @@ -1028,4 +1024,8 @@ public GenericRecord next() {
}
};
}

public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) {
return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ public class HoodieCommonConfig extends HoodieConfig {
.defaultValue(false)
.withDocumentation("Enables support for Schema Evolution feature");

public static final ConfigProperty<Boolean> RECONCILE_SCHEMA = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema")
.defaultValue(false)
.withDocumentation("When a new batch of write has records with old schema, but latest table schema got "
+ "evolved, this config will upgrade the records to leverage latest table schema(default values will be "
+ "injected to missing fields). If not, the write batch would fail.");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -380,7 +380,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpec
Option<Schema> schemaOption = getMergedSchema(dataBlock);
while (recordIterator.hasNext()) {
IndexedRecord currentRecord = recordIterator.next();
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord;
IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), Collections.emptyMap()) : currentRecord;
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
totalLogRecords.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,7 @@ public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchem
}

public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
this.fileSchema = fileSchema;
this.querySchema = querySchema;
this.ignoreRequiredAttribute = ignoreRequiredAttribute;
this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
this(fileSchema, querySchema, ignoreRequiredAttribute, useColumnTypeFromFileSchema, true);
}

/**
Expand Down Expand Up @@ -151,14 +148,15 @@ private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldFie
Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
String nameFromFileSchema = fieldFromFileSchema.name();
String nameFromQuerySchema = querySchema.findField(fieldId).name();
String finalFieldName = useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema;
Type typeFromFileSchema = fieldFromFileSchema.type();
// Current design mechanism guarantees nestedType change is not allowed, so no need to consider.
if (newType.isNestedType()) {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
finalFieldName, newType, oldField.doc());
} else {
return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
finalFieldName, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,33 @@
* Utility methods to support evolve old avro schema based on a given schema.
*/
public class AvroSchemaEvolutionUtils {

/**
* Support evolution from a new avroSchema.
* Now hoodie support implicitly add columns when hoodie write operation,
* This ability needs to be preserved, so implicitly evolution for internalSchema should supported.
*
* @param evolvedSchema implicitly evolution of avro when hoodie write operation
* @param oldSchema old internalSchema
* @param supportPositionReorder support position reorder
* @return evolution Schema
* Support reconcile from a new avroSchema.
* 1) incoming data has missing columns that were already defined in the table –> null values will be injected into missing columns
* 2) incoming data contains new columns not defined yet in the table -> columns will be added to the table schema (incoming dataframe?)
* 3) incoming data has missing columns that are already defined in the table and new columns not yet defined in the table ->
* new columns will be added to the table schema, missing columns will be injected with null values
* 4) support nested schema change.
* Notice:
* the incoming schema should not have delete/rename semantics.
* for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d
* we must guarantee the column c is missing semantic, instead of delete semantic.
* @param incomingSchema implicitly evolution of avro when hoodie write operation
* @param oldTableSchema old internalSchema
* @return reconcile Schema
*/
public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {
InternalSchema evolvedInternalSchema = AvroInternalSchemaConverter.convert(evolvedSchema);
public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema) {
InternalSchema inComingInternalSchema = AvroInternalSchemaConverter.convert(incomingSchema);
// do check, only support add column evolution
List<String> colNamesFromEvolved = evolvedInternalSchema.getAllColsFullName();
List<String> colNamesFromOldSchema = oldSchema.getAllColsFullName();
List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromEvolved.contains(f)).collect(Collectors.toList());
List<String> colNamesFromIncoming = inComingInternalSchema.getAllColsFullName();
List<String> colNamesFromOldSchema = oldTableSchema.getAllColsFullName();
List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromIncoming.contains(f)).collect(Collectors.toList());
List<Types.Field> newFields = new ArrayList<>();
if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) {
// no changes happen
if (supportPositionReorder) {
evolvedInternalSchema.getRecord().fields().forEach(f -> newFields.add(oldSchema.getRecord().field(f.name())));
return new InternalSchema(newFields);
}
return oldSchema;
}
// try to find all added columns
if (diffFromOldSchema.size() != 0) {
throw new UnsupportedOperationException("Cannot evolve schema implicitly, find delete/rename operation");
if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) {
return oldTableSchema;
}

List<String> diffFromEvolutionSchema = colNamesFromEvolved.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
List<String> diffFromEvolutionSchema = colNamesFromIncoming.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
// Remove redundancy from diffFromEvolutionSchema.
// for example, now we add a struct col in evolvedSchema, the struct col is " user struct<name:string, age:int> "
// when we do diff operation: user, user.name, user.age will appeared in the resultSet which is redundancy, user.name and user.age should be excluded.
Expand All @@ -77,29 +73,27 @@ public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema,
// find redundancy, skip it
continue;
}
finalAddAction.put(evolvedInternalSchema.findIdByName(name), name);
finalAddAction.put(inComingInternalSchema.findIdByName(name), name);
}

TableChanges.ColumnAddChange addChange = TableChanges.ColumnAddChange.get(oldSchema);
TableChanges.ColumnAddChange addChange = TableChanges.ColumnAddChange.get(oldTableSchema);
finalAddAction.entrySet().stream().forEach(f -> {
String name = f.getValue();
int splitPoint = name.lastIndexOf(".");
String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
String rawName = splitPoint > 0 ? name.substring(splitPoint + 1) : name;
addChange.addColumns(parentName, rawName, evolvedInternalSchema.findType(name), null);
// try to infer add position.
java.util.Optional<String> inferPosition =
colNamesFromIncoming.stream().filter(c ->
c.lastIndexOf(".") == splitPoint
&& c.startsWith(parentName)
&& inComingInternalSchema.findIdByName(c) > inComingInternalSchema.findIdByName(name)
&& oldTableSchema.findIdByName(c) > 0).sorted((s1, s2) -> oldTableSchema.findIdByName(s1) - oldTableSchema.findIdByName(s2)).findFirst();
addChange.addColumns(parentName, rawName, inComingInternalSchema.findType(name), null);
inferPosition.map(i -> addChange.addPositionChange(name, i, "before"));
});

InternalSchema res = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange);
if (supportPositionReorder) {
evolvedInternalSchema.getRecord().fields().forEach(f -> newFields.add(oldSchema.getRecord().field(f.name())));
return new InternalSchema(newFields);
} else {
return res;
}
}

public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, InternalSchema oldSchema) {
return evolveSchemaFromNewAvroSchema(evolvedSchema, oldSchema, false);
return SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange);
}

/**
Expand Down
Loading