diff --git a/CHANGELOG.md b/CHANGELOG.md index 856b8c7067ebf..d099a2f8f1c9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets ([#19607](https://github.com/opensearch-project/OpenSearch/pull/19607)) - Fix issue with updating core with a patch number other than 0 ([#19377](https://github.com/opensearch-project/OpenSearch/pull/19377)) - [Java Agent] Allow JRT protocol URLs in protection domain extraction ([#19683](https://github.com/opensearch-project/OpenSearch/pull/19683)) +- Fix potential concurrent modification exception when updating allocation filters ([#19701])(https://github.com/opensearch-project/OpenSearch/pull/19701)) ### Dependencies - Update to Gradle 9.1 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575)) diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodeFilters.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodeFilters.java index b27e1aa49a803..fa63180fb1547 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodeFilters.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodeFilters.java @@ -97,7 +97,7 @@ public static DiscoveryNodeFilters buildOrUpdateFromKeyValue( updated = new DiscoveryNodeFilters(opType, new HashMap<>()); } else { assert opType == original.opType : "operation type should match with node filter parameter"; - updated = new DiscoveryNodeFilters(original.opType, original.filters); + updated = new DiscoveryNodeFilters(original.opType, new HashMap<>(original.filters)); } for (Map.Entry entry : filters.entrySet()) { String[] values = Strings.tokenizeToStringArray(entry.getValue(), ","); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java index d3200c1bc9d75..b99d773099a3d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDecider.java @@ -193,7 +193,7 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod @Override public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) { Decision decision = shouldClusterFilter(node.node(), allocation); - return decision != null && decision == Decision.NO ? decision : Decision.ALWAYS; + return decision == Decision.NO ? decision : Decision.ALWAYS; } private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { @@ -258,6 +258,13 @@ private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, Ro } private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) { + // Copy values to local variables so we're not null-checking on volatile fields. + // The value of a volatile field could change from non-null to null between the + // check and its usage. + DiscoveryNodeFilters clusterRequireFilters = this.clusterRequireFilters; + DiscoveryNodeFilters clusterIncludeFilters = this.clusterIncludeFilters; + DiscoveryNodeFilters clusterExcludeFilters = this.clusterExcludeFilters; + if (clusterRequireFilters != null) { if (clusterRequireFilters.match(node) == false) { return allocation.decision( diff --git a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeFiltersTests.java b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeFiltersTests.java index 691a3cb418f94..5fc097eff0eac 100644 --- a/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeFiltersTests.java +++ b/server/src/test/java/org/opensearch/cluster/node/DiscoveryNodeFiltersTests.java @@ -47,6 +47,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -354,6 +360,67 @@ public void testOpTypeMismatch() { } } + public void testConcurrentModification() { + AtomicReference filters = new AtomicReference<>(); + AtomicBoolean keepRunning = new AtomicBoolean(true); + int count = 200; // I can pretty reliably reproduce failures with 200 iterations, but not 150 + try (ExecutorService executor = Executors.newFixedThreadPool(2)) { + // Thread 1: repeatedly update the filters + Future t1 = executor.submit(() -> { + try { + for (int i = 0; i < count && keepRunning.get(); i++) { + Settings.Builder settingsBuilder = Settings.builder().put("xxx._id", "id" + i); + if (i % 2 == 0) { + settingsBuilder.put("xxx._name", ""); + } else { + settingsBuilder.put("xxx._name", "name" + i); + } + DiscoveryNodeFilters newFilters = buildOrUpdateFromSettings(filters.get(), OR, "xxx.", settingsBuilder.build()); + filters.set(newFilters); + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } finally { + keepRunning.set(false); + } + }); + + // Thread 2: repeatedly read and match nodes against the filters + Future t2 = executor.submit(() -> { + try { + for (int i = 0; i < count && keepRunning.get(); i++) { + DiscoveryNodeFilters currentFilters = filters.get(); + if (currentFilters != null) { + DiscoveryNode node = new DiscoveryNode( + "name" + i, + "id" + i, + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ); + currentFilters.match(node); + } + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } finally { + keepRunning.set(false); + } + }); + t2.get(); + t1.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + private Settings shuffleSettings(Settings source) { Settings.Builder settings = Settings.builder(); List keys = new ArrayList<>(source.keySet());