Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
Expand Down Expand Up @@ -78,6 +76,9 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -242,11 +243,11 @@ protected void commit(HoodieTable table, String commitActionType, String instant
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
Expand Down Expand Up @@ -397,7 +398,7 @@ protected void rollbackFailedBootstrap() {
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsert(I records, final String instantTime,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);


/**
Expand All @@ -417,7 +418,7 @@ public abstract O bulkInsert(I records, final String instantTime,
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instantTime,
Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
Option<BulkInsertPartitioner> bulkInsertPartitioner);

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
Expand Down Expand Up @@ -458,7 +459,7 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
* @param hoodieTable Hoodie Table
* @return Write Status
*/
protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable<T, I, K, O> hoodieTable);
protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable hoodieTable);

/**
* Post Commit Hook. Derived classes use this method to perform post-commit processing
Expand All @@ -468,7 +469,7 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
Expand All @@ -480,7 +481,7 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
}
}

protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (!tableServicesEnabled(config)) {
return;
}
Expand Down Expand Up @@ -524,15 +525,15 @@ protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommi
}
}

protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
protected void runAnyPendingCompactions(HoodieTable table) {
table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
.forEach(instant -> {
LOG.info("Running previously failed inflight compaction at instant " + instant);
compact(instant.getTimestamp(), true);
});
}

protected void runAnyPendingClustering(HoodieTable<T, I, K, O> table) {
protected void runAnyPendingClustering(HoodieTable table) {
table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
if (instantPlan.isPresent()) {
Expand All @@ -558,7 +559,7 @@ protected void autoCleanOnCommit() {
}
}

protected void autoArchiveOnCommit(HoodieTable<T, I, K, O> table) {
protected void autoArchiveOnCommit(HoodieTable table) {
if (!config.isAutoArchive()) {
return;
}
Expand Down Expand Up @@ -808,7 +809,7 @@ public HoodieCleanMetadata clean(boolean skipLocking) {
* and keep increasing unbounded over time.
* @param table table to commit on.
*/
protected void archive(HoodieTable<T, I, K, O> table) {
protected void archive(HoodieTable table) {
if (!tableServicesEnabled(config)) {
return;
}
Expand Down Expand Up @@ -937,7 +938,7 @@ public abstract void commitCompaction(String compactionInstantTime, HoodieCommit
/**
* Commit Compaction and track metrics.
*/
protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable<T, I, K, O> table, String compactionCommitTime);
protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);

/**
* Get inflight time line exclude compaction and clustering.
Expand Down Expand Up @@ -1223,7 +1224,7 @@ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> ex
return scheduleClustering(extraMetadata);
}

protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
Expand All @@ -1238,7 +1239,7 @@ protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieT
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
*/
protected void finalizeWrite(HoodieTable<T, I, K, O> table, String instantTime, List<HoodieWriteStat> stats) {
protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(context, instantTime, stats);
Expand Down Expand Up @@ -1273,7 +1274,7 @@ public HoodieMetrics getMetrics() {
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);

/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
Expand All @@ -1288,14 +1289,14 @@ public HoodieMetrics getMetrics() {
* <li>Initializing metrics contexts</li>
* </ul>
*/
protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}

HoodieTable<T, I, K, O> table;
HoodieTable table;

this.txnManager.beginTransaction();
try {
Expand Down Expand Up @@ -1381,7 +1382,7 @@ public void close() {
this.txnManager.close();
}

private void setWriteTimer(HoodieTable<T, I, K, O> table) {
private void setWriteTimer(HoodieTable table) {
String commitType = table.getMetaClient().getCommitActionType();
if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, Strin
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
I records, Option<BulkInsertPartitioner> bulkInsertPartitioner);

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
Expand Down Expand Up @@ -237,7 +237,7 @@ public abstract HoodieWriteMetadata<O> insertPrepped(HoodieEngineContext context
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
I preppedRecords, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
I preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner);

/**
* Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* When file groups in clustering, write records to these file group need to check.
*/
public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
public abstract class UpdateStrategy<T extends HoodieRecordPayload, I> {

protected final HoodieEngineContext engineContext;
protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);

/**
* Only write input records. Does not change timeline/index. Return information about new files created.
*/
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe,
Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand All @@ -28,16 +29,12 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
Expand All @@ -48,69 +45,64 @@
* @param <T>
*/
@SuppressWarnings("checkstyle:LineLength")
public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
private SparkDeleteHelper() {
public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
private HoodieDeleteHelper() {
}

private static class DeleteHelperHolder {
private static final SparkDeleteHelper SPARK_DELETE_HELPER = new SparkDeleteHelper();
private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new HoodieDeleteHelper<>();
}

public static SparkDeleteHelper newInstance() {
return DeleteHelperHolder.SPARK_DELETE_HELPER;
public static HoodieDeleteHelper newInstance() {
return DeleteHelperHolder.HOODIE_DELETE_HELPER;
}

@Override
public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, int parallelism) {
public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, int parallelism) {
boolean isIndexingGlobal = table.getIndex().isGlobal();
if (isIndexingGlobal) {
return keys.keyBy(HoodieKey::getRecordKey)
.reduceByKey((key1, key2) -> key1, parallelism)
.values();
return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
} else {
return keys.distinct(parallelism);
}
}

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(String instantTime,
JavaRDD<HoodieKey> keys,
HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> deleteExecutor) {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);

public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
HoodieData<HoodieKey> keys,
HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> deleteExecutor) {
try {
HoodieWriteMetadata result = null;
JavaRDD<HoodieKey> dedupedKeys = keys;
HoodieData<HoodieKey> dedupedKeys = keys;
final int parallelism = config.getDeleteShuffleParallelism();
if (config.shouldCombineBeforeDelete()) {
// De-dupe/merge if needed
dedupedKeys = deduplicateKeys(keys, table, parallelism);
} else if (!keys.partitions().isEmpty()) {
} else if (!keys.isEmpty()) {
dedupedKeys = keys.repartition(parallelism);
}

JavaRDD<HoodieRecord<T>> dedupedRecords =
HoodieData<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
Instant beginTag = Instant.now();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = HoodieJavaRDD.getJavaRDD(
table.getIndex().tagLocation(HoodieJavaRDD.of(dedupedRecords), context, table));
HoodieData<HoodieRecord<T>> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());

// filter out non existent keys/records
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
HoodieData<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
HoodieWriteMetadata<HoodieData<WriteStatus>> result;
if (!taggedValidRecords.isEmpty()) {
result = deleteExecutor.execute(taggedValidRecords);
result.setIndexLookupDuration(tagLocationDuration);
} else {
// if entire set of keys are non existent
deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
result = new HoodieWriteMetadata();
result.setWriteStatuses(jsc.emptyRDD());
result = new HoodieWriteMetadata<>();
result.setWriteStatuses(context.emptyHoodieData());
deleteExecutor.commitOnAutoCommit(result);
}
return result;
Expand Down
Loading