Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -87,44 +88,55 @@ protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, Hoo
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
return commit(instantTime, writeStatuses, Option.empty());
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
return commit(instantTime, writeStatuses, Option.empty(), actionType);
}

/**
* Complete changes performed at the given instantTime marker with specified action.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, String commitActionType) {
return commit(instantTime, writeStatuses, Option.empty(), commitActionType);
}

/**
*
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) {
Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
return commit(instantTime, writeStatuses, extraMetadata, actionType);
}

/**
* Complete changes performed at the given instantTime marker with specified action.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String commitActionType) {
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, stats, extraMetadata);
return commitStats(instantTime, stats, extraMetadata, commitActionType);
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
LOG.info("Committing " + instantTime);
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);

HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));

HoodieCommitMetadata metadata = CommitUtils.buildCommitMetadata(stats, extraMetadata, operationType, config.getSchema());
// Finalize write
finalizeWrite(table, instantTime, stats);

// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
metadata.setOperationType(operationType);

try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(table, metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, actionType);
emitCommitMetrics(instantTime, metadata, commitActionType);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,22 @@ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> pr
return postWrite(result, instantTime, table);
}

/**
* Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.

* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records);
return postWrite(result, instantTime, table);
}

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
* de-duped and non existent keys will be removed before deleting.
Expand Down Expand Up @@ -576,7 +592,8 @@ public String startCommit() {
rollbackPendingCommits();
}
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommit(instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommit(instantTime, metaClient.getCommitActionType());
return instantTime;
}

Expand All @@ -586,15 +603,23 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommitWithTime(instantTime, metaClient.getCommitActionType());
}

/**
* Completes a new commit time for a write operation (insert/update/delete) with specified action.
*/
public void startCommitWithTime(String instantTime, String actionType) {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackPending) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackPendingCommits();
}
startCommit(instantTime);
startCommit(instantTime, actionType);
}

private void startCommit(String instantTime) {
private void startCommit(String instantTime, String actionType) {
LOG.info("Generate a new instant time " + instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true);
// if there are pending compactions, their instantTime must not be greater than that of this instant time
Expand All @@ -603,7 +628,7 @@ private void startCommit(String instantTime) {
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, actionType,
instantTime));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.MergeHelper;
import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
Expand Down Expand Up @@ -124,6 +125,12 @@ public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instan
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}

@Override
public HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records) {
return new InsertOverwriteCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String i
public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner);

/**
* Logically delete all existing records and Insert a batch of new records into Hoodie table at the supplied instantTime.
*/
public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records);

public HoodieWriteConfig getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

package org.apache.hudi.table;

import org.apache.hadoop.conf.Configuration;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -275,6 +277,7 @@ public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws H
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
deleteReplacedFiles(hoodieInstant);
try {
deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
Expand All @@ -301,6 +304,44 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
}
}

private void deleteReplacedFiles(HoodieInstant instant) {
if (!instant.isCompleted()) {
// only delete files for completed instants
return;
}
Option<HoodieInstant> replaceInstantOption = metaClient.getActiveTimeline().getCompletedAndReplaceTimeline()
.filter(replaceInstant -> replaceInstant.getTimestamp().equals(instant.getTimestamp())).firstInstant();

replaceInstantOption.ifPresent(replaceInstant -> {
try {
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
metaClient.getActiveTimeline().getInstantDetails(replaceInstant).get(),
HoodieCommitMetadata.class);

metadata.getPartitionToReplaceStats().entrySet().stream().forEach(entry ->
deleteFileGroups(entry.getKey(), entry.getValue().stream().map(e -> e.getFileId()).collect(Collectors.toSet()), instant)
);
} catch (IOException e) {
throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
}
});
}

private void deleteFileGroups(String partitionPath, Set<String> fileIdsToDelete, HoodieInstant instant) {
try {
FileStatus[] statuses = metaClient.getFs().listStatus(FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPath));
for (FileStatus status : statuses) {
String fileId = FSUtils.getFileIdFromFilePath(status.getPath());
if (fileIdsToDelete.contains(fileId)) {
LOG.info("Delete " + status.getPath() + " to archive " + instant);
metaClient.getFs().delete(status.getPath());
}
}
} catch (IOException e) {
LOG.error("unable to delete file groups that are replaced", e);
}
}

private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = new HashMap<>();
Expand All @@ -327,7 +368,8 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieI
break;
}
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.DELTA_COMMIT_ACTION: {
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.REPLACE_ACTION: {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieCommitMetadata.class);
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@

package org.apache.hudi.table.action;

import java.util.List;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.Option;

import org.apache.spark.api.java.JavaRDD;

import java.time.Duration;
import java.util.List;

/**
* Contains metadata, write-statuses and latency times corresponding to a commit/delta-commit action.
Expand Down
Loading