diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 1868cc3..02ad7ea 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -414,6 +414,13 @@ private void RegisterAsProcessed(String shardId, String sequenceNumber) { ShardInfo shardInfo = shardRegister.get(shardId); String currentSeqNo = shardInfo.getLastCommittedRecordSeqNo(); + // shardInfo could be null if the AWS KCL lib has removed it from the registry + if (shardInfo == null) { + LOGGER.info("Skipping commitRecord: ShardID: {} lastPushedSequenceNumber: {}" + + " because shardId is not present any longer in the shardRegister", shardId, sequenceNumber); + return; + } + // Prevent issues with out of order registration. // Since we "commit" some records before sending to Kafka(and them being committed) in some cases. if (currentSeqNo != null && !currentSeqNo.equals("")) { @@ -440,4 +447,4 @@ ArrayBlockingQueue getEventsQueue() { SourceInfo getSourceInfo() { return sourceInfo; } -} \ No newline at end of file +}