Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,15 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue("true")
.withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility.");

public static final ConfigProperty<String> SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP = ConfigProperty
.key("hoodie.datasource.write.schema.allow.auto.evolution.column.drop")
.defaultValue("false")
.sinceVersion("0.13.0")
.withDocumentation("Controls whether table's schema is allowed to automatically evolve when "
+ "incoming batch's schema can have any of the columns dropped. By default, Hudi will not "
+ "allow this kind of (auto) schema evolution. Set this config to true to allow table's "
+ "schema to be updated automatically when columns are dropped from the new incoming batch.");

public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.insert.shuffle.parallelism")
.defaultValue("0")
Expand Down Expand Up @@ -1086,6 +1095,10 @@ public boolean shouldValidateAvroSchema() {
return getBoolean(AVRO_SCHEMA_VALIDATE_ENABLE);
}

public boolean shouldAllowAutoEvolutionColumnDrop() {
return getBooleanOrDefault(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP);
}

public String getTableName() {
return getString(TBL_NAME);
}
Expand Down Expand Up @@ -2451,6 +2464,11 @@ public Builder withAvroSchemaValidate(boolean enable) {
return this;
}

public Builder withAllowAutoEvolutionColumnDrop(boolean shouldAllowDroppedColumns) {
writeConfig.setValue(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, String.valueOf(shouldAllowDroppedColumns));
return this;
}

public Builder forTable(String tableName) {
writeConfig.setValue(TBL_NAME, tableName);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ public TaskContextSupplier getTaskContextSupplier() {
*/
private void validateSchema() throws HoodieUpsertException, HoodieInsertException {

if (!config.shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
if (!shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) {
// Check not required
return;
}
Expand All @@ -812,7 +812,7 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields());
isValid = isSchemaCompatible(tableSchema, writerSchema);
isValid = isSchemaCompatible(tableSchema, writerSchema, config.shouldAllowAutoEvolutionColumnDrop());
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
}
Expand Down Expand Up @@ -1010,4 +1010,12 @@ public HoodieTableMetadata getMetadataTable() {
public Runnable getPreExecuteRunnable() {
return Functions.noop();
}

private boolean shouldValidateAvroSchema() {
// TODO(HUDI-4772) re-enable validations in case partition columns
// being dropped from the data-file after fixing the write schema
Boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns();

return config.shouldValidateAvroSchema() && !shouldDropPartitionColumns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.client;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand All @@ -34,10 +32,17 @@
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -79,36 +84,35 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {

@Test
public void testSchemaCompatibilityBasic() {
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA, false),
"Same schema is compatible");

String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA
+ MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX;
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema),
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema, false),
"Reordered fields are compatible");
assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA),
assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA, false),
"Reordered fields are compatible");

String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history", "tip_future");

// NOTE: That even though renames could be carried over as "column drop" and "column add"
// both of which are legitimate operations, no data carry-over will occur (exactly b/c
// it's an old column being dropped, and the new one being added)
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema),
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, false),
"Renaming fields is essentially: dropping old field, created a new one");
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, true),
"Renaming fields is essentially: dropping old field, created a new one");

String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec", "triprec_renamed");
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema),
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema, false),
"Renamed record name is not compatible");

String swappedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA.replace("city_to_state", "fare")
+ FARE_NESTED_SCHEMA.replace("fare", "city_to_state") + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema),
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema, false),
"Swapped fields are not compatible");

String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+ TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX;
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed),
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed, false),
"Incompatible field type change is not allowed");

// Array of allowed schema field type transitions
Expand All @@ -119,43 +123,42 @@ public void testSchemaCompatibilityBasic() {
for (String[] fieldChange : allowedFieldChanges) {
String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX;
String toSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX;
assertTrue(isSchemaCompatible(fromSchema, toSchema),
assertTrue(isSchemaCompatible(fromSchema, toSchema, false),
"Compatible field type change is not allowed");
if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) {
assertFalse(isSchemaCompatible(toSchema, fromSchema),
assertFalse(isSchemaCompatible(toSchema, fromSchema, false),
"Incompatible field type change is allowed");
}
}

// Names and aliases should match
String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX;
assertFalse(isSchemaCompatible(fromSchema, toSchema), "Field names should match");
assertFalse(isSchemaCompatible(toSchema, fromSchema), "Field names should match");
assertFalse(isSchemaCompatible(fromSchema, toSchema, false), "Field names should match");
assertFalse(isSchemaCompatible(toSchema, fromSchema, false), "Field names should match");


assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED),
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, false),
"Added field with default is compatible (Evolved Schema)");

String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+ TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field")
+ TRIP_SCHEMA_SUFFIX;
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema, false),
"Multiple added fields with defaults are compatible");

assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX),
assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX, false),
"Added field without default and not nullable is not compatible (Evolved Schema)");

assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA),
assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA, false),
"Added nullable field is compatible (Evolved Schema)");
}

@Test
public void testMORTable() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testMORTable(boolean shouldAllowDroppedColumns) throws Exception {
tableType = HoodieTableType.MERGE_ON_READ;

// Create the table
Expand All @@ -165,7 +168,7 @@ public void testMORTable() throws Exception {
.setTimelineLayoutVersion(VERSION_1)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA, shouldAllowDroppedColumns);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);

// Initial inserts with TRIP_EXAMPLE_SCHEMA
Expand Down Expand Up @@ -194,20 +197,26 @@ public void testMORTable() throws Exception {
checkReadRecords("000", numRecords);

// Insert with evolved schema (column dropped) is allowed
HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns);
client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
final List<HoodieRecord> failedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
// We cannot use insertBatch directly here because we want to insert records
// with a evolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA.
writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false);
try {
writeBatch(client, "005", "004", Option.empty(), "003", numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false);
assertTrue(shouldAllowDroppedColumns);
} catch (HoodieInsertException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after L209, we should also do, before catch block

assertTrue(shouldAllowDroppedColumns); 

assertFalse(shouldAllowDroppedColumns);
return;
}

// Update with evolved schema (column dropped) is allowed
// Update with evolved schema (column dropped) might be allowed depending on config set.
updateBatch(hoodieDevolvedWriteConfig, client, "006", "005", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 2 * numRecords, 0);

// Insert with an evolved scheme is allowed
HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, shouldAllowDroppedColumns);
client = getHoodieWriteClient(hoodieEvolvedWriteConfig);

// We cannot use insertBatch directly here because we want to insert records
Expand All @@ -230,19 +239,28 @@ public void testMORTable() throws Exception {

// Now try updating w/ the original schema (should succeed)
client = getHoodieWriteClient(hoodieWriteConfig);
updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9);
try {
updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9);
assertTrue(shouldAllowDroppedColumns);
} catch (HoodieUpsertException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment as above

assertFalse(shouldAllowDroppedColumns);
}
}

@Test
public void testCopyOnWriteTable() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Exception {
// Create the table
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_1)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build();
HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA)
.withRollbackUsingMarkers(false)
.withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns)
.build();
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);

// Initial inserts with TRIP_EXAMPLE_SCHEMA
Expand All @@ -266,19 +284,25 @@ public void testCopyOnWriteTable() throws Exception {
checkReadRecords("000", numRecords);

// Inserting records w/ new evolved schema (w/ tip column dropped)
HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns);
client = getHoodieWriteClient(hoodieDevolvedWriteConfig);
final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED);
writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false);
try {
writeBatch(client, "004", "003", Option.empty(), "003", numRecords,
(String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false);
assertTrue(shouldAllowDroppedColumns);
} catch (HoodieInsertException e) {
assertFalse(shouldAllowDroppedColumns);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return;
}

// Updating records w/ new evolved schema
updateBatch(hoodieDevolvedWriteConfig, client, "005", "004", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
numUpdateRecords, 2 * numRecords, 5);

// Inserting with evolved schema is allowed
HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, shouldAllowDroppedColumns);
client = getHoodieWriteClient(hoodieEvolvedWriteConfig);
final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("006", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED);
// We cannot use insertBatch directly here because we want to insert records
Expand All @@ -299,9 +323,14 @@ public void testCopyOnWriteTable() throws Exception {

// Now try updating w/ the original schema (should succeed)
client = getHoodieWriteClient(hoodieWriteConfig);
updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
numUpdateRecords, 3 * numRecords, 8);
try {
updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(),
initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true,
numUpdateRecords, 3 * numRecords, 8);
assertTrue(shouldAllowDroppedColumns);
} catch (HoodieUpsertException e) {
assertFalse(shouldAllowDroppedColumns);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}

private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException {
Expand Down Expand Up @@ -362,8 +391,8 @@ private List<HoodieRecord> convertToSchema(List<HoodieRecord> records, String sc
}).collect(Collectors.toList());
}

private HoodieWriteConfig getWriteConfig(String schema) {
return getWriteConfigBuilder(schema).build();
private HoodieWriteConfig getWriteConfig(String schema, boolean shouldAllowDroppedColumns) {
return getWriteConfigBuilder(schema).withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns).build();
}

private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
Expand All @@ -373,8 +402,8 @@ private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
.withAvroSchemaValidate(true);
}

private static boolean isSchemaCompatible(String oldSchema, String newSchema) {
return AvroSchemaUtils.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema));
private static boolean isSchemaCompatible(String oldSchema, String newSchema, boolean shouldAllowDroppedColumns) {
return AvroSchemaUtils.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema), shouldAllowDroppedColumns);
}

@Override
Expand Down
Loading