From 9d6672702af49e9d794d23026e65cff89886a6e7 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Wed, 20 Jul 2022 13:59:08 +0200 Subject: [PATCH 1/2] Use custom task instead of generic AckedClusterStateUpdateTask that is not intended to be used with batching. --- .../MetadataUpdateSettingsService.java | 398 ++++++++++-------- .../java/org/elasticsearch/node/Node.java | 3 +- .../indices/cluster/ClusterStateChanges.java | 3 +- 3 files changed, 220 insertions(+), 184 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index 8fe89831c08ca..8d0408d40ff99 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -14,11 +14,14 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; +import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; @@ -27,22 +30,19 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.ShardLimitValidator; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Locale; import java.util.Set; import java.util.function.BiFunction; -import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext; import static org.elasticsearch.index.IndexSettings.same; /** @@ -56,223 +56,261 @@ public class MetadataUpdateSettingsService { private final IndexScopedSettings indexScopedSettings; private final IndicesService indicesService; private final ShardLimitValidator shardLimitValidator; - private final ThreadPool threadPool; - private final ClusterStateTaskExecutor executor; + private final ClusterStateTaskExecutor executor; public MetadataUpdateSettingsService( ClusterService clusterService, AllocationService allocationService, IndexScopedSettings indexScopedSettings, IndicesService indicesService, - ShardLimitValidator shardLimitValidator, - ThreadPool threadPool + ShardLimitValidator shardLimitValidator ) { this.clusterService = clusterService; this.allocationService = allocationService; this.indexScopedSettings = indexScopedSettings; this.indicesService = indicesService; this.shardLimitValidator = shardLimitValidator; - this.threadPool = threadPool; - this.executor = new ClusterStateTaskExecutor() { - @Override - @SuppressForbidden(reason = "consuming published cluster state for legacy reasons") - public ClusterState execute(ClusterState currentState, List> taskContexts) { - ClusterState state = currentState; - for (final var taskContext : taskContexts) { - try { - final var task = taskContext.getTask(); - state = task.execute(state); - taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task); - } catch (Exception e) { - taskContext.onFailure(e); - } - } - if (state != currentState) { - // reroute in case things change that require it (like number of replicas) - state = allocationService.reroute(state, "settings update"); + this.executor = (currentState, taskContexts) -> { + ClusterState state = currentState; + for (final var taskContext : taskContexts) { + try { + final var task = taskContext.getTask(); + state = task.execute(state, taskContext); + taskContext.success(task.getAckListener()); + } catch (Exception e) { + taskContext.onFailure(e); } - return state; } + if (state != currentState) { + // reroute in case things change that require it (like number of replicas) + state = allocationService.reroute(state, "settings update"); + } + return state; }; } - public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener listener) { - final Settings normalizedSettings = Settings.builder() - .put(request.settings()) - .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX) - .build(); - Settings.Builder settingsForClosedIndices = Settings.builder(); - Settings.Builder settingsForOpenIndices = Settings.builder(); - final Set skippedSettings = new HashSet<>(); - - indexScopedSettings.validate( - normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false), // don't validate wildcards - false, // don't validate values here we check it below never allow to change the number of shards - true - ); // validate internal or private index settings - for (String key : normalizedSettings.keySet()) { - Setting setting = indexScopedSettings.get(key); - boolean isWildcard = setting == null && Regex.isSimpleMatchPattern(key); - assert setting != null // we already validated the normalized settings - || (isWildcard && normalizedSettings.hasValue(key) == false) - : "unknown setting: " + key + " isWildcard: " + isWildcard + " hasValue: " + normalizedSettings.hasValue(key); - settingsForClosedIndices.copy(key, normalizedSettings); - if (isWildcard || setting.isDynamic()) { - settingsForOpenIndices.copy(key, normalizedSettings); - } else { - skippedSettings.add(key); - } + private final class UpdateSettingsTask implements ClusterStateTaskListener { + private final UpdateSettingsClusterStateUpdateRequest request; + private final ActionListener listener; + + private UpdateSettingsTask(UpdateSettingsClusterStateUpdateRequest request, ActionListener listener) { + this.request = request; + this.listener = listener; } - final Settings closedSettings = settingsForClosedIndices.build(); - final Settings openSettings = settingsForOpenIndices.build(); - final boolean preserveExisting = request.isPreserveExisting(); - - // TODO: move this to custom class instead of AckedClusterStateUpdateTask - AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask( - Priority.URGENT, - request, - wrapPreservingContext(listener, threadPool.getThreadContext()) - ) { - @Override - public ClusterState execute(ClusterState currentState) { - RoutingTable.Builder routingTableBuilder = null; - Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); - - // allow to change any settings to a closed index, and only allow dynamic settings to be changed - // on an open index - Set openIndices = new HashSet<>(); - Set closedIndices = new HashSet<>(); - final String[] actualIndices = new String[request.indices().length]; - for (int i = 0; i < request.indices().length; i++) { - Index index = request.indices()[i]; - actualIndices[i] = index.getName(); - final IndexMetadata metadata = currentState.metadata().getIndexSafe(index); - if (metadata.getState() == IndexMetadata.State.OPEN) { - openIndices.add(index); - } else { - closedIndices.add(index); - } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + assert false : "should not be called"; + } + + private ClusterStateAckListener getAckListener() { + return new ClusterStateAckListener() { + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked() { + listener.onResponse(AcknowledgedResponse.of(true)); + } + + @Override + public void onAckFailure(Exception e) { + listener.onFailure(e); + } + + @Override + public void onAckTimeout() { + listener.onResponse(AcknowledgedResponse.of(false)); + } + + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } + }; + } + + ClusterState execute(ClusterState currentState, ClusterStateTaskExecutor.TaskContext taskContext) { + final Settings normalizedSettings = Settings.builder() + .put(request.settings()) + .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX) + .build(); + Settings.Builder settingsForClosedIndices = Settings.builder(); + Settings.Builder settingsForOpenIndices = Settings.builder(); + final Set skippedSettings = new HashSet<>(); + + indexScopedSettings.validate( + normalizedSettings.filter(s -> Regex.isSimpleMatchPattern(s) == false), // don't validate wildcards + false, // don't validate values here we check it below never allow to change the number of shards + true + ); // validate internal or private index settings + for (String key : normalizedSettings.keySet()) { + Setting setting = indexScopedSettings.get(key); + boolean isWildcard = setting == null && Regex.isSimpleMatchPattern(key); + assert setting != null // we already validated the normalized settings + || (isWildcard && normalizedSettings.hasValue(key) == false) + : "unknown setting: " + key + " isWildcard: " + isWildcard + " hasValue: " + normalizedSettings.hasValue(key); + settingsForClosedIndices.copy(key, normalizedSettings); + if (isWildcard || setting.isDynamic()) { + settingsForOpenIndices.copy(key, normalizedSettings); + } else { + skippedSettings.add(key); + } + } + final Settings closedSettings = settingsForClosedIndices.build(); + final Settings openSettings = settingsForOpenIndices.build(); + final boolean preserveExisting = request.isPreserveExisting(); + + RoutingTable.Builder routingTableBuilder = null; + Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + + // allow to change any settings to a closed index, and only allow dynamic settings to be changed + // on an open index + Set openIndices = new HashSet<>(); + Set closedIndices = new HashSet<>(); + final String[] actualIndices = new String[request.indices().length]; + for (int i = 0; i < request.indices().length; i++) { + Index index = request.indices()[i]; + actualIndices[i] = index.getName(); + final IndexMetadata metadata = currentState.metadata().getIndexSafe(index); + if (metadata.getState() == IndexMetadata.State.OPEN) { + openIndices.add(index); + } else { + closedIndices.add(index); } + } - if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) { - throw new IllegalArgumentException( + if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) { + taskContext.onFailure( + new IllegalArgumentException( String.format( Locale.ROOT, "Can't update non dynamic settings [%s] for open indices %s", skippedSettings, openIndices ) - ); - } + ) + ); + return currentState; + } - if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) { - final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings); - if (preserveExisting == false) { - // Verify that this won't take us over the cluster shard limit. - shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas); - - /* - * We do not update the in-sync allocation IDs as they will be removed upon the first index operation - * which makes these copies stale. - * - * TODO: should we update the in-sync allocation IDs once the data is deleted by the node? - */ - routingTableBuilder = RoutingTable.builder(currentState.routingTable()); - routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); - metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); - logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices); - } + if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) { + final int updatedNumberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(openSettings); + if (preserveExisting == false) { + // Verify that this won't take us over the cluster shard limit. + shardLimitValidator.validateShardLimitOnReplicaUpdate(currentState, request.indices(), updatedNumberOfReplicas); + + /* + * We do not update the in-sync allocation IDs as they will be removed upon the first index operation + * which makes these copies stale. + * + * TODO: should we update the in-sync allocation IDs once the data is deleted by the node? + */ + routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + routingTableBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); + metadataBuilder.updateNumberOfReplicas(updatedNumberOfReplicas, actualIndices); + logger.info("updating number_of_replicas to [{}] for indices {}", updatedNumberOfReplicas, actualIndices); } + } - updateIndexSettings( - openIndices, - metadataBuilder, - (index, indexSettings) -> indexScopedSettings.updateDynamicSettings( - openSettings, - indexSettings, - Settings.builder(), - index.getName() - ), - preserveExisting, - indexScopedSettings - ); + updateIndexSettings( + openIndices, + metadataBuilder, + (index, indexSettings) -> indexScopedSettings.updateDynamicSettings( + openSettings, + indexSettings, + Settings.builder(), + index.getName() + ), + preserveExisting, + indexScopedSettings + ); - updateIndexSettings( - closedIndices, - metadataBuilder, - (index, indexSettings) -> indexScopedSettings.updateSettings( - closedSettings, - indexSettings, - Settings.builder(), - index.getName() - ), - preserveExisting, - indexScopedSettings - ); + updateIndexSettings( + closedIndices, + metadataBuilder, + (index, indexSettings) -> indexScopedSettings.updateSettings( + closedSettings, + indexSettings, + Settings.builder(), + index.getName() + ), + preserveExisting, + indexScopedSettings + ); - if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings) - || IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) { - for (String index : actualIndices) { - final Settings settings = metadataBuilder.get(index).getSettings(); - MetadataCreateIndexService.validateTranslogRetentionSettings(settings); - MetadataCreateIndexService.validateStoreTypeSetting(settings); - } + if (IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.exists(normalizedSettings) + || IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.exists(normalizedSettings)) { + for (String index : actualIndices) { + final Settings settings = metadataBuilder.get(index).getSettings(); + MetadataCreateIndexService.validateTranslogRetentionSettings(settings); + MetadataCreateIndexService.validateStoreTypeSetting(settings); } - boolean changed = false; - // increment settings versions - for (final String index : actualIndices) { - if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) { - changed = true; - final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index)); - builder.settingsVersion(1 + builder.settingsVersion()); - metadataBuilder.put(builder); - } + } + boolean changed = false; + // increment settings versions + for (final String index : actualIndices) { + if (same(currentState.metadata().index(index).getSettings(), metadataBuilder.get(index).getSettings()) == false) { + changed = true; + final IndexMetadata.Builder builder = IndexMetadata.builder(metadataBuilder.get(index)); + builder.settingsVersion(1 + builder.settingsVersion()); + metadataBuilder.put(builder); } + } - final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); - boolean changedBlocks = false; - for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) { - changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings); - } - changed |= changedBlocks; + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + boolean changedBlocks = false; + for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) { + changedBlocks |= maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings); + } + changed |= changedBlocks; - if (changed == false) { - return currentState; - } + if (changed == false) { + return currentState; + } - ClusterState updatedState = ClusterState.builder(currentState) - .metadata(metadataBuilder) - .routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build()) - .blocks(changedBlocks ? blocks.build() : currentState.blocks()) - .build(); + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder == null ? currentState.routingTable() : routingTableBuilder.build()) + .blocks(changedBlocks ? blocks.build() : currentState.blocks()) + .build(); - try { - for (Index index : openIndices) { - final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index); - final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); - indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); - } - for (Index index : closedIndices) { - final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index); - final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); - // Verifies that the current index settings can be updated with the updated dynamic settings. - indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); - // Now check that we can create the index with the updated settings (dynamic and non-dynamic). - // This step is mandatory since we allow to update non-dynamic settings on closed indices. - indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata); - } - } catch (IOException ex) { - throw ExceptionsHelper.convertToElastic(ex); + try { + for (Index index : openIndices) { + final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index); + final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); + indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); } - - return updatedState; + for (Index index : closedIndices) { + final IndexMetadata currentMetadata = currentState.metadata().getIndexSafe(index); + final IndexMetadata updatedMetadata = updatedState.metadata().getIndexSafe(index); + // Verifies that the current index settings can be updated with the updated dynamic settings. + indicesService.verifyIndexMetadata(currentMetadata, updatedMetadata); + // Now check that we can create the index with the updated settings (dynamic and non-dynamic). + // This step is mandatory since we allow to update non-dynamic settings on closed indices. + indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata); + } + } catch (IOException ex) { + taskContext.onFailure(ExceptionsHelper.convertToElastic(ex)); + return currentState; } - }; + return updatedState; + } + } + + public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener listener) { clusterService.submitStateUpdateTask( "update-settings " + Arrays.toString(request.indices()), - clusterTask, - clusterTask, + new UpdateSettingsTask(request, listener), + ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()), this.executor ); } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 0a1a79dcb014c..4908806f0a020 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -685,8 +685,7 @@ protected Node( clusterModule.getAllocationService(), settingsModule.getIndexScopedSettings(), indicesService, - shardLimitValidator, - threadPool + shardLimitValidator ); Collection pluginComponents = pluginsService.flatMap( diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 4990d86330f0c..08f86b7c08664 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -282,8 +282,7 @@ public IndexMetadata verifyIndexMetadata(IndexMetadata indexMetadata, Version mi allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, - shardLimitValidator, - threadPool + shardLimitValidator ); MetadataCreateIndexService createIndexService = new MetadataCreateIndexService( SETTINGS, From a88508ecc8f32fc0ae42c57dd422fef74ce78d58 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 21 Jul 2022 08:24:43 +0200 Subject: [PATCH 2/2] Fix comments --- .../MetadataUpdateSettingsService.java | 73 ++++++++----------- 1 file changed, 30 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java index 8d0408d40ff99..1ba382c725670 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -75,8 +75,8 @@ public MetadataUpdateSettingsService( for (final var taskContext : taskContexts) { try { final var task = taskContext.getTask(); - state = task.execute(state, taskContext); - taskContext.success(task.getAckListener()); + state = task.execute(state); + taskContext.success(task); } catch (Exception e) { taskContext.onFailure(e); } @@ -89,7 +89,7 @@ public MetadataUpdateSettingsService( }; } - private final class UpdateSettingsTask implements ClusterStateTaskListener { + private final class UpdateSettingsTask implements ClusterStateAckListener, ClusterStateTaskListener { private final UpdateSettingsClusterStateUpdateRequest request; private final ActionListener listener; @@ -99,45 +99,41 @@ private UpdateSettingsTask(UpdateSettingsClusterStateUpdateRequest request, Acti } @Override - public void onFailure(Exception e) { - listener.onFailure(e); + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; } @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - assert false : "should not be called"; + public void onAllNodesAcked() { + listener.onResponse(AcknowledgedResponse.of(true)); } - private ClusterStateAckListener getAckListener() { - return new ClusterStateAckListener() { - @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - return true; - } + @Override + public void onAckFailure(Exception e) { + listener.onFailure(e); + } - @Override - public void onAllNodesAcked() { - listener.onResponse(AcknowledgedResponse.of(true)); - } + @Override + public void onAckTimeout() { + listener.onResponse(AcknowledgedResponse.of(false)); + } - @Override - public void onAckFailure(Exception e) { - listener.onFailure(e); - } + @Override + public TimeValue ackTimeout() { + return request.ackTimeout(); + } - @Override - public void onAckTimeout() { - listener.onResponse(AcknowledgedResponse.of(false)); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } - @Override - public TimeValue ackTimeout() { - return request.ackTimeout(); - } - }; + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + assert false : "should not be called"; } - ClusterState execute(ClusterState currentState, ClusterStateTaskExecutor.TaskContext taskContext) { + ClusterState execute(ClusterState currentState) { final Settings normalizedSettings = Settings.builder() .put(request.settings()) .normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX) @@ -188,17 +184,9 @@ ClusterState execute(ClusterState currentState, ClusterStateTaskExecutor.TaskCon } if (skippedSettings.isEmpty() == false && openIndices.isEmpty() == false) { - taskContext.onFailure( - new IllegalArgumentException( - String.format( - Locale.ROOT, - "Can't update non dynamic settings [%s] for open indices %s", - skippedSettings, - openIndices - ) - ) + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Can't update non dynamic settings [%s] for open indices %s", skippedSettings, openIndices) ); - return currentState; } if (IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.exists(openSettings)) { @@ -298,8 +286,7 @@ ClusterState execute(ClusterState currentState, ClusterStateTaskExecutor.TaskCon indicesService.verifyIndexMetadata(updatedMetadata, updatedMetadata); } } catch (IOException ex) { - taskContext.onFailure(ExceptionsHelper.convertToElastic(ex)); - return currentState; + throw ExceptionsHelper.convertToElastic(ex); } return updatedState;