Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,34 @@
import org.slf4j.LoggerFactory;

/**
* A {@link Weigher} implementation that calculates the weight of an entity based on its type. The
* weight is calculated as follows:
* A {@link Weigher} implementation that calculates the weight of an entity based on its type. In
* Caffeine's weight-based eviction, higher weights make entities MORE likely to be evicted (all
* else being equal), as the cache prefers to retain lighter entries within the maximum weight
* limit.
*
* <p>Weight assignments (lower weight = higher retention priority):
*
* <ul>
* <li>Metalake: 0, which means that it will never be evicted from the cache unless timeout occurs
* or manually cleared.
* <li>Catalog: 0, which means that it will never be evicted from the cache unless timeout occurs
* or manually cleared.
* <li>Schema: 500
* <li>Tag: 100
* <li>Policy: 100
* <li>Other: 200
* <li>Schema: 100 (lowest weight, highest retention priority)
* <li>Other (e.g., Fileset): 200 (medium weight, medium retention priority)
* <li>Tag: 500 (highest weight, lowest retention priority)
* <li>Policy: 500 (highest weight, lowest retention priority)
* </ul>
*
* <p>Note: Caffeine's W-TinyLFU algorithm considers both access frequency and weight. Frequently
* accessed heavier entries may still be retained over infrequently accessed lighter entries.
*/
public class EntityCacheWeigher implements Weigher<EntityCacheKey, List<Entity>> {
public static final int METALAKE_WEIGHT = 0; // 0 means never evict
public static final int CATALOG_WEIGHT = 0;
public static final int SCHEMA_WEIGHT = 500; // higher weight means it will less likely be evicted
public static final int SCHEMA_WEIGHT = 100; // Lower weight = higher retention priority
public static final int OTHER_WEIGHT = 200;
public static final int TAG_WEIGHT = 100;
public static final int POLICY_WEIGHT = 100;
public static final int TAG_WEIGHT = 500;
public static final int POLICY_WEIGHT = 500;
private static final Logger LOG = LoggerFactory.getLogger(EntityCacheWeigher.class.getName());
private static final EntityCacheWeigher INSTANCE = new EntityCacheWeigher();
private static final Map<Entity.EntityType, Integer> ENTITY_WEIGHTS =
Expand Down
75 changes: 57 additions & 18 deletions core/src/test/java/org/apache/gravitino/cache/TestCacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ void testDefaultCacheConfig() {
Assertions.assertTrue(config.get(Configs.CACHE_WEIGHER_ENABLED));
Assertions.assertEquals(10_000, config.get(Configs.CACHE_MAX_ENTRIES));
Assertions.assertEquals(3_600_000L, config.get(Configs.CACHE_EXPIRATION_TIME));
Assertions.assertEquals(9_000_000L, EntityCacheWeigher.getMaxWeight());
Assertions.assertEquals(24_200_000L, EntityCacheWeigher.getMaxWeight());
Assertions.assertEquals("caffeine", config.get(Configs.CACHE_IMPLEMENTATION));
}

@Test
void testPolicyAndTagCacheWeigher() throws InterruptedException {
Caffeine<Object, Object> builder = Caffeine.newBuilder();
builder.maximumWeight(2000);
builder.maximumWeight(5000);
builder.weigher(EntityCacheWeigher.getInstance());
Cache<EntityCacheRelationKey, List<Entity>> cache = builder.build();

Expand Down Expand Up @@ -152,22 +152,60 @@ void testPolicyAndTagCacheWeigher() throws InterruptedException {
List.of(fileset));
}

// Access filesets 5-14 twice to increase their frequency to 5 (insert + 4 gets)
for (int access = 0; access < 4; access++) {
for (int i = 5; i < 15; i++) {
String filesetName = "fileset" + i;
cache.getIfPresent(
EntityCacheRelationKey.of(
NameIdentifier.of(new String[] {"metalake1", "catalog1", "schema1", filesetName}),
Entity.EntityType.FILESET));
}
}

Thread.sleep(1000);

// There should no tag entities in the cache, because the weight of each tag entity is 100 that
// is higher than the maximum weight of the fileset entity which is 200.
Awaitility.await()
.atMost(Duration.ofSeconds(20))
.pollInterval(Duration.ofMillis(10))
.until(
() ->
IntStream.of(0, 1, 2, 3)
.mapToObj(i -> NameIdentifierUtil.ofTag("metalake", "tag" + i))
.allMatch(
tagNameIdent ->
cache.getIfPresent(
EntityCacheRelationKey.of(tagNameIdent, Entity.EntityType.TAG))
== null));
// Count how many filesets are still in cache
// Weight calculation: base(100) + filesets(15×200=3000) + tags(10×500=5000) = 8100 > 5000 limit
// Filesets 5-14 have freq=5, tags have freq=1. With frequency advantage + lighter weight,
// filesets should be strongly prioritized by Caffeine's W-TinyLFU
long remainingFilesets =
IntStream.range(5, 15)
.mapToObj(i -> "fileset" + i)
.filter(
filesetName ->
cache.getIfPresent(
EntityCacheRelationKey.of(
NameIdentifier.of(
new String[] {"metalake1", "catalog1", "schema1", filesetName}),
Entity.EntityType.FILESET))
!= null)
.count();

// Count how many tags are still in cache
long remainingTags =
IntStream.range(0, 10)
.mapToObj(i -> NameIdentifierUtil.ofTag("metalake", "tag" + i))
.filter(
tagNameIdent ->
cache.getIfPresent(
EntityCacheRelationKey.of(tagNameIdent, Entity.EntityType.TAG))
!= null)
.count();

// Verify weight-based eviction: filesets (weight=200, freq=5) should be strongly
// prioritized over tags (weight=500, freq=1) due to both higher frequency and lighter weight
Assertions.assertTrue(
remainingFilesets + remainingTags < 20,
String.format(
"Expected significant eviction due to weight limit (max=5000). Found filesets=%d, tags=%d (total=%d/20)",
remainingFilesets, remainingTags, remainingFilesets + remainingTags));

Assertions.assertTrue(
remainingFilesets > remainingTags,
String.format(
"Expected filesets (weight=200, freq=5) to be prioritized over tags (weight=500, freq=1). Found filesets=%d, tags=%d",
remainingFilesets, remainingTags));
}

@Test
Expand Down Expand Up @@ -240,11 +278,12 @@ void testCaffeineCacheWithWeight() throws Exception {
NameIdentifier.of("metalake1.catalog" + i), Entity.EntityType.CATALOG)));
}

// Only some of the 100 schemas are still in the cache, to be exact, 5000 / 500 = 10 schemas.
// Only some of the 100 schemas are still in the cache.
// With new weights: schema=100, so approximately 5000 / 100 = 50 schemas fit.
Awaitility.await()
.atMost(Duration.ofSeconds(5))
.pollInterval(Duration.ofMillis(10))
.until(() -> cache.asMap().size() == 10 + 3 + 5000 / 500);
.until(() -> cache.asMap().size() == 10 + 3 + 5000 / 100);
}

@Test
Expand Down