Introduce a hard capacity limit for the objects cache#10629
Introduce a hard capacity limit for the objects cache#10629snazy merged 3 commits intoprojectnessie:mainfrom
Conversation
| .expireAfter( | ||
| new Expiry<CacheKeyValue, Secret>() { |
There was a problem hiding this comment.
maybe switch to Expiry.creating(...)?
There was a problem hiding this comment.
That's a nice proposal!
It simplifies the code.
| .scheduler(Scheduler.systemScheduler()) | ||
| .maximumWeight(config.capacityMb() * 1024L * 1024L) | ||
| .weigher(this::weigher) | ||
| .expireAfter( |
There was a problem hiding this comment.
We need expireAfterCreate and ``expireAfterUpdate` here.
And would require a bigger change, as all supplying function are defined to provide the expiration timestamp.
There was a problem hiding this comment.
Oops, so Expiry.writing?
(but ignore if unhelpful refactor)
catalog/files/impl/src/main/java/org/projectnessie/catalog/files/s3/StsClientsPool.java
Outdated
Show resolved
Hide resolved
...rage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java
Show resolved
Hide resolved
Scheduler.systemScheduler for Caffeine caches|
@ben-manes Some problem is that the cache's retained total heap size can grow up to OOMs in case eviction cannot keep up. WDYT of having an optional way to not admit any new cache entries, when a bounded cache's capacity would be exceeded? |
8ab9900 to
97c7765
Compare
To avoid serializing all writes against an single lock for eviction, we decouple the hashmap and eviction policies using a ring buffer. That means most of the time the writer can update the hash table, queue up its work to replay on the policy, schedule the policy to be run, and return to the caller. If the write rate exceeds the eviction rate then the buffer fills up, causing back pressure by writers falling back to acquiring the eviction lock and performing the maintenance work themselves. In those cases the writer threads are descheduled on the lock and the eviction is able to do a batch of work when draining the buffer. The write buffer is sized to a max of What often happens is that eviction is stalled not by the write rate but due to the a stalled executor (defaults to I think we can rely on the write buffer causing back pressure because if the write rate truly exceeds the eviction rate then entry loading is extremely cheap (miss penalty), which makes the cache kind of pointless. Usually the problem is something stalling eviction so it builds up until forced through, so I'd look to see if adjusting your FJP usage or swapping the executor resolves it for you. You might see in your logs the cache's warning when it detects problems: "The cache is experiencing excessive wait times for acquiring the eviction lock. This may indicate that a long-running computation has halted eviction when trying to remove the victim entry. Consider using AsyncCache to decouple the computation from the map operation." |
| "", | ||
| x -> config.capacityMb()); | ||
| meterRegistry.gauge( | ||
| "current_weight", |
There was a problem hiding this comment.
Could you make this current_weight_mb to be comparable to cache_capacity_mb?
Also, how about renaming to cache_weight_mb?.. "current" is only at the time of measurement :)
There was a problem hiding this comment.
Can rename to _mb - but i don't think that renaming an existing metric's a good idea ;)
| "", | ||
| x -> currentWeight()); | ||
| meterRegistry.gauge( | ||
| "rejections", singletonList(Tag.of("cache", CACHE_NAME)), "", x -> rejections()); |
There was a problem hiding this comment.
Could we rename to cache_rejections to simplify finding it in dashboards?
...rage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java
Show resolved
Hide resolved
...rage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java
Show resolved
Hide resolved
| void cachePut(CacheKeyValue key, CacheKeyValue value) { | ||
| var w = weigher(key, value); | ||
| if (currentWeight.get() + w < admitWeight) { | ||
| currentWeight.addAndGet(w); |
There was a problem hiding this comment.
I wonder if we could leverage the cache's Policy.Eviction.weightedSize() 🤔
That method could cause inline eviction, so more work on put, but we'd get the benefit of actually evicting stale objects rather that rejecting fresh objects... WDYT?
There was a problem hiding this comment.
It's already above the "max weight" bound in such cases.
I'm a bit blind on the actual root cause, but what might happen in some cases is that the CPUs are all totally busy and eviction then cannot keep up. We don't use async-loading not use loader-function - it's all "dumb" I wrote a dumb isolated test with multiple threads (not using FJP) putting objects into a cache. Each thread runs in a tight loop w/ a Not sure whether the write-buffer size is a problem there, because from a "PI times thumb" estimation, I doubt that could exceed the total heap size. But I'm not 100% sure on that. Is there a way to tweak the write-buffer size? From some experiments locally, it seems that '.executor(Runnable::run)` is quite helpful. I'll dig more into this. Not sure whether we can get rid of the "overshoot thingy", but maybe we can. |
|
(I'm now into the "fun" situation to assert behaviors of concurrent things in a unit test... 🤦) |
|
I have a similar tool called Stresser (CI run) which shows some of the internal metrics. CI only runs on a few cores though. It is really easy to starve FJP since it is the hidden default most everywhere, but it only NUM_CPU threads which makes it pretty awful for I/O work where it is most often used due to being CompletableFuture's default, e.g. |
|
Oh yea, From some more testing:
This is the "tool" I've been initially running to simulate these "ugly" situation (not in Git - just a one-off thingy). Without the protection on package org.projectnessie.versioned.storage.cache;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.projectnessie.versioned.storage.cache.CaffeineCacheBackend.ARRAY_OVERHEAD;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Ticker;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
public class TestPushHard {
private static final long ONE_MB = 1024 * 1024;
@ParameterizedTest
@MethodSource
void testPushHard(int numAdders, boolean withScheduler) throws Exception {
var rt = Runtime.getRuntime();
var maxMemory = rt.maxMemory();
var capacity = maxMemory / 2;
var limitCapacity = capacity * 4 / 3;
var currentWeight = new AtomicLong();
var success = new AtomicLong();
var reject = new AtomicLong();
var builder =
Caffeine.newBuilder()
.maximumWeight(capacity)
.weigher(TestPushHard::weigh)
.expireAfter(Expiry.creating((k, v) -> Duration.of(1, ChronoUnit.HOURS)))
.removalListener(
(k, v, cause) -> {
currentWeight.addAndGet(-weigh(k, v));
})
.ticker(Ticker.systemTicker());
if (withScheduler) {
builder.scheduler(Scheduler.systemScheduler());
}
var cache = builder.build();
var idGen = new AtomicLong();
var executor = Executors.newFixedThreadPool(numAdders);
try {
var stop = new AtomicBoolean();
for (int i = 0; i < numAdders; i++) {
executor.execute(
() -> {
while (!stop.get()) {
byte[] v = new byte[ThreadLocalRandom.current().nextInt(4096, 65536)];
int weight = weigh(0L, v);
if (currentWeight.get() + weight < limitCapacity) {
cache.put(idGen.incrementAndGet(), v);
currentWeight.addAndGet(weight);
Thread.yield();
success.incrementAndGet();
} else {
reject.incrementAndGet();
}
}
});
}
for (long sec = 0; sec < 20; sec++) {
System.out.printf(
"weight: %dm ok:%d reject:%d - max: %dm free %dm total %dm%n",
currentWeight.get() / ONE_MB,
success.get(),
reject.get(),
rt.maxMemory() / ONE_MB,
rt.freeMemory() / ONE_MB,
rt.totalMemory() / ONE_MB);
Thread.sleep(1000L);
}
stop.set(true);
} finally {
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
static int weigh(Long key, byte[] value) {
return 24 // j.l.Long
+ ARRAY_OVERHEAD
+ value.length;
}
static Stream<Arguments> testPushHard() {
return Stream.of(2, 4, 6, 7, 8, 9, 10)
.flatMap(num -> Stream.of(arguments(num, true), arguments(num, false)));
}
} |
137b918 to
9dbd3b2
Compare
| .cacheCapacityOvershoot(0.1d) | ||
| // Production uses Runnable::run, but that lets this test sometimes run way too | ||
| // long, so we introduce some delay to simulate the case that eviction cannot keep up. | ||
| .executor(t -> delayedExecutor(1, TimeUnit.MILLISECONDS).execute(t)) |
There was a problem hiding this comment.
Note: this delay let's the test hit the relevant seen* assertions pretty quickly.
| public class TestCacheOvershoot { | ||
| @InjectSoftAssertions protected SoftAssertions soft; | ||
|
|
||
| @RepeatedTest(5) |
There was a problem hiding this comment.
Ran this one couple 1000 times locally - no failures.
| CaffeineCacheBackend backend1noSpy; | ||
| CaffeineCacheBackend backend2noSpy; | ||
| CaffeineCacheBackend backend1; | ||
| CaffeineCacheBackend backend2; |
There was a problem hiding this comment.
Need the concrete type here, otherwise Mockito complains about the new cachePut() being seen as an unwanted interaction, so had to add some .verify() for those.
|
yeah, large memory allocations makes it harder to juggle. Usually its high load times for small results like a cache in front of a db. Regardless the GC has its own required slack for concurrency, so its pretty easy to exhaust all of that with large objects. Sometimes soft references are okay as a safety net to let the GC know it can discard, but those rightfully have a bad reputation. The limit protection seems like a good approach for your usage. |
|
I suppose its worth mentioning that striping is how many caches try to solve this since eviction requires a global lock making it single threaded. Then you have N caches able to evict in parallel to better handle concurrent write throughput. This has the negative effect of reducing the max entry size to 1/Nth so larger entries can't be retained. Memcached is smarter by segmenting by object size, rather than hashing the LRU space, so they obtain a nicer balance by using dedicated regions (slab allocator). That makes a ton of sense if one is size aware like your off-heap cache, but lots of oddities if trying to do that for on-heap Java as you know far better than me. Otherwise caches often forgo trying to optimize eviction as reads are the common case, but this can also become unusably slow if not handled thoughtfully (e.g. ehcache's crazy behavior). |
| meterRegistry.gauge( | ||
| "cache_rejected_weight_mb", |
There was a problem hiding this comment.
Why not a DistributionSummary similar to caffeine's evicted weight metrics?
I'm not sure a "gauge" is correct here, because it's a cumulative value.
There was a problem hiding this comment.
True, a DS seems better here.
However, I also updates the other metrics to report the values as bytes, but also specify the base-unit bytes in the metric. The base-unit's exposed downstream, so the metrics are shown in the "correct" way.
| @VisibleForTesting | ||
| void cachePut(CacheKeyValue key, CacheKeyValue value) { | ||
| var w = weigher(key, value); | ||
| if (currentWeight.get() + w < admitWeight) { |
There was a problem hiding this comment.
Suggestion: how about calling cache.policy().eviction().ifPresent(Policy.Eviction::weightedSize) when we're about to reject?
This call will perform any delayed evictions, if I'm not mistaken. It will cause overhead on put, but only in case of overflow. The benefit would be that we'll get rid of stale entries rather than fresh entries.
My local testing shows that with this approach the cache size is contained with the "overshoot" cap. I can show more details tomorrow, but below is a quick summary. WDYT?
- A few thread put 256K objects into cache
- The limit is 1GB, with 0.1 overshoot
- X is the number of objects inserted
- Y is the increase in heap usage (after
System.gc()) - The lower line is the weight limit
- The higher line is the "admit" limit
- The circles represent
maincode - The dots represent this PR with the proposed change.
- There are no rejections.
There was a problem hiding this comment.
Suggestion: how about calling
cache.policy().eviction().ifPresent(Policy.Eviction::weightedSize)when we're about to reject?
No need to, all cache operations trigger eviction when necessary - and we always perform a getIfPresent before a put.
There was a problem hiding this comment.
This call will perform any delayed evictions, if I'm not mistaken. It will cause overhead on
put, but only in case of overflow. The benefit would be that we'll get rid of stale entries rather than fresh entries.My local testing shows that with this approach the cache size is contained with the "overshoot" cap. I can show more details tomorrow, but below is a quick summary. WDYT?
Yea - I see the same: no rejections at all. The argument that fresh entries wouldn't be shredded is key IMO. Caffeine's eviction algorithm helps with not evicting hot entries / evicts "one off objects" for us. That's IMO more important than paying the eviction-lock price twice.
There was a problem hiding this comment.
So the put-function now looks like this:
void cachePut(CacheKeyValue key, CacheKeyValue value) {
var w = weigher(key, value);
if (weightSupplier.getAsLong() + w < admitWeight) {
cache.put(key, value);
} else {
rejections.incrementAndGet();
rejectionsWeight.accept(w);
}
}where weightSupplier yields policy().eviction().weightedSize().
The interesting part now is that I didn't manage to run into the else branch, which means that the cache weight stays below the permitted "overshoot" of 10%. It does exceed the configured capacity though.
I think that it's still possible that shredding happens, but it seems very very unlikely. So I'd like to keep the shredding-safety-net.
There was a problem hiding this comment.
Okay, and now I already revert my statement above: I can manage to run into the else branch. With 16 threads (no yield() in the loop) putting objects into the cache it shreds. That's however a super-stress situation that's IMHO in no way realistic. With the yield() it doesn't shred - even with 16 threads (== number of cores of my CPU).
There was a problem hiding this comment.
Hm - but couldn't repro the repro again - mysteries of concurrency-testing...
Anyway, I think we're good now.
...rage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java
Outdated
Show resolved
Hide resolved
...rage/cache/src/main/java/org/projectnessie/versioned/storage/cache/CaffeineCacheBackend.java
Outdated
Show resolved
Hide resolved
| this.cache = cacheBuilder.build(); | ||
|
|
||
| var eviction = cache.policy().eviction().orElseThrow(); | ||
| weightSupplier = () -> eviction.weightedSize().orElse(0L); |
There was a problem hiding this comment.
I believe this might be an overkill... IIRC, in one of the previous revisions, we tracked the current weight separately, as a simple long value.
My idea for calling eviction.weightedSize() was for cases when the incoming object was going to exceed the admitWeight. That would cause inline eviction only when we're about to go over the hard limit (admittedly the overall behaviour would rely on the internals of the particular cache impl.).
This this particular state of the code, we're calling eviction.weightedSize() on every put (and so force eviction on every put).
Is that intended? If so, would you mind adding a comment to clarity that?
There was a problem hiding this comment.
This this particular state of the code, we're calling eviction.weightedSize() on every put (and so force eviction on every put).
That's not true. It only evicts when it's required to evict. See com.github.benmanes.caffeine.cache.BoundedLocalCache.BoundedPolicy.BoundedEviction#weightedSize
There was a problem hiding this comment.
When the cache is full, it'll always be required to evict, right? It will evict either inside put or here... as far as I can tell.
39895d7 to
5f9cb16
Compare
5f9cb16 to
94f65bd
Compare
| executor.awaitTermination(10, TimeUnit.MINUTES); | ||
| } | ||
|
|
||
| soft.assertThat(cache.currentWeightReported()).isLessThanOrEqualTo(admitWeight); |
There was a problem hiding this comment.
Minor: I believe after all insertions are done, current weight should not exceed the lower limit (no overshoot), because the executor performs eviction tasks immediately.
There was a problem hiding this comment.
I've seen it "overshoot" ;)
* Policy Store: PolicyMappingRecord with Persistence Impl (apache#1104) * Spark: Setup repository code structure and build (apache#1190) * Added freshness aware table loading using metadata file location for ETag (apache#1037) * Pulled in iceberg 1.8.0 spec changes for freshness aware table loading and added feature to Polaris * Changed etag support to use entityId:version tuple * fixed getresponse call * Changed etagged response to record and gave default implementation to ETaggableEntity * Made iceberg rest spec docs clearer * Added HTTP Compliant ETag and IfNoneMatch representations and separated persistence from etag logic * Changed ETag to be a record and improved semantics of IfNoneMatch * Fixed semantics of if none match * Removed ETag representation, consolidated in IfNoneMatch * fixed if none match parsing * Added table entity retrieval method to table operations * removed accidental commit of pycache folders * Fixed formatting * Changed to use metadata location hash * Ran formatting * use sha256 * Moved out ETag functions to utility class and removed ETaggedLoadTableResponse * Addressed comments * Fixed IcebergTableLikeEntity package rename * main: Update dependency io.opentelemetry.semconv:opentelemetry-semconv to v1.31.0 (apache#1288) * Update LICENSE and NOTICE in the distributions (admin and server) (apache#1258) * Gradle/Quarkus: make imageBuild task depend on jandex (apache#1290) * Core: Clarify the atomicity of BasePersistence methods (apache#1274) * Implement GenericTableCatalogAdapter (apache#1264) * rebase * more fixes * autolint * working on tests * stable test * autolint * polish * changes per review * some changes per review * grants * autolint * changes per review * changes per review * typofix * Improve code-containment and efficiency of etag-aware loading (apache#1296) * Improve code-containment and efficiency of etag-aware loading -Make the hash generation resilient against null metadataLocation -Use getResolvedPath instead of getPassthroughResolvedPath to avoid redundant persistence round-trip -Only try to calculate the etag for comparison against ifNoneMatch if the ifNoneMatch is actually provided * Add strict null-checking at callsites to generateETag, disallow passing null to generator * Add TODO to refactor shared logic for etag generation * Core: Add Endpoints and resource paths for Generic Table (apache#1286) * main: Update dependency com.nimbusds:nimbus-jose-jwt to v10.1 (apache#1299) * [JDBC] Part1 : ADD SQL script for Polaris setup (apache#1276) * main: Update registry.access.redhat.com/ubi9/openjdk-21-runtime Docker tag to v1.22-1.1743605859 (apache#1300) * done (apache#1297) * Add Polaris Community Meeting for April 3, 2025 (apache#1304) * Move varint to components/persistence and adopt package name * NoSQL: Move varint to components/persistence and adopt package name * NoSQL: Remove `RealmId` * Make `BasePolaritsMetaStoreManagerTest` and `(Base)ResolverTest` reusable Moves the test cases into the `Base*` classes and make sure the classes can be reused by other persistence implementations. * NoSQL: more changes * make telemetry work * implement Polaris policies * adopt cache to [Nessie](projectnessie/nessie#10629 change --------- Co-authored-by: Honah (Jonas) J. <honahx@apache.org> Co-authored-by: gh-yzou <167037035+gh-yzou@users.noreply.github.com> Co-authored-by: Mansehaj Singh <msehajs@gmail.com> Co-authored-by: Mend Renovate <bot@renovateapp.com> Co-authored-by: JB Onofré <jbonofre@apache.org> Co-authored-by: Alexandre Dutra <adutra@users.noreply.github.com> Co-authored-by: Yufei Gu <yufei@apache.org> Co-authored-by: Eric Maynard <eric.maynard+oss@snowflake.com> Co-authored-by: Dennis Huo <7410123+dennishuo@users.noreply.github.com> Co-authored-by: Prashant Singh <35593236+singhpk234@users.noreply.github.com>
Outstanding changes from projectnessie/nessie#10629 (other parts are already there)
|
FYI, another approach to consider is making an explicit call to cleanUp() after the write. This would be similar to not having a write buffer at all. The write buffer is meant to absorb a burst of modifications without serializing writers against an eviction lock, assuming the burst can be handled as a batch. The ConcurrentHashMap is always written first and then the policy, and its work is very cheap (but we can’t control the cost of callbacks or if the removal blocks on a colliding compute). So if you prefer a stricter bound by blocking writers from exceeding the eviction rate, calling cleanup should also do the trick. |

Implements a hard limit to prevent the cache's retained heap size to grow "infinitely" in case cache eviction cannot keep up with newly added entries. The default is to allow 10% on top of the cache-capacity.
The "overshoot" is necessary to eventually trigger the cache eviction.
Also adds the recommended
Scheduler.systemScheduler