Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@
import java.util.Collections;
import java.util.Map;

public abstract class BaseClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {

private static final Logger LOG = LogManager.getLogger(BaseClusteringPlanActionExecutor.class);
private static final Logger LOG = LogManager.getLogger(ClusteringPlanActionExecutor.class);

private final Option<Map<String, String>> extraMetadata;

public BaseClusteringPlanActionExecutor(HoodieEngineContext context,
public ClusteringPlanActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,44 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;

import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -53,6 +66,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
Expand Down Expand Up @@ -200,4 +215,65 @@ protected abstract Iterator<List<WriteStatus>> handleInsert(String idPfx,

protected abstract Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr) throws IOException;

protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseCommitActionExecutor responsibilities are a bit confusing, it handles regular writing process such as insert, upsert and with this path clustering, then what about the compaction?

Should we make a new base class for table services then ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the goal is to revamp the commit executors and write pipeline altogether later on, so the refactoring here is limited to code reuse. @xushiyan is that the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 agreed that it looks like some mixed responsibilities there. i'll make clearer separation in https://issues.apache.org/jira/browse/HUDI-2439

// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();

final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
validateWriteResult(clusteringPlan, writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}

private HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
Instant indexStartTime = Instant.now();
// Update the index back
HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
return statuses;
}

private Map<String, List<String>> getPartitionToReplacedFileIds(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());

return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> "org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy"
.equals(config.getClusteringExecutionStrategyClass())
|| !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(HoodieFileGroupId::getPartitionPath, Collectors.mapping(HoodieFileGroupId::getFileId, Collectors.toList())));
}

/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private void validateWriteResult(HoodieClusteringPlan clusteringPlan, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.JavaTaskContextSupplier;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieList;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ClusteringOperation;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand Down Expand Up @@ -71,7 +73,7 @@
* Clustering strategy for Java engine.
*/
public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
extends ClusteringExecutionStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
extends ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Java-specific class going to be removed as a follow-up?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua yes i should make another PR to deal with ClusteringExecutionStrategy and subclasses, which can be a good separation.


private static final Logger LOG = LogManager.getLogger(JavaExecutionStrategy.class);

Expand All @@ -81,7 +83,7 @@ public JavaExecutionStrategy(
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> performClustering(
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(
HoodieClusteringPlan clusteringPlan, Schema schema, String instantTime) {
// execute clustering for each group and collect WriteStatus
List<WriteStatus> writeStatusList = new ArrayList<>();
Expand All @@ -90,8 +92,8 @@ public HoodieWriteMetadata<List<WriteStatus>> performClustering(
inputGroup, clusteringPlan.getStrategy().getStrategyParams(),
Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false),
instantTime)));
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(writeStatusList);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieList.of(writeStatusList));
return writeMetadata;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.JavaClusteringPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.cluster.JavaExecuteClusteringCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.JavaBulkInsertPreppedCommitActionExecutor;
Expand All @@ -70,10 +70,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -192,7 +193,7 @@ public HoodieWriteMetadata<List<WriteStatus>> compact(HoodieEngineContext contex

@Override
public Option<HoodieClusteringPlan> scheduleClustering(final HoodieEngineContext context, final String instantTime, final Option<Map<String, String>> extraMetadata) {
return new JavaClusteringPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
return new ClusteringPlanActionExecutor<>(context, config, this, instantTime, extraMetadata).execute();
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,32 @@

package org.apache.hudi.table.action.cluster;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;

import org.apache.avro.Schema;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class JavaExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseJavaCommitActionExecutor<T> {

private final HoodieClusteringPlan clusteringPlan;

public JavaExecuteClusteringCommitActionExecutor(
HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime) {
public JavaExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
String instantTime) {
super(context, config, table, instantTime, WriteOperationType.CLUSTER);
this.clusteringPlan = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), HoodieTimeline.getReplaceCommitRequestedInstant(instantTime))
Expand All @@ -68,56 +54,13 @@ public JavaExecuteClusteringCommitActionExecutor(

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();

final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
HoodieWriteMetadata<List<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, List<HoodieRecord<? extends HoodieRecordPayload>>, List<HoodieKey>, List<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
List<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
List<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
writeMetadata.setWriteStats(statuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()));
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(writeMetadata));
validateWriteResult(writeMetadata);
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}

/**
* Validate actions taken by clustering. In the first implementation, we validate at least one new file is written.
* But we can extend this to add more validation. E.g. number of records read = number of records written etc.
* We can also make these validations in BaseCommitActionExecutor to reuse pre-commit hooks for multiple actions.
*/
private void validateWriteResult(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
if (writeMetadata.getWriteStatuses().isEmpty()) {
throw new HoodieClusteringException("Clustering plan produced 0 WriteStatus for " + instantTime
+ " #groups: " + clusteringPlan.getInputGroups().size() + " expected at least "
+ clusteringPlan.getInputGroups().stream().mapToInt(HoodieClusteringGroup::getNumOutputFileGroups).sum()
+ " write statuses");
}
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = executeClustering(clusteringPlan);
List<WriteStatus> transformedWriteStatuses = writeMetadata.getWriteStatuses().collectAsList();
return writeMetadata.clone(transformedWriteStatuses);
}

@Override
protected String getCommitActionType() {
return HoodieTimeline.REPLACE_COMMIT_ACTION;
}

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
Set<HoodieFileGroupId> newFilesWritten = writeMetadata.getWriteStats().get().stream()
.map(s -> new HoodieFileGroupId(s.getPartitionPath(), s.getFileId())).collect(Collectors.toSet());
return ClusteringUtils.getFileGroupsFromClusteringPlan(clusteringPlan)
.filter(fg -> !newFilesWritten.contains(fg))
.collect(Collectors.groupingBy(fg -> fg.getPartitionPath(), Collectors.mapping(fg -> fg.getFileId(), Collectors.toList())));
}
}
Loading