Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -774,11 +774,15 @@ private Set<String> getMetadataPartitionsToUpdate() {
// fetch partitions to update from table config
Set<String> partitionsToUpdate = getCompletedMetadataPartitions(dataMetaClient.getTableConfig());
// add inflight indexes as well because the file groups have already been initialized, so writers can log updates
// NOTE: Async HoodieIndexer can move some partition to inflight. While that partition is still being built,
// the regular ingestion writers should not be blocked. They can go ahead and log updates to the metadata partition.
// Instead of depending on enabledPartitionTypes, the table config becomes the source of truth for which partitions to update.
partitionsToUpdate.addAll(getInflightMetadataPartitions(dataMetaClient.getTableConfig()));
if (!partitionsToUpdate.isEmpty()) {
return partitionsToUpdate;
}
// fallback to all enabled partitions if table config returned no partitions
LOG.warn("There are no partitions to update according to table config. Falling back to enabled partition types in the write config.");
return getEnabledPartitionTypes().stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,8 @@ public static class PropertyBuilder {
private HoodieTimelineTimeZone commitTimeZone;
private Boolean partitionMetafileUseBaseFormat;
private Boolean dropPartitionColumnsWhenWrite;
private String metadataPartitions;
private String inflightMetadataPartitions;

/**
* Persist the configs that is written at the first time, and should not be changed.
Expand Down Expand Up @@ -825,6 +827,16 @@ public PropertyBuilder setDropPartitionColumnsWhenWrite(Boolean dropPartitionCol
return this;
}

public PropertyBuilder setMetadataPartitions(String partitions) {
this.metadataPartitions = partitions;
return this;
}

public PropertyBuilder setInflightMetadataPartitions(String partitions) {
this.inflightMetadataPartitions = partitions;
return this;
}

public PropertyBuilder set(String key, Object value) {
if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) {
this.others.put(key, value);
Expand Down Expand Up @@ -927,6 +939,14 @@ public PropertyBuilder fromProperties(Properties properties) {
if (hoodieConfig.contains(HoodieTableConfig.DROP_PARTITION_COLUMNS)) {
setDropPartitionColumnsWhenWrite(hoodieConfig.getBoolean(HoodieTableConfig.DROP_PARTITION_COLUMNS));
}

if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS)) {
setMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS));
}

if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) {
setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT));
}
return this;
}

Expand Down Expand Up @@ -1012,6 +1032,14 @@ public Properties build() {
if (null != dropPartitionColumnsWhenWrite) {
tableConfig.setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, Boolean.toString(dropPartitionColumnsWhenWrite));
}

if (null != metadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS, metadataPartitions);
}

if (null != inflightMetadataPartitions) {
tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions);
}
return tableConfig.getProps();
}

Expand Down