Skip to content

Commit

Permalink
Avoid early expiration of an pending future due to delayed pinning (f…
Browse files Browse the repository at this point in the history
…ixes #1623)
  • Loading branch information
ben-manes committed Jan 14, 2025
1 parent 25405d6 commit 9b65365
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1929,18 +1929,6 @@ public void run() {
accessOrderWindowDeque().offerLast(node);
}
}

// Ensure that in-flight async computation cannot expire (reset on a completion callback)
if (isComputingAsync(node.getValue())) {
synchronized (node) {
if (!Async.isReady((CompletableFuture<?>) node.getValue())) {
long expirationTime = expirationTicker().read() + ASYNC_EXPIRY;
setVariableTime(node, expirationTime);
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
}
}
}
}
}

Expand Down Expand Up @@ -2311,6 +2299,9 @@ public void putAll(Map<? extends K, ? extends V> map) {
node = nodeFactory.newNode(key, keyReferenceQueue(),
value, valueReferenceQueue(), newWeight, now);
setVariableTime(node, expireAfterCreate(key, value, expiry, now));
long expirationTime = isComputingAsync(value) ? (now + ASYNC_EXPIRY) : now;
setAccessTime(node, expirationTime);
setWriteTime(node, expirationTime);
}
prior = data.putIfAbsent(node.getKeyReference(), node);
if (prior == null) {
Expand Down Expand Up @@ -2391,21 +2382,22 @@ public void putAll(Map<? extends K, ? extends V> map) {
varTime = expireAfterUpdate(prior, key, value, expiry, now);
}

long expirationTime = isComputingAsync(value) ? (now + ASYNC_EXPIRY) : now;
if (mayUpdate) {
exceedsTolerance =
(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)
|| (expiresVariable()
&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);
setWriteTime(prior, expirationTime);

prior.setValue(value, valueReferenceQueue());
prior.setWeight(newWeight);
setWriteTime(prior, now);

discardRefresh(prior.getKeyReference());
}

setVariableTime(prior, varTime);
setAccessTime(prior, now);
setAccessTime(prior, expirationTime);
}

if (expired) {
Expand All @@ -2422,9 +2414,6 @@ public void putAll(Map<? extends K, ? extends V> map) {
} else if (!onlyIfAbsent && exceedsTolerance) {
afterWrite(new UpdateTask(prior, weightedDifference));
} else {
if (mayUpdate) {
setWriteTime(prior, now);
}
afterRead(prior, now, /* recordHit= */ false);
}

Expand Down Expand Up @@ -2548,9 +2537,11 @@ public boolean remove(Object key, Object value) {
n.setValue(value, valueReferenceQueue());
n.setWeight(weight);

long expirationTime = isComputingAsync(value) ? (now[0] + ASYNC_EXPIRY) : now[0];
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);

discardRefresh(k);
return n;
}
Expand Down Expand Up @@ -2605,9 +2596,10 @@ public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefre
n.setValue(newValue, valueReferenceQueue());
n.setWeight(weight);

long expirationTime = isComputingAsync(newValue) ? (now[0] + ASYNC_EXPIRY) : now[0];
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);

if (shouldDiscardRefresh) {
discardRefresh(k);
Expand Down Expand Up @@ -2697,6 +2689,9 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
var created = nodeFactory.newNode(key, keyReferenceQueue(),
newValue[0], valueReferenceQueue(), weight[1], now[0]);
setVariableTime(created, expireAfterCreate(key, newValue[0], expiry(), now[0]));
long expirationTime = isComputingAsync(newValue[0]) ? (now[0] + ASYNC_EXPIRY) : now[0];
setAccessTime(created, expirationTime);
setWriteTime(created, expirationTime);
return created;
}

Expand Down Expand Up @@ -2730,8 +2725,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
n.setWeight(weight[1]);

setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
if (isComputingAsync(newValue[0])) {
long expirationTime = now[0] + ASYNC_EXPIRY;
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
} else {
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
}
discardRefresh(k);
return n;
}
Expand Down Expand Up @@ -2867,9 +2868,12 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
long varTime = expireAfterCreate(key, newValue[0], expiry, now[0]);
var created = nodeFactory.newNode(keyRef, newValue[0],
valueReferenceQueue(), weight[1], now[0]);

long expirationTime = isComputingAsync(newValue[0]) ? (now[0] + ASYNC_EXPIRY) : now[0];
setAccessTime(created, expirationTime);
setWriteTime(created, expirationTime);
setVariableTime(created, varTime);
setAccessTime(created, now[0]);
setWriteTime(created, now[0]);

discardRefresh(key);
return created;
}
Expand Down Expand Up @@ -2920,9 +2924,11 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
n.setValue(newValue[0], valueReferenceQueue());
n.setWeight(weight[1]);

long expirationTime = isComputingAsync(newValue[0]) ? (now[0] + ASYNC_EXPIRY) : now[0];
setAccessTime(n, expirationTime);
setWriteTime(n, expirationTime);
setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);

discardRefresh(kr);
return n;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2926,6 +2926,30 @@ public void expireAfterRead_disabled(BoundedLocalCache<Int, Int> cache, CacheCon
assertThat(duration).isEqualTo(expiresAt);
}

@Test(dataProvider = "caches")
@CacheSpec(mustExpireWithAnyOf = { AFTER_WRITE, VARIABLE },
expiry = { CacheExpiry.DISABLED, CacheExpiry.WRITE },
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE)
public void expireAfterWrite_writeTime(AsyncCache<Int, Int> cache, CacheContext context) {
var localCache = asBoundedLocalCache(cache);
localCache.setDrainStatusRelease(PROCESSING_TO_REQUIRED);

var future = new CompletableFuture<Int>();
cache.put(context.absentKey(), future);
assertThat(localCache.writeBuffer).isNotEmpty();

context.ticker().advance(Duration.ofMinutes(2));
future.complete(context.absentValue());
assertThat(localCache.writeBuffer).isNotEmpty();

localCache.setDrainStatusRelease(REQUIRED);
assertThat(cache.getIfPresent(context.absentKey())).isNotNull();
context.ticker().advance(Duration.ofSeconds(45));
assertThat(cache.getIfPresent(context.absentKey())).isNotNull();
context.ticker().advance(Duration.ofSeconds(15));
assertThat(cache.getIfPresent(context.absentKey())).isNull();
}

@Test
public void fixedExpireAfterWrite() {
int key = 1;
Expand Down

0 comments on commit 9b65365

Please sign in to comment.