Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
Expand All @@ -30,12 +32,14 @@
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
Expand All @@ -47,7 +51,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_HIDDEN;

Expand Down Expand Up @@ -106,20 +109,12 @@ public TransportAction(
this.autoCreateIndex = autoCreateIndex;
executor = (currentState, taskContexts) -> {
ClusterState state = currentState;
final Map<CreateIndexRequest, CreateIndexTask> successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size());
final Map<CreateIndexRequest, String> successfulRequests = Maps.newMapWithExpectedSize(taskContexts.size());
for (final var taskContext : taskContexts) {
final var task = taskContext.getTask();
try {
final CreateIndexTask successfulBefore = successfulRequests.putIfAbsent(task.request, task);
if (successfulBefore == null) {
state = task.execute(state);
} else {
// TODO: clean this up to just deduplicate the task listener instead of setting the generated name from
// duplicate tasks here and then waiting for shards to become available multiple times in parallel for
// each duplicate task
task.indexNameRef.set(successfulBefore.indexNameRef.get());
}
taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState), task);
state = task.execute(state, successfulRequests, taskContext);
assert successfulRequests.containsKey(task.request);
} catch (Exception e) {
taskContext.onFailure(e);
}
Expand All @@ -136,52 +131,102 @@ protected void masterOperation(
Task task,
CreateIndexRequest request,
ClusterState state,
ActionListener<CreateIndexResponse> finalListener
ActionListener<CreateIndexResponse> listener
) {
AtomicReference<String> indexNameRef = new AtomicReference<>();
ActionListener<AcknowledgedResponse> listener = ActionListener.wrap(response -> {
String indexName = indexNameRef.get();
assert indexName != null;
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(
new String[] { indexName },
ActiveShardCount.DEFAULT,
request.timeout(),
shardsAcked -> finalListener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)),
finalListener::onFailure
);
} else {
finalListener.onResponse(new CreateIndexResponse(false, false, indexName));
}
}, finalListener::onFailure);
CreateIndexTask clusterTask = new CreateIndexTask(request, listener, indexNameRef);
clusterService.submitStateUpdateTask("auto create [" + request.index() + "]", clusterTask, clusterTask, executor);
clusterService.submitStateUpdateTask(
"auto create [" + request.index() + "]",
new CreateIndexTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
executor
);
}

@Override
protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
}

// TODO: split the listner out of this task and use AckedClusterStateTaskListener directly to avoid the complicated listener
// construction upstream when instantiating these
private final class CreateIndexTask extends AckedClusterStateUpdateTask {

final CreateIndexRequest request;
final AtomicReference<String> indexNameRef;
private final class CreateIndexTask implements ClusterStateTaskListener {
private final CreateIndexRequest request;
private final ActionListener<CreateIndexResponse> listener;

CreateIndexTask(
CreateIndexRequest request,
ActionListener<AcknowledgedResponse> listener,
AtomicReference<String> indexNameRef
) {
super(Priority.URGENT, request, listener);
private CreateIndexTask(CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) {
this.request = request;
this.indexNameRef = indexNameRef;
this.listener = listener;
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "should not be called";
}

private ClusterStateAckListener getAckListener(String indexName) {
return new ClusterStateAckListener() {
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}

@Override
public void onAllNodesAcked(Exception e) {
if (e == null) {
activeShardsObserver.waitForActiveShards(
new String[] { indexName },
ActiveShardCount.DEFAULT,
request.timeout(),
shardsAcked -> listener.onResponse(new CreateIndexResponse(true, shardsAcked, indexName)),
listener::onFailure
);
} else {
listener.onResponse(new CreateIndexResponse(false, false, indexName));
}
}

@Override
public void onAckTimeout() {
listener.onResponse(new CreateIndexResponse(false, false, indexName));
}

@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
};
}

/**
* @param successfulRequests Cache of successful requests executed by this batch, to avoid failing duplicate requests with a
* {@link ResourceAlreadyExistsException}. If this method executes a request it should update this
* map.
*/
ClusterState execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth a comment on the implied contract w.r.t. returning a clusterState, calling success/failure on the taskContext, and updating successfulRequests? It looks to me like you're doing all the juggling just fine, but some future developer might be happy to add a new return case and think that just satisfying the compiler here is sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. I added a comment in 6be0514 and also an assertion to enforce it.

ClusterState currentState,
Map<CreateIndexRequest, String> successfulRequests,
ClusterStateTaskExecutor.TaskContext<CreateIndexTask> taskContext
) throws Exception {
final ActionListener<ClusterState> publishListener = new ActionListener<>() {
@Override
public void onResponse(ClusterState clusterState) {
// nothing to do here, listener is completed at the end of acking
}

@Override
public void onFailure(Exception e) {
CreateIndexTask.this.onFailure(e);
}
};

final var previousIndexName = successfulRequests.get(request);
if (previousIndexName != null) {
taskContext.success(publishListener, getAckListener(previousIndexName));
return currentState;
}

final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices.validateDataStreamAccess(
request.index(),
threadPool.getThreadContext()
Expand All @@ -208,11 +253,13 @@ public ClusterState execute(ClusterState currentState) throws Exception {
false
);
ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());

final var indexName = clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName();
taskContext.success(publishListener, getAckListener(indexName));
successfulRequests.put(request, indexName);
return clusterState;
} else {
String indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
indexNameRef.set(indexName);
final var indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
throw new IllegalStateException("system indices do not support date math expressions");
Expand All @@ -223,6 +270,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {

if (shouldAutoCreate == false) {
// The index already exists.
taskContext.success(publishListener, getAckListener(indexName));
successfulRequests.put(request, indexName);
return currentState;
}
}
Expand Down Expand Up @@ -259,7 +308,10 @@ public ClusterState execute(ClusterState currentState) throws Exception {
updateRequest = buildUpdateRequest(indexName);
}

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
final var clusterState = createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
taskContext.success(publishListener, getAckListener(indexName));
successfulRequests.put(request, indexName);
return clusterState;
}
}

Expand Down