Skip to content

Commit

Permalink
KCLRecordProcessor handles multiple shutdown calls gracefully
Browse files Browse the repository at this point in the history
In some occasions KCL library calls shutdown multiple times for the
same shards KCLRecordProcessor. We have to make sure it doesn't fail
because closed dependencies.
  • Loading branch information
ebartkus committed Jul 12, 2019
1 parent 9c4daca commit fbd6754
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,19 @@ public void shutdown(ShutdownInput shutdownInput) {
* Called on ShutdownReason.TERMINATE
*/
private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateException, ShutdownException, InterruptedException {
if (!lastProcessedSeqNo.isEmpty()) {
ShardInfo processorRegister = shardRegister.get(this.shardId);

while (!processorRegister.getLastCommittedRecordSeqNo().equals(this.lastProcessedSeqNo)) {
LOGGER.info(
"Shard ended. Waiting for all data table: {} from shard: {} to be committed. " +
"lastCommittedRecordSeqNo: {} lastProcessedSeqNo: {}",
tableName,
shardId,
processorRegister.getLastCommittedRecordSeqNo(),
this.lastProcessedSeqNo);
Thread.sleep(1000);
if (lastProcessedSeqNo != null && !lastProcessedSeqNo.isEmpty()) {
ShardInfo processorRegister = shardRegister.getOrDefault(this.shardId, null);
if (processorRegister != null) {
while (!processorRegister.getLastCommittedRecordSeqNo().equals(this.lastProcessedSeqNo)) {
LOGGER.info(
"Shard ended. Waiting for all data table: {} from shard: {} to be committed. " +
"lastCommittedRecordSeqNo: {} lastProcessedSeqNo: {}",
tableName,
shardId,
processorRegister.getLastCommittedRecordSeqNo(),
this.lastProcessedSeqNo);
Thread.sleep(500);
}
}
}

Expand All @@ -231,7 +232,10 @@ private void onTerminate(ShutdownInput shutdownInput) throws InvalidStateExcepti
"Shard ended. All data committed. Checkpoint and proceed to next one. Table: {} ShardID: {}",
tableName,
shardId);
shutdownInput.getCheckpointer().checkpoint();
IRecordProcessorCheckpointer checkpointer = shutdownInput.getCheckpointer();
if (checkpointer != null) {
checkpointer.checkpoint();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,27 @@ void shutdownWaitsForLastRecordToBeCommittedOnShardEndBeforeCheckpoint() throws
verify(checkpointer, only()).checkpoint();
}

@Test
void shutdownSucceddsIfCalledMultipleTimes() throws InvalidStateException, ShutdownException {
// Arrange
IRecordProcessorCheckpointer checkpointer = Mockito.mock(IRecordProcessorCheckpointer.class);
ProcessRecordsInput processRecordsInput = getProcessRecordsInput("SQ1").withCheckpointer(checkpointer);
shardRegister.get(shardId).setLastCommittedRecordSeqNo("SQ1");

// Act
processor.processRecords(processRecordsInput);
ShutdownInput shutdownInput = new ShutdownInput()
.withShutdownReason(ShutdownReason.TERMINATE)
.withCheckpointer(checkpointer);

processor.shutdown(shutdownInput);
processor.shutdown(shutdownInput);

// Assert
verify(checkpointer, times(2)).checkpoint();
assertFalse(shardRegister.containsKey(shardId));
}

@Test
void shutdownCheckpointsIfNoDataWasReceivedFromThisShard() throws InvalidStateException, ShutdownException {
// Arrange
Expand Down

0 comments on commit fbd6754

Please sign in to comment.