Skip to content

Commit

Permalink
Merge (#116407)
Browse files Browse the repository at this point in the history
  • Loading branch information
n1v0lg authored Nov 7, 2024
1 parent 567429e commit 544f8f9
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,39 @@
import org.elasticsearch.xpack.core.security.action.user.PutUserResponse;
import org.elasticsearch.xpack.security.authz.store.NativePrivilegeStore;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.security.support.SecuritySystemIndices.SECURITY_MAIN_ALIAS;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class SecurityIndexManagerIntegTests extends SecurityIntegTestCase {

private final int concurrentCallsToOnAvailable = 6;
private final ExecutorService executor = Executors.newFixedThreadPool(concurrentCallsToOnAvailable);

@After
public void shutdownExecutor() {
executor.shutdown();
}

public void testConcurrentOperationsTryingToCreateSecurityIndexAndAlias() throws Exception {
final int processors = Runtime.getRuntime().availableProcessors();
final int numThreads = Math.min(50, scaledRandomIntBetween((processors + 1) / 2, 4 * processors)); // up to 50 threads
Expand Down Expand Up @@ -110,6 +124,12 @@ public void testOnIndexAvailableForSearchIndexCompletesWithinTimeout() throws Ex
// pick longer wait than in the assertBusy that waits for below to ensure index has had enough time to initialize
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueSeconds(40));

// check listener added
assertThat(
securityIndexManager.getStateChangeListeners(),
hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class))
);

createSecurityIndexWithWaitForActiveShards();

assertBusy(
Expand All @@ -121,6 +141,12 @@ public void testOnIndexAvailableForSearchIndexCompletesWithinTimeout() throws Ex
// security index creation is complete and index is available for search; therefore whenIndexAvailableForSearch should report
// success in time
future.actionGet();

// check no remaining listeners
assertThat(
securityIndexManager.getStateChangeListeners(),
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -152,6 +178,69 @@ public void testOnIndexAvailableForSearchIndexAlreadyAvailable() throws Exceptio
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueSeconds(10));
future.actionGet();
}

// check no remaining listeners
assertThat(
securityIndexManager.getStateChangeListeners(),
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
);
}

@SuppressWarnings("unchecked")
public void testOnIndexAvailableForSearchIndexUnderConcurrentLoad() throws Exception {
final SecurityIndexManager securityIndexManager = internalCluster().getInstances(NativePrivilegeStore.class)
.iterator()
.next()
.getSecurityIndexManager();
// Long time out calls should all succeed
final List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < concurrentCallsToOnAvailable / 2; i++) {
final Future<Void> future = executor.submit(() -> {
try {
final ActionFuture<Void> f = new PlainActionFuture<>();
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) f, TimeValue.timeValueSeconds(40));
f.actionGet();
} catch (Exception ex) {
fail(ex, "should not have encountered exception");
}
return null;
});
futures.add(future);
}

// short time-out tasks should all time out
for (int i = 0; i < concurrentCallsToOnAvailable / 2; i++) {
final Future<Void> future = executor.submit(() -> {
expectThrows(ElasticsearchTimeoutException.class, () -> {
final ActionFuture<Void> f = new PlainActionFuture<>();
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) f, TimeValue.timeValueMillis(10));
f.actionGet();
});
return null;
});
futures.add(future);
}

// Sleep a second for short-running calls to timeout
Thread.sleep(1000);

createSecurityIndexWithWaitForActiveShards();
// ensure security index manager state is fully in the expected precondition state for this test (ready for search)
assertBusy(
() -> assertThat(securityIndexManager.isAvailable(SecurityIndexManager.Availability.SEARCH_SHARDS), is(true)),
30,
TimeUnit.SECONDS
);

for (var future : futures) {
future.get(10, TimeUnit.SECONDS);
}

// check no remaining listeners
assertThat(
securityIndexManager.getStateChangeListeners(),
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
);
}

@SuppressWarnings("unchecked")
Expand All @@ -163,9 +252,24 @@ public void testOnIndexAvailableForSearchIndexWaitTimeOut() {
.next()
.getSecurityIndexManager();

final ActionFuture<Void> future = new PlainActionFuture<>();
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueMillis(100));
expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
{
final ActionFuture<Void> future = new PlainActionFuture<>();
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueMillis(100));
expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
}

