Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
// it would check the validity.
this.onCheckpointing = true;
// wait for the buffer data flush out and request a new instant
flushBuffer(true);
flushBuffer(true, false);
// signal the task thread to start buffering
addToBufferCondition.signal();
} finally {
Expand Down Expand Up @@ -221,7 +221,7 @@ public void notifyCheckpointComplete(long checkpointId) {
* End input action for batch source.
*/
public void endInput() {
flushBuffer(true);
flushBuffer(true, true);
this.writeClient.cleanHandles();
}

Expand Down Expand Up @@ -333,13 +333,13 @@ private void putDataIntoBuffer(I value) {
private void flushBufferOnCondition(I value) {
boolean needFlush = this.detector.detect(value);
if (needFlush) {
flushBuffer(false);
flushBuffer(false, false);
this.detector.reset();
}
}

@SuppressWarnings("unchecked, rawtypes")
private void flushBuffer(boolean isFinalBatch) {
private void flushBuffer(boolean isLastBatch, boolean isEndInput) {
this.currentInstant = this.writeClient.getInflightAndRequestedInstant(this.config.get(FlinkOptions.TABLE_TYPE));
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
Expand All @@ -364,8 +364,14 @@ private void flushBuffer(boolean isFinalBatch) {
LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant);
writeStatus = Collections.emptyList();
}
this.eventGateway.sendEventToCoordinator(
new BatchWriteSuccessEvent(this.taskID, currentInstant, writeStatus, isFinalBatch));
final BatchWriteSuccessEvent event = BatchWriteSuccessEvent.builder()
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.isLastBatch(isLastBatch)
.isEndInput(isEndInput)
.build();
this.eventGateway.sendEventToCoordinator(event);
this.buffer.clear();
this.currentInstant = "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,6 @@ public class StreamWriteOperatorCoordinator
*/
private final int parallelism;

/**
* Whether the coordinator executes with the bounded data set.
*/
private final boolean isBounded;

/**
* Whether needs to schedule compaction task on finished checkpoints.
*/
Expand All @@ -117,16 +112,13 @@ public class StreamWriteOperatorCoordinator
*
* @param conf The config options
* @param parallelism The operator task number
* @param isBounded Whether the input data source is bounded
*/
public StreamWriteOperatorCoordinator(
Configuration conf,
int parallelism,
boolean isBounded) {
int parallelism) {
this.conf = conf;
this.parallelism = parallelism;
this.needsScheduleCompaction = StreamerUtil.needsScheduleCompaction(conf);
this.isBounded = isBounded;
}

@Override
Expand All @@ -143,11 +135,6 @@ public void start() throws Exception {

@Override
public void close() {
if (isBounded) {
// start to commit the instant.
checkAndCommitWithRetry();
// no compaction scheduling for batch mode
}
// teardown the resource
if (writeClient != null) {
writeClient.close();
Expand Down Expand Up @@ -216,6 +203,11 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
} else {
this.eventBuffer[event.getTaskID()] = event;
}
if (event.isEndInput() && checkReady()) {
// start to commit the instant.
doCommit();
// no compaction scheduling for batch mode
}
}

@Override
Expand Down Expand Up @@ -424,12 +416,10 @@ public HoodieFlinkWriteClient getWriteClient() {
public static class Provider implements OperatorCoordinator.Provider {
private final OperatorID operatorId;
private final Configuration conf;
private final boolean isBounded;

public Provider(OperatorID operatorId, Configuration conf, boolean isBounded) {
public Provider(OperatorID operatorId, Configuration conf) {
this.operatorId = operatorId;
this.conf = conf;
this.isBounded = isBounded;
}

@Override
Expand All @@ -439,7 +429,7 @@ public OperatorID getOperatorId() {

@Override
public OperatorCoordinator create(Context context) {
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism(), isBounded);
return new StreamWriteOperatorCoordinator(this.conf, context.currentParallelism());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,11 @@ public class StreamWriteOperatorFactory<I>

private final StreamWriteOperator<I> operator;
private final Configuration conf;
private final boolean isBounded;

public StreamWriteOperatorFactory(
Configuration conf) {
this(conf, false);
}

public StreamWriteOperatorFactory(
Configuration conf,
boolean isBounded) {
public StreamWriteOperatorFactory(Configuration conf) {
super(new StreamWriteOperator<>(conf));
this.operator = (StreamWriteOperator<I>) getOperator();
this.conf = conf;
this.isBounded = isBounded;
}

@Override
Expand All @@ -70,7 +61,7 @@ public <T extends StreamOperator<Object>> T createStreamOperator(StreamOperatorP

@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s, OperatorID operatorID) {
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, isBounded);
return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* An operator event to mark successful checkpoint batch write.
Expand All @@ -36,13 +37,13 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
private final int taskID;
private final String instantTime;
private boolean isLastBatch;

public BatchWriteSuccessEvent(
int taskID,
String instantTime,
List<WriteStatus> writeStatuses) {
this(taskID, instantTime, writeStatuses, false);
}
/**
* Flag saying whether the event comes from the end of input, e.g. the source
* is bounded, there are two cases in which this flag should be set to true:
* 1. batch execution mode
* 2. bounded stream source such as VALUES
*/
private final boolean isEndInput;

/**
* Creates an event.
Expand All @@ -55,15 +56,24 @@ public BatchWriteSuccessEvent(
* if true, the whole data set of the checkpoint
* has been flushed successfully
*/
public BatchWriteSuccessEvent(
private BatchWriteSuccessEvent(
int taskID,
String instantTime,
List<WriteStatus> writeStatuses,
boolean isLastBatch) {
boolean isLastBatch,
boolean isEndInput) {
this.taskID = taskID;
this.instantTime = instantTime;
this.writeStatuses = new ArrayList<>(writeStatuses);
this.isLastBatch = isLastBatch;
this.isEndInput = isEndInput;
}

/**
* Returns the builder for {@link BatchWriteSuccessEvent}.
*/
public static Builder builder() {
return new Builder();
}

public List<WriteStatus> getWriteStatuses() {
Expand All @@ -82,6 +92,10 @@ public boolean isLastBatch() {
return isLastBatch;
}

public boolean isEndInput() {
return isEndInput;
}

/**
* Merges this event with given {@link BatchWriteSuccessEvent} {@code other}.
*
Expand All @@ -101,4 +115,51 @@ public void mergeWith(BatchWriteSuccessEvent other) {
public boolean isReady(String currentInstant) {
return isLastBatch && this.instantTime.equals(currentInstant);
}

// -------------------------------------------------------------------------
// Builder
// -------------------------------------------------------------------------

/**
* Builder for {@link BatchWriteSuccessEvent}.
*/
public static class Builder {
private List<WriteStatus> writeStatus;
private Integer taskID;
private String instantTime;
private boolean isLastBatch = false;
private boolean isEndInput = false;

public BatchWriteSuccessEvent build() {
Objects.requireNonNull(taskID);
Objects.requireNonNull(instantTime);
Objects.requireNonNull(writeStatus);
return new BatchWriteSuccessEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput);
}

public Builder taskID(int taskID) {
this.taskID = taskID;
return this;
}

public Builder instantTime(String instantTime) {
this.instantTime = instantTime;
return this;
}

public Builder writeStatus(List<WriteStatus> writeStatus) {
this.writeStatus = writeStatus;
return this;
}

public Builder isLastBatch(boolean isLastBatch) {
this.isLastBatch = isLastBatch;
return this;
}

public Builder isEndInput(boolean isEndInput) {
this.isEndInput = isEndInput;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", context.getTable().getPartitionKeys()));
TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema());
inferAvroSchema(conf, tableSchema.toRowDataType().notNull().getLogicalType());
return new HoodieTableSink(conf, tableSchema, context.isBounded());
return new HoodieTableSink(conf, tableSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,18 @@ public class HoodieTableSink implements AppendStreamTableSink<RowData>, Partitio

private final Configuration conf;
private final TableSchema schema;
private final boolean isBounded;

public HoodieTableSink(Configuration conf, TableSchema schema, boolean isBounded) {
public HoodieTableSink(Configuration conf, TableSchema schema) {
this.conf = conf;
this.schema = schema;
this.isBounded = isBounded;
}

@Override
public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
// Read from kafka source
RowType rowType = (RowType) this.schema.toRowDataType().notNull().getLogicalType();
int numWriteTasks = this.conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf, isBounded);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);

DataStream<Object> pipeline = dataStream
.map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class TestStreamWriteOperatorCoordinator {
@BeforeEach
public void before() throws Exception {
coordinator = new StreamWriteOperatorCoordinator(
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2, false);
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), 2);
coordinator.start();
}

Expand All @@ -75,14 +75,22 @@ void testInstantState() {
WriteStatus writeStatus = new WriteStatus(true, 0.1D);
writeStatus.setPartitionPath("par1");
writeStatus.setStat(new HoodieWriteStat());
OperatorEvent event0 =
new BatchWriteSuccessEvent(0, instant, Collections.singletonList(writeStatus), true);
OperatorEvent event0 = BatchWriteSuccessEvent.builder()
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true)
.build();

WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
writeStatus1.setPartitionPath("par2");
writeStatus1.setStat(new HoodieWriteStat());
OperatorEvent event1 =
new BatchWriteSuccessEvent(1, instant, Collections.singletonList(writeStatus1), true);
OperatorEvent event1 = BatchWriteSuccessEvent.builder()
.taskID(1)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1))
.isLastBatch(true)
.build();
coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1);

Expand Down Expand Up @@ -115,7 +123,11 @@ public void testCheckpointAndRestore() throws Exception {
public void testReceiveInvalidEvent() {
CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
OperatorEvent event = new BatchWriteSuccessEvent(0, "abc", Collections.emptyList());
OperatorEvent event = BatchWriteSuccessEvent.builder()
.taskID(0)
.instantTime("abc")
.writeStatus(Collections.emptyList())
.build();
assertThrows(IllegalStateException.class,
() -> coordinator.handleEventFromOperator(0, event),
"Receive an unexpected event for instant abc from task 0");
Expand All @@ -126,7 +138,11 @@ public void testCheckpointCompleteWithRetry() {
final CompletableFuture<byte[]> future = new CompletableFuture<>();
coordinator.checkpointCoordinator(1, future);
String inflightInstant = coordinator.getInstant();
OperatorEvent event = new BatchWriteSuccessEvent(0, inflightInstant, Collections.emptyList());
OperatorEvent event = BatchWriteSuccessEvent.builder()
.taskID(0)
.instantTime(inflightInstant)
.writeStatus(Collections.emptyList())
.build();
coordinator.handleEventFromOperator(0, event);
assertThrows(HoodieException.class,
() -> coordinator.checkpointComplete(1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E
this.gateway = new MockOperatorEventGateway();
this.conf = conf;
// one function
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1, false);
this.coordinator = new StreamWriteOperatorCoordinator(conf, 1);
this.functionInitializationContext = new MockFunctionInitializationContext();
this.compactFunctionWrapper = new CompactFunctionWrapper(this.conf);
}
Expand Down
Loading