Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 113 additions & 24 deletions hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand All @@ -39,6 +40,7 @@
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieDataFile;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -94,6 +96,8 @@
public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {

private static Logger logger = LogManager.getLogger(HoodieWriteClient.class);
private static final String UPDATE_STR = "update";
private static final String LOOKUP_STR = "lookup";
private final boolean rollbackInFlight;
private final transient HoodieMetrics metrics;
private final transient HoodieIndex<T> index;
Expand All @@ -103,18 +107,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private transient Timer.Context indexTimer = null;

/**
* @param jsc
* @param clientConfig
* @throws Exception
*
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) throws Exception {
this(jsc, clientConfig, false);
}

/**
* @param jsc
* @param clientConfig
* @param rollbackInFlight
*
*/
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) {
this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc));
Expand Down Expand Up @@ -150,7 +150,7 @@ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieReco
HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
indexTimer = metrics.getIndexCtx();
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table);
metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
Expand All @@ -159,7 +159,7 @@ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieReco
* Upserts a bunch of new records into the Hoodie table, at the supplied commitTime
*/
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(records);
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
Expand All @@ -168,7 +168,7 @@ public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final Strin
indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
return upsertRecordsInternal(taggedRecords, commitTime, table, true);
} catch (Throwable e) {
Expand All @@ -189,7 +189,7 @@ public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final Strin
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT_PREPPED);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, true);
} catch (Throwable e) {
Expand All @@ -211,7 +211,7 @@ public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(records);
HoodieTable<T> table = getTableAndInitCtx(OperationType.INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
Expand All @@ -238,7 +238,7 @@ public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final Strin
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
HoodieTable<T> table = getTableAndInitCtx(OperationType.INSERT_PREPPED);
try {
return upsertRecordsInternal(preppedRecords, commitTime, table, false);
} catch (Throwable e) {
Expand Down Expand Up @@ -281,7 +281,7 @@ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final S
*/
public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(records);
HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT);
try {
// De-dupe/merge if needed
JavaRDD<HoodieRecord<T>> dedupedRecords =
Expand Down Expand Up @@ -314,7 +314,7 @@ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final S
*/
public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime,
Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED);
try {
return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
} catch (Throwable e) {
Expand All @@ -325,6 +325,46 @@ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> pr
}
}

/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
* deduped and non existant keys will be removed before deleting.
*
* @param keys {@link List} of {@link HoodieKey}s to be deleted
* @param commitTime Commit time handle
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime) {
HoodieTable<T> table = getTableAndInitCtx(OperationType.DELETE);
try {
// De-dupe/merge if needed
JavaRDD<HoodieKey> dedupedKeys =
config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, config.getDeleteShuffleParallelism()) : keys;

JavaRDD<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
// filter out non existant keys/records
JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(record -> record.isCurrentLocationKnown());
if (!taggedValidRecords.isEmpty()) {
metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
return upsertRecordsInternal(taggedValidRecords, commitTime, table, true);
} else {
// if entire set of keys are non existent
JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(Collections.EMPTY_LIST, 1);
commitOnAutoCommit(commitTime, writeStatusRDD, table.getMetaClient().getCommitActionType());
return writeStatusRDD;
}
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
throw (HoodieUpsertException) e;
}
throw new HoodieUpsertException("Failed to delete for commit time " + commitTime, e);
}
}

private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime,
HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
final JavaRDD<HoodieRecord<T>> repartitionedRecords;
Expand Down Expand Up @@ -366,10 +406,7 @@ private void commitOnAutoCommit(String commitTime, JavaRDD<WriteStatus> resultRD

private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records,
int parallelism) {
if (condition) {
return deduplicateRecords(records, parallelism);
}
return records;
return condition ? deduplicateRecords(records, parallelism) : records;
}

