Skip to content

Commit ee09a1b

Browse files
committed
Revert "Improving statsByShard performance when the number of shards is very large (elastic#130857)" (elastic#137973)
This reverts commit 22c15bc. (cherry picked from commit e138f5f) # Conflicts: # server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java
1 parent a662521 commit ee09a1b

File tree

11 files changed

+93
-231
lines changed

11 files changed

+93
-231
lines changed

docs/changelog/130857.yaml

Lines changed: 0 additions & 6 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@
4646
import org.elasticsearch.index.seqno.RetentionLeaseStats;
4747
import org.elasticsearch.index.seqno.SeqNoStats;
4848
import org.elasticsearch.index.shard.IndexShard;
49-
import org.elasticsearch.index.shard.ShardId;
50-
import org.elasticsearch.indices.IndicesQueryCache;
5149
import org.elasticsearch.indices.IndicesService;
5250
import org.elasticsearch.injection.guice.Inject;
5351
import org.elasticsearch.node.NodeService;
@@ -254,12 +252,9 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
254252
false,
255253
false
256254
);
257-
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
258255
List<ShardStats> shardsStats = new ArrayList<>();
259256
for (IndexService indexService : indicesService) {
260257
for (IndexShard indexShard : indexService) {
261-
// get the shared ram for this shard id (or zero if there's nothing in the map)
262-
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
263258
cancellableTask.ensureNotCancelled();
264259
if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) {
265260
// only report on fully started shards
@@ -280,7 +275,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
280275
new ShardStats(
281276
indexShard.routingEntry(),
282277
indexShard.shardPath(),
283-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS, sharedRam),
278+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS),
284279
commitStats,
285280
seqNoStats,
286281
retentionLeaseStats,
@@ -308,7 +303,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
308303
clusterStatus,
309304
nodeInfo,
310305
nodeStats,
311-
shardsStats.toArray(new ShardStats[0]),
306+
shardsStats.toArray(new ShardStats[shardsStats.size()]),
312307
searchUsageStats,
313308
repositoryUsageStats,
314309
ccsTelemetry,
@@ -477,7 +472,7 @@ protected void sendItemRequest(String clusterAlias, ActionListener<RemoteCluster
477472
@Override
478473
protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) {
479474
if (response != null) {
480-
remoteClustersStats.computeIfPresent(clusterAlias, (ignored, v) -> v.acceptResponse(response));
475+
remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response));
481476
}
482477
}
483478

