Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
Expand Down Expand Up @@ -51,7 +53,7 @@ public class CompactFunction extends ProcessFunction<CompactionPlanEvent, Compac
/**
* Write Client.
*/
private transient HoodieFlinkWriteClient writeClient;
private transient HoodieFlinkWriteClient<?> writeClient;

/**
* Whether to execute compaction asynchronously.
Expand Down Expand Up @@ -89,21 +91,24 @@ public void processElement(CompactionPlanEvent event, Context context, Collector
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector),
() -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector);
doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
}
}

private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
HoodieFlinkMergeOnReadTableCompactor compactor = new HoodieFlinkMergeOnReadTableCompactor();
private void doCompaction(String instantTime,
CompactionOperation compactionOperation,
Collector<CompactionCommitEvent> collector,
HoodieWriteConfig writeConfig) throws IOException {
HoodieFlinkMergeOnReadTableCompactor<?> compactor = new HoodieFlinkMergeOnReadTableCompactor<>();
List<WriteStatus> writeStatuses = compactor.compact(
new HoodieFlinkCopyOnWriteTable<>(
writeClient.getConfig(),
writeConfig,
writeClient.getEngineContext(),
writeClient.getHoodieTable().getMetaClient()),
writeClient.getHoodieTable().getMetaClient(),
Expand All @@ -114,6 +119,12 @@ private void doCompaction(String instantTime, CompactionOperation compactionOper
collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), writeStatuses, taskID));
}

private HoodieWriteConfig reloadWriteConfig() throws Exception {
HoodieWriteConfig writeConfig = writeClient.getConfig();
CompactionUtil.setAvroSchema(writeConfig, writeClient.getHoodieTable().getMetaClient());
return writeConfig;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to create the write client again, just update the schema of the write config, and in which case do we need the schema evolution, schema evolution is not supported now ~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will update this and waiting for schema evolution supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess writeConfig.setSchema should work, so no need to rebuild it ~


@VisibleForTesting
public void setExecutor(NonThrownExecutor executor) {
this.executor = executor;
Expand Down
13 changes: 13 additions & 0 deletions hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sink.compact.FlinkCompactionConfig;
Expand Down Expand Up @@ -106,6 +107,18 @@ public static void setAvroSchema(Configuration conf, HoodieTableMetaClient metaC
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, tableAvroSchema.toString());
}

/**
* Sets up the avro schema string into the HoodieWriteConfig {@code HoodieWriteConfig}
* through reading from the hoodie table metadata.
*
* @param writeConfig The HoodieWriteConfig
*/
public static void setAvroSchema(HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchema(false);
writeConfig.setSchema(tableAvroSchema.toString());
}

/**
* Infers the changelog mode based on the data file schema(including metadata fields).
*
Expand Down