Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -174,7 +175,7 @@ public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTim
/**
* Removes all existing records from the partitions affected and inserts the given HoodieRecords, into the table.
*
* @param records HoodieRecords to insert
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return list of WriteStatus to inspect errors and counts
*/
Expand All @@ -194,7 +195,7 @@ public List<WriteStatus> insertOverwrite(
/**
* Removes all existing records of the Hoodie table and inserts the given HoodieRecords, into the table.
*
* @param records HoodieRecords to insert
* @param records HoodieRecords to insert
* @param instantTime Instant time of the commit
* @return list of WriteStatus to inspect errors and counts
*/
Expand Down Expand Up @@ -235,7 +236,7 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.DELETE, instantTime);
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context,instantTime, keys);
HoodieWriteMetadata<List<WriteStatus>> result = table.delete(context, instantTime, keys);
return postWrite(result, instantTime, table);
}

Expand Down Expand Up @@ -391,11 +392,11 @@ public void cleanHandlesGracefully() {
/**
* Get or create a new write handle in order to reuse the file handles.
*
* @param record The first record in the bucket
* @param config Write config
* @param instantTime The instant time
* @param table The table
* @param recordItr Record iterator
* @param record The first record in the bucket
* @param config Write config
* @param instantTime The instant time
* @param table The table
* @param recordItr Record iterator
* @return Existing write handle or create a new one
*/
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
Expand Down Expand Up @@ -454,7 +455,8 @@ public String getLastPendingInstant(HoodieTableType tableType) {
}

public String getLastPendingInstant(String actionType) {
HoodieTimeline unCompletedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterInflightsAndRequested();
HoodieTimeline unCompletedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterInflightsAndRequested();
return unCompletedTimeline.getInstants()
.filter(x -> x.getAction().equals(actionType))
.map(HoodieInstant::getTimestamp)
Expand All @@ -465,7 +467,8 @@ public String getLastPendingInstant(String actionType) {

public String getLastCompletedInstant(HoodieTableType tableType) {
final String commitType = CommitUtils.getCommitActionType(tableType);
HoodieTimeline completedTimeline = getHoodieTable().getMetaClient().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline completedTimeline = FlinkClientUtil.createMetaClient(basePath)
.getCommitsTimeline().filterCompletedInstants();
return completedTimeline.getInstants()
.filter(x -> x.getAction().equals(commitType))
.map(HoodieInstant::getTimestamp)
Expand All @@ -475,8 +478,7 @@ public String getLastCompletedInstant(HoodieTableType tableType) {
}

public void transitionRequestedToInflight(String commitType, String inFlightInstant) {
HoodieFlinkTable<T> table = getHoodieTable();
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieActiveTimeline activeTimeline = FlinkClientUtil.createMetaClient(basePath).getActiveTimeline();
HoodieInstant requested = new HoodieInstant(HoodieInstant.State.REQUESTED, commitType, inFlightInstant);
activeTimeline.transitionRequestedToInflight(requested, Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.util;

import org.apache.hudi.common.table.HoodieTableMetaClient;

import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -29,6 +31,13 @@
*/
public class FlinkClientUtil {

/**
* Creates the meta client.
*/
public static HoodieTableMetaClient createMetaClient(String basePath) {
return HoodieTableMetaClient.builder().setBasePath(basePath).setConf(FlinkClientUtil.getHadoopConf()).build();
}

/**
* Parses the file name from path.
*/
Expand Down
155 changes: 122 additions & 33 deletions hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.table.action.commit.FlinkWriteHelper;
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.state.FunctionInitializationContext;
Expand All @@ -54,6 +57,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
Expand All @@ -76,20 +80,18 @@
* starts a new instant on the time line when a checkpoint triggers, the coordinator checkpoints always
* start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists.
*
* <p>In order to improve the throughput, The function process thread does not block data buffering
* after the checkpoint thread starts flushing the existing data buffer. So there is possibility that the next checkpoint
* batch was written to current checkpoint. When a checkpoint failure triggers the write rollback, there may be some duplicate records
* (e.g. the eager write batch), the semantics is still correct using the UPSERT operation.
* <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until
* the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing,
* when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata
* can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job
* recovers.
*
* <p><h2>Fault Tolerance</h2>
*
* <p>The operator coordinator checks and commits the last instant then starts a new one when a checkpoint finished successfully.
* The operator rolls back the written data and throws to trigger a failover when any error occurs.
* This means one Hoodie instant may span one or more checkpoints(some checkpoints notifications may be skipped).
* If a checkpoint timed out, the next checkpoint would help to rewrite the left buffer data (clean the buffer in the last
* step of the #flushBuffer method).
*
* <p>The operator coordinator would try several times when committing the write status.
* <p>The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully.
* It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint,
* the write function blocks data buffer flushing for the configured checkpoint timeout
* before it throws exception, any checkpoint failure would finally trigger the job failure.
*
* <p>Note: The function task requires the input stream be shuffled by the file IDs.
*
Expand Down Expand Up @@ -162,6 +164,16 @@ public class StreamWriteFunction<K, I, O>
*/
private volatile boolean confirming = false;

/**
* List state of the write metadata events.
*/
private transient ListState<WriteMetadataEvent> writeMetadataState;

/**
* Write status list for the current checkpoint.
*/
private List<WriteStatus> writeStatuses;

/**
* Constructs a StreamingSinkFunction.
*
Expand All @@ -173,27 +185,43 @@ public StreamWriteFunction(Configuration config) {

@Override
public void open(Configuration parameters) throws IOException {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
this.tracer = new TotalSizeTracer(this.config);
initBuffer();
initWriteFunction();
}

@Override
public void initializeState(FunctionInitializationContext context) {
// no operation
public void initializeState(FunctionInitializationContext context) throws Exception {
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));

this.writeStatuses = new ArrayList<>();
this.writeMetadataState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
"write-metadata-state",
TypeInformation.of(WriteMetadataEvent.class)
));

if (context.isRestored()) {
restoreWriteMetadata();
} else {
sendBootstrapEvent();
}
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
// Based on the fact that the coordinator starts the checkpoint first,
// it would check the validity.
// wait for the buffer data flush out and request a new instant
flushRemaining(false);
// Reload the snapshot state as the current state.
reloadWriteMetaState();
}

@Override
Expand All @@ -215,6 +243,7 @@ public void close() {
public void endInput() {
flushRemaining(true);
this.writeClient.cleanHandles();
this.writeStatuses.clear();
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -274,6 +303,49 @@ private void initWriteFunction() {
}
}

private void restoreWriteMetadata() throws Exception {
String lastInflight = this.writeClient.getLastPendingInstant(this.actionType);
boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
if (Objects.equals(lastInflight, event.getInstantTime())) {
// The checkpoint succeed but the meta does not commit,
// re-commit the inflight instant
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
eventSent = true;
}
}
if (!eventSent) {
sendBootstrapEvent();
}
}

private void sendBootstrapEvent() {
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.writeStatus(Collections.emptyList())
.instantTime("")
.isBootstrap(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
}

/**
* Reload the write metadata state as the current checkpoint.
*/
private void reloadWriteMetaState() throws Exception {
this.writeMetadataState.clear();
WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.isBootstrap(true)
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
}

/**
* Represents a data item in the buffer, this is needed to reduce the
* memory footprint.
Expand Down Expand Up @@ -477,23 +549,23 @@ private void bufferRecord(HoodieRecord<?> value) {
bucket.records.add(item);
}

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = this.writeClient.getLastPendingInstant(this.actionType);

if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}
private boolean hasData() {
return this.buckets.size() > 0
&& this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
}

private String instantToWrite() {
String instant = this.writeClient.getLastPendingInstant(this.actionType);
// if exactly-once semantics turns on,
// waits for the checkpoint notification until the checkpoint timeout threshold hits.
if (confirming) {
long waitingTime = 0L;
long ckpTimeout = config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT);
long interval = 500L;
while (instant == null || instant.equals(this.currentInstant)) {
// wait condition:
// 1. there is no inflight instant
// 2. the inflight instant does not change and the checkpoint has buffering data
while (instant == null || (instant.equals(this.currentInstant) && hasData())) {
// sleep for a while
try {
if (waitingTime > ckpTimeout) {
Expand All @@ -511,6 +583,18 @@ private boolean flushBucket(DataBucket bucket) {
// successfully.
confirming = false;
}
return instant;
}

@SuppressWarnings("unchecked, rawtypes")
private boolean flushBucket(DataBucket bucket) {
String instant = instantToWrite();

if (instant == null) {
// in case there are empty checkpoints that has no input data
LOG.info("No inflight instant when flushing data, skip.");
return false;
}

List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records");
Expand All @@ -520,20 +604,22 @@ private boolean flushBucket(DataBucket bucket) {
bucket.preWrite(records);
final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant));
records.clear();
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus)
.isLastBatch(false)
.isEndInput(false)
.build();

this.eventGateway.sendEventToCoordinator(event);
writeStatuses.addAll(writeStatus);
return true;
}

@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
this.currentInstant = this.writeClient.getLastPendingInstant(this.actionType);
this.currentInstant = instantToWrite();
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
throw new HoodieException("No inflight instant when flushing data!");
Expand All @@ -560,17 +646,20 @@ private void flushRemaining(boolean isEndInput) {
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
}
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.isLastBatch(true)
.isEndInput(isEndInput)
.build();

this.eventGateway.sendEventToCoordinator(event);
this.buckets.clear();
this.tracer.reset();
this.writeClient.cleanHandles();
this.writeStatuses.addAll(writeStatus);
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
}

@Override
public void endInput() throws Exception {
public void endInput() {
sinkFunction.endInput();
}
}
Loading