Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
Expand All @@ -25,8 +27,11 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;

public class TransportDeleteDesiredNodesAction extends TransportMasterNodeAction<DeleteDesiredNodesAction.Request, ActionResponse.Empty> {
private final DesiredNodesClusterStateTaskExecutor taskExecutor;

private final ClusterStateTaskExecutor<DeleteDesiredNodesTask> taskExecutor = new DeleteDesiredNodesExecutor();

@Inject
public TransportDeleteDesiredNodesAction(
Expand All @@ -47,7 +52,6 @@ public TransportDeleteDesiredNodesAction(
in -> ActionResponse.Empty.INSTANCE,
ThreadPool.Names.SAME
);
this.taskExecutor = new DesiredNodesClusterStateTaskExecutor();
}

@Override
Expand All @@ -57,26 +61,35 @@ protected void masterOperation(
ClusterState state,
ActionListener<ActionResponse.Empty> listener
) throws Exception {
clusterService.submitStateUpdateTask("delete-desired-nodes", new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE));
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(ActionResponse.Empty.INSTANCE);
}
}, taskExecutor);
clusterService.submitStateUpdateTask(
"delete-desired-nodes",
new DeleteDesiredNodesTask(listener),
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
taskExecutor
);
}

@Override
protected ClusterBlockException checkBlock(DeleteDesiredNodesAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

private record DeleteDesiredNodesTask(ActionListener<ActionResponse.Empty> listener) implements ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

private static class DeleteDesiredNodesExecutor implements ClusterStateTaskExecutor<DeleteDesiredNodesTask> {
@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<DeleteDesiredNodesTask>> taskContexts) throws Exception {
for (final var taskContext : taskContexts) {
taskContext.success(
taskContext.getTask().listener().delegateFailure((l, s) -> l.onResponse(ActionResponse.Empty.INSTANCE))
);
}
return currentState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.desirednodes.DesiredNodesSettingsValidator;
Expand All @@ -28,13 +29,17 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.List;
import java.util.Locale;
import java.util.function.BiConsumer;

import static java.lang.String.format;

public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction<UpdateDesiredNodesRequest, UpdateDesiredNodesResponse> {

private final DesiredNodesSettingsValidator settingsValidator;
private final ClusterStateTaskExecutor<ClusterStateUpdateTask> taskExecutor;

private final ClusterStateTaskExecutor<UpdateDesiredNodesTask> taskExecutor = new UpdateDesiredNodesExecutor();

@Inject
public TransportUpdateDesiredNodesAction(
Expand All @@ -58,7 +63,6 @@ public TransportUpdateDesiredNodesAction(
ThreadPool.Names.SAME
);
this.settingsValidator = settingsValidator;
this.taskExecutor = new DesiredNodesClusterStateTaskExecutor();
}

@Override
Expand All @@ -75,47 +79,29 @@ protected void masterOperation(
) throws Exception {
try {
settingsValidator.validate(request.getNodes());

clusterService.submitStateUpdateTask(
"update-desired-nodes",
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
volatile boolean replacedExistingHistoryId = false;

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = updateDesiredNodes(currentState, request);
final DesiredNodes previousDesiredNodes = DesiredNodes.latestFromClusterState(currentState);
final DesiredNodes latestDesiredNodes = DesiredNodes.latestFromClusterState(updatedState);
replacedExistingHistoryId = previousDesiredNodes != null
&& previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false;
return updatedState;
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId));
}
},
new UpdateDesiredNodesTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
taskExecutor
);
} catch (Exception e) {
listener.onFailure(e);
}
}

