Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ClientTransformIndexer extends TransformIndexer {
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Settings destIndexSettings;
private final boolean crossProjectEnabled;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndexHolder;
Expand Down Expand Up @@ -140,6 +141,7 @@ class ClientTransformIndexer extends TransformIndexer {
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);

disablePit = TransformEffectiveSettings.isPitDisabled(transformConfig.getSettings());
crossProjectEnabled = transformServices.crossProjectModeDecider().crossProjectEnabled();
}

@Override
Expand Down Expand Up @@ -536,7 +538,10 @@ private void injectPointInTimeIfNeeded(
SearchRequest searchRequest = namedSearchRequest.v2();
// We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems.
// We should not re-enable until this is resolved: https://github.com/elastic/elasticsearch/issues/80187
if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) {
if (disablePit
|| searchRequest.indices().length == 0
|| transformConfig.getSource().requiresRemoteCluster()
|| crossProjectEnabled) {
listener.onResponse(namedSearchRequest);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -272,44 +273,7 @@ public void testDisablePit() throws InterruptedException {

try (var threadPool = createThreadPool()) {
final var client = new PitMockClient(threadPool, true);
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
mock(ThreadPool.class),
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
mock(TransformExtension.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
mock(TransformNode.class),
mock(CrossProjectModeDecider.class)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
0L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);
var indexer = createIndexer(client, config);

this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
if (pitEnabled) {
Expand Down Expand Up @@ -337,63 +301,87 @@ public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedExc
// Remote index is configured within source
.setSource(new SourceConfig("remote-cluster:remote-index"))
.build();
boolean pitEnabled = TransformEffectiveSettings.isPitDisabled(config.getSettings()) == false;

try (var threadPool = createThreadPool()) {
final var client = new PitMockClient(threadPool, true);
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
mock(ThreadPool.class),
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
mock(TransformExtension.class),
MockClientTransformIndexer indexer = createIndexer(client, config);
assertPitIsNeverUsed(indexer);
}
}

private MockClientTransformIndexer createIndexer(Client client, TransformConfig config) {
return createIndexer(
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
mock(TransformNode.class),
mock(CrossProjectModeDecider.class)
),
client,
config
);
}

private MockClientTransformIndexer createIndexer(TransformServices services, Client client, TransformConfig config) {
return new MockClientTransformIndexer(
mock(ThreadPool.class),
mock(ClusterService.class),
mock(IndexNameExpressionResolver.class),
mock(TransformExtension.class),
services,
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);
}

private void assertPitIsNeverUsed(MockClientTransformIndexer indexer) throws InterruptedException {
boolean pitEnabled = TransformEffectiveSettings.isPitDisabled(indexer.getConfig().getSettings()) == false;
// we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> assertNull(response.pointInTimeId()));

// reverse the setting
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> assertNull(response.pointInTimeId()));
}

public void testDisablePitWhenCrossProjectSearchIsEnabled() throws InterruptedException {
var crossProjectModeDecider = mock(CrossProjectModeDecider.class);
when(crossProjectModeDecider.crossProjectEnabled()).thenReturn(true);

TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()).setSource(
new SourceConfig("remote-index")
).build();

try (var threadPool = createThreadPool()) {
final var client = new PitMockClient(threadPool, true);
MockClientTransformIndexer indexer = createIndexer(
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO),
mock(TransformNode.class),
mock(CrossProjectModeDecider.class)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
0L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
crossProjectModeDecider
),
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
client,
config
);

// reverse the setting
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);
assertPitIsNeverUsed(indexer);
}
}

Expand Down