Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,27 @@ public void setSuccessRecordKeys(List<String> successRecordKeys) {
this.successRecordKeys = successRecordKeys;
}

public double getFailureFraction() {
return failureFraction;
}

public boolean isTrackSuccessRecords() {
return trackSuccessRecords;
}

@Override
public String toString() {
return "PartitionPath " + partitionPath + ", FileID " + fileId + ", Success records "
+ totalRecords + ", errored Rows " + totalErrorRecords
+ ", global error " + (globalError != null);
}

public WriteStatus toWriteStatus() {
WriteStatus status = new WriteStatus(trackSuccessRecords, failureFraction);
status.setFileId(fileId);
status.setTotalRecords(totalRecords);
status.setPartitionPath(partitionPath);
status.setStat(stat);
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,9 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When true, spins up an instance of the timeline server (meta server that serves cached file listings, statistics),"
+ "running on each writer's driver process, accepting requests during the write from executors.");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
public static final ConfigProperty<Boolean> EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED = ConfigProperty
.key("hoodie.embed.timeline.server.reuse.enabled")
.defaultValue("false")
.defaultValue(false)
.withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)"
+ "to avoid startup costs. This should rarely be changed.");

Expand Down Expand Up @@ -1084,7 +1084,7 @@ public boolean isEmbeddedTimelineServerEnabled() {
}

public boolean isEmbeddedTimelineServerReuseEnabled() {
return Boolean.parseBoolean(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED));
return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to do getBooleanOrDefault, otherwise it might NPE (due to unboxing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED is a ConfigProperty, getBoolean will handle this.

public <T> Boolean getBoolean(ConfigProperty<T> configProperty) {
    if (configProperty.hasDefaultValue()) {
      return getBooleanOrDefault(configProperty);
    }
    Option<Object> rawValue = getRawValue(configProperty);
    return rawValue.map(v -> Boolean.parseBoolean(v.toString())).orElse(null);
  }

}

public int getEmbeddedTimelineServerPort() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.List;
import java.util.Map;
Expand All @@ -53,6 +55,18 @@ public SparkConsistentBucketClusteringExecutionStrategy(HoodieTable table, Hoodi
super(table, engineContext, writeConfig);
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
throw new HoodieClusteringException("Not implement yet");
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords, int numOutputGroups, String instantTime,
Map<String, String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -35,6 +36,8 @@
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.List;
import java.util.Map;
Expand All @@ -54,15 +57,40 @@ public SparkSingleFileSortExecutionStrategy(HoodieTable table,
super(table, engineContext, writeConfig);
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
}
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);

HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();

// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's duplicate the comment from the original method as well


return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords,
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean preserveHoodieMetadata,
Map<String, String> extraMetadata) {
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
if (numOutputGroups != 1 || fileGroupIdList.size() != 1) {
throw new HoodieClusteringException("Expect only one file group for strategy: " + getClass().getName());
}
Expand All @@ -75,6 +103,6 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));

return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), shouldPreserveHoodieMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -33,6 +34,8 @@
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.List;
import java.util.Map;
Expand All @@ -53,17 +56,40 @@ public SparkSortAndSizeExecutionStrategy(HoodieTable table,
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata,
final Map<String, String> extraMetadata) {
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime, Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();

newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));

return HoodieDatasetBulkInsertHelper.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
getRowPartitioner(strategyParams, schema), numOutputGroups, shouldPreserveHoodieMetadata);
}

@Override
public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords,
final int numOutputGroups,
final String instantTime,
final Map<String, String> strategyParams,
final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList,
final boolean shouldPreserveHoodieMetadata,
final Map<String, String> extraMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);

HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(),
newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,31 @@
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.sort.SpaceCurveSortingHelper;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.Arrays;
import java.util.List;

/**
* A partitioner that does spatial curve optimization sorting based on specified column values for each RDD partition.
* support z-curve optimization, hilbert will come soon.
* @param <T> HoodieRecordPayload type
*/
public class RDDSpatialCurveSortPartitioner<T extends HoodieRecordPayload>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
extends SpatialCurveSortPartitionerBase<JavaRDD<HoodieRecord<T>>> {

private final transient HoodieSparkEngineContext sparkEngineContext;
private final String[] orderByColumns;
private final SerializableSchema schema;
private final HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy;
private final HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType;

public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext,
String[] orderByColumns,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType,
Schema schema) {
super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
this.sparkEngineContext = sparkEngineContext;
this.orderByColumns = orderByColumns;
this.layoutOptStrategy = layoutOptStrategy;
this.curveCompositionStrategyType = curveCompositionStrategyType;
this.schema = new SerializableSchema(schema);
}

Expand Down Expand Up @@ -91,27 +81,4 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
return hoodieRecord;
});
}

private Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning that up!

if (orderByColumns.length == 0) {
// No-op
return dataset;
}

List<String> orderedCols = Arrays.asList(orderByColumns);

switch (curveCompositionStrategyType) {
case DIRECT:
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
case SAMPLE:
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
default:
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
}
}

@Override
public boolean arePartitionRecordsSorted() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class RowSpatialCurveSortPartitioner extends SpatialCurveSortPartitionerBase<Dataset<Row>> {

public RowSpatialCurveSortPartitioner(HoodieWriteConfig config) {
super(config.getClusteringSortColumns(), config.getLayoutOptimizationStrategy(), config.getLayoutOptimizationCurveBuildMethod());
}

public RowSpatialCurveSortPartitioner(String[] orderByColumns,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType) {
super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
}

@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {
return reorder(records, outputPartitions);
}
}
Loading