/**
Expand Down Expand Up @@ -451,7 +488,7 @@ private JavaRDD<WriteStatus> updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> w
indexTimer = metrics.getIndexCtx();
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
metrics.updateIndexMetrics("update", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
indexTimer = null;
// Trigger the insert and collect statuses
commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
Expand Down Expand Up @@ -501,6 +538,7 @@ private boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
if (extraMetadata.isPresent()) {
extraMetadata.get().forEach(metadata::addMetadata);
}
metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());

try {
activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime),
Expand Down Expand Up @@ -929,8 +967,6 @@ public void close() {
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
*
* @throws HoodieIOException
*/
public void clean() throws HoodieIOException {
cleanClient.clean();
Expand All @@ -942,7 +978,6 @@ public void clean() throws HoodieIOException {
* cleaned)
*
* @param startCleanTime Cleaner Instant Timestamp
* @return
* @throws HoodieIOException in case of any IOException
*/
protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
Expand Down Expand Up @@ -1088,6 +1123,20 @@ JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records, in
}, parallelism).map(Tuple2::_2);
}

/**
* Deduplicate Hoodie records, using the given deduplication funciton.
*/
JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
if (isIndexingGlobal) {
return keys.keyBy(HoodieKey::getRecordKey)
.reduceByKey((key1, key2) -> key1)
.values();
} else {
return keys.distinct();
}
}

/**
* Cleanup all inflight commits
*/
Expand All @@ -1101,9 +1150,13 @@ private void rollbackInflightCommits() {
}
}

private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
private HoodieTable getTableAndInitCtx(OperationType operationType) {
HoodieTableMetaClient metaClient = createMetaClient(true);
if (operationType == OperationType.DELETE) {
setWriteSchemaFromLastInstant(metaClient);
}
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
writeContext = metrics.getCommitCtx();
} else {
Expand All @@ -1112,6 +1165,30 @@ private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
return table;
}

/**
* Sets write schema from last instant since deletes may not have schema set in the config.
*/
private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
try {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Option<HoodieInstant> lastInstant =
activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType()))
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) {
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
} else {
throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
}
} else {
throw new HoodieIOException("Deletes issued without any prior commits");
}
} catch (IOException e) {
throw new HoodieIOException("IOException thrown while reading last commit metadata", e);
}
}
/**
* Compaction specific private methods
*/
Expand Down Expand Up @@ -1323,4 +1400,16 @@ private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetada
}
}

/**
* Refers to different operation types
*/
enum OperationType {
INSERT,
INSERT_PREPPED,
UPSERT,
UPSERT_PREPPED,
DELETE,
BULK_INSERT,
BULK_INSERT_PREPPED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
Expand All @@ -59,6 +60,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level";
private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
Expand Down Expand Up @@ -119,6 +122,10 @@ public String getSchema() {
return props.getProperty(AVRO_SCHEMA);
}

public void setSchema(String schemaStr) {
props.setProperty(AVRO_SCHEMA, schemaStr);
}

public String getTableName() {
return props.getProperty(TABLE_NAME);
}
Expand All @@ -143,6 +150,10 @@ public int getUpsertShuffleParallelism() {
return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
}

public int getDeleteShuffleParallelism() {
return Integer.parseInt(props.getProperty(DELETE_PARALLELISM));
}

public int getRollbackParallelism() {
return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
}
Expand All @@ -159,6 +170,10 @@ public boolean shouldCombineBeforeUpsert() {
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_UPSERT_PROP));
}

public boolean shouldCombineBeforeDelete() {
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP));
}

public StorageLevel getWriteStatusStorageLevel() {
return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL));
}
Expand Down Expand Up @@ -666,11 +681,14 @@ public HoodieWriteConfig build() {
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
DEFAULT_COMBINE_BEFORE_INSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
DEFAULT_COMBINE_BEFORE_UPSERT);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
DEFAULT_COMBINE_BEFORE_DELETE);
setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public abstract JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecord

/**
* Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
* present)
* present).
*/
public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) throws HoodieIndexException;
Expand Down
Loading