Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table.");

private HoodieClusteringConfig() {
public static final ConfigProperty<Boolean> CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA = ConfigProperty
.key("hoodie.clustering.preserve.commit.metadata")
.defaultValue(false)
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

public HoodieClusteringConfig() {
super();
}

Expand Down Expand Up @@ -214,6 +220,11 @@ public Builder withAsyncClustering(Boolean asyncClustering) {
return this;
}

public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMetadata) {
clusteringConfig.setValue(CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA, String.valueOf(preserveHoodieCommitMetadata));
return this;
}

public HoodieClusteringConfig build() {
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ public boolean isAsyncClusteringEnabled() {
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
}

public boolean isPreserveHoodieCommitMetadata() {
return getBoolean(HoodieClusteringConfig.CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA);
}

public boolean isClusteringEnabled() {
// TODO: future support async clustering
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@

public class CreateHandleFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {

private boolean preserveMetadata = false;

public CreateHandleFactory() {
this(false);
}

public CreateHandleFactory(boolean preserveMetadata) {
this.preserveMetadata = preserveMetadata;
}

@Override
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {

return new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
getNextFileId(fileIdPrefix), taskContextSupplier);
getNextFileId(fileIdPrefix), taskContextSupplier, preserveMetadata);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,33 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
protected long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
private boolean preserveHoodieMetadata = false;

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier);
taskContextSupplier, false);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
boolean preserveHoodieMetadata) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier, preserveHoodieMetadata);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, overriddenSchema, taskContextSupplier, false);
}

public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
taskContextSupplier);
this.preserveHoodieMetadata = preserveHoodieMetadata;
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.setStat(new HoodieWriteStat());
Expand Down Expand Up @@ -119,7 +134,11 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
if (preserveHoodieMetadata) {
fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
}
// update the new location of record, so we know where to find it next
record.unseal();
record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/**
* A HoodieCreateHandle which writes all data into a single file.
* <p>
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class HoodieUnboundedCreateHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieCreateHandle<T, I, K, O> {

private static final Logger LOG = LogManager.getLogger(HoodieUnboundedCreateHandle.class);

public HoodieUnboundedCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
boolean preserveHoodieMetadata) {
super(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
taskContextSupplier, preserveHoodieMetadata);
}

@Override
public boolean canWrite(HoodieRecord record) {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io;

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A SingleFileHandleCreateFactory is used to write all data in the spark partition into a single data file.
* <p>
* Please use this with caution. This can end up creating very large files if not used correctly.
*/
public class SingleFileHandleCreateFactory<T extends HoodieRecordPayload, I, K, O> extends WriteHandleFactory<T, I, K, O> {

private AtomicBoolean isHandleCreated = new AtomicBoolean(false);
private String fileId;
private boolean preserveHoodieMetadata;

public SingleFileHandleCreateFactory(String fileId, boolean preserveHoodieMetadata) {
super();
this.fileId = fileId;
this.preserveHoodieMetadata = preserveHoodieMetadata;
}

@Override
public HoodieWriteHandle<T, I, K, O> create(final HoodieWriteConfig hoodieConfig, final String commitTime,
final HoodieTable<T, I, K, O> hoodieTable, final String partitionPath,
final String fileIdPrefix, TaskContextSupplier taskContextSupplier) {

if (isHandleCreated.compareAndSet(false, true)) {
return new HoodieUnboundedCreateHandle(hoodieConfig, commitTime, hoodieTable, partitionPath,
fileId, // ignore idPfx, always use same fileId
taskContextSupplier, preserveHoodieMetadata);
}

throw new HoodieIOException("Fixed handle create is only expected to be invoked once");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,27 @@

package org.apache.hudi.table.action.cluster.strategy;

import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.Serializable;
import java.util.Map;

/**
* Pluggable implementation for writing data into new file groups based on ClusteringPlan.
*/
public abstract class ClusteringExecutionStrategy<T extends HoodieRecordPayload,I,K,O> implements Serializable {
public abstract class ClusteringExecutionStrategy<T extends HoodieRecordPayload, I, K, O> implements Serializable {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@satishkotha One change in this class is that I have moved the transform() method to corresponding strategy classes. There is some duplicate code but due to virtual key changes I had to do that. Otherwise, we need significant refactoring i.e. move the HoodieSparkKeyGeneratorFactory and several key generators to hudi-client-common instead of hudi-spark-client.

private static final Logger LOG = LogManager.getLogger(ClusteringExecutionStrategy.class);

private final HoodieTable<T,I,K,O> hoodieTable;
private final HoodieEngineContext engineContext;
private final HoodieTable<T, I, K, O> hoodieTable;
private final transient HoodieEngineContext engineContext;
private final HoodieWriteConfig writeConfig;

public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
Expand All @@ -50,10 +52,9 @@ public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engine
* file groups created is bounded by numOutputGroups.
* Note that commit is not done as part of strategy. commit is callers responsibility.
*/
public abstract O performClustering(final I inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema);
public abstract HoodieWriteMetadata<O> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime);

protected HoodieTable<T,I,K, O> getHoodieTable() {
protected HoodieTable<T, I, K, O> getHoodieTable() {
return this.hoodieTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public Option<HoodieClusteringPlan> generateClusteringPlan() {
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadata())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ public abstract O bulkInsert(I inputRecords, String instantTime,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism);
int parallelism,
boolean preserveMetadata);
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMeta
}
}

protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
return Collections.emptyMap();
}

Expand Down Expand Up @@ -330,7 +330,7 @@ public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, Hoodie
List<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(statuses));
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(final List<HoodieRecord
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism());
List<WriteStatus> writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false);
//update index
((BaseJavaCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
Expand All @@ -85,7 +85,8 @@ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> inputRecords,
boolean performDedupe,
Option<BulkInsertPartitioner<T>> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism) {
int parallelism,
boolean preserveHoodieMetadata) {

// De-dupe/merge if needed
List<HoodieRecord<T>> dedupedRecords = inputRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ protected String getCommitActionType() {
}

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeResult) {
return context.mapToPair(
writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()),
writeResult.getWriteStatuses().stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toList()),
partitionPath ->
Pair.of(partitionPath, getAllExistingFileIds(partitionPath)), 1
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import java.util.HashMap;
import java.util.List;
Expand All @@ -48,7 +49,7 @@ protected List<String> getAllExistingFileIds(String partitionPath) {
}

@Override
protected Map<String, List<String>> getPartitionToReplacedFileIds(List<WriteStatus> writeStatuses) {
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeResult) {
Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context,
table.getMetaClient().getBasePath(), config.useFileListingMetadata(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
Expand Down Expand Up @@ -357,11 +358,13 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstan
private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<WriteStatus> writeStatuses,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
String clusteringCommitTime) {

List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());

List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
if (!writeStatuses.filter(WriteStatus::hasErrors).isEmpty()) {
if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStatuses.filter(WriteStatus::hasErrors).map(WriteStatus::getFileId).collect());
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}
finalizeWrite(table, clusteringCommitTime, writeStats);
try {
Expand Down
Loading