diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 8ee0c163409a5..708a6471b825e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -107,10 +107,33 @@ public void testSchemaCompatibilityBasic() throws Exception { assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema), "Swapped fields are not compatible"); - String typeChangeSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX; - assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchema), - "Field type change is not compatible"); + assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed), + "Incompatible field type change is not allowed"); + + // Array of allowed schema field type transitions + String[][] allowedFieldChanges = { + {"string", "bytes"}, {"bytes", "string"}, + {"int", "long"}, {"int", "float"}, {"long", "float"}, + {"int", "double"}, {"float", "double"}, {"long", "double"}}; + for (String[] fieldChange : allowedFieldChanges) { + String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX; + String toSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX; + assertTrue(TableSchemaResolver.isSchemaCompatible(fromSchema, toSchema), + "Compatible field type change is not allowed"); + if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) { + assertFalse(TableSchemaResolver.isSchemaCompatible(toSchema, fromSchema), + "Incompatible field type change is allowed"); + } + } + + // Names and aliases should match + String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; + String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; + assertFalse(TableSchemaResolver.isSchemaCompatible(fromSchema, toSchema), "Field names should match"); + assertFalse(TableSchemaResolver.isSchemaCompatible(toSchema, fromSchema), "Field names should match"); + assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED), "Added field with default is compatible (Evolved Schema)"); @@ -474,6 +497,7 @@ private HoodieWriteConfig getWriteConfig(String schema) { .build(); } + @Override protected HoodieTableType getTableType() { return tableType; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 372b3936b5b2f..181edd3caaf6f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -296,7 +296,7 @@ public MessageType convertAvroSchemaToParquet(Schema schema) { public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) { // record names must match: - if (!SchemaCompatibility.schemaNameEquals(oldSchema, newSchema)) { + if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) { return false; } @@ -329,9 +329,11 @@ public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { // All fields in the newSchema record can be populated from the oldSchema record return true; } else { - // Use the checks implemented by + // Use the checks implemented by Avro + // newSchema is the schema which will be used to read the records written earlier using oldSchema. Hence, in the + // check below, use newSchema as the reader schema and oldSchema as the writer schema. org.apache.avro.SchemaCompatibility.SchemaPairCompatibility compatResult = - org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(oldSchema, newSchema); + org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(newSchema, oldSchema); return compatResult.getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; } }