server/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,7 @@ public CommonStats(CommonStatsFlags flags) {
154154
/**
155155
* Filters the given flags for {@link CommonStatsFlags#SHARD_LEVEL} flags and calculates the corresponding statistics.
156156
*/
157-
public static CommonStats getShardLevelStats(
158-
IndicesQueryCache indicesQueryCache,
159-
IndexShard indexShard,
160-
CommonStatsFlags flags,
161-
long precomputedSharedRam
162-
) {
157+
public static CommonStats getShardLevelStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, CommonStatsFlags flags) {
163158
// Filter shard level flags
164159
CommonStatsFlags filteredFlags = flags.clone();
165160
for (CommonStatsFlags.Flag flag : filteredFlags.getFlags()) {
@@ -179,7 +174,7 @@ public static CommonStats getShardLevelStats(
179174
case Refresh -> stats.refresh = indexShard.refreshStats();
180175
case Flush -> stats.flush = indexShard.flushStats();
181176
case Warmer -> stats.warmer = indexShard.warmerStats();
182-
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId(), precomputedSharedRam);
177+
case QueryCache -> stats.queryCache = indicesQueryCache.getStats(indexShard.shardId());
183178
case FieldData -> stats.fieldData = indexShard.fieldDataStats(flags.fieldDataFields());
184179
case Completion -> stats.completion = indexShard.completionStats(flags.completionDataFields());
185180
case Segments -> stats.segments = indexShard.segmentStats(

server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.index.seqno.RetentionLeaseStats;
2727
import org.elasticsearch.index.seqno.SeqNoStats;
2828
import org.elasticsearch.index.shard.IndexShard;
29-
import org.elasticsearch.indices.IndicesQueryCache;
3029
import org.elasticsearch.indices.IndicesService;
3130
import org.elasticsearch.injection.guice.Inject;
3231
import org.elasticsearch.tasks.CancellableTask;
@@ -111,13 +110,7 @@ protected void shardOperation(IndicesStatsRequest request, ShardRouting shardRou
111110
assert task instanceof CancellableTask;
112111
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
113112
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
114-
long sharedRam = IndicesQueryCache.getSharedRamSizeForShard(indicesService, indexShard.shardId());
115-
CommonStats commonStats = CommonStats.getShardLevelStats(
116-
indicesService.getIndicesQueryCache(),
117-
indexShard,
118-
request.flags(),
119-
sharedRam
120-
);
113+
CommonStats commonStats = CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, request.flags());
121114
CommitStats commitStats;
122115
SeqNoStats seqNoStats;
123116
RetentionLeaseStats retentionLeaseStats;

server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java

Lines changed: 33 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,12 @@
2828
import org.elasticsearch.common.unit.ByteSizeValue;
2929
import org.elasticsearch.core.Nullable;
3030
import org.elasticsearch.core.Predicates;
31-
import org.elasticsearch.index.IndexService;
3231
import org.elasticsearch.index.cache.query.QueryCacheStats;
33-
import org.elasticsearch.index.shard.IndexShard;
3432
import org.elasticsearch.index.shard.ShardId;
3533

3634
import java.io.Closeable;
3735
import java.io.IOException;
3836
import java.util.Collections;
39-
import java.util.HashMap;
4037
import java.util.IdentityHashMap;
4138
import java.util.Map;
4239
import java.util.Set;
@@ -72,38 +69,6 @@ public class IndicesQueryCache implements QueryCache, Closeable {
7269
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
7370
private volatile long sharedRamBytesUsed;
7471

75-
/**
76-
* Calculates a map of {@link ShardId} to {@link Long} which contains the calculated share of the {@link IndicesQueryCache} shared ram
77-
* size for a given shard (that is, the sum of all the longs is the size of the indices query cache). Since many shards will not
78-
* participate in the cache, shards whose calculated share is zero will not be contained in the map at all. As a consequence, the
79-
* correct pattern for using the returned map will be via {@link Map#getOrDefault(Object, Object)} with a {@code defaultValue} of
80-
* {@code 0L}.
81-
*/
82-
public static Map<ShardId, Long> getSharedRamSizeForAllShards(IndicesService indicesService) {
83-
Map<ShardId, Long> shardIdToSharedRam = new HashMap<>();
84-
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
85-
for (IndexService indexService : indicesService) {
86-
for (IndexShard indexShard : indexService) {
87-
final var queryCache = indicesService.getIndicesQueryCache();
88-
long sharedRam = (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(indexShard.shardId(), cacheTotals);
89-
// as a size optimization, only store non-zero values in the map
90-
if (sharedRam > 0L) {
91-
shardIdToSharedRam.put(indexShard.shardId(), sharedRam);
92-
}
93-
}
94-
}
95-
return Collections.unmodifiableMap(shardIdToSharedRam);
96-
}
97-
98-
public long getCacheSizeForShard(ShardId shardId) {
99-
Stats stats = shardStats.get(shardId);
100-
return stats != null ? stats.cacheSize : 0L;
101-
}
102-
103-
public long getSharedRamBytesUsed() {
104-
return sharedRamBytesUsed;
105-
}
106-
10772
// This is a hack for the fact that the close listener for the
10873
// ShardCoreKeyMap will be called before onDocIdSetEviction
10974
// See onDocIdSetEviction for more info
@@ -126,58 +91,40 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) {
12691
return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats();
12792
}
12893

129-
/**
130-
* This computes the total cache size in bytes, and the total shard count in the cache for all shards.
131-
* @param indicesService
132-
* @return A CacheTotals object containing the computed total number of items in the cache and the number of shards seen in the cache
133-
*/
134-
private static CacheTotals getCacheTotalsForAllShards(IndicesService indicesService) {
135-
IndicesQueryCache queryCache = indicesService.getIndicesQueryCache();
136-
boolean hasQueryCache = queryCache != null;
94+
private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
95+
if (sharedRamBytesUsed == 0L) {
96+
return 0L;
97+
}
98+
99+
/*
100+
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
101+
* shard.
102+
*/
103+
// TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222
137104
long totalItemsInCache = 0L;
138105
int shardCount = 0;
139-
for (final IndexService indexService : indicesService) {
140-
for (final IndexShard indexShard : indexService) {
141-
final var shardId = indexShard.shardId();
142-
long cacheSize = hasQueryCache ? queryCache.getCacheSizeForShard(shardId) : 0L;
143-
shardCount++;
144-
assert cacheSize >= 0 : "Unexpected cache size of " + cacheSize + " for shard " + shardId;
145-
totalItemsInCache += cacheSize;
106+
if (itemsInCacheForShard == 0L) {
107+
for (final var stats : shardStats.values()) {
108+
shardCount += 1;
109+
if (stats.cacheSize > 0L) {
110+
// some shard has nonzero cache footprint, so we apportion the shared size by cache footprint, and this shard has none
111+
return 0L;
112+
}
113+
}
114+
} else {
115+
// branchless loop for the common case
116+
for (final var stats : shardStats.values()) {
117+
shardCount += 1;
118+
totalItemsInCache += stats.cacheSize;
146119
}
147-
}
148-
return new CacheTotals(totalItemsInCache, shardCount);
149-
}
150-
151-
public static long getSharedRamSizeForShard(IndicesService indicesService, ShardId shardId) {
152-
IndicesQueryCache.CacheTotals cacheTotals = IndicesQueryCache.getCacheTotalsForAllShards(indicesService);
153-
final var queryCache = indicesService.getIndicesQueryCache();
154-
return (queryCache == null) ? 0L : queryCache.getSharedRamSizeForShard(shardId, cacheTotals);
155-
}
156-
157-
/**
158-
* This method computes the shared RAM size in bytes for the given indexShard.
159-
* @param shardId The shard to compute the shared RAM size for
160-
* @param cacheTotals Shard totals computed in getCacheTotalsForAllShards()
161-
* @return the shared RAM size in bytes allocated to the given shard, or 0 if unavailable
162-
*/
163-
private long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals) {
164-
long sharedRamBytesUsed = getSharedRamBytesUsed();
165-
if (sharedRamBytesUsed == 0L) {
166-
return 0L;
167120
}
168121

169-
int shardCount = cacheTotals.shardCount();
170122
if (shardCount == 0) {
171123
// Sometimes it's not possible to do this when there are no shard entries at all, which can happen as the shared ram usage can
172124
// extend beyond the closing of all shards.
173125
return 0L;
174126
}
175-
/*
176-
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
177-
* shard.
178-
*/
179-
long totalItemsInCache = cacheTotals.totalItemsInCache();
180-
long itemsInCacheForShard = getCacheSizeForShard(shardId);
127+
181128
final long additionalRamBytesUsed;
182129
if (totalItemsInCache == 0) {
183130
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
@@ -198,12 +145,10 @@ private long getSharedRamSizeForShard(ShardId shardId, CacheTotals cacheTotals)
198145
return additionalRamBytesUsed;
199146
}
200147

201-
private record CacheTotals(long totalItemsInCache, int shardCount) {}
202-
203148
/** Get usage statistics for the given shard. */
204-
public QueryCacheStats getStats(ShardId shard, long precomputedSharedRamBytesUsed) {
149+
public QueryCacheStats getStats(ShardId shard) {
205150
final QueryCacheStats queryCacheStats = toQueryCacheStatsSafe(shardStats.get(shard));
206-
queryCacheStats.addRamBytesUsed(precomputedSharedRamBytesUsed);
151+
queryCacheStats.addRamBytesUsed(getShareOfAdditionalRamBytesUsed(queryCacheStats.getCacheSize()));
207152
return queryCacheStats;
208153
}
209154

@@ -312,7 +257,7 @@ QueryCacheStats toQueryCacheStats() {
312257
public String toString() {
313258
return "{shardId="
314259
+ shardId
315-
+ ", ramBytesUsed="
260+
+ ", ramBytedUsed="
316261
+ ramBytesUsed
317262
+ ", hitCount="
318263
+ hitCount
@@ -409,7 +354,11 @@ protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
409354
shardStats.cacheCount += 1;
410355
shardStats.ramBytesUsed += ramBytesUsed;
411356

412-
StatsAndCount statsAndCount = stats2.computeIfAbsent(readerCoreKey, ignored -> new StatsAndCount(shardStats));
357+
StatsAndCount statsAndCount = stats2.get(readerCoreKey);
358+
if (statsAndCount == null) {
359+
statsAndCount = new StatsAndCount(shardStats);
360+
stats2.put(readerCoreKey, statsAndCount);
361+
}
413362
statsAndCount.count += 1;
414363
}
415364

@@ -422,7 +371,7 @@ protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sum
422371
if (numEntries > 0) {
423372
// We can't use ShardCoreKeyMap here because its core closed
424373
// listener is called before the listener of the cache which
425-
// triggers this eviction. So instead we use stats2 that
374+
// triggers this eviction. So instead we use use stats2 that
426375
// we only evict when nothing is cached anymore on the segment
427376
// instead of relying on close listeners
428377
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -497,36 +497,33 @@ static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService,
497497
}
498498

499499
static Map<Index, List<IndexShardStats>> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) {
500-
Map<ShardId, Long> shardIdToSharedRam = IndicesQueryCache.getSharedRamSizeForAllShards(indicesService);
501500
final Map<Index, List<IndexShardStats>> statsByShard = new HashMap<>();
501+
502502
for (final IndexService indexService : indicesService) {
503503
for (final IndexShard indexShard : indexService) {
504-
// get the shared ram for this shard id (or zero if there's nothing in the map)
505-
long sharedRam = shardIdToSharedRam.getOrDefault(indexShard.shardId(), 0L);
506504
try {
507-
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags, sharedRam);
505+
final IndexShardStats indexShardStats = indicesService.indexShardStats(indicesService, indexShard, flags);
506+
508507
if (indexShardStats == null) {
509508
continue;
510509
}
510+
511511
if (statsByShard.containsKey(indexService.index()) == false) {
512512
statsByShard.put(indexService.index(), arrayAsArrayList(indexShardStats));
513513
} else {
514514
statsByShard.get(indexService.index()).add(indexShardStats);
515515
}
516516
} catch (IllegalIndexShardStateException | AlreadyClosedException e) {
517+
// we can safely ignore illegal state on ones that are closing for example
517518
logger.trace(() -> format("%s ignoring shard stats", indexShard.shardId()), e);
518519
}
519520
}
520521
}
522+
521523
return statsByShard;
522524
}
523525

524-
IndexShardStats indexShardStats(
525-
final IndicesService indicesService,
526-
final IndexShard indexShard,
527-
final CommonStatsFlags flags,
528-
final long precomputedSharedRam
529-
) {
526+
IndexShardStats indexShardStats(final IndicesService indicesService, final IndexShard indexShard, final CommonStatsFlags flags) {
530527
if (indexShard.routingEntry() == null) {
531528
return null;
532529
}
@@ -551,7 +548,7 @@ IndexShardStats indexShardStats(
551548
new ShardStats(
552549
indexShard.routingEntry(),
553550
indexShard.shardPath(),
554-
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags, precomputedSharedRam),
551+
CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, flags),
555552
commitStats,
556553
seqNoStats,
557554
retentionLeaseStats,

server/src/test/java/org/elasticsearch/action/admin/cluster/stats/VersionStatsTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void testCreation() {
115115
ShardStats shardStats = new ShardStats(
116116
shardRouting,
117117
new ShardPath(false, path, path, shardRouting.shardId()),
118-
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store), 0L),
118+
CommonStats.getShardLevelStats(null, indexShard, new CommonStatsFlags(CommonStatsFlags.Flag.Store)),
119119
null,
120120
null,
121121
null,

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1620,7 +1620,7 @@ public void testShardStats() throws IOException {
16201620
ShardStats stats = new ShardStats(
16211621
shard.routingEntry(),
16221622
shard.shardPath(),
1623-
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags(), 0L),
1623+
CommonStats.getShardLevelStats(new IndicesQueryCache(Settings.EMPTY), shard, new CommonStatsFlags()),
16241624
shard.commitStats(),
16251625
shard.seqNoStats(),
16261626
shard.getRetentionLeaseStats(),

0 commit comments

Comments
 (0)