Skip to content

Commit b03d138

Browse files
authored
Lift retention lease expiration to index shard (#38380)
This commit lifts the control of when retention leases are expired to index shard. In this case, we move expiration to an explicit action rather than a side-effect of calling ReplicationTracker#getRetentionLeases. This explicit action is invoked on a timer. If any retention leases expire, then we hard sync the retention leases to the replicas. Otherwise, we proceed with a background sync.
1 parent 4a15e2b commit b03d138

File tree

7 files changed

+192
-212
lines changed

7 files changed

+192
-212
lines changed

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
121121
private volatile AsyncRefreshTask refreshTask;
122122
private volatile AsyncTranslogFSync fsyncTask;
123123
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
124-
private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask;
124+
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
125125

126126
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
127127
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -198,7 +198,7 @@ public IndexService(
198198
this.refreshTask = new AsyncRefreshTask(this);
199199
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
200200
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
201-
this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this);
201+
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
202202
rescheduleFsyncTask(indexSettings.getTranslogDurability());
203203
}
204204

@@ -289,7 +289,7 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
289289
fsyncTask,
290290
trimTranslogTask,
291291
globalCheckpointTask,
292-
retentionLeaseBackgroundSyncTask);
292+
retentionLeaseSyncTask);
293293
}
294294
}
295295
}
@@ -788,8 +788,8 @@ private void maybeSyncGlobalCheckpoints() {
788788
sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint");
789789
}
790790

791-
private void backgroundSyncRetentionLeases() {
792-
sync(IndexShard::backgroundSyncRetentionLeases, "retention lease");
791+
private void syncRetentionLeases() {
792+
sync(IndexShard::syncRetentionLeases, "retention lease");
793793
}
794794

795795
private void sync(final Consumer<IndexShard> sync, final String source) {
@@ -812,11 +812,11 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
812812
&& e instanceof IndexShardClosedException == false) {
813813
logger.warn(
814814
new ParameterizedMessage(
815-
"{} failed to execute background {} sync", shard.shardId(), source), e);
815+
"{} failed to execute {} sync", shard.shardId(), source), e);
816816
}
817817
},
818818
ThreadPool.Names.SAME,
819-
"background " + source + " sync");
819+
source + " sync");
820820
} catch (final AlreadyClosedException | IndexShardClosedException e) {
821821
// the shard was closed concurrently, continue
822822
}
@@ -957,15 +957,15 @@ public String toString() {
957957
}
958958
}
959959

960-
final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask {
960+
final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask {
961961

962-
AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) {
962+
AsyncRetentionLeaseSyncTask(final IndexService indexService) {
963963
super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings()));
964964
}
965965

966966
@Override
967967
protected void runInternal() {
968-
indexService.backgroundSyncRetentionLeases();
968+
indexService.syncRetentionLeases();
969969
}
970970

971971
@Override
@@ -975,7 +975,7 @@ protected String getThreadPool() {
975975

976976
@Override
977977
public String toString() {
978-
return "retention_lease_background_sync";
978+
return "retention_lease_sync";
979979
}
980980

981981
}

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,10 @@ public long getRetentionLeaseMillis() {
339339
return retentionLeaseMillis;
340340
}
341341

342+
private void setRetentionLeaseMillis(final TimeValue retentionLease) {
343+
this.retentionLeaseMillis = retentionLease.millis();
344+
}
345+
342346
private volatile boolean warmerEnabled;
343347
private volatile int maxResultWindow;
344348
private volatile int maxInnerResultWindow;
@@ -523,6 +527,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
523527
scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline);
524528
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
525529
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled);
530+
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING, this::setRetentionLeaseMillis);
526531
}
527532

528533
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2929
import org.elasticsearch.cluster.routing.ShardRouting;
3030
import org.elasticsearch.common.SuppressForbidden;
31+
import org.elasticsearch.common.collect.Tuple;
3132
import org.elasticsearch.common.io.stream.StreamInput;
3233
import org.elasticsearch.common.io.stream.StreamOutput;
3334
import org.elasticsearch.common.io.stream.Writeable;
@@ -155,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
155156
private final LongSupplier currentTimeMillisSupplier;
156157

157158
/**
158-
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
159-
* retention lease sync action, to sync retention leases to replicas.
159+
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
160+
* retention leases to replicas.
160161
*/
161-
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
162+
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease;
162163

