Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I

private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
private transient AsyncCleanerService asyncCleanerService;
protected final boolean rollbackPending;
protected transient AsyncCleanerService asyncCleanerService;

/**
* Create a write client, without cleaning up failed/inflight commits.
Expand Down Expand Up @@ -134,7 +134,6 @@ public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.index = createIndex(writeConfig);
syncTableMetadata();
}

protected abstract HoodieIndex<T, I, K, O> createIndex(HoodieWriteConfig writeConfig);
Expand Down Expand Up @@ -368,6 +367,19 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
*/
public abstract O delete(K keys, final String instantTime);

/**
* Common method containing steps to be performed before write (upsert/insert/...
*
* @param instantTime Instant Time
* @param hoodieTable Hoodie Table
* @return Write Status
*/
protected void preWrite(String instantTime, WriteOperationType writeOperationType) {
setOperationType(writeOperationType);
syncTableMetadata();
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}

/**
* Common method containing steps to be performed after write (upsert/insert/..) operations including auto-commit.
* @param result Commit Action Result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -365,7 +364,6 @@ private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync);

// Read each instant in order and sync it to metadata table
final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline();
for (HoodieInstant instant : instantsToSync) {
LOG.info("Syncing instant " + instant + " to metadata table");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTim
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
Expand All @@ -125,8 +124,7 @@ public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTim
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
Expand Down Expand Up @@ -158,7 +156,7 @@ public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedR
public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
preWrite(instantTime, WriteOperationType.DELETE);
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
Expand All @@ -112,8 +111,7 @@ public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecor
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata<List<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
Expand All @@ -123,8 +121,7 @@ public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTim
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
Expand All @@ -138,8 +135,7 @@ public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecor
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata<List<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
Expand Down Expand Up @@ -169,7 +165,7 @@ public List<WriteStatus> delete(List<HoodieKey> keys,
String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
preWrite(instantTime, WriteOperationType.DELETE);
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public HoodieSparkCompactor(AbstractHoodieWriteClient<T, JavaRDD<HoodieRecord<T>
@Override
public void compact(HoodieInstant instant) throws IOException {
LOG.info("Compactor executing compaction " + instant);
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>)compactionClient;
SparkRDDWriteClient<T> writeClient = (SparkRDDWriteClient<T>) compactionClient;
JavaRDD<WriteStatus> res = writeClient.compact(instant.getTimestamp());
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status");
long numWriteErrors = res.collect().stream().filter(WriteStatus::hasErrors).count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String inst
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
Expand All @@ -154,8 +153,7 @@ public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
table.validateUpsertSchema();
setOperationType(WriteOperationType.UPSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
Expand All @@ -165,8 +163,7 @@ public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String inst
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
return postWrite(result, instantTime, table);
}
Expand All @@ -176,8 +173,7 @@ public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_PREPPED);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
return postWrite(result, instantTime, table);
}
Expand All @@ -192,8 +188,7 @@ public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppe
public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE);
HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
Expand All @@ -209,8 +204,7 @@ public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final
public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.INSERT_OVERWRITE_TABLE);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE);
HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
Expand All @@ -225,8 +219,7 @@ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.BULK_INSERT);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
return postWrite(result, instantTime, table);
}
Expand All @@ -236,23 +229,22 @@ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> pr
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime);
table.validateInsertSchema();
setOperationType(WriteOperationType.BULK_INSERT_PREPPED);
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
return postWrite(result, instantTime, table);
}

@Override
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
setOperationType(WriteOperationType.DELETE);
preWrite(instantTime, WriteOperationType.DELETE);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.delete(context,instantTime, keys);
return postWrite(result, instantTime, table);
}

public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime);
setOperationType(WriteOperationType.DELETE_PARTITION);
preWrite(instantTime, WriteOperationType.DELETE_PARTITION);
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context,instantTime, partitions);
return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
}
Expand Down
Loading