diff --git a/docs/changelog/144074.yaml b/docs/changelog/144074.yaml new file mode 100644 index 0000000000000..459082a6b6c1e --- /dev/null +++ b/docs/changelog/144074.yaml @@ -0,0 +1,5 @@ +area: Distributed +issues: [] +pr: 144074 +summary: Batch index creation +type: enhancement diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceIT.java index 20826adcc1356..2505e52df1e1c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceIT.java @@ -9,27 +9,45 @@ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.test.ESIntegTestCase; +import java.util.HashSet; import java.util.Locale; +import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; +import static org.elasticsearch.action.support.ActionTestUtils.assertNoSuccessListener; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; public class MetadataCreateIndexServiceIT extends ESIntegTestCase { public void testRequestTemplateIsRespected() throws InterruptedException { /* - * This test passes a template in the CreateIndexClusterStateUpdateRequest, and makes sure that the settings from that template - * are used when creating the index. + * This test passes a template in the CreateIndexClusterStateUpdateRequest, and makes sure that the settings + * from that template are used when creating the index. */ MetadataCreateIndexService metadataCreateIndexService = internalCluster().getCurrentMasterNodeInstance( MetadataCreateIndexService.class @@ -75,4 +93,133 @@ public void onFailure(Exception e) { Settings settings = response.getSettings().get(indexName); assertThat(settings.get("index.number_of_replicas"), equalTo(Integer.toString(numberOfReplicas))); } + + public void testCreateIndexBatching() throws Exception { + final var masterClusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final int totalRequestCount = randomIntBetween(1, 20); + final var validIndexNames = new HashSet(); + final var invalidSettingsNames = new HashSet(); + final var allIndexNames = new HashSet(); + + final var preExistingIndexName = addRandomIndexNameNoCollision(allIndexNames); + createIndex(preExistingIndexName); + + final ClusterStateListener listener = event -> { + final var projectMetadata = event.state().metadata().getProject(ProjectId.DEFAULT); + if (projectMetadata == null) { + return; + } + final var createdInState = validIndexNames.stream().filter(projectMetadata::hasIndex).toList(); + assertThat( + "expected either none or all valid indices to appear atomically, but found " + createdInState.size(), + createdInState, + anyOf(hasSize(0), hasSize(validIndexNames.size())) + ); + for (final var indexName : invalidSettingsNames) { + assertFalse("invalid index [" + indexName + "] should never be created", projectMetadata.hasIndex(indexName)); + } + }; + masterClusterService.addListener(listener); + try { + final var barrier = new CyclicBarrier(2); + masterClusterService.submitUnbatchedStateUpdateTask("block", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + safeAwait(barrier); + safeAwait(barrier); + return currentState; + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("blocking task failed", e); + } + }); + safeAwait(barrier); + + final var responsesLatch = new CountDownLatch(totalRequestCount); + for (int i = 0; i < totalRequestCount; i++) { + switch (randomIntBetween(0, 3)) { + // valid request + case 0 -> { + final var indexName = addRandomIndexNameNoCollision(allIndexNames); + validIndexNames.add(indexName); + client().execute( + TransportCreateIndexAction.TYPE, + new CreateIndexRequest(indexName), + new LatchedActionListener<>( + assertNoFailureListener(response -> assertTrue(response.isAcknowledged())), + responsesLatch + ) + ); + } + // invalid index name + case 1 -> client().execute( + TransportCreateIndexAction.TYPE, + new CreateIndexRequest(randomIdentifier("INVALID_BECAUSE_UPPER_CASE_")), + new LatchedActionListener<>( + assertNoSuccessListener( + e -> assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(InvalidIndexNameException.class)) + ), + responsesLatch + ) + ); + // invalid settings + case 2 -> { + final var indexName = addRandomIndexNameNoCollision(allIndexNames); + invalidSettingsNames.add(indexName); + client().execute( + TransportCreateIndexAction.TYPE, + new CreateIndexRequest(indexName).settings(Settings.builder().put("index.version.created", 1)), + new LatchedActionListener<>( + assertNoSuccessListener( + e -> assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(IllegalArgumentException.class)) + ), + responsesLatch + ) + ); + } + // already existing index + default -> client().execute( + TransportCreateIndexAction.TYPE, + new CreateIndexRequest(preExistingIndexName), + new LatchedActionListener<>( + assertNoSuccessListener( + e -> assertThat(ExceptionsHelper.unwrapCause(e), instanceOf(ResourceAlreadyExistsException.class)) + ), + responsesLatch + ) + ); + } + } + + assertTrue( + "create-index tasks should be queued behind the blocking task", + waitUntil( + () -> masterClusterService.getMasterService() + .pendingTasks() + .stream() + .filter(pct -> pct.getSource().toString().startsWith("create-index")) + .count() == totalRequestCount + ) + ); + final var initialState = masterClusterService.state(); + safeAwait(barrier); + + assertTrue("timed out waiting for create-index responses", responsesLatch.await(30, TimeUnit.SECONDS)); + if (validIndexNames.isEmpty()) { + assertSame("cluster state should not change when all requests failed", masterClusterService.state(), initialState); + } + } finally { + masterClusterService.removeListener(listener); + } + } + + private String addRandomIndexNameNoCollision(Set existingIndexNames) { + var indexName = randomIndexName(); + while (existingIndexNames.add(indexName) == false) { + indexName = randomIndexName(); + } + return indexName; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java index cafef902c89d4..2666e54f47fe4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexService.java @@ -22,9 +22,10 @@ import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -38,9 +39,10 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.DataTier; -import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionListener; +import org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionMultiListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.Strings; @@ -56,7 +58,6 @@ import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.PathUtils; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; @@ -119,6 +120,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveSettings; +import static org.elasticsearch.cluster.routing.allocation.allocator.AllocationActionListener.rerouteCompletionIsNotRequired; import static org.elasticsearch.index.IndexModule.INDEX_RECOVERY_TYPE_SETTING; import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; @@ -126,6 +128,7 @@ * Service responsible for submitting create index requests */ public class MetadataCreateIndexService { + public static TransportVersion INDEX_LIMIT_EXCEEDED_EXCEPTION_VERSION = TransportVersion.fromName("index_limit_exceeded_exception"); // Deliberately not registered so it can only be set in tests/plugins. @@ -188,7 +191,10 @@ interface ClusterBlocksTransformer { private final Set indexSettingProviders; private final ThreadPool threadPool; private final ClusterBlocksTransformer blocksTransformerUponIndexCreation; - private final Priority clusterStateUpdateTaskPriority; + + // package private for tests + final CreateIndexClusterStateUpdateTaskExecutor createIndexTaskExecutor; + private final MasterServiceTaskQueue createIndexTaskQueue; private volatile TimeValue maxMasterNodeTimeout; private volatile int maxIndicesPerProject; @@ -221,7 +227,12 @@ public MetadataCreateIndexService( this.indexSettingProviders = indexSettingProviders.getIndexSettingProviders(); this.threadPool = threadPool; this.blocksTransformerUponIndexCreation = createClusterBlocksTransformerForIndexCreation(settings); - this.clusterStateUpdateTaskPriority = CREATE_INDEX_PRIORITY_SETTING.get(settings); + this.createIndexTaskExecutor = new CreateIndexClusterStateUpdateTaskExecutor(); + this.createIndexTaskQueue = clusterService.createTaskQueue( + "create-index", + CREATE_INDEX_PRIORITY_SETTING.get(settings), + createIndexTaskExecutor + ); if (clusterService.getClusterSettings().isDynamicSetting(CREATE_INDEX_MAX_TIMEOUT_SETTING.getKey())) { // setting only registered in some tests today @@ -432,48 +443,120 @@ private void onlyCreateIndex( final CreateIndexClusterStateUpdateRequest request, final ActionListener listener ) { - try { + ActionListener.run(listener, l -> { normalizeRequestSetting(request); - } catch (Exception e) { + final var task = new CreateIndexClusterStateUpdateTask(request, ackTimeout, l); + createIndexTaskQueue.submitTask(task.toString(), task, masterNodeTimeout); + }); + } + + static class CreateIndexClusterStateUpdateTask implements ClusterStateTaskListener { + private final CreateIndexClusterStateUpdateRequest request; + private final TimeValue ackTimeout; + private final ActionListener listener; + + CreateIndexClusterStateUpdateTask( + CreateIndexClusterStateUpdateRequest request, + TimeValue ackTimeout, + ActionListener listener + ) { + this.request = request; + this.ackTimeout = ackTimeout; + this.listener = listener; + } + + @Override + public void onFailure(Exception e) { + logger.log( + e instanceof ResourceAlreadyExistsException + || e instanceof ProcessClusterEventTimeoutException + || MasterService.isPublishFailureException(e) ? Level.TRACE : Level.DEBUG, + () -> Strings.format("[%s] failed to create, in project [%s]", request.index(), request.projectId()), + e + ); listener.onFailure(e); - return; } - var delegate = new AllocationActionListener<>(listener, threadPool.getThreadContext()); - submitUnbatchedTask( - "create-index [" + request.index() + "], in project [" + request.projectId() + "], cause [" + request.cause() + "]", - new AckedClusterStateUpdateTask(clusterStateUpdateTaskPriority, masterNodeTimeout, ackTimeout, delegate.clusterStateUpdate()) { + private ClusterStateAckListener getAckListener(ActionListener responseListener) { + return new ClusterStateAckListener() { + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return applyCreateIndexRequest(currentState, request, false, RerouteBehavior.PERFORM_REROUTE, delegate.reroute()); + public void onAllNodesAcked() { + responseListener.onResponse(AcknowledgedResponse.TRUE); } @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { - logger.trace(() -> "[" + request.index() + "] failed to create, in project [" + request.projectId() + "]", e); - } else { - logger.debug(() -> "[" + request.index() + "] failed to create, in project [" + request.projectId() + "]", e); - } - super.onFailure(e); + public void onAckFailure(Exception e) { + responseListener.onResponse(AcknowledgedResponse.FALSE); } - } - ); + + @Override + public void onAckTimeout() { + responseListener.onResponse(AcknowledgedResponse.FALSE); + } + + @Override + public TimeValue ackTimeout() { + return ackTimeout; + } + }; + } + + @Override + public String toString() { + return Strings.format("create-index [%s], in project [%s], cause [%s]", request.index(), request.projectId(), request.cause()); + } } - @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here - private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { - clusterService.submitUnbatchedStateUpdateTask(source, task); + /** + * Executes a batch of {@link CreateIndexClusterStateUpdateTask}s in a single cluster state update, + * deferring allocation reroute() until all indices in the batch have been processed. + */ + private class CreateIndexClusterStateUpdateTaskExecutor implements ClusterStateTaskExecutor { + @Override + public ClusterState execute(BatchExecutionContext batchExecutionContext) { + final var allocationActionMultiListener = new AllocationActionMultiListener( + threadPool.getThreadContext() + ); + var state = batchExecutionContext.initialState(); + for (final var taskContext : batchExecutionContext.taskContexts()) { + final var task = taskContext.getTask(); + try (var ignored = taskContext.captureResponseHeaders()) { + state = applyCreateIndexRequest( + state, + task.request, + false, + // Wait until all indices in the batch have been processed before doing a reroute + RerouteBehavior.SKIP_REROUTE, + rerouteCompletionIsNotRequired() + ); + taskContext.success(task.getAckListener(allocationActionMultiListener.delay(task.listener))); + } catch (Exception e) { + taskContext.onFailure(e); + } + } + if (state != batchExecutionContext.initialState()) { + try (var ignored = batchExecutionContext.dropHeadersContext()) { + state = allocationService.reroute(state, "create-index", allocationActionMultiListener.reroute()); + } + } else { + allocationActionMultiListener.noRerouteNeeded(); + } + return state; + } } private void normalizeRequestSetting(CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest) { - Settings build = Settings.builder() + final var settings = Settings.builder() .put(createIndexClusterStateRequest.settings()) .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX) .build(); - indexScopedSettings.validate(build, true); - createIndexClusterStateRequest.settings(build); + indexScopedSettings.validate(settings, true); + createIndexClusterStateRequest.settings(settings); } /** @@ -717,7 +800,6 @@ private ClusterState applyCreateIndexWithTemporaryService( allocationService.getShardRoutingRoleStrategy() ); assert assertHasRefreshBlock(indexMetadata, updated.projectState(request.projectId())); - if (rerouteBehavior == RerouteBehavior.SKIP_REROUTE) { if (rerouteListener != null) { rerouteListener.onResponse(null); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 5b5ce2f55d849..4b5f8bc4b4d13 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.ClusterStateTaskExecutorUtils; import org.elasticsearch.cluster.version.CompatibilityVersions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; @@ -60,6 +61,7 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.query.SearchExecutionContextHelper; import org.elasticsearch.index.shard.IndexLongFieldRange; @@ -136,9 +138,14 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; +import static org.mockito.AdditionalAnswers.returnsFirstArg; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class MetadataCreateIndexServiceTests extends ESTestCase { @@ -1943,6 +1950,81 @@ public void testIndexSettingProviderPrivateSetting() throws Exception { assertThat(indexMetadata.getSettings().get(IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.getKey()), equalTo("private_setting")); } + public void testBatchedIndexCreationAndReroute() { + withTemporaryClusterService((clusterService, threadPool) -> { + final var allocationService = mock(AllocationService.class); + when(allocationService.reroute(any(ClusterState.class), any(String.class), any())).thenAnswer(returnsFirstArg()); + when(allocationService.getShardRoutingRoleStrategy()).thenReturn(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY); + MetadataCreateIndexService service = new MetadataCreateIndexService( + Settings.EMPTY, + clusterService, + mockIndicesService(), + allocationService, + createTestShardLimitService(randomIntBetween(1, 1000), clusterService), + newEnvironment(), + new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS), + threadPool, + null, + EmptySystemIndices.INSTANCE, + false, + IndexSettingProviders.EMPTY + ); + + final int taskCount = randomIntBetween(2, 5); + final var validIndexNames = new ArrayList(); + final var tasks = new ArrayList(taskCount); + for (int i = 0; i < taskCount; i++) { + final String indexName; + if (randomBoolean()) { + indexName = randomIndexName(); + validIndexNames.add(indexName); + } else { + // invalid index name + indexName = randomIdentifier("INVALID_BECAUSE_UPPER_CASE_"); + } + tasks.add( + new MetadataCreateIndexService.CreateIndexClusterStateUpdateTask( + new CreateIndexClusterStateUpdateRequest("test", projectId, indexName, indexName), + TimeValue.THIRTY_SECONDS, + ActionListener.noop() + ) + ); + } + final ClusterState initialState = clusterService.state(); + final ClusterState resultState; + try { + resultState = ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterService.state(), + service.createIndexTaskExecutor, + tasks, + task -> {}, + (task, e) -> assertThat(e, instanceOf(InvalidIndexNameException.class)) + ); + } catch (Exception e) { + throw new AssertionError("Failed to test run cluster state update for batch index creation request", e); + } + if (validIndexNames.isEmpty()) { + assertSame("cluster state should not change when all requests are invalid", resultState, initialState); + verify(allocationService, never()).reroute(any(), any(), any()); + } else { + final var stateIndices = resultState.metadata().getProject(projectId).indices().keySet(); + assertTrue( + "expected cluster state indices [" + stateIndices + "] to contain all [" + validIndexNames + "]", + stateIndices.containsAll(validIndexNames) + ); + verify(allocationService, times(1)).reroute(any(), eq("create-index"), any()); + } + }); + } + + private IndicesService mockIndicesService() { + try { + return DataStreamTestHelper.mockIndicesServices(MappingLookup.EMPTY); + } catch (Exception e) { + throw new AssertionError("failed to setup indicesService", e); + } + } + private IndexTemplateMetadata addMatchingTemplate(Consumer configurator) { IndexTemplateMetadata.Builder builder = templateMetadataBuilder("template1", "te*"); configurator.accept(builder);