// Also works with 0 timeout
{
final ActionFuture<Void> future = new PlainActionFuture<>();
securityIndexManager.onIndexAvailableForSearch((ActionListener<Void>) future, TimeValue.timeValueMillis(0));
expectThrows(ElasticsearchTimeoutException.class, future::actionGet);
}

// check no remaining listeners
assertThat(
securityIndexManager.getStateChangeListeners(),
not(hasItem(instanceOf(SecurityIndexManager.StateConsumerWithCancellable.class)))
);
}

public void testSecurityIndexSettingsCannotBeChanged() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -420,45 +421,80 @@ public void accept(State previousState, State nextState) {
* Notifies {@code listener} once the security index is available, or calls {@code onFailure} on {@code timeout}.
*/
public void onIndexAvailableForSearch(ActionListener<Void> listener, TimeValue timeout) {
logger.info("Will wait for security index [{}] to become available for search", getConcreteIndexName());
logger.info("Will wait for security index [{}] for [{}] to become available for search", getConcreteIndexName(), timeout);

final ActionListener<Void> notifyOnceListener = ActionListener.notifyOnce(listener);
if (state.indexAvailableForSearch) {
logger.debug("Security index [{}] is already available", getConcreteIndexName());
listener.onResponse(null);
return;
}

final AtomicBoolean isDone = new AtomicBoolean(false);
final var indexAvailableForSearchListener = new StateConsumerWithCancellable() {
@Override
public void accept(SecurityIndexManager.State previousState, SecurityIndexManager.State nextState) {
if (nextState.indexAvailableForSearch) {
assert cancellable != null;
// cancel and removeStateListener are idempotent
cancellable.cancel();
removeStateListener(this);
notifyOnceListener.onResponse(null);
if (isDone.compareAndSet(false, true)) {
cancel();
removeStateListener(this);
listener.onResponse(null);
}
}
}
};
// add listener _before_ registering timeout -- this way we are guaranteed it gets removed (either by timeout below, or successful
// completion above)
addStateListener(indexAvailableForSearchListener);

// schedule failure handling on timeout -- keep reference to cancellable so a successful completion can cancel the timeout
indexAvailableForSearchListener.cancellable = client.threadPool().schedule(() -> {
removeStateListener(indexAvailableForSearchListener);
notifyOnceListener.onFailure(
new ElasticsearchTimeoutException(
"timed out waiting for security index [" + getConcreteIndexName() + "] to become available for search"
)
);
}, timeout, client.threadPool().generic());
indexAvailableForSearchListener.setCancellable(client.threadPool().schedule(() -> {
if (isDone.compareAndSet(false, true)) {
removeStateListener(indexAvailableForSearchListener);
listener.onFailure(
new ElasticsearchTimeoutException(
"timed out waiting for security index [" + getConcreteIndexName() + "] to become available for search"
)
);
}
}, timeout, client.threadPool().generic()));
}

// in case the state has meanwhile changed to available, return immediately
if (state.indexAvailableForSearch) {
indexAvailableForSearchListener.cancellable.cancel();
notifyOnceListener.onResponse(null);
} else {
addStateListener(indexAvailableForSearchListener);
}
// pkg-private for testing
List<BiConsumer<State, State>> getStateChangeListeners() {
return stateChangeListeners;
}

private abstract static class StateConsumerWithCancellable
/**
* This class ensures that if cancel() is called _before_ setCancellable(), the passed-in cancellable is still correctly cancelled on
* a subsequent setCancellable() call.
*/
// pkg-private for testing
abstract static class StateConsumerWithCancellable
implements
BiConsumer<SecurityIndexManager.State, SecurityIndexManager.State> {
volatile Scheduler.ScheduledCancellable cancellable;
BiConsumer<SecurityIndexManager.State, SecurityIndexManager.State>,
Scheduler.Cancellable {
private volatile Scheduler.ScheduledCancellable cancellable;
private volatile boolean cancelled = false;

void setCancellable(Scheduler.ScheduledCancellable cancellable) {
this.cancellable = cancellable;
if (cancelled) {
cancel();
}
}

public boolean cancel() {
cancelled = true;
if (cancellable != null) {
// cancellable is idempotent, so it's fine to potentially call it multiple times
return cancellable.cancel();
}
return isCancelled();
}

public boolean isCancelled() {
return cancelled;
}
}

private Tuple<Boolean, Boolean> checkIndexAvailable(ClusterState state) {
Expand Down

0 comments on commit 544f8f9

Please sign in to comment.