Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
"hoodie.clustering.plan.partition.filter.mode";

// Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";
private static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";

public static final ConfigProperty<String> DAYBASED_LOOKBACK_PARTITIONS = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions")
Expand Down Expand Up @@ -190,63 +190,88 @@ public class HoodieClusteringConfig extends HoodieConfig {
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

/**
* Using space-filling curves to optimize the layout of table to boost query performance.
* The table data which sorted by space-filling curve has better aggregation;
* combine with min-max filtering, it can achieve good performance improvement.
*
* Notice:
* when we use this feature, we need specify the sort columns.
* The more columns involved in sorting, the worse the aggregation, and the smaller the query performance improvement.
* Choose the filter columns which commonly used in query sql as sort columns.
* It is recommend that 2 ~ 4 columns participate in sorting.
* @deprecated this setting has no effect. Please refer to clustering configuration, as well as
* {@link #LAYOUT_OPTIMIZE_STRATEGY} config to enable advanced record layout optimization strategies
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable")
.defaultValue(false)
.sinceVersion("0.10.0")
.withDocumentation("Enable use z-ordering/space-filling curves to optimize the layout of table to boost query performance. "
+ "This parameter takes precedence over clustering strategy set using " + EXECUTION_STRATEGY_CLASS_NAME.key());
.deprecatedAfter("0.11.0")
.withDocumentation("This setting has no effect. Please refer to clustering configuration, as well as "
+ "LAYOUT_OPTIMIZE_STRATEGY config to enable advanced record layout optimization strategies");

public static final ConfigProperty LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty
/**
* Determines ordering strategy in for records layout optimization.
* Currently, following strategies are supported
* <ul>
* <li>Linear: simply orders records lexicographically</li>
* <li>Z-order: orders records along Z-order spatial-curve</li>
* <li>Hilbert: orders records along Hilbert's spatial-curve</li>
* </ul>
*
* NOTE: "z-order", "hilbert" strategies may consume considerably more compute, than "linear".
* Make sure to perform small-scale local testing for your dataset before applying globally.
*/
public static final ConfigProperty<String> LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "strategy")
.defaultValue("z-order")
.defaultValue("linear")
.sinceVersion("0.10.0")
.withDocumentation("Type of layout optimization to be applied, current only supports `z-order` and `hilbert` curves.");
.withDocumentation("Determines ordering strategy used in records layout optimization. "
+ "Currently supported strategies are \"linear\", \"z-order\" and \"hilbert\" values are supported.");

