Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for faster shard reallocation on graceful shutdown #1005

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ target/
AwsCredentials.properties
.idea
*.iml

.sdkmanrc
.vscode
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,22 @@ public class CommonCalculations {
public static long getRenewerTakerIntervalMillis(long leaseDurationMillis, long epsilonMillis) {
return leaseDurationMillis / 3 - epsilonMillis;
}

/**
* Convenience method for calculating lease taker intervals in milliseconds.
*
* @param leaseTakerIntervalMillis Current value for interval (from default or overriden).
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @return lease taker interval.
*/
public static long getLeaseTakerIntervalMillis(
long leaseTakerIntervalMillis, long leaseDurationMillis, long epsilonMillis
) {
if (leaseTakerIntervalMillis > 0) {
return leaseTakerIntervalMillis;
}

return (leaseDurationMillis + epsilonMillis) * 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,8 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory,
leaseCleanupManager,
schemaRegistryDecoder
schemaRegistryDecoder,
leaseManagementConfig.evictLeaseOnShutdown()
);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class Lease {
/*
* See javadoc for System.nanoTime - summary:
*
*
* Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two
* values will be very large. We will consider leases to be expired if they are more than a year old.
*/
Expand Down Expand Up @@ -111,7 +111,7 @@ public class Lease {

/**
* Copy constructor, used by clone().
*
*
* @param lease lease to copy
*/
protected Lease(Lease lease) {
Expand Down Expand Up @@ -164,7 +164,7 @@ public Set<String> parentShardIds() {
/**
* Updates this Lease's mutable, application-specific fields based on the passed-in lease object. Does not update
* fields that are internal to the leasing library (leaseKey, leaseOwner, leaseCounter).
*
*
* @param lease
*/
public void update(final Lease lease) {
Expand Down Expand Up @@ -195,9 +195,16 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @return true if this lease is unassigned (no assigned owner), false otherwise.
*/
public boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Sets lastCounterIncrementNanos
*
*
* @param lastCounterIncrementNanos last renewal in nanoseconds since the epoch
*/
public void lastCounterIncrementNanos(Long lastCounterIncrementNanos) {
Expand All @@ -206,7 +213,7 @@ public void lastCounterIncrementNanos(Long lastCounterIncrementNanos) {

/**
* Sets concurrencyToken.
*
*
* @param concurrencyToken may not be null
*/
public void concurrencyToken(@NonNull final UUID concurrencyToken) {
Expand All @@ -215,7 +222,7 @@ public void concurrencyToken(@NonNull final UUID concurrencyToken) {

/**
* Sets leaseKey. LeaseKey is immutable once set.
*
*
* @param leaseKey may not be null.
*/
public void leaseKey(@NonNull final String leaseKey) {
Expand All @@ -227,7 +234,7 @@ public void leaseKey(@NonNull final String leaseKey) {

/**
* Sets leaseCounter.
*
*
* @param leaseCounter may not be null
*/
public void leaseCounter(@NonNull final Long leaseCounter) {
Expand Down Expand Up @@ -303,7 +310,7 @@ public void hashKeyRange(HashKeyRangeForLease hashKeyRangeForLease) {

/**
* Sets leaseOwner.
*
*
* @param leaseOwner may be null.
*/
public void leaseOwner(String leaseOwner) {
Expand All @@ -312,12 +319,10 @@ public void leaseOwner(String leaseOwner) {

/**
* Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics.
*
*
* @return A deep copy of this object.
*/
public Lease copy() {
return new Lease(this);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ public class LeaseManagementConfig {
private long listShardsCacheAllowedAgeInSeconds = 30;
private int cacheMissWarningModulus = 250;

/**
* Interval at which the lease taker will execute.
* If unspecified, an interval will be calculated based on the lease duration.
*/
private long leaseTakerIntervalMillis = -1L;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing this property to clients.


/**
* If leases should be evicted or not on shutdown requested.
* By default, leases are not evicted.
*/
private boolean evictLeaseOnShutdown = false;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New property. If false, same behavior as today which is wait for lease to be expired before taking it.


private MetricsFactory metricsFactory = new NullMetricsFactory();

@Deprecated
Expand Down Expand Up @@ -326,6 +338,7 @@ public LeaseManagementFactory leaseManagementFactory() {
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
leaseTakerIntervalMillis,
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
Expand Down Expand Up @@ -361,6 +374,7 @@ public LeaseManagementFactory leaseManagementFactory(final LeaseSerializer lease
executorService(),
failoverTimeMillis(),
epsilonMillis(),
leaseTakerIntervalMillis,
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import software.amazon.kinesis.metrics.MetricsUtil;

import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
import static software.amazon.kinesis.common.CommonCalculations.getLeaseTakerIntervalMillis;

/**
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
Expand Down Expand Up @@ -108,12 +109,13 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final long epsilonMillis,
final long leaseTakerIntervalMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, leaseTakerIntervalMillis,
maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
Expand Down Expand Up @@ -144,6 +146,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
final String workerIdentifier,
final long leaseDurationMillis,
final long epsilonMillis,
final long leaseTakerIntervalMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
Expand All @@ -158,7 +161,7 @@ public DynamoDBLeaseCoordinator(final LeaseRefresher leaseRefresher,
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
this.takerIntervalMillis = getLeaseTakerIntervalMillis(leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis);
if (initialLeaseTableReadCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}
Expand Down
Loading