static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredNodesRequest request) {
final DesiredNodesMetadata desiredNodesMetadata = DesiredNodesMetadata.fromClusterState(currentState);
final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();
static ClusterState replaceDesiredNodes(ClusterState clusterState, DesiredNodes newDesiredNodes) {
return clusterState.copyAndUpdateMetadata(
metadata -> metadata.putCustom(DesiredNodesMetadata.TYPE, new DesiredNodesMetadata(newDesiredNodes))
);
}

static DesiredNodes updateDesiredNodes(DesiredNodes latestDesiredNodes, UpdateDesiredNodesRequest request) {
final DesiredNodes proposedDesiredNodes = new DesiredNodes(request.getHistoryID(), request.getVersion(), request.getNodes());

if (latestDesiredNodes != null) {
if (latestDesiredNodes.equals(proposedDesiredNodes)) {
return currentState;
return latestDesiredNodes;
}

if (latestDesiredNodes.hasSameVersion(proposedDesiredNodes)) {
Expand All @@ -139,8 +125,46 @@ static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredN
}
}

return currentState.copyAndUpdateMetadata(
metadata -> metadata.putCustom(DesiredNodesMetadata.TYPE, new DesiredNodesMetadata(proposedDesiredNodes))
);
return proposedDesiredNodes;
}

private record UpdateDesiredNodesTask(UpdateDesiredNodesRequest request, ActionListener<UpdateDesiredNodesResponse> listener)
implements
ClusterStateTaskListener {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}

private static class UpdateDesiredNodesExecutor implements ClusterStateTaskExecutor<UpdateDesiredNodesTask> {

private static final BiConsumer<ActionListener<UpdateDesiredNodesResponse>, ClusterState> SUCCESS_SAME_HISTORY_ID = (l, s) -> l
.onResponse(new UpdateDesiredNodesResponse(false));
private static final BiConsumer<ActionListener<UpdateDesiredNodesResponse>, ClusterState> SUCCESS_NEW_HISTORY_ID = (l, s) -> l
.onResponse(new UpdateDesiredNodesResponse(true));

@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<UpdateDesiredNodesTask>> taskContexts) throws Exception {
final var initialDesiredNodes = DesiredNodesMetadata.fromClusterState(currentState).getLatestDesiredNodes();
var desiredNodes = initialDesiredNodes;
for (final var taskContext : taskContexts) {
final var previousDesiredNodes = desiredNodes;
try {
desiredNodes = updateDesiredNodes(desiredNodes, taskContext.getTask().request());
} catch (Exception e) {
taskContext.onFailure(e);
continue;
}
final var replacedExistingHistoryId = previousDesiredNodes != null
&& previousDesiredNodes.hasSameHistoryId(desiredNodes) == false;
taskContext.success(
taskContext.getTask()
.listener()
.delegateFailure(replacedExistingHistoryId ? SUCCESS_NEW_HISTORY_ID : SUCCESS_SAME_HISTORY_ID)
);
}
return desiredNodes == initialDesiredNodes ? currentState : replaceDesiredNodes(currentState, desiredNodes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ public void testUpdateDesiredNodes() {
request = new UpdateDesiredNodesRequest(desiredNodes.historyID(), desiredNodes.version() + 1, updatedNodes);
}

final ClusterState updatedClusterState = TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request);
final ClusterState updatedClusterState = TransportUpdateDesiredNodesAction.replaceDesiredNodes(
currentClusterState,
TransportUpdateDesiredNodesAction.updateDesiredNodes(DesiredNodes.latestFromClusterState(currentClusterState), request)
);
final DesiredNodesMetadata desiredNodesMetadata = updatedClusterState.metadata().custom(DesiredNodesMetadata.TYPE);
assertThat(desiredNodesMetadata, is(notNullValue()));

Expand All @@ -157,13 +160,7 @@ public void testUpdateDesiredNodes() {
}

public void testUpdatesAreIdempotent() {
final DesiredNodesMetadata desiredNodesMetadata = randomDesiredNodesMetadata();
final ClusterState currentClusterState = ClusterState.builder(new ClusterName(randomAlphaOfLength(10)))
.metadata(Metadata.builder().putCustom(DesiredNodesMetadata.TYPE, desiredNodesMetadata).build())
.build();

final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();

final DesiredNodes latestDesiredNodes = randomDesiredNodesMetadata().getLatestDesiredNodes();
final List<DesiredNode> equivalentDesiredNodesList = new ArrayList<>(latestDesiredNodes.nodes());
if (randomBoolean()) {
Collections.shuffle(equivalentDesiredNodesList, random());
Expand All @@ -174,19 +171,11 @@ public void testUpdatesAreIdempotent() {
equivalentDesiredNodesList
);

final ClusterState updatedClusterState = TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request);
final DesiredNodesMetadata updatedDesiredNodesMetadata = updatedClusterState.metadata().custom(DesiredNodesMetadata.TYPE);
assertThat(updatedDesiredNodesMetadata, is(notNullValue()));
assertThat(updatedDesiredNodesMetadata.getLatestDesiredNodes(), is(notNullValue()));
assertThat(updatedDesiredNodesMetadata.getLatestDesiredNodes(), is(equalTo(latestDesiredNodes)));
assertSame(latestDesiredNodes, TransportUpdateDesiredNodesAction.updateDesiredNodes(latestDesiredNodes, request));
}

public void testUpdateSameHistoryAndVersionWithDifferentContentsFails() {
final DesiredNodesMetadata desiredNodesMetadata = randomDesiredNodesMetadata();
final ClusterState currentClusterState = ClusterState.builder(new ClusterName(randomAlphaOfLength(10)))
.metadata(Metadata.builder().putCustom(DesiredNodesMetadata.TYPE, desiredNodesMetadata).build())
.build();

final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();
final UpdateDesiredNodesRequest request = new UpdateDesiredNodesRequest(
latestDesiredNodes.historyID(),
Expand All @@ -196,17 +185,13 @@ public void testUpdateSameHistoryAndVersionWithDifferentContentsFails() {

IllegalArgumentException exception = expectThrows(
IllegalArgumentException.class,
() -> TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request)
() -> TransportUpdateDesiredNodesAction.updateDesiredNodes(latestDesiredNodes, request)
);
assertThat(exception.getMessage(), containsString("already exists with a different definition"));
}

public void testBackwardUpdatesFails() {
final DesiredNodesMetadata desiredNodesMetadata = randomDesiredNodesMetadata();
final ClusterState currentClusterState = ClusterState.builder(new ClusterName(randomAlphaOfLength(10)))
.metadata(Metadata.builder().putCustom(DesiredNodesMetadata.TYPE, desiredNodesMetadata).build())
.build();

final DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();
final UpdateDesiredNodesRequest request = new UpdateDesiredNodesRequest(
latestDesiredNodes.historyID(),
Expand All @@ -216,7 +201,7 @@ public void testBackwardUpdatesFails() {

VersionConflictException exception = expectThrows(
VersionConflictException.class,
() -> TransportUpdateDesiredNodesAction.updateDesiredNodes(currentClusterState, request)
() -> TransportUpdateDesiredNodesAction.updateDesiredNodes(latestDesiredNodes, request)
);
assertThat(exception.getMessage(), containsString("has been superseded by version"));
}
Expand Down