diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java index 815ef4892e68f..808eda5071e1f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java @@ -149,10 +149,27 @@ public void setSuccessRecordKeys(List successRecordKeys) { this.successRecordKeys = successRecordKeys; } + public double getFailureFraction() { + return failureFraction; + } + + public boolean isTrackSuccessRecords() { + return trackSuccessRecords; + } + @Override public String toString() { return "PartitionPath " + partitionPath + ", FileID " + fileId + ", Success records " + totalRecords + ", errored Rows " + totalErrorRecords + ", global error " + (globalError != null); } + + public WriteStatus toWriteStatus() { + WriteStatus status = new WriteStatus(trackSuccessRecords, failureFraction); + status.setFileId(fileId); + status.setTotalRecords(totalRecords); + status.setPartitionPath(partitionPath); + status.setStat(stat); + return status; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 78c8d677523a2..7f805410ea962 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -325,9 +325,9 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics)," + "running on each writer's driver process, accepting requests during the write from executors."); - public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty + public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty .key("hoodie.embed.timeline.server.reuse.enabled") - .defaultValue("false") + .defaultValue(false) .withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)" + "to avoid startup costs. This should rarely be changed."); @@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() { } public boolean isEmbeddedTimelineServerReuseEnabled() { - return Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED)); + return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED); } public int getEmbeddedTimelineServerPort() { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 5bd2da23451aa..0c52a7cc49b7e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -37,6 +38,7 @@ import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FutureUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -47,8 +49,11 @@ import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory; +import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner; +import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner; +import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner; import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; @@ -68,9 +73,16 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.sources.BaseRelation; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -95,13 +107,22 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext @Override public HoodieWriteMetadata> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) { JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext()); + boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false); // execute clustering for each group async and collect WriteStatus Stream> writeStatusesStream = FutureUtils.allOf( clusteringPlan.getInputGroups().stream() - .map(inputGroup -> runClusteringForGroupAsync(inputGroup, - clusteringPlan.getStrategy().getStrategyParams(), - Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), - instantTime)) + .map(inputGroup -> { + if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", false)) { + return runClusteringForGroupAsyncAsRow(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime); + } + return runClusteringForGroupAsync(inputGroup, + clusteringPlan.getStrategy().getStrategyParams(), + shouldPreserveMetadata, + instantTime); + }) .collect(Collectors.toList())) .join() .stream(); @@ -113,33 +134,62 @@ public HoodieWriteMetadata> performClustering(final Hood return writeMetadata; } + /** + * Execute clustering to write inputRecords into new files based on strategyParams. + * Different from {@link performClusteringWithRecordsRDD}, this method take {@link Dataset} + * as inputs. + */ + public abstract HoodieData performClusteringWithRecordsAsRow(final Dataset inputRecords, + final int numOutputGroups, + final String instantTime, + final Map strategyParams, + final Schema schema, + final List fileGroupIdList, + final boolean shouldPreserveHoodieMetadata, + final Map extraMetadata); + /** * Execute clustering to write inputRecords into new files as defined by rules in strategy parameters. * The number of new file groups created is bounded by numOutputGroups. * Note that commit is not done as part of strategy. commit is callers responsibility. * - * @param inputRecords RDD of {@link HoodieRecord}. - * @param numOutputGroups Number of output file groups. - * @param instantTime Clustering (replace commit) instant time. - * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. - * @param schema Schema of the data including metadata fields. - * @param fileGroupIdList File group id corresponding to each out group. - * @param preserveHoodieMetadata Whether to preserve commit metadata while clustering. + * @param inputRecords RDD of {@link HoodieRecord}. + * @param numOutputGroups Number of output file groups. + * @param instantTime Clustering (replace commit) instant time. + * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. + * @param schema Schema of the data including metadata fields. + * @param fileGroupIdList File group id corresponding to each out group. + * @param shouldPreserveHoodieMetadata Whether to preserve commit metadata while clustering. * @return RDD of {@link WriteStatus}. */ - public abstract HoodieData performClusteringWithRecordsRDD(final HoodieData> inputRecords, final int numOutputGroups, final String instantTime, - final Map strategyParams, final Schema schema, - final List fileGroupIdList, final boolean preserveHoodieMetadata, - final Map extraMetadata); + public abstract HoodieData performClusteringWithRecordsRDD(final HoodieData> inputRecords, + final int numOutputGroups, + final String instantTime, + final Map strategyParams, + final Schema schema, + final List fileGroupIdList, + final boolean shouldPreserveHoodieMetadata, + final Map extraMetadata); + + protected BulkInsertPartitioner> getRowPartitioner(Map strategyParams, + Schema schema) { + return getPartitioner(strategyParams, schema, true); + } + + protected BulkInsertPartitioner>> getRDDPartitioner(Map strategyParams, + Schema schema) { + return getPartitioner(strategyParams, schema, false); + } /** * Create {@link BulkInsertPartitioner} based on strategy params. * * @param strategyParams Strategy parameters containing columns to sort the data by when clustering. * @param schema Schema of the data including metadata fields. - * @return {@link RDDCustomColumnsSortPartitioner} if sort columns are provided, otherwise empty. */ - protected BulkInsertPartitioner>> getPartitioner(Map strategyParams, Schema schema) { + private BulkInsertPartitioner getPartitioner(Map strategyParams, + Schema schema, + boolean isRowPartitioner) { Option orderByColumnsOpt = Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key())) .map(listStr -> listStr.split(",")); @@ -149,26 +199,27 @@ protected BulkInsertPartitioner>> getPartitioner(Map> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map strategyParams, - boolean preserveHoodieMetadata, String instantTime) { + boolean preserveHoodieMetadata, String instantTime) { return CompletableFuture.supplyAsync(() -> { JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); HoodieData> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime); @@ -181,6 +232,25 @@ private CompletableFuture> runClusteringForGroupAsync(Ho }); } + /** + * Submit job to execute clustering for the group, directly using the spark native Row representation. + */ + private CompletableFuture> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup, + Map strategyParams, + boolean shouldPreserveHoodieMetadata, + String instantTime) { + return CompletableFuture.supplyAsync(() -> { + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); + Dataset inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime); + Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema())); + List inputFileIds = clusteringGroup.getSlices().stream() + .map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId())) + .collect(Collectors.toList()); + return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata, + clusteringGroup.getExtraMetadata()); + }); + } + /** * Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any). */ @@ -275,6 +345,66 @@ private HoodieData> readRecordsForGroupBaseFiles(JavaSparkContex .map(record -> transform(record, writeConfig))); } + /** + * Get dataset of all records for the group. This includes all records from file slice (Apply updates from log files, if any). + */ + private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc, + HoodieClusteringGroup clusteringGroup, + String instantTime) { + List clusteringOps = clusteringGroup.getSlices().stream() + .map(ClusteringOperation::create).collect(Collectors.toList()); + boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); + SQLContext sqlContext = new SQLContext(jsc.sc()); + + Path[] baseFilePaths = clusteringOps + .stream() + .map(op -> { + ArrayList readPaths = new ArrayList<>(); + if (op.getBootstrapFilePath() != null) { + readPaths.add(op.getBootstrapFilePath()); + } + if (op.getDataFilePath() != null) { + readPaths.add(op.getDataFilePath()); + } + return readPaths; + }) + .flatMap(Collection::stream) + .filter(path -> !path.isEmpty()) + .map(Path::new) + .toArray(Path[]::new); + + HashMap params = new HashMap<>(); + params.put("hoodie.datasource.query.type", "snapshot"); + params.put("as.of.instant", instantTime); + + Path[] paths; + if (hasLogFiles) { + String compactionFractor = Option.ofNullable(getWriteConfig().getString("compaction.memory.fraction")) + .orElse("0.75"); + params.put("compaction.memory.fraction", compactionFractor); + + Path[] deltaPaths = clusteringOps + .stream() + .filter(op -> !op.getDeltaFilePaths().isEmpty()) + .flatMap(op -> op.getDeltaFilePaths().stream()) + .map(Path::new) + .toArray(Path[]::new); + paths = CollectionUtils.combine(baseFilePaths, deltaPaths); + } else { + paths = baseFilePaths; + } + + String readPathString = String.join(",", Arrays.stream(paths).map(Path::toString).toArray(String[]::new)); + params.put("hoodie.datasource.read.paths", readPathString); + // Building HoodieFileIndex needs this param to decide query path + params.put("glob.paths", readPathString); + + // Let Hudi relations to fetch the schema from the table itself + BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter() + .createRelation(sqlContext, getHoodieTable().getMetaClient(), null, paths, params); + return sqlContext.baseRelationToDataFrame(relation); + } + /** * Stream to array conversion with generic type is not straightforward. * Implement a utility method to abstract high level logic. This needs to be improved in future diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java index 23c4b04577dbe..78cf449499562 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java @@ -35,6 +35,8 @@ import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import java.util.List; import java.util.Map; @@ -53,6 +55,18 @@ public SparkConsistentBucketClusteringExecutionStrategy(HoodieTable table, Hoodi super(table, engineContext, writeConfig); } + @Override + public HoodieData performClusteringWithRecordsAsRow(Dataset inputRecords, + int numOutputGroups, + String instantTime, + Map strategyParams, + Schema schema, + List fileGroupIdList, + boolean shouldPreserveHoodieMetadata, + Map extraMetadata) { + throw new HoodieClusteringException("Not implement yet"); + } + @Override public HoodieData performClusteringWithRecordsRDD(HoodieData> inputRecords, int numOutputGroups, String instantTime, Map strategyParams, Schema schema, List fileGroupIdList, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java index b994244c5314f..f2ae9a922d811 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java @@ -19,6 +19,7 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -35,6 +36,8 @@ import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import java.util.List; import java.util.Map; @@ -54,15 +57,40 @@ public SparkSingleFileSortExecutionStrategy(HoodieTable table, super(table, engineContext, writeConfig); } + @Override + public HoodieData performClusteringWithRecordsAsRow(Dataset inputRecords, + int numOutputGroups, + String instantTime, + Map strategyParams, + Schema schema, + List fileGroupIdList, + boolean shouldPreserveHoodieMetadata, + Map extraMetadata) { + if (numOutputGroups != 1 || fileGroupIdList.size() != 1) { + throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName()); + } + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); + + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() + .withBulkInsertParallelism(numOutputGroups) + .withProps(getWriteConfig().getProps()).build(); + + // Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value. + newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE)); + + return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, + getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata); + } + @Override public HoodieData performClusteringWithRecordsRDD(HoodieData> inputRecords, - int numOutputGroups, - String instantTime, - Map strategyParams, - Schema schema, - List fileGroupIdList, - boolean preserveHoodieMetadata, - Map extraMetadata) { + int numOutputGroups, + String instantTime, + Map strategyParams, + Schema schema, + List fileGroupIdList, + boolean shouldPreserveHoodieMetadata, + Map extraMetadata) { if (numOutputGroups != 1 || fileGroupIdList.size() != 1) { throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName()); } @@ -75,6 +103,6 @@ public HoodieData performClusteringWithRecordsRDD(HoodieData) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, - false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata)); + false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), shouldPreserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java index cee923cf9b96d..35c8f288bc891 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.clustering.run.strategy; +import org.apache.hudi.HoodieDatasetBulkInsertHelper; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -33,6 +34,8 @@ import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import java.util.List; import java.util.Map; @@ -53,17 +56,40 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table, } @Override - public HoodieData performClusteringWithRecordsRDD(final HoodieData> inputRecords, final int numOutputGroups, - final String instantTime, final Map strategyParams, final Schema schema, - final List fileGroupIdList, final boolean preserveHoodieMetadata, - final Map extraMetadata) { + public HoodieData performClusteringWithRecordsAsRow(Dataset inputRecords, + int numOutputGroups, + String instantTime, Map strategyParams, + Schema schema, + List fileGroupIdList, + boolean shouldPreserveHoodieMetadata, + Map extraMetadata) { + LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() + .withBulkInsertParallelism(numOutputGroups) + .withProps(getWriteConfig().getProps()).build(); + + newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); + + return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, + getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata); + } + + @Override + public HoodieData performClusteringWithRecordsRDD(final HoodieData> inputRecords, + final int numOutputGroups, + final String instantTime, + final Map strategyParams, + final Schema schema, + final List fileGroupIdList, + final boolean shouldPreserveHoodieMetadata, + final Map extraMetadata) { LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime); HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() .withBulkInsertParallelism(numOutputGroups) .withProps(getWriteConfig().getProps()).build(); newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); - return (HoodieData) SparkBulkInsertHelper.newInstance() - .bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); + return (HoodieData) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), + newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata)); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java index 50a0a534f881b..2ab9107fa54b5 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java @@ -29,8 +29,6 @@ import org.apache.hudi.common.model.RewriteAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.sort.SpaceCurveSortingHelper; -import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -38,32 +36,24 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import java.util.Arrays; -import java.util.List; - /** * A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition. * support z-curve optimization, hilbert will come soon. * @param HoodieRecordPayload type */ public class RDDSpatialCurveSortPartitioner - implements BulkInsertPartitioner>> { + extends SpatialCurveSortPartitionerBase>> { private final transient HoodieSparkEngineContext sparkEngineContext; - private final String[] orderByColumns; private final SerializableSchema schema; - private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; - private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext, String[] orderByColumns, HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType, Schema schema) { + super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType); this.sparkEngineContext = sparkEngineContext; - this.orderByColumns = orderByColumns; - this.layoutOptStrategy = layoutOptStrategy; - this.curveCompositionStrategyType = curveCompositionStrategyType; this.schema = new SerializableSchema(schema); } @@ -91,27 +81,4 @@ public JavaRDD> repartitionRecords(JavaRDD> reco return hoodieRecord; }); } - - private Dataset reorder(Dataset dataset, int numOutputGroups) { - if (orderByColumns.length == 0) { - // No-op - return dataset; - } - - List orderedCols = Arrays.asList(orderByColumns); - - switch (curveCompositionStrategyType) { - case DIRECT: - return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups); - case SAMPLE: - return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups); - default: - throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType)); - } - } - - @Override - public boolean arePartitionRecordsSorted() { - return true; - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java new file mode 100644 index 0000000000000..1217477c9d817 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowSpatialCurveSortPartitioner.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +public class RowSpatialCurveSortPartitioner extends SpatialCurveSortPartitionerBase> { + + public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) { + super(config.getClusteringSortColumns(), config.getLayoutOptimizationStrategy(), config.getLayoutOptimizationCurveBuildMethod()); + } + + public RowSpatialCurveSortPartitioner(String[] orderByColumns, + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, + HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) { + super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType); + } + + @Override + public Dataset repartitionRecords(Dataset records, int outputPartitions) { + return reorder(records, outputPartitions); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java new file mode 100644 index 0000000000000..96048f2782bc1 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/SpatialCurveSortPartitionerBase.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.sort.SpaceCurveSortingHelper; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; +import java.util.List; + +public abstract class SpatialCurveSortPartitionerBase implements BulkInsertPartitioner { + + private final String[] orderByColumns; + private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy; + private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType; + + public SpatialCurveSortPartitionerBase(String orderByColumns, + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, + HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) { + if (orderByColumns != null) { + this.orderByColumns = Arrays.stream(orderByColumns.split(",")) + .map(String::trim).toArray(String[]::new); + } else { + throw new IllegalArgumentException("The config " + + HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key() + " must be provided"); + } + this.layoutOptStrategy = layoutOptStrategy; + this.curveCompositionStrategyType = curveCompositionStrategyType; + } + + public SpatialCurveSortPartitionerBase(String[] orderByColumns, + HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy, + HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) { + this.orderByColumns = orderByColumns; + this.layoutOptStrategy = layoutOptStrategy; + this.curveCompositionStrategyType = curveCompositionStrategyType; + } + + /** + * Mapping specified multi need-to-order columns to one dimension while preserving data locality. + */ + protected Dataset reorder(Dataset dataset, int numOutputGroups) { + if (orderByColumns.length == 0) { + // No-op + return dataset; + } + + List orderedCols = Arrays.asList(orderByColumns); + + switch (curveCompositionStrategyType) { + case DIRECT: + return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups); + case SAMPLE: + return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups); + default: + throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType)); + } + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index e7c6ccd6fadb1..9da04f72600b7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -70,6 +70,8 @@ public class HoodieRowCreateHandle implements Serializable { private final UTF8String commitTime; private final Function seqIdGenerator; + private final boolean shouldPreserveHoodieMetadata; + private final HoodieTimer currTimer; protected final HoodieInternalRowFileWriter fileWriter; @@ -84,6 +86,20 @@ public HoodieRowCreateHandle(HoodieTable table, long taskId, long taskEpochId, StructType structType) { + this(table, writeConfig, partitionPath, fileId, instantTime, taskPartitionId, taskId, taskEpochId, + structType, false); + } + + public HoodieRowCreateHandle(HoodieTable table, + HoodieWriteConfig writeConfig, + String partitionPath, + String fileId, + String instantTime, + int taskPartitionId, + long taskId, + long taskEpochId, + StructType structType, + boolean shouldPreserveHoodieMetadata) { this.partitionPath = partitionPath; this.table = table; this.writeConfig = writeConfig; @@ -104,6 +120,8 @@ public HoodieRowCreateHandle(HoodieTable table, this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(), writeConfig.getWriteStatusFailureFraction()); + this.shouldPreserveHoodieMetadata = shouldPreserveHoodieMetadata; + writeStatus.setPartitionPath(partitionPath); writeStatus.setFileId(fileId); writeStatus.setStat(new HoodieWriteStat()); @@ -154,12 +172,14 @@ private void writeRow(InternalRow row) { UTF8String recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD); UTF8String partitionPath = row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD); // This is the only meta-field that is generated dynamically, hence conversion b/w - // [[String]] and [[UTF8String]] is unavoidable - UTF8String seqId = UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement())); + // [[String]] and [[UTF8String]] is unavoidable if preserveHoodieMetadata is false + UTF8String seqId = shouldPreserveHoodieMetadata ? row.getUTF8String(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD_ORD) + : UTF8String.fromString(seqIdGenerator.apply(GLOBAL_SEQ_NO.getAndIncrement())); + UTF8String writeCommitTime = shouldPreserveHoodieMetadata ? row.getUTF8String(HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD) + : commitTime; - InternalRow updatedRow = new HoodieInternalRow(commitTime, seqId, recordKey, + InternalRow updatedRow = new HoodieInternalRow(writeCommitTime, seqId, recordKey, partitionPath, fileName, row, true); - try { fileWriter.writeRow(recordKey, updatedRow); // NOTE: To avoid conversion on the hot-path we only convert [[UTF8String]] into [[String]] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java similarity index 90% rename from hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java index f7918cf3fd9f1..12e9dda81a5bc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.hudi.internal; +package org.apache.hudi.table.action.commit; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.HoodieInternalWriteStatus; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; @@ -66,6 +65,7 @@ public class BulkInsertDataInternalWriterHelper { private final String fileIdPrefix; private final Map handles = new HashMap<>(); private final boolean populateMetaFields; + private final boolean shouldPreserveHoodieMetadata; private final Option keyGeneratorOpt; private final boolean simpleKeyGen; private final int simplePartitionFieldIndex; @@ -81,6 +81,13 @@ public class BulkInsertDataInternalWriterHelper { public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, boolean populateMetaFields, boolean arePartitionRecordsSorted) { + this(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, taskEpochId, structType, + populateMetaFields, arePartitionRecordsSorted, false); + } + + public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, + String instantTime, int taskPartitionId, long taskId, long taskEpochId, StructType structType, + boolean populateMetaFields, boolean arePartitionRecordsSorted, boolean shouldPreserveHoodieMetadata) { this.hoodieTable = hoodieTable; this.writeConfig = writeConfig; this.instantTime = instantTime; @@ -89,6 +96,7 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo this.taskEpochId = taskEpochId; this.structType = structType; this.populateMetaFields = populateMetaFields; + this.shouldPreserveHoodieMetadata = shouldPreserveHoodieMetadata; this.arePartitionRecordsSorted = arePartitionRecordsSorted; this.fileIdPrefix = UUID.randomUUID().toString(); @@ -118,7 +126,8 @@ public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, HoodieWriteCo private Option getKeyGenerator(Properties properties) { TypedProperties typedProperties = new TypedProperties(); typedProperties.putAll(properties); - if (properties.get(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()).equals(NonpartitionedKeyGenerator.class.getName())) { + if (Option.ofNullable(properties.get(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key())) + .map(v -> v.equals(NonpartitionedKeyGenerator.class.getName())).orElse(false)) { return Option.empty(); // Do not instantiate NonPartitionKeyGen } else { try { @@ -199,7 +208,7 @@ private HoodieRowCreateHandle getRowCreateHandle(String partitionPath) throws IO private HoodieRowCreateHandle createHandle(String partitionPath) { return new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), - instantTime, taskPartitionId, taskId, taskEpochId, structType); + instantTime, taskPartitionId, taskId, taskEpochId, structType, shouldPreserveHoodieMetadata); } private String getNextFileId() { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala similarity index 73% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 09f1fac2c874d..296abaf4f5e27 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -17,14 +17,18 @@ package org.apache.hudi +import org.apache.hudi.client.WriteStatus import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.config.TypedProperties -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.data.HoodieData +import org.apache.hudi.common.engine.TaskContextSupplier +import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface} -import org.apache.hudi.table.BulkInsertPartitioner +import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable} +import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue} @@ -33,8 +37,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row} import org.apache.spark.unsafe.types.UTF8String -import scala.collection.JavaConverters.asScalaBufferConverter -import scala.collection.mutable +import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter} object HoodieDatasetBulkInsertHelper extends Logging { @@ -55,7 +58,7 @@ object HoodieDatasetBulkInsertHelper extends Logging { val populateMetaFields = config.populateMetaFields() val schema = df.schema - val keyGeneratorClassName = config.getStringOrThrow(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME, + val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, "Key-generator class name is required") val prependedRdd: RDD[InternalRow] = @@ -105,6 +108,52 @@ object HoodieDatasetBulkInsertHelper extends Logging { partitioner.repartitionRecords(trimmedDF, config.getBulkInsertShuffleParallelism) } + /** + * Perform bulk insert for [[Dataset]], will not change timeline/index, return + * information about write files. + */ + def bulkInsert(dataset: Dataset[Row], + instantTime: String, + table: HoodieTable[_ <: HoodieRecordPayload[_ <: HoodieRecordPayload[_ <: AnyRef]], _, _, _], + writeConfig: HoodieWriteConfig, + partitioner: BulkInsertPartitioner[Dataset[Row]], + parallelism: Int, + shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = { + val repartitionedDataset = partitioner.repartitionRecords(dataset, parallelism) + val arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted + val schema = dataset.schema + val writeStatuses = repartitionedDataset.queryExecution.toRdd.mapPartitions(iter => { + val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier + val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get + val taskId = taskContextSupplier.getStageIdSupplier.get.toLong + val taskEpochId = taskContextSupplier.getAttemptIdSupplier.get + val writer = new BulkInsertDataInternalWriterHelper( + table, + writeConfig, + instantTime, + taskPartitionId, + taskId, + taskEpochId, + schema, + writeConfig.populateMetaFields, + arePartitionRecordsSorted, + shouldPreserveHoodieMetadata) + + try { + iter.foreach(writer.write) + } catch { + case t: Throwable => + writer.abort() + throw t + } finally { + writer.close() + } + + writer.getWriteStatuses.asScala.map(_.toWriteStatus).iterator + }).collect() + table.getContext.parallelize(writeStatuses.toList.asJava) + } + private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = { val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD) val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD) @@ -149,7 +198,7 @@ object HoodieDatasetBulkInsertHelper extends Logging { } private def getPartitionPathFields(config: HoodieWriteConfig): Seq[String] = { - val keyGeneratorClassName = config.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME) + val keyGeneratorClassName = config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME) val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator] keyGenerator.getPartitionPathFields.asScala } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala similarity index 100% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala index 6d55309779ce4..9fe67f9918d01 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkAdapterSupport.scala @@ -26,6 +26,12 @@ import org.apache.spark.sql.hudi.SparkAdapter */ trait SparkAdapterSupport { + lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter + +} + +object SparkAdapterSupport { + lazy val sparkAdapter: SparkAdapter = { val adapterClass = if (HoodieSparkUtils.isSpark3_3) { "org.apache.spark.sql.adapter.Spark3_3Adapter" @@ -39,4 +45,4 @@ trait SparkAdapterSupport { getClass.getClassLoader.loadClass(adapterClass) .newInstance().asInstanceOf[SparkAdapter] } -} +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala similarity index 99% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala index bd7f2f54560e8..edf05f2db2ec9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieUnsafeUtils.scala @@ -84,5 +84,4 @@ object HoodieUnsafeUtils { .map(p => p._1) .collect() } - } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 1dd96958aa0f6..6f9616b669c47 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.hudi import org.apache.avro.Schema +import org.apache.hadoop.fs.Path import org.apache.hudi.client.utils.SparkRowSerDe +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -32,8 +34,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, Subque import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, LogicalRelation, PartitionedFile, SparkParsePartitionUtil} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql.{HoodieCatalogUtils, HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SparkSession} +import org.apache.spark.sql.{HoodieCatalogUtils, HoodieCatalystExpressionUtils, HoodieCatalystPlansUtils, Row, SQLContext, SparkSession} import org.apache.spark.storage.StorageLevel import java.util.Locale @@ -147,6 +150,15 @@ trait SparkAdapter extends Serializable { */ def createInterpretedPredicate(e: Expression): InterpretedPredicate + /** + * Create Hoodie relation based on globPaths, otherwise use tablePath if it's empty + */ + def createRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + schema: Schema, + globPaths: Array[Path], + parameters: java.util.Map[String, String]): BaseRelation + /** * Create instance of [[HoodieFileScanRDD]] * SPARK-37273 FileScanRDD constructor changed in SPARK 3.3 diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index 6d4236b048d2d..366d19fe6ebc0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -166,6 +166,10 @@ public Boolean getBoolean(ConfigProperty configProperty) { return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(null); } + public boolean getBooleanOrDefault(String key, boolean defaultVal) { + return Option.ofNullable(props.getProperty(key)).map(Boolean::parseBoolean).orElse(defaultVal); + } + public boolean getBooleanOrDefault(ConfigProperty configProperty) { Option rawValue = getRawValue(configProperty); return rawValue.map(v -> Boolean.parseBoolean(v.toString())) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index d025fc44247bf..1f69bea2d8fdb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -765,6 +765,10 @@ public static Map parallelizeFilesProcess( if (subPaths.size() > 0) { SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); int actualParallelism = Math.min(subPaths.size(), parallelism); + + hoodieEngineContext.setJobStatus(FSUtils.class.getSimpleName(), + "Parallel listing paths " + String.join(",", subPaths)); + result = hoodieEngineContext.mapToPair(subPaths, subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), actualParallelism); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java index c18d1f333bd31..2a3edafb8f27d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java @@ -61,6 +61,8 @@ public abstract class HoodieRecord implements Serializable { public static int RECORD_KEY_META_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(RECORD_KEY_METADATA_FIELD); public static int PARTITION_PATH_META_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(PARTITION_PATH_METADATA_FIELD); public static int FILENAME_META_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(FILENAME_METADATA_FIELD); + public static int COMMIT_TIME_METADATA_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(COMMIT_TIME_METADATA_FIELD); + public static int COMMIT_SEQNO_METADATA_FIELD_ORD = HOODIE_META_COLUMNS_NAME_TO_POS.get(COMMIT_SEQNO_METADATA_FIELD); /** * Identifies the record across the table. diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java index be868ad29b284..44844a8d475ec 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java @@ -183,8 +183,17 @@ public boolean accept(Path path) { metaClientCache.put(baseDir.toString(), metaClient); } - fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, - metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); + if (getConf().get("as.of.instant") != null) { + // Build FileSystemViewManager with specified time, it's necessary to set this config when you may + // access old version files. For example, in spark side, using "hoodie.datasource.read.paths" + // which contains old version files, if not specify this value, these files will be filtered. + fsView = FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, + metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf()), + metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get("as.of.instant"))); + } else { + fsView = FileSystemViewManager.createInMemoryFileSystemView(engineContext, + metaClient, HoodieInputFormatUtils.buildMetadataConfig(getConf())); + } String partition = FSUtils.getRelativePartitionPath(new Path(metaClient.getBasePath()), folder); List latestFiles = fsView.getLatestBaseFiles(partition).collect(Collectors.toList()); // populate the cache diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index 119c61f84bc19..01a28d056c643 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -162,7 +162,6 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, } else { val readPathsStr = optParams.get(DataSourceReadOptions.READ_PATHS.key) val extraReadPaths = readPathsStr.map(p => p.split(",").toSeq).getOrElse(Seq()) - // NOTE: Spark is able to infer partitioning values from partition path only when Hive-style partitioning // scheme is used. Therefore, we fallback to reading the table as non-partitioned (specifying // partitionColumns = Seq.empty) whenever Hive-style partitioning is not involved diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index 348a6056c8883..c7608c98291be 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -107,47 +107,8 @@ class DefaultSource extends RelationProvider log.info("Obtained hudi table path: " + tablePath) val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() - val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent - val tableType = metaClient.getTableType - val queryType = parameters(QUERY_TYPE.key) - // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain - // Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that - // case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema - // from the table itself - val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) { - None - } else { - Option(schema) - } - - log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") - - if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { - new EmptyRelation(sqlContext, metaClient) - } else { - (tableType, queryType, isBootstrappedTable) match { - case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | - (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | - (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => - resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters) - - case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) - - case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient) - - case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => - new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient) - - case (_, _, true) => - new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters) - case (_, _, _) => - throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + - s"isBootstrappedTable: $isBootstrappedTable ") - } - } + DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, parameters) } def getValidCommits(metaClient: HoodieTableMetaClient): String = { @@ -230,6 +191,60 @@ class DefaultSource extends RelationProvider parameters: Map[String, String]): Source = { new HoodieStreamSource(sqlContext, metadataPath, schema, parameters) } +} + +object DefaultSource { + + private val log = LogManager.getLogger(classOf[DefaultSource]) + + def createRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + schema: StructType, + globPaths: Seq[Path], + parameters: Map[String, String]): BaseRelation = { + val tableType = metaClient.getTableType + val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent + val queryType = parameters(QUERY_TYPE.key) + + log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") + + // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain + // Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that + // case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema + // from the table itself + val userSchema = if (isUsingHiveCatalog(sqlContext.sparkSession)) { + None + } else { + Option(schema) + } + + if (metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) { + new EmptyRelation(sqlContext, metaClient) + } else { + (tableType, queryType, isBootstrappedTable) match { + case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | + (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | + (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => + resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters) + + case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) + + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => + new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient) + + case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient) + + case (_, _, true) => + new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters) + + case (_, _, _) => + throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + + s"isBootstrappedTable: $isBootstrappedTable ") + } + } + } private def resolveBaseFileOnlyRelation(sqlContext: SQLContext, globPaths: Seq[Path], diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java similarity index 87% rename from hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java rename to hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java index 5438fbcfc0d98..a343ab3c5ccee 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -17,8 +17,9 @@ * under the License. */ -package org.apache.hudi.table.functional; +package org.apache.hudi.functional; +import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; @@ -45,7 +46,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.util.Arrays; import java.util.List; @@ -59,21 +59,30 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTestHarness { private static Stream testClustering() { + // enableClusteringAsRow, doUpdates, populateMetaFields, preserveCommitMetadata return Stream.of( - Arguments.of(true, true, true), - Arguments.of(true, true, false), - Arguments.of(true, false, true), - Arguments.of(true, false, false), - Arguments.of(false, true, true), - Arguments.of(false, true, false), - Arguments.of(false, false, true), - Arguments.of(false, false, false) + Arguments.of(true, true, true, true), + Arguments.of(true, true, true, false), + Arguments.of(true, true, false, true), + Arguments.of(true, true, false, false), + Arguments.of(true, false, true, true), + Arguments.of(true, false, true, false), + Arguments.of(true, false, false, true), + Arguments.of(true, false, false, false), + Arguments.of(false, true, true, true), + Arguments.of(false, true, true, false), + Arguments.of(false, true, false, true), + Arguments.of(false, true, false, false), + Arguments.of(false, false, true, true), + Arguments.of(false, false, true, false), + Arguments.of(false, false, false, true), + Arguments.of(false, false, false, false) ); } @ParameterizedTest @MethodSource - void testClustering(boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { + void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { // set low compaction small File Size to generate more file groups. HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") @@ -148,13 +157,22 @@ void testClustering(boolean doUpdates, boolean populateMetaFields, boolean prese assertEquals(allFiles.length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); // Do the clustering and validate - doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen); + doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen, clusteringAsRow); } } + private static Stream testClusteringWithNoBaseFiles() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception { + @MethodSource + void testClusteringWithNoBaseFiles(boolean clusteringAsRow, boolean doUpdates) throws Exception { // set low compaction small File Size to generate more file groups. HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder() .forTable("test-trip-table") @@ -217,7 +235,7 @@ void testClusteringWithNoBaseFiles(boolean doUpdates) throws Exception { assertEquals(dataGen.getPartitionPaths().length, hoodieTable.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getLeft).count()); // do the clustering and validate - doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen); + doClusteringAndValidate(client, clusteringCommitTime, metaClient, cfg, dataGen, clusteringAsRow); } } @@ -225,7 +243,12 @@ private void doClusteringAndValidate(SparkRDDWriteClient client, String clusteringCommitTime, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, - HoodieTestDataGenerator dataGen) { + HoodieTestDataGenerator dataGen, + boolean clusteringAsRow) { + if (clusteringAsRow) { + client.getConfig().setValue(DataSourceWriteOptions.ENABLE_ROW_WRITER(), "true"); + } + client.cluster(clusteringCommitTime, true); metaClient = HoodieTableMetaClient.reload(metaClient); final HoodieTable clusteredTable = HoodieSparkTable.create(cfg, context(), metaClient); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 241223be7e333..8ba5db5e5ffbb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -309,6 +309,10 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) + val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath) + .setLoadActiveTimelineOnLoad(true).build() + + val instantTime = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.findFirst().get().getTimestamp val record1FilePaths = fs.listStatus(new Path(basePath, dataGen.getPartitionPaths.head)) .filter(!_.getPath.getName.contains("hoodie_partition_metadata")) @@ -319,6 +323,13 @@ class TestCOWDataSource extends HoodieClientTestBase { val records2 = dataGen.generateInsertsContainsAllPartitions("002", 20) val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(basePath) + + val inputDF3 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + inputDF3.write.format("org.apache.hudi") .options(commonOpts) // Use bulk insert here to make sure the files have different file groups. .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) @@ -326,6 +337,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .save(basePath) val hudiReadPathDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), instantTime) .option(DataSourceReadOptions.READ_PATHS.key, record1FilePaths) .load() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala index 17715627fef38..7d54959d2ea4c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLayoutOptimization.scala @@ -25,7 +25,7 @@ import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} import org.apache.hudi.testutils.HoodieClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals @@ -85,6 +85,7 @@ class TestLayoutOptimization extends HoodieClientTestBase { @ParameterizedTest @MethodSource(Array("testLayoutOptimizationParameters")) def testLayoutOptimizationFunctional(tableType: String, + clusteringAsRow: String, layoutOptimizationStrategy: String, spatialCurveCompositionStrategy: String): Unit = { val curveCompositionStrategy = @@ -112,6 +113,7 @@ class TestLayoutOptimization extends HoodieClientTestBase { .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 * 1024 * 1024L)) + .option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key(), clusteringAsRow) .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key(), layoutOptimizationStrategy) .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_SPATIAL_CURVE_BUILD_METHOD.key(), curveCompositionStrategy) .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat,begin_lon") @@ -164,18 +166,29 @@ class TestLayoutOptimization extends HoodieClientTestBase { object TestLayoutOptimization { def testLayoutOptimizationParameters(): java.util.stream.Stream[Arguments] = { + // TableType, enableClusteringAsRow, layoutOptimizationStrategy, spatialCurveCompositionStrategy java.util.stream.Stream.of( - arguments("COPY_ON_WRITE", "linear", null), - arguments("COPY_ON_WRITE", "z-order", "direct"), - arguments("COPY_ON_WRITE", "z-order", "sample"), - arguments("COPY_ON_WRITE", "hilbert", "direct"), - arguments("COPY_ON_WRITE", "hilbert", "sample"), - - arguments("MERGE_ON_READ", "linear", null), - arguments("MERGE_ON_READ", "z-order", "direct"), - arguments("MERGE_ON_READ", "z-order", "sample"), - arguments("MERGE_ON_READ", "hilbert", "direct"), - arguments("MERGE_ON_READ", "hilbert", "sample") + arguments("COPY_ON_WRITE", "true", "linear", null), + arguments("COPY_ON_WRITE", "true", "z-order", "direct"), + arguments("COPY_ON_WRITE", "true", "z-order", "sample"), + arguments("COPY_ON_WRITE", "true", "hilbert", "direct"), + arguments("COPY_ON_WRITE", "true", "hilbert", "sample"), + arguments("COPY_ON_WRITE", "false", "linear", null), + arguments("COPY_ON_WRITE", "false", "z-order", "direct"), + arguments("COPY_ON_WRITE", "false", "z-order", "sample"), + arguments("COPY_ON_WRITE", "false", "hilbert", "direct"), + arguments("COPY_ON_WRITE", "false", "hilbert", "sample"), + + arguments("MERGE_ON_READ", "true", "linear", null), + arguments("MERGE_ON_READ", "true", "z-order", "direct"), + arguments("MERGE_ON_READ", "true", "z-order", "sample"), + arguments("MERGE_ON_READ", "true", "hilbert", "direct"), + arguments("MERGE_ON_READ", "true", "hilbert", "sample"), + arguments("MERGE_ON_READ", "false", "linear", null), + arguments("MERGE_ON_READ", "false", "z-order", "direct"), + arguments("MERGE_ON_READ", "false", "z-order", "sample"), + arguments("MERGE_ON_READ", "false", "hilbert", "direct"), + arguments("MERGE_ON_READ", "false", "hilbert", "sample") ) } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java index 8a8a50502913f..9f878dfa572d2 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java @@ -20,6 +20,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.writer.DataWriter; diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index d7d498431b0b6..d4ca8bc402015 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -19,8 +19,10 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema +import org.apache.hadoop.fs.Path import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.hudi.{Spark2HoodieFileScanRDD, Spark2RowSerDe} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark2HoodieFileScanRDD, Spark2RowSerDe} import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -34,9 +36,11 @@ import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql._ +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ +import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.collection.mutable.ArrayBuffer /** @@ -125,6 +129,15 @@ class Spark2Adapter extends SparkAdapter { InterpretedPredicate.create(e) } + override def createRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + schema: Schema, + globPaths: Array[Path], + parameters: java.util.Map[String, String]): BaseRelation = { + val dataSchema = Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull + DefaultSource.createRelation(sqlContext, metaClient, dataSchema, globPaths, parameters.asScala.toMap) + } + override def createHoodieFileScanRDD(sparkSession: SparkSession, readFunction: PartitionedFile => Iterator[InternalRow], filePartitions: Seq[FilePartition], diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java index e9163c8244c2e..24157c694ef21 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java @@ -19,8 +19,8 @@ package org.apache.hudi.spark3.internal; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.internal.BulkInsertDataInternalWriterHelper; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.write.DataWriter; diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 1c8151ef91867..d0fb7ce6e8651 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.adapter -import org.apache.hudi.Spark3RowSerDe +import org.apache.avro.Schema +import org.apache.hadoop.fs.Path +import org.apache.hudi.{AvroConversionUtils, DefaultSource, Spark3RowSerDe} import org.apache.hudi.client.utils.SparkRowSerDe +import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -31,11 +34,13 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SparkSession} +import org.apache.spark.sql.{HoodieSpark3CatalogUtils, Row, SQLContext, SparkSession} import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ +import scala.collection.JavaConverters.mapAsScalaMapConverter import scala.util.control.NonFatal /** @@ -88,6 +93,15 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { Predicate.createInterpreted(e) } + override def createRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + schema: Schema, + globPaths: Array[Path], + parameters: java.util.Map[String, String]): BaseRelation = { + val dataSchema = Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull + DefaultSource.createRelation(sqlContext, metaClient, dataSchema, globPaths, parameters.asScala.toMap) + } + /** * Converts instance of [[StorageLevel]] to a corresponding string */