Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -767,14 +767,14 @@ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldAvr
Schema.Field field = fields.get(i);
String fieldName = field.name();
fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) {
Schema.Field oldField = oldSchema.getField(field.name());
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
} else {
String fieldFullName = createFullName(fieldNames);
String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
// deal with rename
if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) {
if (oldSchema.getField(fieldNameFromOldSchema) != null) {
// find rename
Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}

test("Test Alter Table multiple times") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
if (HoodieSparkUtils.gteqSpark3_1) {
spark.sql("set hoodie.schema.on.read.enable=true")
spark.sql(
s"""
|create table $tableName (
| id int,
| col1 string,
| col2 string,
| ts long
|) using hudi
| location '$tablePath'
| options (
| type = '$tableType',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
spark.sql(s"show create table ${tableName}").show(false)
spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")

// Rename to a previously existing column name + insert
spark.sql(s"alter table ${tableName} drop column col1")
spark.sql(s"alter table ${tableName} rename column col2 to col1")

spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
checkAnswer(spark.sql(s"select col1 from ${tableName} order by id").collect())(
Seq("bbb"), Seq("aaa")
)
}
}
}
}

test("Test Alter Table complex") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down