Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieC
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();

// Disable auto commit. Strategy is only expected to write data in new files.
config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());

final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Clustering Strategy based on following.
Expand All @@ -60,13 +59,12 @@ public List<WriteStatus> performClusteringWithRecordList(
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));

HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.JAVA).withProps(props).build();
.withBulkInsertParallelism(numOutputGroups)
.withEngineType(EngineType.JAVA)
.withProps(getWriteConfig().getProps()).build();
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
return (List<WriteStatus>) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -111,7 +112,7 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,

FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
config.getFileIdPrefixProviderClassName(),
config.getProps());
new TypedProperties(config.getProps()));

List<WriteStatus> writeStatuses = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* This strategy is similar to {@link SparkSortAndSizeExecutionStrategy} with the difference being that
Expand Down Expand Up @@ -67,13 +66,12 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
}
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());

HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(Long.MAX_VALUE));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@

import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* Clustering Strategy based on following.
Expand All @@ -58,12 +57,11 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
Properties props = getWriteConfig().getProps();
props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), String.valueOf(numOutputGroups));
// We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();

HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
Expand Down