Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets ([#19607](https://github.com/opensearch-project/OpenSearch/pull/19607))
- Fix issue with updating core with a patch number other than 0 ([#19377](https://github.com/opensearch-project/OpenSearch/pull/19377))
- [Java Agent] Allow JRT protocol URLs in protection domain extraction ([#19683](https://github.com/opensearch-project/OpenSearch/pull/19683))
- Fix potential concurrent modification exception when updating allocation filters ([#19701])(https://github.com/opensearch-project/OpenSearch/pull/19701))

### Dependencies
- Update to Gradle 9.1 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static DiscoveryNodeFilters buildOrUpdateFromKeyValue(
updated = new DiscoveryNodeFilters(opType, new HashMap<>());
} else {
assert opType == original.opType : "operation type should match with node filter parameter";
updated = new DiscoveryNodeFilters(original.opType, original.filters);
updated = new DiscoveryNodeFilters(original.opType, new HashMap<>(original.filters));
}
for (Map.Entry<String, String> entry : filters.entrySet()) {
String[] values = Strings.tokenizeToStringArray(entry.getValue(), ",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node.node(), allocation);
return decision != null && decision == Decision.NO ? decision : Decision.ALWAYS;
return decision == Decision.NO ? decision : Decision.ALWAYS;
}

private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Expand Down Expand Up @@ -258,6 +258,13 @@ private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, Ro
}

private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
// Copy values to local variables so we're not null-checking on volatile fields.
// The value of a volatile field could change from non-null to null between the
// check and its usage.
DiscoveryNodeFilters clusterRequireFilters = this.clusterRequireFilters;
DiscoveryNodeFilters clusterIncludeFilters = this.clusterIncludeFilters;
DiscoveryNodeFilters clusterExcludeFilters = this.clusterExcludeFilters;

if (clusterRequireFilters != null) {
if (clusterRequireFilters.match(node) == false) {
return allocation.decision(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -354,6 +360,67 @@ public void testOpTypeMismatch() {
}
}

public void testConcurrentModification() {
AtomicReference<DiscoveryNodeFilters> filters = new AtomicReference<>();
AtomicBoolean keepRunning = new AtomicBoolean(true);
int count = 200; // I can pretty reliably reproduce failures with 200 iterations, but not 150
try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
// Thread 1: repeatedly update the filters
Future<?> t1 = executor.submit(() -> {
try {
for (int i = 0; i < count && keepRunning.get(); i++) {
Settings.Builder settingsBuilder = Settings.builder().put("xxx._id", "id" + i);
if (i % 2 == 0) {
settingsBuilder.put("xxx._name", "");
} else {
settingsBuilder.put("xxx._name", "name" + i);
}
DiscoveryNodeFilters newFilters = buildOrUpdateFromSettings(filters.get(), OR, "xxx.", settingsBuilder.build());
filters.set(newFilters);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} finally {
keepRunning.set(false);
}
});

// Thread 2: repeatedly read and match nodes against the filters
Future<?> t2 = executor.submit(() -> {
try {
for (int i = 0; i < count && keepRunning.get(); i++) {
DiscoveryNodeFilters currentFilters = filters.get();
if (currentFilters != null) {
DiscoveryNode node = new DiscoveryNode(
"name" + i,
"id" + i,
buildNewFakeTransportAddress(),
emptyMap(),
emptySet(),
Version.CURRENT
);
currentFilters.match(node);
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
} finally {
keepRunning.set(false);
}
});
t2.get();
t1.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}

private Settings shuffleSettings(Settings source) {
Settings.Builder settings = Settings.builder();
List<String> keys = new ArrayList<>(source.keySet());
Expand Down
Loading