diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index 057b4a6f61299..f9dc297342230 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -59,7 +59,7 @@ public class HoodieClusteringConfig extends HoodieConfig { "hoodie.clustering.plan.partition.filter.mode"; // Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix - public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize."; + private static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize."; public static final ConfigProperty DAYBASED_LOOKBACK_PARTITIONS = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions") @@ -190,63 +190,88 @@ public class HoodieClusteringConfig extends HoodieConfig { .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); /** - * Using space-filling curves to optimize the layout of table to boost query performance. - * The table data which sorted by space-filling curve has better aggregation; - * combine with min-max filtering, it can achieve good performance improvement. - * - * Notice: - * when we use this feature, we need specify the sort columns. - * The more columns involved in sorting, the worse the aggregation, and the smaller the query performance improvement. - * Choose the filter columns which commonly used in query sql as sort columns. - * It is recommend that 2 ~ 4 columns participate in sorting. + * @deprecated this setting has no effect. Please refer to clustering configuration, as well as + * {@link #LAYOUT_OPTIMIZE_STRATEGY} config to enable advanced record layout optimization strategies */ public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable") .defaultValue(false) .sinceVersion("0.10.0") - .withDocumentation("Enable use z-ordering/space-filling curves to optimize the layout of table to boost query performance. " - + "This parameter takes precedence over clustering strategy set using " + EXECUTION_STRATEGY_CLASS_NAME.key()); + .deprecatedAfter("0.11.0") + .withDocumentation("This setting has no effect. Please refer to clustering configuration, as well as " + + "LAYOUT_OPTIMIZE_STRATEGY config to enable advanced record layout optimization strategies"); - public static final ConfigProperty LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty + /** + * Determines ordering strategy in for records layout optimization. + * Currently, following strategies are supported + * + * + * NOTE: "z-order", "hilbert" strategies may consume considerably more compute, than "linear". + * Make sure to perform small-scale local testing for your dataset before applying globally. + */ + public static final ConfigProperty LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "strategy") - .defaultValue("z-order") + .defaultValue("linear") .sinceVersion("0.10.0") - .withDocumentation("Type of layout optimization to be applied, current only supports `z-order` and `hilbert` curves."); + .withDocumentation("Determines ordering strategy used in records layout optimization. " + + "Currently supported strategies are \"linear\", \"z-order\" and \"hilbert\" values are supported."); /** - * There exists two method to build z-curve. - * one is directly mapping sort cols to z-value to build z-curve; - * we can find this method in Amazon DynamoDB https://aws.amazon.com/cn/blogs/database/tag/z-order/ - * the other one is Boundary-based Interleaved Index method which we proposed. simply call it sample method. - * Refer to rfc-28 for specific algorithm flow. - * Boundary-based Interleaved Index method has better generalization, but the build speed is slower than direct method. + * NOTE: This setting only has effect if {@link #LAYOUT_OPTIMIZE_STRATEGY} value is set to + * either "z-order" or "hilbert" (ie leveraging space-filling curves) + * + * Currently, two methods to order records along the curve are supported "build" and "sample": + * + * + * + * NOTE: Boundary-based interleaved Index method has better generalization, + * but is slower than direct method. + * + * Please refer to RFC-28 for specific elaboration on both flows. + * + * [1] https://aws.amazon.com/cn/blogs/database/tag/z-order/ */ - public static final ConfigProperty LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD = ConfigProperty + public static final ConfigProperty LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD = ConfigProperty .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "curve.build.method") .defaultValue("direct") .sinceVersion("0.10.0") - .withDocumentation("Controls how data is sampled to build the space filling curves. two methods: `direct`,`sample`." - + "The direct method is faster than the sampling, however sample method would produce a better data layout."); + .withDocumentation("Controls how data is sampled to build the space-filling curves. " + + "Two methods: \"direct\", \"sample\". The direct method is faster than the sampling, " + + "however sample method would produce a better data layout."); + /** - * Doing sample for table data is the first step in Boundary-based Interleaved Index method. - * larger sample number means better optimize result, but more memory consumption + * NOTE: This setting only has effect if {@link #LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD} value + * is set to "sample" + * + * Determines target sample size used by the Boundary-based Interleaved Index method. + * Larger sample size entails better layout optimization outcomes, at the expense of higher memory + * footprint. */ - public static final ConfigProperty LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty + public static final ConfigProperty LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "build.curve.sample.size") .defaultValue("200000") .sinceVersion("0.10.0") - .withDocumentation("when setting" + LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD.key() + " to `sample`, the amount of sampling to be done." - + "Large sample size leads to better results, at the expense of more memory usage."); + .withDocumentation("Determines target sample size used by the Boundary-based Interleaved Index method " + + "of building space-filling curve. Larger sample size entails better layout optimization outcomes, " + + "at the expense of higher memory footprint."); /** - * The best way to use Z-order/Space-filling curves is to cooperate with Data-Skipping - * with data-skipping query engine can greatly reduce the number of table files to be read. - * otherwise query engine can only do row-group skipping for files (parquet/orc) + * @deprecated this setting has no effect */ public static final ConfigProperty LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE = ConfigProperty .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "data.skipping.enable") .defaultValue(true) .sinceVersion("0.10.0") + .deprecatedAfter("0.11.0") .withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete."); public static final ConfigProperty ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT = ConfigProperty @@ -516,18 +541,13 @@ public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering) return this; } - public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) { - clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable)); - return this; - } - public Builder withDataOptimizeStrategy(String strategy) { clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy); return this; } public Builder withDataOptimizeBuildCurveStrategy(String method) { - clusteringConfig.setValue(LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD, method); + clusteringConfig.setValue(LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD, method); return this; } @@ -536,11 +556,6 @@ public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) { return this; } - public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) { - clusteringConfig.setValue(LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE, String.valueOf(dataSkipping)); - return this; - } - public HoodieClusteringConfig build() { clusteringConfig.setDefaultValue( PLAN_STRATEGY_CLASS_NAME, getDefaultPlanStrategyClassName(engineType)); @@ -578,21 +593,21 @@ private String getDefaultExecutionStrategyClassName(EngineType engineType) { /** * Type of a strategy for building Z-order/Hilbert space-filling curves. */ - public enum BuildCurveStrategyType { + public enum SpatialCurveCompositionStrategyType { DIRECT("direct"), SAMPLE("sample"); - private static final Map VALUE_TO_ENUM_MAP = - TypeUtils.getValueToEnumMap(BuildCurveStrategyType.class, e -> e.value); + private static final Map VALUE_TO_ENUM_MAP = + TypeUtils.getValueToEnumMap(SpatialCurveCompositionStrategyType.class, e -> e.value); private final String value; - BuildCurveStrategyType(String value) { + SpatialCurveCompositionStrategyType(String value) { this.value = value; } - public static BuildCurveStrategyType fromValue(String value) { - BuildCurveStrategyType enumValue = VALUE_TO_ENUM_MAP.get(value); + public static SpatialCurveCompositionStrategyType fromValue(String value) { + SpatialCurveCompositionStrategyType enumValue = VALUE_TO_ENUM_MAP.get(value); if (enumValue == null) { throw new HoodieException(String.format("Invalid value (%s)", value)); } @@ -605,6 +620,7 @@ public static BuildCurveStrategyType fromValue(String value) { * Layout optimization strategies such as Z-order/Hilbert space-curves, etc */ public enum LayoutOptimizationStrategy { + LINEAR("linear"), ZORDER("z-order"), HILBERT("hilbert"); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 5a49897732aba..37af347d407ad 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1288,30 +1288,21 @@ public String getClusteringSortColumns() { return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS); } - /** - * Data layout optimize properties. - */ - public boolean isLayoutOptimizationEnabled() { - return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE); - } - - public String getLayoutOptimizationStrategy() { - return getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY); + public HoodieClusteringConfig.LayoutOptimizationStrategy getLayoutOptimizationStrategy() { + return HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue( + getStringOrDefault(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY) + ); } - public HoodieClusteringConfig.BuildCurveStrategyType getLayoutOptimizationCurveBuildMethod() { - return HoodieClusteringConfig.BuildCurveStrategyType.fromValue( - getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD)); + public HoodieClusteringConfig.SpatialCurveCompositionStrategyType getLayoutOptimizationCurveBuildMethod() { + return HoodieClusteringConfig.SpatialCurveCompositionStrategyType.fromValue( + getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD)); } public int getLayoutOptimizationSampleSize() { return getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE); } - public boolean isDataSkippingEnabled() { - return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE); - } - /** * index properties. */ diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 9b2aad3ebafa1..f4cecbeaadb7a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -305,7 +305,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); } + final HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); try { this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); + finalizeWrite(table, clusteringCommitTime, writeStats); - writeTableMetadataForTableServices(table, metadata,clusteringInstant); - // Update outstanding metadata indexes - if (config.isLayoutOptimizationEnabled() - && !config.getClusteringSortColumns().isEmpty()) { - table.updateMetadataIndexes(context, writeStats, clusteringCommitTime); - } + // Update table's metadata (table) + updateTableMetadata(table, metadata, clusteringInstant); + // Update tables' metadata indexes + // NOTE: This overlaps w/ metadata table (above) and will be reconciled in the future + table.updateMetadataIndexes(context, writeStats, clusteringCommitTime); + LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); + table.getActiveTimeline().transitionReplaceInflightToComplete( HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); @@ -412,13 +415,13 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { + private void updateTableMetadata(HoodieTable>, JavaRDD, JavaRDD> table, HoodieCommitMetadata commitMetadata, + HoodieInstant hoodieInstant) { boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent( - w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); + table.getMetadataWriter(hoodieInstant.getTimestamp()) + .ifPresent(writer -> writer.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index c88b848ddf8cf..d3c92e21175c5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,6 +18,10 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -39,11 +43,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; -import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveOptimizationSortPartitioner; +import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -54,11 +59,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -134,16 +134,28 @@ public abstract JavaRDD performClusteringWithRecordsRDD(final JavaR * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. */ protected Option> getPartitioner(Map strategyParams, Schema schema) { - if (getWriteConfig().isLayoutOptimizationEnabled()) { - // sort input records by z-order/hilbert - return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(), - getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema))); - } else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { - return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), - HoodieAvroUtils.addMetadataFields(schema), getWriteConfig().isConsistentLogicalTimestampEnabled())); - } else { - return Option.empty(); - } + Option orderByColumnsOpt = + Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) + .map(listStr -> listStr.split(",")); + + return orderByColumnsOpt.map(orderByColumns -> { + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy(); + switch (layoutOptStrategy) { + case ZORDER: + case HILBERT: + return new RDDSpatialCurveSortPartitioner( + (HoodieSparkEngineContext) getEngineContext(), + orderByColumns, + layoutOptStrategy, + getWriteConfig().getLayoutOptimizationCurveBuildMethod(), + HoodieAvroUtils.addMetadataFields(schema)); + case LINEAR: + return new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), + getWriteConfig().isConsistentLogicalTimestampEnabled()); + default: + throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy)); + } + }); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java index 98bf9151fc9ef..1d18c5a456f96 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobExecutionStrategy.java @@ -124,8 +124,8 @@ private Stream runClusteringForGroup(ClusteringGroupInfo clustering Iterator> writeStatuses = performClusteringWithRecordsIterator(inputRecords, clusteringOps.getNumOutputGroups(), instantTime, strategyParams, schema.get(), inputFileIds, preserveHoodieMetadata, taskContextSupplier); - Iterable> writestatusIterable = () -> writeStatuses; - return StreamSupport.stream(writestatusIterable.spliterator(), false) + Iterable> writeStatusIterable = () -> writeStatuses; + return StreamSupport.stream(writeStatusIterable.spliterator(), false) .flatMap(writeStatusList -> writeStatusList.stream()); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java similarity index 51% rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index ca7dfa3e7f2cd..c5e110055debe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -28,8 +28,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.RewriteAvroPayload; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sort.SpaceCurveSortingHelper; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; @@ -38,80 +38,74 @@ import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; /** * A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition. * support z-curve optimization, hilbert will come soon. * @param HoodieRecordPayload type */ -public class RDDSpatialCurveOptimizationSortPartitioner +public class RDDSpatialCurveSortPartitioner implements BulkInsertPartitioner>> { - private final HoodieSparkEngineContext sparkEngineContext; - private final SerializableSchema serializableSchema; - private final HoodieWriteConfig config; - public RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext sparkEngineContext, HoodieWriteConfig config, Schema schema) { + private final HoodieSparkEngineContext sparkEngineContext; + private final String[] orderByColumns; + private final Schema schema; + private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; + private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + + public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext, + String[] orderByColumns, + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, + HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType, + Schema schema) { this.sparkEngineContext = sparkEngineContext; - this.config = config; - this.serializableSchema = new SerializableSchema(schema); + this.orderByColumns = orderByColumns; + this.layoutOptStrategy = layoutOptStrategy; + this.curveCompositionStrategyType = curveCompositionStrategyType; + this.schema = schema; } @Override public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { - JavaRDD preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get()); - return preparedRecord.map(record -> { - String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); - String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); - HoodieKey hoodieKey = new HoodieKey(key, partition); - HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record)); - return hoodieRecord; - }); - } - - private JavaRDD prepareGenericRecord(JavaRDD> inputRecords, final int numOutputGroups, final Schema schema) { SerializableSchema serializableSchema = new SerializableSchema(schema); - JavaRDD genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); - Dataset originDF = + JavaRDD genericRecordsRDD = + records.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); + + Dataset sourceDataset = AvroConversionUtils.createDataFrame( - genericRecordJavaRDD.rdd(), + genericRecordsRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession() ); - Dataset sortedDF = reorder(originDF, numOutputGroups); - - return HoodieSparkUtils.createRdd(sortedDF, schema.getName(), - schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD(); + Dataset sortedDataset = reorder(sourceDataset, outputSparkPartitions); + + return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace(), false, Option.empty()) + .toJavaRDD() + .map(record -> { + String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieKey hoodieKey = new HoodieKey(key, partition); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record)); + return hoodieRecord; + }); } - private Dataset reorder(Dataset originDF, int numOutputGroups) { - String orderedColumnsListConfig = config.getClusteringSortColumns(); - - if (isNullOrEmpty(orderedColumnsListConfig) || numOutputGroups <= 0) { + private Dataset reorder(Dataset dataset, int numOutputGroups) { + if (orderByColumns.length == 0) { // No-op - return originDF; + return dataset; } - List orderedCols = - Arrays.stream(orderedColumnsListConfig.split(",")) - .map(String::trim) - .collect(Collectors.toList()); - - HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = - HoodieClusteringConfig.LayoutOptimizationStrategy.fromValue(config.getLayoutOptimizationStrategy()); - - HoodieClusteringConfig.BuildCurveStrategyType curveBuildStrategyType = config.getLayoutOptimizationCurveBuildMethod(); + List orderedCols = Arrays.asList(orderByColumns); - switch (curveBuildStrategyType) { + switch (curveCompositionStrategyType) { case DIRECT: - return SpaceCurveSortingHelper.orderDataFrameByMappingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups); + return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups); case SAMPLE: - return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(originDF, layoutOptStrategy, orderedCols, numOutputGroups); + return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups); default: - throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveBuildStrategyType)); + throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java index d92bac4d84714..5d7ae1e46bc62 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/columnstats/ColumnStatsIndexHelper.java @@ -262,10 +262,10 @@ public static void updateColumnStatsIndexFor( // │ │ ├── .parquet // │ │ └── ... // - // If index is currently empty (no persisted tables), we simply create one - // using clustering operation's commit instance as it's name Path newIndexTablePath = new Path(indexFolderPath, commitTime); + // If index is currently empty (no persisted tables), we simply create one + // using clustering operation's commit instance as it's name if (!fs.exists(new Path(indexFolderPath))) { newColStatsIndexDf.repartition(1) .write() @@ -326,6 +326,9 @@ public static void updateColumnStatsIndexFor( .repartition(1) .write() .format("parquet") + // NOTE: We intend to potentially overwrite index-table from the previous Clustering + // operation that has failed to commit + .mode("overwrite") .save(newIndexTablePath.toString()); // Clean up residual col-stats-index tables that have might have been dangling since diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index aa9a924ed6925..12c0483bf3e5d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -184,13 +184,6 @@ private void updateColumnsStatsIndex( String basePath = metaClient.getBasePath(); String indexPath = metaClient.getColumnStatsIndexPath(); - List completedCommits = - metaClient.getCommitsTimeline() - .filterCompletedInstants() - .getInstants() - .map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - List touchedFiles = updatedFilesStats.stream() .map(s -> new Path(basePath, s.getPath()).toString()) @@ -214,6 +207,13 @@ private void updateColumnsStatsIndex( new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields() ); + List completedCommits = + metaClient.getCommitsTimeline() + .filterCompletedInstants() + .getInstants() + .map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + ColumnStatsIndexHelper.updateColumnStatsIndexFor( sparkEngineContext.getSqlContext().sparkSession(), AvroConversionUtils.convertAvroSchemaToStructType(tableWriteSchema), diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 1e1d887906c99..058e7c357594d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -119,7 +119,8 @@ object DataSourceReadOptions { .key("hoodie.enable.data.skipping") .defaultValue(true) .sinceVersion("0.10.0") - .withDocumentation("enable data skipping to boost query after doing z-order optimize for current table") + .withDocumentation("Enables data-skipping allowing queries to leverage indexes to reduce the search space by " + + "skipping over files") /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSpaceCurveLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala similarity index 82% rename from hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSpaceCurveLayoutOptimization.scala rename to hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala index e453953ff11e2..98495c6278f1c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSpaceCurveLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala @@ -37,7 +37,7 @@ import scala.collection.JavaConversions._ import scala.util.Random @Tag("functional") -class TestSpaceCurveLayoutOptimization extends HoodieClientTestBase { +class TestLayoutOptimization extends HoodieClientTestBase { var spark: SparkSession = _ val sourceTableSchema = @@ -79,7 +79,13 @@ class TestSpaceCurveLayoutOptimization extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testLayoutOptimizationParameters")) - def testLayoutOptimizationFunctional(tableType: String): Unit = { + def testLayoutOptimizationFunctional(tableType: String, + layoutOptimizationStrategy: String, + spatialCurveCompositionStrategy: String): Unit = { + val curveCompositionStrategy = + Option(spatialCurveCompositionStrategy) + .getOrElse(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD.defaultValue()) + val targetRecordsCount = 10000 // Bulk Insert Operation val records = recordsToStrings(dataGen.generateInserts("001", targetRecordsCount)).toList @@ -98,8 +104,9 @@ class TestSpaceCurveLayoutOptimization extends HoodieClientTestBase { .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) - .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") - .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), layoutOptimizationStrategy) + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD.key(), curveCompositionStrategy) + .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat,begin_lon") .mode(SaveMode.Overwrite) .save(basePath) @@ -162,14 +169,20 @@ class TestSpaceCurveLayoutOptimization extends HoodieClientTestBase { } } -object TestSpaceCurveLayoutOptimization { +object TestLayoutOptimization { def testLayoutOptimizationParameters(): java.util.stream.Stream[Arguments] = { java.util.stream.Stream.of( - arguments("COPY_ON_WRITE", "hilbert"), - arguments("COPY_ON_WRITE", "z-order"), - arguments("MERGE_ON_READ", "hilbert"), - arguments("MERGE_ON_READ", "z-order") + arguments("COPY_ON_WRITE", "linear", null), + arguments("COPY_ON_WRITE", "z-order", "direct"), + arguments("COPY_ON_WRITE", "z-order", "sample"), + arguments("COPY_ON_WRITE", "hilbert", "direct"), + arguments("COPY_ON_WRITE", "hilbert", "sample"), + + arguments("MERGE_ON_READ", "linear", null), + arguments("MERGE_ON_READ", "z-order", "direct"), + arguments("MERGE_ON_READ", "z-order", "sample"), + arguments("MERGE_ON_READ", "hilbert", "direct"), + arguments("MERGE_ON_READ", "hilbert", "sample") ) } } -