Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private void sendBootstrapEvent() {
.taskID(taskID)
.writeStatus(Collections.emptyList())
.instantTime("")
.isBootstrap(true)
.bootstrap(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
Expand All @@ -342,7 +342,7 @@ private void reloadWriteMetaState() throws Exception {
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(new ArrayList<>(writeStatuses))
.isBootstrap(true)
.bootstrap(true)
.build();
this.writeMetadataState.add(event);
writeStatuses.clear();
Expand Down Expand Up @@ -617,8 +617,8 @@ private boolean flushBucket(DataBucket bucket) {
.taskID(taskID)
.instantTime(instant) // the write instant may shift but the event still use the currentInstant.
.writeStatus(writeStatus)
.isLastBatch(false)
.isEndInput(false)
.lastBatch(false)
.endInput(false)
.build();

this.eventGateway.sendEventToCoordinator(event);
Expand All @@ -627,7 +627,7 @@ private boolean flushBucket(DataBucket bucket) {
}

@SuppressWarnings("unchecked, rawtypes")
private void flushRemaining(boolean isEndInput) {
private void flushRemaining(boolean endInput) {
this.currentInstant = instantToWrite(hasData());
if (this.currentInstant == null) {
// in case there are empty checkpoints that has no input data
Expand Down Expand Up @@ -659,8 +659,8 @@ private void flushRemaining(boolean isEndInput) {
.taskID(taskID)
.instantTime(currentInstant)
.writeStatus(writeStatus)
.isLastBatch(true)
.isEndInput(isEndInput)
.lastBatch(true)
.endInput(endInput)
.build();

this.eventGateway.sendEventToCoordinator(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ public class CommitAckEvent implements OperatorEvent {

private static final CommitAckEvent INSTANCE = new CommitAckEvent();

// default constructor for efficient serialization
public CommitAckEvent() {}

public static CommitAckEvent getInstance() {
return INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,50 +34,53 @@ public class WriteMetadataEvent implements OperatorEvent {
private static final long serialVersionUID = 1L;

private List<WriteStatus> writeStatuses;
private final int taskID;
private int taskID;
private String instantTime;
private boolean isLastBatch;
private boolean lastBatch;

/**
* Flag saying whether the event comes from the end of input, e.g. the source
* is bounded, there are two cases in which this flag should be set to true:
* 1. batch execution mode
* 2. bounded stream source such as VALUES
*/
private final boolean isEndInput;
private boolean endInput;

/**
* Flag saying whether the event comes from bootstrap of a write function.
*/
private final boolean isBootstrap;
private boolean bootstrap;

/**
* Creates an event.
*
* @param taskID The task ID
* @param instantTime The instant time under which to write the data
* @param writeStatuses The write statues list
* @param isLastBatch Whether the event reports the last batch
* @param lastBatch Whether the event reports the last batch
* within an checkpoint interval,
* if true, the whole data set of the checkpoint
* has been flushed successfully
* @param isBootstrap Whether the event comes from the bootstrap
* @param bootstrap Whether the event comes from the bootstrap
*/
private WriteMetadataEvent(
int taskID,
String instantTime,
List<WriteStatus> writeStatuses,
boolean isLastBatch,
boolean isEndInput,
boolean isBootstrap) {
boolean lastBatch,
boolean endInput,
boolean bootstrap) {
this.taskID = taskID;
this.instantTime = instantTime;
this.writeStatuses = new ArrayList<>(writeStatuses);
this.isLastBatch = isLastBatch;
this.isEndInput = isEndInput;
this.isBootstrap = isBootstrap;
this.lastBatch = lastBatch;
this.endInput = endInput;
this.bootstrap = bootstrap;
}

// default constructor for efficient serialization
public WriteMetadataEvent() {}

/**
* Returns the builder for {@link WriteMetadataEvent}.
*/
Expand All @@ -89,24 +92,48 @@ public List<WriteStatus> getWriteStatuses() {
return writeStatuses;
}

public void setWriteStatuses(List<WriteStatus> writeStatuses) {
this.writeStatuses = writeStatuses;
}

public int getTaskID() {
return taskID;
}

public void setTaskID(int taskID) {
this.taskID = taskID;
}

public String getInstantTime() {
return instantTime;
}

public boolean isLastBatch() {
return isLastBatch;
public void setInstantTime(String instantTime) {
this.instantTime = instantTime;
}

public boolean isEndInput() {
return isEndInput;
return endInput;
}

public void setEndInput(boolean endInput) {
this.endInput = endInput;
}

public boolean isBootstrap() {
return isBootstrap;
return bootstrap;
}

public void setBootstrap(boolean bootstrap) {
this.bootstrap = bootstrap;
}

public boolean isLastBatch() {
return lastBatch;
}

public void setLastBatch(boolean lastBatch) {
this.lastBatch = lastBatch;
}

/**
Expand All @@ -118,7 +145,7 @@ public void mergeWith(WriteMetadataEvent other) {
ValidationUtils.checkArgument(this.taskID == other.taskID);
// the instant time could be monotonically increasing
this.instantTime = other.instantTime;
this.isLastBatch |= other.isLastBatch; // true if one of the event isLastBatch true.
this.lastBatch |= other.lastBatch; // true if one of the event isLastBatch true.
List<WriteStatus> statusList = new ArrayList<>();
statusList.addAll(this.writeStatuses);
statusList.addAll(other.writeStatuses);
Expand All @@ -129,7 +156,7 @@ public void mergeWith(WriteMetadataEvent other) {
* Returns whether the event is ready to commit.
*/
public boolean isReady(String currentInstant) {
return isLastBatch && this.instantTime.equals(currentInstant);
return lastBatch && this.instantTime.equals(currentInstant);
}

// -------------------------------------------------------------------------
Expand All @@ -143,15 +170,15 @@ public static class Builder {
private List<WriteStatus> writeStatus;
private Integer taskID;
private String instantTime;
private boolean isLastBatch = false;
private boolean isEndInput = false;
private boolean isBootstrap = false;
private boolean lastBatch = false;
private boolean endInput = false;
private boolean bootstrap = false;

public WriteMetadataEvent build() {
Objects.requireNonNull(taskID);
Objects.requireNonNull(instantTime);
Objects.requireNonNull(writeStatus);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, isLastBatch, isEndInput, isBootstrap);
return new WriteMetadataEvent(taskID, instantTime, writeStatus, lastBatch, endInput, bootstrap);
}

public Builder taskID(int taskID) {
Expand All @@ -169,18 +196,18 @@ public Builder writeStatus(List<WriteStatus> writeStatus) {
return this;
}

public Builder isLastBatch(boolean isLastBatch) {
this.isLastBatch = isLastBatch;
public Builder lastBatch(boolean lastBatch) {
this.lastBatch = lastBatch;
return this;
}

public Builder isEndInput(boolean isEndInput) {
this.isEndInput = isEndInput;
public Builder endInput(boolean endInput) {
this.endInput = endInput;
return this;
}

public Builder isBootstrap(boolean isBootstrap) {
this.isBootstrap = isBootstrap;
public Builder bootstrap(boolean bootstrap) {
this.bootstrap = bootstrap;
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public void before() throws Exception {
.taskID(0)
.instantTime("")
.writeStatus(Collections.emptyList())
.isBootstrap(true)
.bootstrap(true)
.build();

final WriteMetadataEvent event1 = WriteMetadataEvent.builder()
.taskID(1)
.instantTime("")
.writeStatus(Collections.emptyList())
.isBootstrap(true)
.bootstrap(true)
.build();

coordinator.handleEventFromOperator(0, event0);
Expand All @@ -106,7 +106,7 @@ void testInstantState() {
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true)
.lastBatch(true)
.build();

WriteStatus writeStatus1 = new WriteStatus(false, 0.2D);
Expand All @@ -116,7 +116,7 @@ void testInstantState() {
.taskID(1)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1))
.isLastBatch(true)
.lastBatch(true)
.build();
coordinator.handleEventFromOperator(0, event0);
coordinator.handleEventFromOperator(1, event1);
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testCheckpointCompleteWithPartialEvents() {
.taskID(1)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus1))
.isLastBatch(true)
.lastBatch(true)
.build();
coordinator.handleEventFromOperator(1, event1);
assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(2),
Expand All @@ -207,7 +207,7 @@ public void testHiveSyncInvoked() throws Exception {
.taskID(0)
.instantTime("")
.writeStatus(Collections.emptyList())
.isBootstrap(true)
.bootstrap(true)
.build();

coordinator.handleEventFromOperator(0, event0);
Expand All @@ -223,7 +223,7 @@ public void testHiveSyncInvoked() throws Exception {
.taskID(0)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.isLastBatch(true)
.lastBatch(true)
.build();

coordinator.handleEventFromOperator(0, event1);
Expand Down