Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger refresh when shard becomes search active #96321

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
40a5b2a
iter
martijnvg May 24, 2023
266aa36
different approach: perform refersh forcefully on search thread and n…
martijnvg May 26, 2023
cb27754
iter
martijnvg May 30, 2023
a6e0180
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg May 30, 2023
7a3f342
also check for pending refreshes
martijnvg May 30, 2023
60d92ce
muted test
martijnvg May 31, 2023
d2f4b43
just keep using awaitShardSearchActive(...) instead
martijnvg May 31, 2023
c9a8192
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg Jun 1, 2023
055c31b
Renamed awaitShardSearchActive(...) to makeShardSearchActive(...) and
martijnvg Jun 1, 2023
14a5a44
Renamed makeShardSearchActive(...) to ensureShardSearchActive(...).
martijnvg Jun 1, 2023
3eb8075
update jdocs and use maybeRefresh() instead of scheduledRefresh()
martijnvg Jun 1, 2023
fb9ebfe
Update docs/changelog/96321.yaml
martijnvg Jun 1, 2023
329620d
fix changelog
martijnvg Jun 1, 2023
d427641
added assertion that only allows search and get threads.
martijnvg Jun 1, 2023
a8e18c3
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg Jun 6, 2023
803832e
always perform the refresh with a refresh thread.
martijnvg Jun 6, 2023
109d335
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg Jun 6, 2023
00329a6
fix spelling
martijnvg Jun 9, 2023
c405461
Merge branch 'main' into tsdb/trigger_refresh_when_becoming_search_ac…
elasticmachine Jun 9, 2023
afef5e1
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg Jun 12, 2023
410e6fb
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg Jun 13, 2023
7134f1d
Merge remote-tracking branch 'es/main' into tsdb/trigger_refresh_when…
martijnvg Jun 14, 2023
7dc8173
update docs to reflect the new idle shard refresh behaviour
martijnvg Jun 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/96321.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 96321
summary: Trigger refresh when shard becomes search active
area: Engine
type: enhancement
issues:
- 95544
6 changes: 3 additions & 3 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ are ignored for this index.
refresh. If this setting is not explicitly set, shards that haven't seen
search traffic for at least `index.search.idle.after` seconds will not receive
background refreshes until they receive a search request. Searches that hit an
idle shard where a refresh is pending will wait for the next background
refresh (within `1s`). This behavior aims to automatically optimize bulk
indexing in the default case when no searches are performed. In order to opt
idle shard where a refresh is pending will trigger a refresh as part of the
search operation for that shard only. This behavior aims to automatically optimize
bulk indexing in the default case when no searches are performed. In order to opt
out of this behavior an explicit value of `1s` should set as the refresh
interval.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void asyncShardOperation(ExplainRequest request, ShardId shardId, Acti
throws IOException {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitShardSearchActive(b -> {
indexShard.ensureShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionLi
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
asyncGet(request, shardId, listener);
} else {
indexShard.awaitShardSearchActive(b -> {
indexShard.ensureShardSearchActive(b -> {
try {
asyncGet(request, shardId, listener);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
asyncShardMultiGet(request, shardId, listener);
} else {
indexShard.awaitShardSearchActive(b -> {
indexShard.ensureShardSearchActive(b -> {
try {
asyncShardMultiGet(request, shardId, listener);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected void asyncShardOperation(TermVectorsRequest request, ShardId shardId,
if (request.realtime()) { // it's a realtime request which is not subject to refresh cycles
super.asyncShardOperation(request, shardId, listener);
} else {
indexShard.awaitShardSearchActive(b -> {
indexShard.ensureShardSearchActive(b -> {
try {
super.asyncShardOperation(request, shardId, listener);
} catch (Exception ex) {
Expand Down
24 changes: 20 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -3829,20 +3829,36 @@ public void afterRefresh(boolean didRefresh) {
}

/**
* Registers the given listener and invokes it once the shard is active again and all
* pending refresh translog location has been refreshed. If there is no pending refresh location registered the listener will be
* invoked immediately.
* Ensures this shard is search active before invoking the provided listener.
* <p>
* This is achieved by registering a refresh listener and invoking the provided listener from the refresh listener once the shard is
* active again and all pending refresh translog location has been refreshed. A refresh may be executed to avoid waiting for
* {@link #scheduledRefresh(ActionListener)} to be invoked. If there is no pending refresh location registered the provided listener
* will be invoked immediately.
*
* @param listener the listener to invoke once the pending refresh location is visible. The listener will be called with
* <code>true</code> if the listener was registered to wait for a refresh.
*/
public final void awaitShardSearchActive(Consumer<Boolean> listener) {
public final void ensureShardSearchActive(Consumer<Boolean> listener) {
markSearcherAccessed(); // move the shard into non-search idle
martijnvg marked this conversation as resolved.
Show resolved Hide resolved
final Translog.Location location = pendingRefreshLocation.get();
if (location != null) {
addRefreshListener(location, (result) -> {
pendingRefreshLocation.compareAndSet(location, null);
listener.accept(true);
});
// trigger a refresh to avoid waiting for scheduledRefresh(...) to be invoked from index level refresh scheduler.
// (The if statement should avoid doing an additional refresh if scheduled refresh was invoked between getting
// the current refresh location and adding a refresh listener.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, addRefreshListener might have performed the refresh already in edge cases.

if (location == pendingRefreshLocation.get()) {
// This method may be called from many different threads including transport_worker threads and
// a refresh can be a costly operation, so we should fork to a refresh thread to be safe:
threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> {
if (location == pendingRefreshLocation.get()) {
getEngine().maybeRefresh("ensure-shard-search-active", PlainActionFuture.newFuture());
}
});
}
} else {
listener.accept(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard shard = indexService.getShard(shardId.id());
final SearchOperationListener searchOperationListener = shard.getSearchOperationListener();
shard.awaitShardSearchActive(ignored -> {
shard.ensureShardSearchActive(ignored -> {
Engine.SearcherSupplier searcherSupplier = null;
ReaderContext readerContext = null;
try {
Expand Down Expand Up @@ -1654,8 +1654,7 @@ private void rewriteAndFetchShardRequest(IndexShard shard, ShardSearchRequest re
if (request.readerId() != null) {
l.onResponse(request);
} else {
// now we need to check if there is a pending refresh and register
shard.awaitShardSearchActive(b -> l.onResponse(request));
shard.ensureShardSearchActive(b -> l.onResponse(request));
}
});
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3764,7 +3764,7 @@ public void testIsSearchIdle() throws Exception {
assertBusy(() -> assertTrue(primary.isSearchIdle()));
do {
// now loop until we are fast enough... shouldn't take long
primary.awaitShardSearchActive(aBoolean -> {});
primary.ensureShardSearchActive(aBoolean -> {});
if (primary.isSearchIdle()) {
assertTrue(primary.searchIdleTime() >= tenMillis.millis());
}
Expand All @@ -3782,6 +3782,7 @@ public void testIsSearchIdle() throws Exception {
}

public void testScheduledRefresh() throws Exception {
// Setup and make shard search idle:
Settings settings = indexSettings(Version.CURRENT, 1, 1).build();
IndexMetadata metadata = IndexMetadata.builder("test").putMapping("""
{ "properties": { "foo": { "type": "text"}}}""").settings(settings).primaryTerm(0, 1).build();
Expand All @@ -3796,43 +3797,40 @@ public void testScheduledRefresh() throws Exception {
settings = Settings.builder().put(settings).put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO).build();
scopedSettings.applySettings(settings);

// Index document and ensure refresh is needed but not performed:
assertFalse(primary.getEngine().refreshNeeded());
indexDoc(primary, "_doc", "1", "{\"foo\" : \"bar\"}");
assertTrue(primary.getEngine().refreshNeeded());
long lastSearchAccess = primary.getLastSearcherAccess();
// Now since shard is search idle scheduleRefresh(...) shouldn't refresh even if a refresh is needed:
PlainActionFuture<Boolean> future2 = PlainActionFuture.newFuture();
primary.scheduledRefresh(future2);
assertFalse(future2.actionGet());
assertEquals(lastSearchAccess, primary.getLastSearcherAccess());
// wait until the thread-pool has moved the timestamp otherwise we can't assert on this below
assertBusy(() -> assertThat(primary.getThreadPool().relativeTimeInMillis(), greaterThan(lastSearchAccess)));
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because makeShardSearchActive(...) now triggers a refresh, we can no longer invoke this method many times without a refresh to happen. Before this was possible in this test and then just invoke scheduleRefresh(...) and all the listeners were invoked.

primary.awaitShardSearchActive(refreshed -> {
assertTrue(refreshed);
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(2, searcher.getIndexReader().numDocs());
} finally {
latch.countDown();
}
});
}

// Make shard search active again and ensure previously index document is visible:
CountDownLatch latch = new CountDownLatch(1);
primary.ensureShardSearchActive(refreshed -> {
assertTrue(refreshed);
latch.countDown();
});
latch.await();
assertNotEquals(
"awaitShardSearchActive must access a searcher to remove search idle state",
lastSearchAccess,
primary.getLastSearcherAccess()
);
assertTrue(lastSearchAccess < primary.getLastSearcherAccess());
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(1, searcher.getIndexReader().numDocs());
assertEquals(2, searcher.getIndexReader().numDocs());
}
assertTrue(primary.getEngine().refreshNeeded());
PlainActionFuture<Boolean> future3 = PlainActionFuture.newFuture();
primary.scheduledRefresh(future3);
assertTrue(future3.actionGet());
latch.await();

// No documents were added and shard is search active so makeShardSearchActive(...) should behave like a noop:
assertFalse(primary.getEngine().refreshNeeded());
CountDownLatch latch1 = new CountDownLatch(1);
primary.awaitShardSearchActive(refreshed -> {
primary.ensureShardSearchActive(refreshed -> {
assertFalse(refreshed);
try (Engine.Searcher searcher = primary.acquireSearcher("test")) {
assertEquals(2, searcher.getIndexReader().numDocs());
Expand All @@ -3843,6 +3841,7 @@ public void testScheduledRefresh() throws Exception {
});
latch1.await();

// Index a document while shard is search active and ensure scheduleRefresh(...) makes documen visible:
indexDoc(primary, "_doc", "2", "{\"foo\" : \"bar\"}");
PlainActionFuture<Boolean> future4 = PlainActionFuture.newFuture();
primary.scheduledRefresh(future4);
Expand Down