Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
Expand All @@ -40,7 +41,6 @@
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -49,6 +49,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -91,40 +92,46 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses) {
}

/**
*
* Commit changes performed at the given instantTime marker.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata) {
List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, stats, extraMetadata);
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata) {
LOG.info("Committing " + instantTime);
Option<Map<String, String>> extraMetadata) {
HoodieTableMetaClient metaClient = createMetaClient(false);
String actionType = metaClient.getCommitActionType();
return commit(instantTime, writeStatuses, extraMetadata, actionType, Collections.emptyMap());
}

/**
* Complete changes performed at the given instantTime marker with specified action.
*/
public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap());
}

public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds) {
LOG.info("Committing " + instantTime + " action " + commitActionType);
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);

HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));

HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType);
// Finalize write
finalizeWrite(table, instantTime, stats);

// add in extra metadata
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
metadata.setOperationType(operationType);

try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, instantTime),
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
postCommit(table, metadata, instantTime, extraMetadata);
emitCommitMetrics(instantTime, metadata, actionType);
emitCommitMetrics(instantTime, metadata, commitActionType);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,22 @@ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> pr
return postWrite(result, instantTime, table);
}

/**
* Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.

* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this, instantTime);
HoodieWriteMetadata result = table.insertOverwrite(jsc, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
* de-duped and non existent keys will be removed before deleting.
Expand Down Expand Up @@ -576,7 +592,7 @@ public String startCommit() {
rollbackPendingCommits();
}
String instantTime = HoodieActiveTimeline.createNewInstantTime();
startCommit(instantTime);
startCommitWithTime(instantTime);
return instantTime;
}

Expand All @@ -586,24 +602,39 @@ public String startCommit() {
* @param instantTime Instant time to be generated
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommitWithTime(instantTime, metaClient.getCommitActionType(), metaClient);
}

/**
* Completes a new commit time for a write operation (insert/update/delete) with specified action.
*/
public void startCommitWithTime(String instantTime, String actionType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommitWithTime(instantTime, actionType, metaClient);
}

/**
* Completes a new commit time for a write operation (insert/update/delete) with specified action.
*/
private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackPending) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackPendingCommits();
}
startCommit(instantTime);
startCommit(instantTime, actionType, metaClient);
}

private void startCommit(String instantTime) {
LOG.info("Generate a new instant time " + instantTime);
HoodieTableMetaClient metaClient = createMetaClient(true);
private void startCommit(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
LOG.info("Generate a new instant time: " + instantTime + " action: " + actionType);
// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->
ValidationUtils.checkArgument(
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(),
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(State.REQUESTED, actionType,
instantTime));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client;

import org.apache.spark.api.java.JavaRDD;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Result of a write operation.
*/
public class HoodieWriteResult implements Serializable {

private JavaRDD<WriteStatus> writeStatuses;
private Map<String, List<String>> partitionToReplaceFileIds;

public HoodieWriteResult(JavaRDD<WriteStatus> writeStatuses) {
this(writeStatuses, Collections.emptyMap());
}

public HoodieWriteResult(JavaRDD<WriteStatus> writeStatuses, Map<String, List<String>> partitionToReplaceFileIds) {
this.writeStatuses = writeStatuses;
this.partitionToReplaceFileIds = partitionToReplaceFileIds;
}

public JavaRDD<WriteStatus> getWriteStatuses() {
return this.writeStatuses;
}

public void setWriteStatuses(final JavaRDD<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}

public Map<String, List<String>> getPartitionToReplaceFileIds() {
return this.partitionToReplaceFileIds;
}

public void setPartitionToReplaceFileIds(final Map<String, List<String>> partitionToReplaceFileIds) {
this.partitionToReplaceFileIds = partitionToReplaceFileIds;
}

@Override
public String toString() {
return "HoodieWriteResult{"
+ "writeStatuses=" + writeStatuses
+ ", partitionToReplaceFileIds=" + partitionToReplaceFileIds
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hudi.table.action.commit.BulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.DeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.InsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.MergeHelper;
import org.apache.hudi.table.action.commit.UpsertCommitActionExecutor;
Expand Down Expand Up @@ -123,6 +124,12 @@ public HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instan
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}

@Override
public HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records) {
return new InsertOverwriteCommitActionExecutor<>(jsc, config, this, instantTime, records).execute();
}

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(JavaSparkContext jsc, String instantTime, Option<Map<String, String>> extraMetadata) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ public abstract HoodieWriteMetadata insertPrepped(JavaSparkContext jsc, String i
public abstract HoodieWriteMetadata bulkInsertPrepped(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner);

/**
* Logically delete all existing records and Insert a batch of new records into Hoodie table at the supplied instantTime.
*/
public abstract HoodieWriteMetadata insertOverwrite(JavaSparkContext jsc, String instantTime,
JavaRDD<HoodieRecord<T>> records);

public HoodieWriteConfig getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@

package org.apache.hudi.table;

import org.apache.hadoop.conf.Configuration;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand All @@ -44,6 +44,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -275,6 +276,11 @@ public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws H
LOG.info("Wrapper schema " + wrapperSchema.toString());
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
boolean deleteSuccess = deleteReplacedFileGroups(jsc, hoodieInstant);
if (!deleteSuccess) {
// throw error and stop archival if deleting replaced file groups failed.
throw new HoodieCommitException("Unable to delete file(s) for " + hoodieInstant.getFileName());
}
try {
deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
Expand All @@ -301,6 +307,29 @@ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant in
}
}

private boolean deleteReplacedFileGroups(JavaSparkContext jsc, HoodieInstant instant) {
if (!instant.isCompleted() || !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// only delete files for completed replace instants
return true;
}

TableFileSystemView fileSystemView = this.table.getFileSystemView();
List<String> replacedPartitions = getReplacedPartitions(instant);
return ReplaceArchivalHelper.deleteReplacedFileGroups(jsc, metaClient, fileSystemView, instant, replacedPartitions);
}

private List<String> getReplacedPartitions(HoodieInstant instant) {
try {
HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);

return new ArrayList<>(metadata.getPartitionToReplaceFileIds().keySet());
} catch (IOException e) {
throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
}
}

private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
if (records.size() > 0) {
Map<HeaderMetadataType, String> header = new HashMap<>();
Expand Down Expand Up @@ -334,6 +363,13 @@ private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieI
archivedMetaWrapper.setActionType(ActionType.commit.name());
break;
}
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
break;
}
case HoodieTimeline.ROLLBACK_ACTION: {
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieRollbackMetadata.class));
Expand Down
Loading