Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
val primaryKeys = hoodieCatalogTable.tableConfig.getRecordKeyFieldProp.split(",")
// Only records that are not included in the target table can be inserted
val insertSourceDF = sourceDF.join(targetDF, primaryKeys,"leftanti")
executeInsertOnly(insertSourceDF, parameters)

// column order changed after left anti join , we should keep column order of source dataframe
val cols = removeMetaFields(sourceDF).columns
executeInsertOnly(insertSourceDF.select(cols.head, cols.tail:_*), parameters)
}
sparkSession.catalog.refreshTable(targetTableIdentify.unquotedString)
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,41 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
}
}

test("Test Merge Into For Source Table With Different Column Order") {
withTempDir { tmp =>
val tableName = generateTableName
// Create a mor partitioned table.
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt string
| ) using hudi
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
| partitioned by(dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)
// Insert data which matched insert-condition.
spark.sql(
s"""
| merge into $tableName as t0
| using (
| select 'a1' as name, 1 as id, 10 as price, 1000 as ts, '2021-03-21' as dt
| ) as s0
| on t0.id = s0.id
| when not matched and s0.id % 2 = 1 then insert *
""".stripMargin
)
checkAnswer(s"select id,name,price,dt from $tableName")(
Seq(1, "a1", 10, "2021-03-21")
)
}
}
}