163164
/**
164165
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@@ -177,43 +178,42 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
177178
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
178179

179180
/**
180-
* Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired,
181-
* and if any have expired, syncs the retention leases to any replicas.
181+
* Get all retention leases tracked on this shard.
182182
*
183183
* @return the retention leases
184184
*/
185185
public RetentionLeases getRetentionLeases() {
186-
final boolean wasPrimaryMode;
187-
final RetentionLeases nonExpiredRetentionLeases;
188-
synchronized (this) {
189-
if (primaryMode) {
190-
// the primary calculates the non-expired retention leases and syncs them to replicas
191-
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
192-
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
193-
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
194-
.leases()
195-
.stream()
196-
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
197-
if (partitionByExpiration.get(true) == null) {
198-
// early out as no retention leases have expired
199-
return retentionLeases;
200-
}
201-
final Collection<RetentionLease> nonExpiredLeases =
202-
partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
203-
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
204-
}
205-
/*
206-
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
207-
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
208-
* non-expired retention leases, instead receiving them on syncs from the primary.
209-
*/
210-
wasPrimaryMode = primaryMode;
211-
nonExpiredRetentionLeases = retentionLeases;
186+
return getRetentionLeases(false).v2();
187+
}
188+
189+
/**
190+
* If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
191+
* expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
192+
* primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
193+
* expire leases parameter is true, this replication tracker must be in primary mode.
194+
*
195+
* @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
196+
*/
197+
public synchronized Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
198+
if (expireLeases == false) {
199+
return Tuple.tuple(false, retentionLeases);
212200
}
213-
if (wasPrimaryMode) {
214-
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
201+
assert primaryMode;
202+
// the primary calculates the non-expired retention leases and syncs them to replicas
203+
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
204+
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
205+
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
206+
.leases()
207+
.stream()
208+
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
209+
if (partitionByExpiration.get(true) == null) {
210+
// early out as no retention leases have expired
211+
return Tuple.tuple(false, retentionLeases);
215212
}
216-
return nonExpiredRetentionLeases;
213+
final Collection<RetentionLease> nonExpiredLeases =
214+
partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
215+
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
216+
return Tuple.tuple(true, retentionLeases);
217217
}
218218

219219
/**
@@ -246,7 +246,7 @@ public RetentionLease addRetentionLease(
246246
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
247247
currentRetentionLeases = retentionLeases;
248248
}
249-
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
249+
onAddRetentionLease.accept(currentRetentionLeases, listener);
250250
return retentionLease;
251251
}
252252

@@ -563,7 +563,7 @@ private static long inSyncCheckpointStates(
563563
* @param indexSettings the index settings
564564
* @param operationPrimaryTerm the current primary term
565565
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
566-
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
566+
* @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires
567567
*/
568568
public ReplicationTracker(
569569
final ShardId shardId,
@@ -573,7 +573,7 @@ public ReplicationTracker(
573573
final long globalCheckpoint,
574574
final LongConsumer onGlobalCheckpointUpdated,
575575
final LongSupplier currentTimeMillisSupplier,
576-
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
576+
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease) {
577577
super(shardId, indexSettings);
578578
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
579579
this.shardAllocationId = allocationId;
@@ -585,7 +585,7 @@ public ReplicationTracker(
585585
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
586586
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
587587
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
588-
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
588+
this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease);
589589
this.pendingInSync = new HashSet<>();
590590
this.routingTable = null;
591591
this.replicationGroup = null;

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1892,13 +1892,26 @@ public void addGlobalCheckpointListener(
18921892
}
18931893

18941894
/**
1895-
* Get all non-expired retention leases tracked on this shard.
1895+
* Get all retention leases tracked on this shard.
18961896
*
18971897
* @return the retention leases
18981898
*/
18991899
public RetentionLeases getRetentionLeases() {
1900+
return getRetentionLeases(false).v2();
1901+
}
1902+
1903+
/**
1904+
* If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates
1905+
* expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the
1906+
* primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the
1907+
* expire leases parameter is true, this replication tracker must be in primary mode.
1908+
*
1909+
* @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases
1910+
*/
1911+
public Tuple<Boolean, RetentionLeases> getRetentionLeases(final boolean expireLeases) {
1912+
assert expireLeases == false || assertPrimaryMode();
19001913
verifyNotClosed();
1901-
return replicationTracker.getRetentionLeases();
1914+
return replicationTracker.getRetentionLeases(expireLeases);
19021915
}
19031916

19041917
public RetentionLeaseStats getRetentionLeaseStats() {
@@ -1956,10 +1969,15 @@ public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases
19561969
/**
19571970
* Syncs the current retention leases to all replicas.
19581971
*/
1959-
public void backgroundSyncRetentionLeases() {
1972+
public void syncRetentionLeases() {
19601973
assert assertPrimaryMode();
19611974
verifyNotClosed();
1962-
retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases());
1975+
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
1976+
if (retentionLeases.v1()) {
1977+
retentionLeaseSyncer.sync(shardId, retentionLeases.v2(), ActionListener.wrap(() -> {}));
1978+
} else {
1979+
retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2());
1980+
}
19631981
}
19641982

19651983
/**

0 commit comments

Comments
 (0)