Skip to content

Commit 46884f0

Browse files
authored
[Transform] Execute _refresh separately from DBQ, with system permissions. (#88005)
1 parent f562213 commit 46884f0

File tree

4 files changed

+16
-4
lines changed

4 files changed

+16
-4
lines changed

docs/changelog/88005.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 88005
2+
summary: "Execute `_refresh` separately from DBQ, with system permissions"
3+
area: Transform
4+
type: bug
5+
issues:
6+
- 88001

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/RetentionPolicyToDeleteByQueryRequestConverter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,12 @@ static DeleteByQueryRequest buildDeleteByQueryRequest(
7272
/* other dbq options not set and why:
7373
* - timeout: we do not timeout for search, so we don't timeout for dbq
7474
* - batch size: don't use max page size search, dbq should be simple
75+
* - refresh: we call refresh separately, after DBQ is executed because refresh should be executed with system permissions
7576
*/
7677
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES)
7778
.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)
7879
// this should not happen, but still go over version conflicts and report later
7980
.setAbortOnVersionConflict(false)
80-
// refresh the index, so docs are really gone
81-
.setRefresh(true)
8281
// use transforms retry mechanics instead
8382
.setMaxRetries(0)
8483
.indices(destConfig.getIndex());

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,11 @@ private void executeRetentionPolicy(ActionListener<Void> listener) {
463463
);
464464
getStats().markStartDelete();
465465

466+
ActionListener<RefreshResponse> deleteByQueryAndRefreshDoneListener = ActionListener.wrap(
467+
refreshResponse -> finalizeCheckpoint(listener),
468+
listener::onFailure
469+
);
470+
466471
doDeleteByQuery(deleteByQuery, ActionListener.wrap(bulkByScrollResponse -> {
467472
logger.trace(() -> format("[%s] dbq response: [%s]", getJobId(), bulkByScrollResponse));
468473

@@ -493,7 +498,9 @@ private void executeRetentionPolicy(ActionListener<Void> listener) {
493498
return;
494499
}
495500

496-
finalizeCheckpoint(listener);
501+
// Since we configure DBQ request *not* to perform a refresh, we need to perform the refresh manually.
502+
// This separation ensures that the DBQ runs with user permissions and the refresh runs with system permissions.
503+
refreshDestinationIndex(deleteByQueryAndRefreshDoneListener);
497504
}, listener::onFailure));
498505
}
499506

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/RetentionPolicyConfigToDeleteByQueryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testTimeBasedRetentionPolicyConfig() {
7777
assertNotNull(deleteByQueryRequest.getSearchRequest().source());
7878
assertNotNull(deleteByQueryRequest.getSearchRequest().source().query());
7979

80-
assertTrue(deleteByQueryRequest.isRefresh());
80+
assertFalse(deleteByQueryRequest.isRefresh());
8181
assertEquals(0, deleteByQueryRequest.getMaxRetries());
8282
assertEquals(1, deleteByQueryRequest.getSearchRequest().indices().length);
8383
assertEquals(destConfig.getIndex(), deleteByQueryRequest.getSearchRequest().indices()[0]);

0 commit comments

Comments
 (0)