diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 961965353b7ff..24400c5eda90a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -39,6 +39,7 @@ import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.HoodiePendingRollbackInfo; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -276,15 +277,21 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom TableSchemaResolver schemaUtil = new TableSchemaResolver(table.getMetaClient()); String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse(""); FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); - if (!historySchemaStr.isEmpty()) { - InternalSchema internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), - SerDeHelper.parseSchemas(historySchemaStr)); + if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { + InternalSchema internalSchema; Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new Schema.Parser().parse(config.getSchema())); - InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, internalSchema); + if (historySchemaStr.isEmpty()) { + internalSchema = AvroInternalSchemaConverter.convert(avroSchema); + internalSchema.setSchemaId(Long.parseLong(instantTime)); + } else { + internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), + SerDeHelper.parseSchemas(historySchemaStr)); + } + InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); if (evolvedSchema.equals(internalSchema)) { metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema)); //TODO save history schema by metaTable - schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr); + schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr.isEmpty() ? SerDeHelper.inheritSchemas(evolvedSchema, "") : historySchemaStr); } else { evolvedSchema.setSchemaId(Long.parseLong(instantTime)); String newSchemaStr = SerDeHelper.toJson(evolvedSchema); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java index 3e2d8abdd7466..5d1a55453d162 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java @@ -100,7 +100,7 @@ public void runMerge(HoodieTable>, HoodieData GenericRecord = if (sameSchema) identity else { - val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr) - rewriteRecord(_, readerAvroSchema) + HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema) } // Since caller might request to get records in a different ("evolved") schema, we will be rewriting from diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 66066040275bf..fa65461bfdb0e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -745,15 +745,18 @@ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema * * @param oldRecord oldRecord to be rewritten + * @param oldAvroSchema old avro schema. * @param newSchema newSchema used to rewrite oldRecord * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema) * @param fieldNames track the full name of visited field when we travel new schema. * @return newRecord for new Schema */ - private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map renameCols, Deque fieldNames) { + private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvroSchema, Schema newSchema, Map renameCols, Deque fieldNames) { if (oldRecord == null) { return null; } + // try to get real schema for union type + Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord); switch (newSchema.getType()) { case RECORD: if (!(oldRecord instanceof IndexedRecord)) { @@ -761,39 +764,32 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSch } IndexedRecord indexedRecord = (IndexedRecord) oldRecord; List fields = newSchema.getFields(); - Map helper = new HashMap<>(); - + GenericData.Record newRecord = new GenericData.Record(newSchema); for (int i = 0; i < fields.size(); i++) { Schema.Field field = fields.get(i); String fieldName = field.name(); fieldNames.push(fieldName); if (oldSchema.getField(field.name()) != null) { Schema.Field oldField = oldSchema.getField(field.name()); - helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); + newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); } else { String fieldFullName = createFullName(fieldNames); - String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\."); - String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1]; + String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, ""); // deal with rename - if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) { + if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) { // find rename - Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema); - helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); - } - } - fieldNames.pop(); - } - GenericData.Record newRecord = new GenericData.Record(newSchema); - for (int i = 0; i < fields.size(); i++) { - if (helper.containsKey(i)) { - newRecord.put(i, helper.get(i)); - } else { - if (fields.get(i).defaultVal() instanceof JsonProperties.Null) { - newRecord.put(i, null); + Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema); + newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames)); } else { - newRecord.put(i, fields.get(i).defaultVal()); + // deal with default value + if (fields.get(i).defaultVal() instanceof JsonProperties.Null) { + newRecord.put(i, null); + } else { + newRecord.put(i, fields.get(i).defaultVal()); + } } } + fieldNames.pop(); } return newRecord; case ARRAY: @@ -1028,4 +1024,8 @@ public GenericRecord next() { } }; } + + public static GenericRecord rewriteRecordDeep(GenericRecord oldRecord, Schema newSchema) { + return rewriteRecordWithNewSchema(oldRecord, newSchema, Collections.EMPTY_MAP); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index cc62bcc32824f..917cfe621f11e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -36,6 +36,13 @@ public class HoodieCommonConfig extends HoodieConfig { .defaultValue(false) .withDocumentation("Enables support for Schema Evolution feature"); + public static final ConfigProperty RECONCILE_SCHEMA = ConfigProperty + .key("hoodie.datasource.write.reconcile.schema") + .defaultValue(false) + .withDocumentation("When a new batch of write has records with old schema, but latest table schema got " + + "evolved, this config will upgrade the records to leverage latest table schema(default values will be " + + "injected to missing fields). If not, the write batch would fail."); + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty .key("hoodie.common.spillable.diskmap.type") .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 9687136444eeb..16a264e06ddcd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -57,8 +57,8 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; +import java.util.Collections; import java.util.Deque; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -380,7 +380,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, Option keySpec Option schemaOption = getMergedSchema(dataBlock); while (recordIterator.hasNext()) { IndexedRecord currentRecord = recordIterator.next(); - IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord; + IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), Collections.emptyMap()) : currentRecord; processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN, this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName)); totalLogRecords.incrementAndGet(); diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java index bcea9b957b3ea..cd9bae0541cdc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java @@ -68,10 +68,7 @@ public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchem } public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) { - this.fileSchema = fileSchema; - this.querySchema = querySchema; - this.ignoreRequiredAttribute = ignoreRequiredAttribute; - this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema; + this(fileSchema, querySchema, ignoreRequiredAttribute, useColumnTypeFromFileSchema, true); } /** @@ -151,14 +148,15 @@ private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldFie Types.Field fieldFromFileSchema = fileSchema.findField(fieldId); String nameFromFileSchema = fieldFromFileSchema.name(); String nameFromQuerySchema = querySchema.findField(fieldId).name(); + String finalFieldName = useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema; Type typeFromFileSchema = fieldFromFileSchema.type(); // Current design mechanism guarantees nestedType change is not allowed, so no need to consider. if (newType.isNestedType()) { return Types.Field.get(oldField.fieldId(), oldField.isOptional(), - useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc()); + finalFieldName, newType, oldField.doc()); } else { return Types.Field.get(oldField.fieldId(), oldField.isOptional(), - useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc()); + finalFieldName, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java index e57fce4357b25..520a6b9ec75b2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java @@ -33,37 +33,33 @@ * Utility methods to support evolve old avro schema based on a given schema. */ public class AvroSchemaEvolutionUtils { + /** - * Support evolution from a new avroSchema. - * Now hoodie support implicitly add columns when hoodie write operation, - * This ability needs to be preserved, so implicitly evolution for internalSchema should supported. - * - * @param evolvedSchema implicitly evolution of avro when hoodie write operation - * @param oldSchema old internalSchema - * @param supportPositionReorder support position reorder - * @return evolution Schema + * Support reconcile from a new avroSchema. + * 1) incoming data has missing columns that were already defined in the table –> null values will be injected into missing columns + * 2) incoming data contains new columns not defined yet in the table -> columns will be added to the table schema (incoming dataframe?) + * 3) incoming data has missing columns that are already defined in the table and new columns not yet defined in the table -> + * new columns will be added to the table schema, missing columns will be injected with null values + * 4) support nested schema change. + * Notice: + * the incoming schema should not have delete/rename semantics. + * for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d + * we must guarantee the column c is missing semantic, instead of delete semantic. + * @param incomingSchema implicitly evolution of avro when hoodie write operation + * @param oldTableSchema old internalSchema + * @return reconcile Schema */ - public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) { - InternalSchema evolvedInternalSchema = AvroInternalSchemaConverter.convert(evolvedSchema); + public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema) { + InternalSchema inComingInternalSchema = AvroInternalSchemaConverter.convert(incomingSchema); // do check, only support add column evolution - List colNamesFromEvolved = evolvedInternalSchema.getAllColsFullName(); - List colNamesFromOldSchema = oldSchema.getAllColsFullName(); - List diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromEvolved.contains(f)).collect(Collectors.toList()); + List colNamesFromIncoming = inComingInternalSchema.getAllColsFullName(); + List colNamesFromOldSchema = oldTableSchema.getAllColsFullName(); + List diffFromOldSchema = colNamesFromOldSchema.stream().filter(f -> !colNamesFromIncoming.contains(f)).collect(Collectors.toList()); List newFields = new ArrayList<>(); - if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) { - // no changes happen - if (supportPositionReorder) { - evolvedInternalSchema.getRecord().fields().forEach(f -> newFields.add(oldSchema.getRecord().field(f.name()))); - return new InternalSchema(newFields); - } - return oldSchema; - } - // try to find all added columns - if (diffFromOldSchema.size() != 0) { - throw new UnsupportedOperationException("Cannot evolve schema implicitly, find delete/rename operation"); + if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && diffFromOldSchema.size() == 0) { + return oldTableSchema; } - - List diffFromEvolutionSchema = colNamesFromEvolved.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList()); + List diffFromEvolutionSchema = colNamesFromIncoming.stream().filter(f -> !colNamesFromOldSchema.contains(f)).collect(Collectors.toList()); // Remove redundancy from diffFromEvolutionSchema. // for example, now we add a struct col in evolvedSchema, the struct col is " user struct " // when we do diff operation: user, user.name, user.age will appeared in the resultSet which is redundancy, user.name and user.age should be excluded. @@ -77,29 +73,27 @@ public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, // find redundancy, skip it continue; } - finalAddAction.put(evolvedInternalSchema.findIdByName(name), name); + finalAddAction.put(inComingInternalSchema.findIdByName(name), name); } - TableChanges.ColumnAddChange addChange = TableChanges.ColumnAddChange.get(oldSchema); + TableChanges.ColumnAddChange addChange = TableChanges.ColumnAddChange.get(oldTableSchema); finalAddAction.entrySet().stream().forEach(f -> { String name = f.getValue(); int splitPoint = name.lastIndexOf("."); String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : ""; String rawName = splitPoint > 0 ? name.substring(splitPoint + 1) : name; - addChange.addColumns(parentName, rawName, evolvedInternalSchema.findType(name), null); + // try to infer add position. + java.util.Optional inferPosition = + colNamesFromIncoming.stream().filter(c -> + c.lastIndexOf(".") == splitPoint + && c.startsWith(parentName) + && inComingInternalSchema.findIdByName(c) > inComingInternalSchema.findIdByName(name) + && oldTableSchema.findIdByName(c) > 0).sorted((s1, s2) -> oldTableSchema.findIdByName(s1) - oldTableSchema.findIdByName(s2)).findFirst(); + addChange.addColumns(parentName, rawName, inComingInternalSchema.findType(name), null); + inferPosition.map(i -> addChange.addPositionChange(name, i, "before")); }); - InternalSchema res = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange); - if (supportPositionReorder) { - evolvedInternalSchema.getRecord().fields().forEach(f -> newFields.add(oldSchema.getRecord().field(f.name()))); - return new InternalSchema(newFields); - } else { - return res; - } - } - - public static InternalSchema evolveSchemaFromNewAvroSchema(Schema evolvedSchema, InternalSchema oldSchema) { - return evolveSchemaFromNewAvroSchema(evolvedSchema, oldSchema, false); + return SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java index a784b409b8f2f..c799c236d0db0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java @@ -273,7 +273,7 @@ public static String createFullName(String name, Deque fieldNames) { * * @param oldSchema oldSchema * @param newSchema newSchema which modified from oldSchema - * @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema) + * @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameLastPartFromOldSchema) */ public static Map collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) { List colNamesFromWriteSchema = oldSchema.getAllColsFullName(); @@ -281,6 +281,9 @@ public static Map collectRenameCols(InternalSchema oldSchema, In int filedIdFromWriteSchema = oldSchema.findIdByName(f); // try to find the cols which has the same id, but have different colName; return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f); - }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e)); + }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> { + int lastDotIndex = e.lastIndexOf("."); + return e.substring(lastDotIndex == -1 ? 0 : lastDotIndex + 1); + })); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index bd0254da3dc6e..f2c02d627f131 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -27,12 +27,14 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -98,6 +100,12 @@ public class TestHoodieAvroUtils { + "{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":[" + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}"; + private static String SCHEMA_WITH_NESTED_FIELD_RENAMED = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":[" + + "{\"name\":\"fn\",\"type\":\"string\"}," + + "{\"name\":\"ln\",\"type\":\"string\"}," + + "{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":[" + + "{\"name\":\"fn\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"ln\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}"; + @Test public void testPropsPresent() { Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA)); @@ -342,4 +350,26 @@ public void testGetNestedFieldSchema() throws IOException { assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(rec3.getSchema(), "student.firstname")); assertEquals(Schema.create(Schema.Type.STRING), getNestedFieldSchemaFromWriteSchema(nestedSchema, "student.firstname")); } + + @Test + public void testReWriteAvroRecordWithNewSchema() { + Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD); + GenericRecord rec3 = new GenericData.Record(nestedSchema); + rec3.put("firstname", "person1"); + rec3.put("lastname", "person2"); + GenericRecord studentRecord = new GenericData.Record(rec3.getSchema().getField("student").schema()); + studentRecord.put("firstname", "person1"); + studentRecord.put("lastname", "person2"); + rec3.put("student", studentRecord); + + Schema nestedSchemaRename = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_RENAMED); + Map colRenames = new HashMap<>(); + colRenames.put("fn", "firstname"); + colRenames.put("ln", "lastname"); + colRenames.put("ss", "student"); + colRenames.put("ss.fn", "firstname"); + colRenames.put("ss.ln", "lastname"); + GenericRecord studentRecordRename = HoodieAvroUtils.rewriteRecordWithNewSchema(rec3, nestedSchemaRename, colRenames); + Assertions.assertEquals(GenericData.get().validate(nestedSchemaRename, studentRecordRename), true); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java index 3850ef07b90a3..6126c479c6154 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java @@ -38,6 +38,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +46,17 @@ public class TestAvroSchemaEvolutionUtils { + String schemaStr = "{\"type\":\"record\",\"name\":\"newTableName\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"data\"," + + "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"preferences\",\"type\":[\"null\"," + + "{\"type\":\"record\",\"name\":\"newTableName_preferences\",\"fields\":[{\"name\":\"feature1\"," + + "\"type\":\"boolean\"},{\"name\":\"feature2\",\"type\":[\"null\",\"boolean\"],\"default\":null}]}]," + + "\"default\":null},{\"name\":\"locations\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"record\"," + + "\"name\":\"newTableName_locations\",\"fields\":[{\"name\":\"lat\",\"type\":\"float\"},{\"name\":\"long\"," + + "\"type\":\"float\"}]}}},{\"name\":\"points\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\"," + + "{\"type\":\"record\",\"name\":\"newTableName_points\",\"fields\":[{\"name\":\"x\",\"type\":\"long\"}," + + "{\"name\":\"y\",\"type\":\"long\"}]}]}],\"default\":null},{\"name\":\"doubles\",\"type\":{\"type\":\"array\",\"items\":\"double\"}}," + + "{\"name\":\"properties\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[\"null\",\"string\"]}],\"default\":null}]}"; + @Test public void testPrimitiveTypes() { Schema[] avroPrimitives = new Schema[] { @@ -146,16 +158,6 @@ public void testArrayType() { @Test public void testComplexConvert() { - String schemaStr = "{\"type\":\"record\",\"name\":\"newTableName\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"data\"," - + "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"preferences\",\"type\":[\"null\"," - + "{\"type\":\"record\",\"name\":\"newTableName_preferences\",\"fields\":[{\"name\":\"feature1\"," - + "\"type\":\"boolean\"},{\"name\":\"feature2\",\"type\":[\"null\",\"boolean\"],\"default\":null}]}]," - + "\"default\":null},{\"name\":\"locations\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"record\"," - + "\"name\":\"newTableName_locations\",\"fields\":[{\"name\":\"lat\",\"type\":\"float\"},{\"name\":\"long\"," - + "\"type\":\"float\"}]}}},{\"name\":\"points\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[\"null\"," - + "{\"type\":\"record\",\"name\":\"newTableName_points\",\"fields\":[{\"name\":\"x\",\"type\":\"long\"}," - + "{\"name\":\"y\",\"type\":\"long\"}]}]}],\"default\":null},{\"name\":\"doubles\",\"type\":{\"type\":\"array\",\"items\":\"double\"}}," - + "{\"name\":\"properties\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[\"null\",\"string\"]}],\"default\":null}]}"; Schema schema = new Schema.Parser().parse(schemaStr); InternalSchema internalSchema = new InternalSchema(Types.Field.get(0, false, "id", Types.IntType.get()), @@ -284,7 +286,7 @@ public void testReWriteRecordWithTypeChanged() { .updateColumnType("col6", Types.StringType.get()); InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange); Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName()); - GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>()); + GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, Collections.emptyMap()); Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true); } @@ -349,9 +351,26 @@ public void testReWriteNestRecord() { ); Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName()); - GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>()); + GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, Collections.emptyMap()); // test the correctly of rewrite Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true); + + // test rewrite with rename + InternalSchema internalSchema = AvroInternalSchemaConverter.convert(schema); + // do change rename operation + TableChanges.ColumnUpdateChange updateChange = TableChanges.ColumnUpdateChange.get(internalSchema); + updateChange + .renameColumn("id", "idx") + .renameColumn("data", "datax") + .renameColumn("preferences.feature1", "f1") + .renameColumn("preferences.feature2", "f2") + .renameColumn("locations.value.lat", "lt"); + InternalSchema internalSchemaRename = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange); + Schema avroSchemaRename = AvroInternalSchemaConverter.convert(internalSchemaRename, schema.getName()); + Map renameCols = InternalSchemaUtils.collectRenameCols(internalSchema, internalSchemaRename); + GenericRecord avroRecordRename = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, avroSchemaRename, renameCols); + // test the correctly of rewrite + Assertions.assertEquals(GenericData.get().validate(avroSchemaRename, avroRecordRename), true); } @Test @@ -395,7 +414,7 @@ public void testEvolutionSchemaFromNewAvroSchema() { ); evolvedRecord = (Types.RecordType)InternalSchemaBuilder.getBuilder().refreshNewId(evolvedRecord, new AtomicInteger(0)); Schema evolvedAvroSchema = AvroInternalSchemaConverter.convert(evolvedRecord, "test1"); - InternalSchema result = AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(evolvedAvroSchema, oldSchema); + InternalSchema result = AvroSchemaEvolutionUtils.reconcileSchema(evolvedAvroSchema, oldSchema); Types.RecordType checkedRecord = Types.RecordType.get( Types.Field.get(0, false, "id", Types.IntType.get()), Types.Field.get(1, true, "data", Types.StringType.get()), @@ -419,4 +438,37 @@ public void testEvolutionSchemaFromNewAvroSchema() { ); Assertions.assertEquals(result.getRecord(), checkedRecord); } + + @Test + public void testReconcileSchema() { + // simple schema test + // a: boolean, b: int, c: long, d: date + Schema schema = create("simple", + new Schema.Field("a", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.BOOLEAN)), null, JsonProperties.NULL_VALUE), + new Schema.Field("b", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.INT)), null, JsonProperties.NULL_VALUE), + new Schema.Field("c", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.LONG)), null, JsonProperties.NULL_VALUE), + new Schema.Field("d", AvroInternalSchemaConverter.nullableSchema(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))), null, JsonProperties.NULL_VALUE)); + // a: boolean, c: long, c_1: long, d: date + Schema incomingSchema = create("simpleIncoming", + new Schema.Field("a", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.BOOLEAN)), null, JsonProperties.NULL_VALUE), + new Schema.Field("a1", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.LONG)), null, JsonProperties.NULL_VALUE), + new Schema.Field("c", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.LONG)), null, JsonProperties.NULL_VALUE), + new Schema.Field("c1", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.LONG)), null, JsonProperties.NULL_VALUE), + new Schema.Field("c2", AvroInternalSchemaConverter.nullableSchema(Schema.create(Schema.Type.LONG)), null, JsonProperties.NULL_VALUE), + new Schema.Field("d", AvroInternalSchemaConverter.nullableSchema(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))), null, JsonProperties.NULL_VALUE), + new Schema.Field("d1", AvroInternalSchemaConverter.nullableSchema(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))), null, JsonProperties.NULL_VALUE), + new Schema.Field("d2", AvroInternalSchemaConverter.nullableSchema(LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT))), null, JsonProperties.NULL_VALUE)); + + Schema simpleCheckSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"simpleReconcileSchema\",\"fields\":[{\"name\":\"a\",\"type\":[\"null\",\"boolean\"],\"default\":null}," + + "{\"name\":\"b\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"a1\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"c\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"c1\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"c2\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"d\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}," + + "{\"name\":\"d1\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}," + + "{\"name\":\"d2\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}"); + + Schema simpleReconcileSchema = AvroInternalSchemaConverter.convert(AvroSchemaEvolutionUtils + .reconcileSchema(incomingSchema, AvroInternalSchemaConverter.convert(schema)), "simpleReconcileSchema"); + Assertions.assertEquals(simpleReconcileSchema, simpleCheckSchema); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 8ff82746b0ceb..02fc4567c3fea 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -399,12 +399,7 @@ object DataSourceWriteOptions { .defaultValue(classOf[HiveSyncTool].getName) .withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.") - val RECONCILE_SCHEMA: ConfigProperty[Boolean] = ConfigProperty - .key("hoodie.datasource.write.reconcile.schema") - .defaultValue(false) - .withDocumentation("When a new batch of write has records with old schema, but latest table schema got " - + "evolved, this config will upgrade the records to leverage latest table schema(default values will be " - + "injected to missing fields). If not, the write batch would fail.") + val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA // HIVE SYNC SPECIFIC CONFIGS // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 510a45899bce5..a90e6b8e8e653 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -40,6 +40,7 @@ import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool} import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} @@ -242,16 +243,29 @@ object HoodieSparkSqlWriter { classOf[org.apache.avro.Schema])) var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema) - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean + && internalSchemaOpt.isEmpty) { + // force apply full schema evolution. + internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) + } if (reconcileSchema) { schema = lastestSchema } if (internalSchemaOpt.isDefined) { - schema = { - val newSparkSchema = AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema)) - AvroConversionUtils.convertStructTypeToAvroSchema(newSparkSchema, structName, nameSpace) - + // Apply schema evolution. + val mergedSparkSchema = if (!reconcileSchema) { + AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema)) + } else { + // Auto merge write schema and read schema. + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get) + AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, lastestSchema.getName)) } + schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace) + } + + if (reconcileSchema && internalSchemaOpt.isEmpty) { + schema = lastestSchema } validateSchemaForHoodieIsDeleted(schema) sparkContext.getConf.registerAvroSchemas(schema) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala index e71973f94a164..01a81976cf751 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala @@ -199,9 +199,7 @@ class TestHoodieSparkUtils { fail("createRdd should fail, because records don't have a column which is not nullable in the passed in schema") } catch { case e: Exception => - val cause = e.getCause - assertTrue(cause.isInstanceOf[SchemaCompatibilityException]) - assertTrue(e.getMessage.contains("Unable to validate the rewritten record {\"innerKey\": \"innerKey1_2\", \"innerValue\": 2} against schema")) + assertTrue(e.getMessage.contains("null of string in field new_nested_col of test_namespace.test_struct_name.nullableInnerStruct of union")) } spark.stop() } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 15fed579bba41..b64d386f1fb4a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -19,10 +19,13 @@ package org.apache.spark.sql.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{arrays_zip, col} +import org.apache.spark.sql.{Row, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -460,4 +463,65 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + + test("Test schema auto evolution") { + withTempDir { tmp => + Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType => + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + if (HoodieSparkUtils.gteqSpark3_1) { + + val dataGen = new HoodieTestDataGenerator + val schema = HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA + val records1 = RawTripTestPayload.recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 1000, schema)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + // drop tip_history.element.amount, city_to_state, distance_in_meters, drivers + val orgStringDf = inputDF1.drop("city_to_state", "distance_in_meters", "drivers") + .withColumn("tip_history", arrays_zip(col("tip_history.currency"))) + spark.sql("set hoodie.schema.on.read.enable=true") + + val hudiOptions = Map[String,String]( + HoodieWriteConfig.TABLE_NAME -> tableName, + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType, + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + "hoodie.schema.on.read.enable" -> "true", + "hoodie.datasource.write.reconcile.schema" -> "true", + DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true" + ) + + orgStringDf.write + .format("org.apache.hudi") + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .options(hudiOptions) + .mode(SaveMode.Overwrite) + .save(tablePath) + + val oldView = spark.read.format("hudi").load(tablePath) + oldView.show(false) + + val records2 = RawTripTestPayload.recordsToStrings(dataGen.generateUpdatesAsPerSchema("002", 100, schema)).toList + val inputD2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + val updatedStringDf = inputD2.drop("fare").drop("height") + val checkRowKey = inputD2.select("_row_key").collectAsList().map(_.getString(0)).get(0) + + updatedStringDf.write + .format("org.apache.hudi") + .options(hudiOptions) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .option("hoodie.datasource.write.reconcile.schema", "true") + .mode(SaveMode.Append) + .save(tablePath) + spark.read.format("hudi").load(tablePath).registerTempTable("newView") + val checkResult = spark.sql(s"select tip_history.amount,city_to_state,distance_in_meters,fare,height from newView where _row_key='$checkRowKey' ") + .collect().map(row => (row.isNullAt(0), row.isNullAt(1), row.isNullAt(2), row.isNullAt(3), row.isNullAt(4))) + assertResult((false, false, false, true, true))(checkResult(0)) + checkAnswer(spark.sql(s"select fare,height from newView where _row_key='$checkRowKey'").collect())( + Seq(null, null) + ) + } + } + } + } }