Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/142433.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
area: Security
issues: []
pr: 142433
summary: Fix built-in roles sync to retry on lock contention instead of silently discarding
pending updates
type: bug
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.stream.Collectors.toMap;
Expand Down Expand Up @@ -129,7 +128,7 @@ public void taskSucceeded(MarkRolesAsSyncedTask task, Map<String, String> value)
private final QueryableBuiltInRoles.Provider rolesProvider;
private final NativeRolesStore nativeRolesStore;
private final Executor executor;
private final AtomicBoolean synchronizationInProgress = new AtomicBoolean(false);
private final RolesSync sync = new RolesSync();

private volatile boolean securityIndexDeleted = false;

Expand Down Expand Up @@ -216,33 +215,91 @@ public void clusterChanged(ClusterChangedEvent event) {
* @return {@code true} if the synchronization of built-in roles is in progress, {@code false} otherwise
*/
public boolean isSynchronizationInProgress() {
return synchronizationInProgress.get();
return sync.inProgress();
}

private void syncBuiltInRoles(final QueryableBuiltInRoles roles) {
if (synchronizationInProgress.compareAndSet(false, true)) {
try {
final Map<String, String> indexedRolesDigests = readIndexedBuiltInRolesDigests(clusterService.state());
if (roles.rolesDigest().equals(indexedRolesDigests)) {
logger.debug("Security index already contains the latest built-in roles indexed, skipping roles synchronization");
if (sync.startSync()) {
doSyncBuiltInRoles(roles);
}
}

private void doSyncBuiltInRoles(final QueryableBuiltInRoles roles) {
try {
final Map<String, String> indexedRolesDigests = readIndexedBuiltInRolesDigests(clusterService.state());
if (roles.rolesDigest().equals(indexedRolesDigests)) {
logger.debug("Security index already contains the latest built-in roles indexed, skipping roles synchronization");
resetFailedSyncAttempts();
endSyncAndRetryIfNeeded();
} else {
executor.execute(() -> applyRoleChanges(indexedRolesDigests, roles, ActionListener.wrap(v -> {
logger.info("Successfully synced [{}] built-in roles to .security index", roles.roleDescriptors().size());
resetFailedSyncAttempts();
synchronizationInProgress.set(false);
endSyncAndRetryIfNeeded();
}, e -> {
handleException(e);
endSyncAndRetryIfNeeded();
})));
}
} catch (Exception e) {
logger.error("Failed to sync built-in roles", e);
failedSyncAttempts.incrementAndGet();
endSyncAndRetryIfNeeded();
}
}

private void endSyncAndRetryIfNeeded() {
if (sync.endSync()) {
boolean shouldRetry = false;
try {
shouldRetry = shouldSyncBuiltInRoles(clusterService.state());
} catch (Exception e) {
logger.warn("Failed to evaluate retry conditions for built-in roles synchronization", e);
}
if (shouldRetry) {
logger.debug("Retrying synchronization of built-in roles due to pending changes");
doSyncBuiltInRoles(rolesProvider.getRoles());
} else {
endSyncAndRetryIfNeeded();
}
}
}

static class RolesSync {
private static final int IDLE = 0;
private static final int RUNNING = 1;
private static final int RUNNING_PENDING = 2;

private final AtomicInteger state = new AtomicInteger(IDLE);

boolean startSync() {
while (true) {
int s = state.get();
if (s == IDLE) {
if (state.compareAndSet(IDLE, RUNNING)) return true;
} else if (s == RUNNING) {
if (state.compareAndSet(RUNNING, RUNNING_PENDING)) return false;
} else {
executor.execute(() -> doSyncBuiltinRoles(indexedRolesDigests, roles, ActionListener.wrap(v -> {
logger.info("Successfully synced [{}] built-in roles to .security index", roles.roleDescriptors().size());
resetFailedSyncAttempts();
synchronizationInProgress.set(false);
}, e -> {
handleException(e);
synchronizationInProgress.set(false);
})));
return false; // already RUNNING_PENDING
}
} catch (Exception e) {
logger.error("Failed to sync built-in roles", e);
failedSyncAttempts.incrementAndGet();
synchronizationInProgress.set(false);
}
}

boolean endSync() {
while (true) {
int s = state.get();
assert s != IDLE : "endSync should only be called when a sync is in progress";
if (s == RUNNING_PENDING) {
if (state.compareAndSet(RUNNING_PENDING, RUNNING)) return true;
} else {
if (state.compareAndSet(RUNNING, IDLE)) return false;
}
}
}

boolean inProgress() {
return state.get() != IDLE;
}
}

private void handleException(Exception e) {
Expand Down Expand Up @@ -358,7 +415,7 @@ private boolean shouldSyncBuiltInRoles(final ClusterState state) {
return true;
}

private void doSyncBuiltinRoles(
private void applyRoleChanges(
final Map<String, String> indexedRolesDigests,
final QueryableBuiltInRoles roles,
final ActionListener<Void> listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.elasticsearch.test;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules;
Expand Down Expand Up @@ -53,6 +55,7 @@
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;

/**
* Base class to run tests against a cluster with X-Pack installed and security enabled.
Expand Down Expand Up @@ -446,7 +449,12 @@ protected void createSecurityIndexWithWaitForActiveShards() {
try {
client.admin().indices().create(createIndexRequest).actionGet();
} catch (ResourceAlreadyExistsException e) {
logger.info("Security index already exists, ignoring.", e);
logger.info("Security index already exists, waiting for it to become available.", e);
ClusterHealthRequest healthRequest = new ClusterHealthRequest(TEST_REQUEST_TIMEOUT, SECURITY_MAIN_ALIAS).waitForActiveShards(
ActiveShardCount.ALL
);
ClusterHealthResponse healthResponse = client.admin().cluster().health(healthRequest).actionGet();
assertThat(healthResponse.isTimedOut(), is(false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,76 @@ private void mockNativeRolesStoreWithFailure(
.deleteRoles(eq(rolesToDelete), eq(WriteRequest.RefreshPolicy.IMMEDIATE), eq(false), any(ActionListener.class));
}

public void testRolesSyncBasicAcquireRelease() {
var sync = new QueryableBuiltInRolesSynchronizer.RolesSync();
assertFalse(sync.inProgress());

assertTrue(sync.startSync());
assertTrue(sync.inProgress());

assertFalse(sync.endSync());
assertFalse(sync.inProgress());
}

public void testRolesSyncContentionSetsPending() {
var sync = new QueryableBuiltInRolesSynchronizer.RolesSync();
assertTrue(sync.startSync());

// Second caller is rejected but marks pending
assertFalse(sync.startSync());
assertTrue(sync.inProgress());

// endSync sees pending, clears it, keeps lock held
assertTrue(sync.endSync());
assertTrue(sync.inProgress());

// No more pending — release
assertFalse(sync.endSync());
assertFalse(sync.inProgress());
}

public void testRolesSyncMultiplePendingCoalesce() {
var sync = new QueryableBuiltInRolesSynchronizer.RolesSync();
assertTrue(sync.startSync());

// Multiple concurrent arrivals all set pending, but only one retry results
assertFalse(sync.startSync());
assertFalse(sync.startSync());
assertFalse(sync.startSync());

assertTrue(sync.endSync()); // one retry
assertFalse(sync.endSync()); // done
assertFalse(sync.inProgress());
}

public void testRolesSyncPendingDuringRetry() {
var sync = new QueryableBuiltInRolesSynchronizer.RolesSync();
assertTrue(sync.startSync());
assertFalse(sync.startSync()); // pending

assertTrue(sync.endSync()); // retry (lock still held)

// New arrival during retry
assertFalse(sync.startSync());

assertTrue(sync.endSync()); // second retry
assertFalse(sync.endSync()); // done
assertFalse(sync.inProgress());
}

public void testRolesSyncReacquireAfterFullRelease() {
var sync = new QueryableBuiltInRolesSynchronizer.RolesSync();
assertTrue(sync.startSync());
assertFalse(sync.endSync());
assertFalse(sync.inProgress());

// Can acquire again after full release
assertTrue(sync.startSync());
assertTrue(sync.inProgress());
assertFalse(sync.endSync());
assertFalse(sync.inProgress());
}

private static ClusterState.Builder createClusterStateWithOpenSecurityIndex() {
return createClusterState(
TestRestrictedIndices.INTERNAL_SECURITY_MAIN_INDEX_7,
Expand Down