diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index 560b5ffbad305..a43fcd5ad4bf9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -21,9 +21,11 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.CompactionOperation; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; +import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction writeClient; /** * Whether to execute compaction asynchronously. @@ -89,21 +91,24 @@ public void processElement(CompactionPlanEvent event, Context context, Collector if (asyncCompaction) { // executes the compaction task asynchronously to not block the checkpoint barrier propagate. executor.execute( - () -> doCompaction(instantTime, compactionOperation, collector), + () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()), (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)), "Execute compaction for instant %s from task %d", instantTime, taskID); } else { // executes the compaction task synchronously for batch mode. LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID); - doCompaction(instantTime, compactionOperation, collector); + doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig()); } } - private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector collector) throws IOException { - HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor(); + private void doCompaction(String instantTime, + CompactionOperation compactionOperation, + Collector collector, + HoodieWriteConfig writeConfig) throws IOException { + HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor<>(); List writeStatuses = compactor.compact( new HoodieFlinkCopyOnWriteTable<>( - writeClient.getConfig(), + writeConfig, writeClient.getEngineContext(), writeClient.getHoodieTable().getMetaClient()), writeClient.getHoodieTable().getMetaClient(), @@ -114,6 +119,12 @@ private void doCompaction(String instantTime, CompactionOperation compactionOper collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID)); } + private HoodieWriteConfig reloadWriteConfig() throws Exception { + HoodieWriteConfig writeConfig = writeClient.getConfig(); + CompactionUtil.setAvroSchema(writeConfig, writeClient.getHoodieTable().getMetaClient()); + return writeConfig; + } + @VisibleForTesting public void setExecutor(NonThrownExecutor executor) { this.executor = executor; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index d04937bf7d66f..74629f9b0942f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.sink.compact.FlinkCompactionConfig; @@ -106,6 +107,18 @@ public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaC conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString()); } + /** + * Sets up the avro schema string into the HoodieWriteConfig {@code HoodieWriteConfig} + * through reading from the hoodie table metadata. + * + * @param writeConfig The HoodieWriteConfig + */ + public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws Exception { + TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); + Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false); + writeConfig.setSchema(tableAvroSchema.toString()); + } + /** * Infers the changelog mode based on the data file schema(including metadata fields). *