Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL 1.X - Guardrails to avoid checkpoint corruption during resharding #779

Merged
merged 2 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,29 @@ private void applicationCheckpointAndVerification() {
.withCheckpointer(recordProcessorCheckpointer);
recordProcessor.shutdown(shardEndShutdownInput);

final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
boolean successfullyCheckpointedShardEnd = false;

final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
KinesisClientLease leaseFromDdb = null;
try {
leaseFromDdb = leaseCoordinator.getLeaseManager().getLease(shardInfo.getShardId());
} catch (Exception e) {
LOG.error("Shard " + shardInfo.getShardId() + " : Unable to get lease entry for shard to verify shard end checkpointing.", e);
}

if (leaseFromDdb != null && leaseFromDdb.getCheckpoint() != null) {
successfullyCheckpointedShardEnd = leaseFromDdb.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END);
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
if (!leaseFromDdb.getCheckpoint().equals(lastCheckpointValue)) {
LOG.error("Shard " + shardInfo.getShardId() +
" : Checkpoint information mismatch between authoritative source and local cache. " +
"This does not affect the application flow, but cut a ticket to Kinesis when you see this. " +
"Authoritative entry : " + leaseFromDdb.getCheckpoint() + " Cache entry : " + lastCheckpointValue);
}
} else {
LOG.error("Shard " + shardInfo.getShardId() + " : No lease checkpoint entry for shard to verify shard end checkpointing. Lease Entry : " + leaseFromDdb);
}

if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
if (!successfullyCheckpointedShardEnd) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
Expand Down Expand Up @@ -126,6 +127,19 @@ public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(Kinesi
return baseSerializer.getDynamoLeaseOwnerExpectation(lease);
}

@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseCheckpointExpectation(KinesisClientLease lease) {
Map<String, ExpectedAttributeValue> result = baseSerializer.getDynamoLeaseCheckpointExpectation(lease);
ExpectedAttributeValue eav;

if (!lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
eav = new ExpectedAttributeValue(DynamoUtils.createAttributeValue(ExtendedSequenceNumber.SHARD_END.getSequenceNumber()));
eav.setComparisonOperator(ComparisonOperator.NE);
result.put(CHECKPOINT_SEQUENCE_NUMBER_KEY, eav);
}
return result;
}

@Override
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation() {
return baseSerializer.getDynamoNonexistantExpectation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion
LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId());
cleanedUpCompletedLease = true;
}
} else {
cleanupFailureReason = "Configuration/Interval condition not satisfied to execute lease cleanup this cycle";
}
if (!cleanedUpCompletedLease && !alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
// throws ResourceNotFoundException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.TimeUnit;

import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.ExpectedAttributeValue;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.leases.util.DynamoUtils;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -582,7 +583,9 @@ public boolean updateLease(T lease)
UpdateItemRequest request = new UpdateItemRequest();
request.setTableName(table);
request.setKey(serializer.getDynamoHashKey(lease));
request.setExpected(serializer.getDynamoLeaseCounterExpectation(lease));
Map<String, ExpectedAttributeValue> expectations = serializer.getDynamoLeaseCounterExpectation(lease);
expectations.putAll(serializer.getDynamoLeaseCheckpointExpectation(lease));
request.setExpected(expectations);
Copy link
Contributor

@joshua-kim joshua-kim Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is also used for leaseRenewal/leaseStealing codepath, where we're heartbeating the lease counter/owner. We don't necessarily expect SHARD_END in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being used anywhere other than checkpoint code path. And we don't expect SHARD_END here. We expect it not to present when we write a non SHARD_END checkpoint.


Map<String, AttributeValueUpdate> updates = serializer.getDynamoLeaseCounterUpdate(lease);
updates.putAll(serializer.getDynamoUpdateLeaseUpdate(lease));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(Lease
return result;
}

@Override
public Map<String, ExpectedAttributeValue> getDynamoLeaseCheckpointExpectation(final Lease lease) {
return new HashMap<>();
}

@Override
public Map<String, ExpectedAttributeValue> getDynamoNonexistantExpectation() {
Map<String, ExpectedAttributeValue> result = new HashMap<String, ExpectedAttributeValue>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public interface ILeaseSerializer<T extends Lease> {
*/
public Map<String, ExpectedAttributeValue> getDynamoLeaseOwnerExpectation(T lease);

/**
* @param lease
* @return the attribute value map asserting that the checkpoint state is as expected.
*/
default Map<String, ExpectedAttributeValue> getDynamoLeaseCheckpointExpectation(T lease) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe good to rename this to something like getDynamoLeaseShardEndCheckpointExpectation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to keep this generic, so that we can add any future state expectations. Also this would set expectation based on checkpoint in the lease.

throw new UnsupportedOperationException("DynamoLeaseCheckpointExpectation is not implemented");
}

/**
* @return the attribute value map asserting that a lease does not exist.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,9 @@ public final void testConsumeShardWithTransientTerminateError() throws Exception
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
KinesisClientLease currentLease1 = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease1.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease, currentLease, currentLease1);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(currentLease);

RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
Expand Down Expand Up @@ -714,7 +716,10 @@ public final void testConsumeShardWithShardEnd() throws Exception {
parentShardIds.add("parentShardId");
KinesisClientLease currentLease = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease.setCheckpoint(new ExtendedSequenceNumber("testSequenceNumbeer"));
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease);
KinesisClientLease currentLease1 = createLease(streamShardId, "leaseOwner", parentShardIds);
currentLease1.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
when(leaseManager.getLease(streamShardId)).thenReturn(currentLease, currentLease, currentLease1);

when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);

TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -211,11 +212,16 @@ public final void testCallWhenParentInfoNotPresentInLease() throws Exception {
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;

KinesisClientLease currentLease = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease.setCheckpoint(new ExtendedSequenceNumber("3298"));
KinesisClientLease currentLease1 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease1.setCheckpoint(new ExtendedSequenceNumber("3298"));
KinesisClientLease currentLease2 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
currentLease2.setCheckpoint(ExtendedSequenceNumber.SHARD_END);

KinesisClientLease adjacentParentLease = createLease("ShardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease);
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
when(leaseCoordinator.getCurrentlyHeldLease(defaultShardId)).thenReturn( currentLease1);
// 6 times as part of parent lease get in failure mode and then two times in actual execution
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease1, currentLease1, currentLease1, currentLease1,
currentLease1, currentLease1, currentLease1, currentLease2);
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease);

// Make first 5 attempts with partial parent info in lease table
Expand Down Expand Up @@ -267,7 +273,7 @@ public final void testCallWhenParentInfoNotPresentInLease() throws Exception {
verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache).shutdown();
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
verify(leaseCoordinator, never()).dropLease(currentLease);
verify(leaseCoordinator, never()).dropLease(currentLease1);
}

@Test
Expand Down Expand Up @@ -337,6 +343,11 @@ public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() thr
public final void testCallWhenShardEnd() throws Exception {
RecordProcessorCheckpointer checkpointer = mock(RecordProcessorCheckpointer.class);
when(checkpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
final KinesisClientLease parentLease1 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
parentLease1.setCheckpoint(new ExtendedSequenceNumber("3298"));
final KinesisClientLease parentLease2 = createLease(defaultShardId, "leaseOwner", Collections.emptyList());
parentLease2.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
when(leaseManager.getLease(defaultShardId)).thenReturn(parentLease1).thenReturn(parentLease2);
boolean cleanupLeasesOfCompletedShards = false;
boolean ignoreUnexpectedChildShards = false;

Expand Down