Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
Expand All @@ -18,6 +20,7 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -69,13 +72,27 @@ private MlIndexAndAlias() {}
* Adds an {@code alias} to that index if it was created,
* or to the index with the highest suffix if the index did not have to be created.
* The listener is notified with a {@code boolean} that informs whether the index or the alias were created.
* If the index is created, the listener is not called until the index is ready to use via the supplied alias,
* so that a method that receives a success response from this method can safely use the index immediately.
*/
public static void createIndexAndAliasIfNecessary(Client client,
ClusterState clusterState,
IndexNameExpressionResolver resolver,
String indexPatternPrefix,
String alias,
ActionListener<Boolean> listener) {
ActionListener<Boolean> finalListener) {

// If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready
ActionListener<Boolean> indexCreatedListener = ActionListener.wrap(
created -> {
if (created) {
waitForShardsReady(client, alias, finalListener);
} else {
finalListener.onResponse(false);
}
},
finalListener::onFailure
);

String legacyIndexWithoutSuffix = indexPatternPrefix;
String indexPattern = indexPatternPrefix + "*";
Expand All @@ -89,15 +106,15 @@ public static void createIndexAndAliasIfNecessary(Client client,

if (concreteIndexNames.length == 0) {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener);
return;
}
logger.error(
"There are no indices matching '{}' pattern but '{}' alias points at [{}]. This should never happen.",
indexPattern, alias, indexPointedByCurrentWriteAlias.get());
} else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) {
if (indexPointedByCurrentWriteAlias.isEmpty()) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, listener);
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, indexCreatedListener);
return;
}
if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) {
Expand All @@ -107,8 +124,8 @@ public static void createIndexAndAliasIfNecessary(Client client,
alias,
false,
ActionListener.wrap(
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, listener),
listener::onFailure)
unused -> updateWriteAlias(client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, indexCreatedListener),
finalListener::onFailure)
);
return;
}
Expand All @@ -119,12 +136,28 @@ public static void createIndexAndAliasIfNecessary(Client client,
if (indexPointedByCurrentWriteAlias.isEmpty()) {
assert concreteIndexNames.length > 0;
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
updateWriteAlias(client, alias, null, latestConcreteIndexName, listener);
updateWriteAlias(client, alias, null, latestConcreteIndexName, finalListener);
return;
}
}
// If the alias is set, there is nothing more to do.
listener.onResponse(false);
finalListener.onResponse(false);
}

private static void waitForShardsReady(Client client, String index, ActionListener<Boolean> listener) {
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(index)
.waitForYellowStatus()
.waitForNoRelocatingShards(true)
.waitForNoInitializingShards(true);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
healthRequest,
ActionListener.<ClusterHealthResponse>wrap(
response -> listener.onResponse(response.isTimedOut() == false),
listener::onFailure),
client.admin().cluster()::health
);
}

private static void createFirstConcreteIndex(Client client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
Expand All @@ -19,6 +21,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -66,6 +69,7 @@ public class MlIndexAndAliasTests extends ESTestCase {

private ThreadPool threadPool;
private IndicesAdminClient indicesAdminClient;
private ClusterAdminClient clusterAdminClient;
private AdminClient adminClient;
private Client client;
private ActionListener<Boolean> listener;
Expand All @@ -85,8 +89,17 @@ public void setUpMocks() {
when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE));
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any());

clusterAdminClient = mock(ClusterAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new ClusterHealthResponse());
return null;
}).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class));

adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);

client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102")
public void testDeleteExpiredDataWithStandardThrottle() throws Exception {
testExpiredDeletion(-1.0f, 100);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ public void testStopAndRestart() throws Exception {
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807")
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source";
indexData(sourceIndex, 100, 0);
Expand Down