diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java index 17c19cebee991..246a5eef527c6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java @@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; @@ -94,6 +95,18 @@ public JavaRDD compact(HoodieEngineContext context, HoodieCompactio return jsc.emptyRDD(); } HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + + // Here we firstly use the table schema as the reader schema to read + // log file.That is because in the case of MergeInto, the config.getSchema may not + // the same with the table schema. + try { + Schema readerSchema = schemaUtil.getTableAvroSchema(false); + config.setSchema(readerSchema.toString()); + } catch (Exception e) { + // If there is no commit in the table, just ignore the exception. + } + // Compacting is very similar to applying updates to existing file HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient); List operations = compactionPlan.getOperations().stream() @@ -108,7 +121,6 @@ public JavaRDD compact(HoodieEngineContext context, HoodieCompactio private List compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient, HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException { FileSystem fs = metaClient.getFs(); - Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema())); LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames() + " for commit " + instantTime); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index 969d07b850463..5d4a76d89b5dc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -575,7 +575,6 @@ class TestMergeIntoTable extends TestHoodieSqlBase { checkAnswer(s"select id, name, price from $tableName")( Seq(1, "a1", 10.0) ) - spark.sql( s""" | merge into $tableName @@ -593,4 +592,54 @@ class TestMergeIntoTable extends TestHoodieSqlBase { } } } + + test("Test MergeInto For MOR With Compaction On") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '${tmp.getCanonicalPath}' + | options ( + | primaryKey ='id', + | type = 'mor', + | preCombineField = 'ts', + | hoodie.compact.inline = 'true' + | ) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)") + spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4",10.0, 1000) + ) + + spark.sql( + s""" + |merge into $tableName h0 + |using ( + | select 4 as id, 'a4' as name, 11 as price, 1000 as ts + | ) s0 + | on h0.id = s0.id + | when matched then update set * + |""".stripMargin) + + // 5 commits will trigger compaction. + checkAnswer(s"select id, name, price, ts from $tableName order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000), + Seq(4, "a4", 11.0, 1000) + ) + } + } }