/**
* There exists two method to build z-curve.
* one is directly mapping sort cols to z-value to build z-curve;
* we can find this method in Amazon DynamoDB https://aws.amazon.com/cn/blogs/database/tag/z-order/
* the other one is Boundary-based Interleaved Index method which we proposed. simply call it sample method.
* Refer to rfc-28 for specific algorithm flow.
* Boundary-based Interleaved Index method has better generalization, but the build speed is slower than direct method.
* NOTE: This setting only has effect if {@link #LAYOUT_OPTIMIZE_STRATEGY} value is set to
* either "z-order" or "hilbert" (ie leveraging space-filling curves)
*
* Currently, two methods to order records along the curve are supported "build" and "sample":
*
* <ul>
* <li>Direct: entails that spatial curve will be built in full, "filling in" all of the individual
* points corresponding to each individual record</li>
* <li>Sample: leverages boundary-base interleaved index method (described in more details in
* Amazon DynamoDB blog [1])</li>
* </ul>
*
* NOTE: Boundary-based interleaved Index method has better generalization,
* but is slower than direct method.
*
* Please refer to RFC-28 for specific elaboration on both flows.
*
* [1] https://aws.amazon.com/cn/blogs/database/tag/z-order/
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD = ConfigProperty
public static final ConfigProperty<String> LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "curve.build.method")
.defaultValue("direct")
.sinceVersion("0.10.0")
.withDocumentation("Controls how data is sampled to build the space filling curves. two methods: `direct`,`sample`."
+ "The direct method is faster than the sampling, however sample method would produce a better data layout.");
.withDocumentation("Controls how data is sampled to build the space-filling curves. "
+ "Two methods: \"direct\", \"sample\". The direct method is faster than the sampling, "
+ "however sample method would produce a better data layout.");

/**
* Doing sample for table data is the first step in Boundary-based Interleaved Index method.
* larger sample number means better optimize result, but more memory consumption
* NOTE: This setting only has effect if {@link #LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD} value
* is set to "sample"
*
* Determines target sample size used by the Boundary-based Interleaved Index method.
* Larger sample size entails better layout optimization outcomes, at the expense of higher memory
* footprint.
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty
public static final ConfigProperty<String> LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "build.curve.sample.size")
.defaultValue("200000")
.sinceVersion("0.10.0")
.withDocumentation("when setting" + LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD.key() + " to `sample`, the amount of sampling to be done."
+ "Large sample size leads to better results, at the expense of more memory usage.");
.withDocumentation("Determines target sample size used by the Boundary-based Interleaved Index method "
+ "of building space-filling curve. Larger sample size entails better layout optimization outcomes, "
+ "at the expense of higher memory footprint.");

/**
* The best way to use Z-order/Space-filling curves is to cooperate with Data-Skipping
* with data-skipping query engine can greatly reduce the number of table files to be read.
* otherwise query engine can only do row-group skipping for files (parquet/orc)
* @deprecated this setting has no effect
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "data.skipping.enable")
.defaultValue(true)
.sinceVersion("0.10.0")
.deprecatedAfter("0.11.0")
.withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete.");

public static final ConfigProperty<Boolean> ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT = ConfigProperty
Expand Down Expand Up @@ -516,18 +541,13 @@ public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering)
return this;
}

public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable));
return this;
}

public Builder withDataOptimizeStrategy(String strategy) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy);
return this;
}

public Builder withDataOptimizeBuildCurveStrategy(String method) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD, method);
clusteringConfig.setValue(LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD, method);
return this;
}

Expand All @@ -536,11 +556,6 @@ public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) {
return this;
}

public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE, String.valueOf(dataSkipping));
return this;
}

public HoodieClusteringConfig build() {
clusteringConfig.setDefaultValue(
PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType));
Expand Down Expand Up @@ -578,21 +593,21 @@ private String getDefaultExecutionStrategyClassName(EngineType engineType) {
/**
* Type of a strategy for building Z-order/Hilbert space-filling curves.
*/
public enum BuildCurveStrategyType {
public enum SpatialCurveCompositionStrategyType {
DIRECT("direct"),
SAMPLE("sample");

private static final Map<String, BuildCurveStrategyType> VALUE_TO_ENUM_MAP =
TypeUtils.getValueToEnumMap(BuildCurveStrategyType.class, e -> e.value);
private static final Map<String, SpatialCurveCompositionStrategyType> VALUE_TO_ENUM_MAP =
TypeUtils.getValueToEnumMap(SpatialCurveCompositionStrategyType.class, e -> e.value);

private final String value;

BuildCurveStrategyType(String value) {
SpatialCurveCompositionStrategyType(String value) {
this.value = value;
}

public static BuildCurveStrategyType fromValue(String value) {
BuildCurveStrategyType enumValue = VALUE_TO_ENUM_MAP.get(value);
public static SpatialCurveCompositionStrategyType fromValue(String value) {
SpatialCurveCompositionStrategyType enumValue = VALUE_TO_ENUM_MAP.get(value);
if (enumValue == null) {
throw new HoodieException(String.format("Invalid value (%s)", value));
}
Expand All @@ -605,6 +620,7 @@ public static BuildCurveStrategyType fromValue(String value) {
* Layout optimization strategies such as Z-order/Hilbert space-curves, etc
*/
public enum LayoutOptimizationStrategy {
LINEAR("linear"),
ZORDER("z-order"),
HILBERT("hilbert");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1288,30 +1288,21 @@ public String getClusteringSortColumns() {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS);
}

/**
* Data layout optimize properties.
*/
public boolean isLayoutOptimizationEnabled() {
return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE);
}

public String getLayoutOptimizationStrategy() {
return getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY);
public HoodieClusteringConfig.LayoutOptimizationStrategy getLayoutOptimizationStrategy() {
return HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(
getStringOrDefault(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY)
);
}

public HoodieClusteringConfig.BuildCurveStrategyType getLayoutOptimizationCurveBuildMethod() {
return HoodieClusteringConfig.BuildCurveStrategyType.fromValue(
getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD));
public HoodieClusteringConfig.SpatialCurveCompositionStrategyType getLayoutOptimizationCurveBuildMethod() {
return HoodieClusteringConfig.SpatialCurveCompositionStrategyType.fromValue(
getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD));
}

public int getLayoutOptimizationSampleSize() {
return getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE);
}

public boolean isDataSkippingEnabled() {
return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE);
}

/**
* index properties.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadataForTableServices(table, metadata, compactionInstant);
updateTableMetadata(table, metadata, compactionInstant);
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
Expand Down Expand Up @@ -378,17 +378,20 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}

final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());

finalizeWrite(table, clusteringCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata,clusteringInstant);
// Update outstanding metadata indexes
if (config.isLayoutOptimizationEnabled()
&& !config.getClusteringSortColumns().isEmpty()) {
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);
}
// Update table's metadata (table)
updateTableMetadata(table, metadata, clusteringInstant);
// Update tables' metadata indexes
// NOTE: This overlaps w/ metadata table (above) and will be reconciled in the future
table.updateMetadataIndexes(context, writeStats, clusteringCommitTime);

LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata);

table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand All @@ -412,13 +415,13 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}

private void writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
private void updateTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent(
w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
table.getMetadataWriter(hoodieInstant.getTimestamp())
.ifPresent(writer -> writer.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand All @@ -39,11 +43,12 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner;
import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
import org.apache.hudi.io.IOUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand All @@ -54,11 +59,6 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -134,16 +134,28 @@ public abstract JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaR
* @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty.
*/
protected Option<BulkInsertPartitioner<T>> getPartitioner(Map<String, String> strategyParams, Schema schema) {
if (getWriteConfig().isLayoutOptimizationEnabled()) {
// sort input records by z-order/hilbert
return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(),
getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema)));
} else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled()));
} else {
return Option.empty();
}
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));

return orderByColumnsOpt.map(orderByColumns -> {
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
case ZORDER:
case HILBERT:
return new RDDSpatialCurveSortPartitioner(
(HoodieSparkEngineContext) getEngineContext(),
orderByColumns,
layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(),
HoodieAvroUtils.addMetadataFields(schema));
case LINEAR:
return new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema),
getWriteConfig().isConsistentLogicalTimestampEnabled());
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
});
}

/**
Expand Down
Loading