diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 6a3df400275ea..6e8604e393f26 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -91,6 +91,7 @@ class ClientTransformIndexer extends TransformIndexer { private final ClusterService clusterService; private final IndexNameExpressionResolver indexNameExpressionResolver; private final Settings destIndexSettings; + private final boolean crossProjectEnabled; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); private final AtomicReference seqNoPrimaryTermAndIndexHolder; @@ -140,6 +141,7 @@ class ClientTransformIndexer extends TransformIndexer { context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint); disablePit = TransformEffectiveSettings.isPitDisabled(transformConfig.getSettings()); + crossProjectEnabled = transformServices.crossProjectModeDecider().crossProjectEnabled(); } @Override @@ -536,7 +538,10 @@ private void injectPointInTimeIfNeeded( SearchRequest searchRequest = namedSearchRequest.v2(); // We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems. // We should not re-enable until this is resolved: https://github.com/elastic/elasticsearch/issues/80187 - if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) { + if (disablePit + || searchRequest.indices().length == 0 + || transformConfig.getSource().requiresRemoteCluster() + || crossProjectEnabled) { listener.onResponse(namedSearchRequest); return; } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index 1da0dca970fca..0f9176433a075 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -272,44 +273,7 @@ public void testDisablePit() throws InterruptedException { try (var threadPool = createThreadPool()) { final var client = new PitMockClient(threadPool, true); - MockClientTransformIndexer indexer = new MockClientTransformIndexer( - mock(ThreadPool.class), - mock(ClusterService.class), - mock(IndexNameExpressionResolver.class), - mock(TransformExtension.class), - new TransformServices( - mock(IndexBasedTransformConfigManager.class), - mock(TransformCheckpointService.class), - mock(TransformAuditor.class), - new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), - mock(TransformNode.class), - mock(CrossProjectModeDecider.class) - ), - mock(CheckpointProvider.class), - new AtomicReference<>(IndexerState.STOPPED), - null, - new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")), - mock(TransformIndexerStats.class), - config, - null, - new TransformCheckpoint( - "transform", - Instant.now().toEpochMilli(), - 0L, - Collections.emptyMap(), - Instant.now().toEpochMilli() - ), - new TransformCheckpoint( - "transform", - Instant.now().toEpochMilli(), - 2L, - Collections.emptyMap(), - Instant.now().toEpochMilli() - ), - new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), - mock(TransformContext.class), - false - ); + var indexer = createIndexer(client, config); this.assertAsync(listener -> indexer.doNextSearch(0, listener), response -> { if (pitEnabled) { @@ -337,63 +301,87 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc // Remote index is configured within source .setSource(new SourceConfig("remote-cluster:remote-index")) .build(); - boolean pitEnabled = TransformEffectiveSettings.isPitDisabled(config.getSettings()) == false; try (var threadPool = createThreadPool()) { final var client = new PitMockClient(threadPool, true); - MockClientTransformIndexer indexer = new MockClientTransformIndexer( - mock(ThreadPool.class), - mock(ClusterService.class), - mock(IndexNameExpressionResolver.class), - mock(TransformExtension.class), + MockClientTransformIndexer indexer = createIndexer(client, config); + assertPitIsNeverUsed(indexer); + } + } + + private MockClientTransformIndexer createIndexer(Client client, TransformConfig config) { + return createIndexer( + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), + mock(TransformNode.class), + mock(CrossProjectModeDecider.class) + ), + client, + config + ); + } + + private MockClientTransformIndexer createIndexer(TransformServices services, Client client, TransformConfig config) { + return new MockClientTransformIndexer( + mock(ThreadPool.class), + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), + services, + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")), + mock(TransformIndexerStats.class), + config, + null, + new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()), + new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + } + + private void assertPitIsNeverUsed(MockClientTransformIndexer indexer) throws InterruptedException { + boolean pitEnabled = TransformEffectiveSettings.isPitDisabled(indexer.getConfig().getSettings()) == false; + // we expect PIT *not* being used regardless the transform settings + this.assertAsync(listener -> indexer.doNextSearch(0, listener), response -> assertNull(response.pointInTimeId())); + + // reverse the setting + indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build()); + + // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings + this.assertAsync(listener -> indexer.doNextSearch(0, listener), response -> assertNull(response.pointInTimeId())); + } + + public void testDisablePitWhenCrossProjectSearchIsEnabled() throws InterruptedException { + var crossProjectModeDecider = mock(CrossProjectModeDecider.class); + when(crossProjectModeDecider.crossProjectEnabled()).thenReturn(true); + + TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource( + new SourceConfig("remote-index") + ).build(); + + try (var threadPool = createThreadPool()) { + final var client = new PitMockClient(threadPool, true); + MockClientTransformIndexer indexer = createIndexer( new TransformServices( mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), mock(TransformAuditor.class), new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO), mock(TransformNode.class), - mock(CrossProjectModeDecider.class) - ), - mock(CheckpointProvider.class), - new AtomicReference<>(IndexerState.STOPPED), - null, - new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")), - mock(TransformIndexerStats.class), - config, - null, - new TransformCheckpoint( - "transform", - Instant.now().toEpochMilli(), - 0L, - Collections.emptyMap(), - Instant.now().toEpochMilli() + crossProjectModeDecider ), - new TransformCheckpoint( - "transform", - Instant.now().toEpochMilli(), - 2L, - Collections.emptyMap(), - Instant.now().toEpochMilli() - ), - new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), - mock(TransformContext.class), - false - ); - - // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings - this.assertAsync( - listener -> indexer.doNextSearch(0, listener), - response -> assertNull(response.pointInTimeId()) + client, + config ); - // reverse the setting - indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build()); - - // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings - this.assertAsync( - listener -> indexer.doNextSearch(0, listener), - response -> assertNull(response.pointInTimeId()) - ); + assertPitIsNeverUsed(indexer); } }