Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
Expand Down Expand Up @@ -94,6 +95,18 @@ public JavaRDD<WriteStatus> compact(HoodieEngineContext context, HoodieCompactio
return jsc.emptyRDD();
}
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);

// Here we firstly use the table schema as the reader schema to read
// log file.That is because in the case of MergeInto, the config.getSchema may not
// the same with the table schema.
try {
Schema readerSchema = schemaUtil.getTableAvroSchema(false);
config.setSchema(readerSchema.toString());
} catch (Exception e) {
// If there is no commit in the table, just ignore the exception.
}

// Compacting is very similar to applying updates to existing file
HoodieSparkCopyOnWriteTable table = new HoodieSparkCopyOnWriteTable(config, context, metaClient);
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
Expand All @@ -108,7 +121,6 @@ public JavaRDD<WriteStatus> compact(HoodieEngineContext context, HoodieCompactio
private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String instantTime) throws IOException {
FileSystem fs = metaClient.getFs();

Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
LOG.info("Compacting base " + operation.getDataFileName() + " with delta files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
checkAnswer(s"select id, name, price from $tableName")(
Seq(1, "a1", 10.0)
)

spark.sql(
s"""
| merge into $tableName
Expand All @@ -593,4 +592,54 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
}
}
}

test("Test MergeInto For MOR With Compaction On") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}'
| options (
| primaryKey ='id',
| type = 'mor',
| preCombineField = 'ts',
| hoodie.compact.inline = 'true'
| )
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4",10.0, 1000)
)

spark.sql(
s"""
|merge into $tableName h0
|using (
| select 4 as id, 'a4' as name, 11 as price, 1000 as ts
| ) s0
| on h0.id = s0.id
| when matched then update set *
|""".stripMargin)

// 5 commits will trigger compaction.
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 10.0, 1000),
Seq(2, "a2", 10.0, 1000),
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 11.0, 1000)
)
}
}
}