Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ protected void masterOperation(
request.getStartTime(),
systemDataStreamDescriptor,
request.masterNodeTimeout(),
request.ackTimeout(),
true
request.ackTimeout()
);
metadataCreateDataStreamService.createDataStream(updateRequest, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RerouteBehavior;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
Expand Down Expand Up @@ -332,10 +333,9 @@ private ClusterState createDataStream(ClusterState state, String name, Instant t
time.toEpochMilli(),
null,
TimeValue.ZERO,
TimeValue.ZERO,
false
TimeValue.ZERO
);
return createDataStreamService.createDataStream(request, state, ActionListener.noop(), false);
return createDataStreamService.createDataStream(request, state, RerouteBehavior.SKIP_REROUTE, ActionListener.noop(), false);
}

private MetadataRolloverService.RolloverResult rolloverOver(ClusterState state, String name, Instant time) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RerouteBehavior;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -289,14 +290,12 @@ ClusterState execute(
request.index(),
dataStreamDescriptor,
request.masterNodeTimeout(),
request.ackTimeout(),
false
request.ackTimeout()
);
assert createRequest.performReroute() == false
: "rerouteCompletionIsNotRequired() assumes reroute is not called by underlying service";
ClusterState clusterState = metadataCreateDataStreamService.createDataStream(
createRequest,
currentState,
RerouteBehavior.SKIP_REROUTE,
rerouteCompletionIsNotRequired(),
request.isInitializeFailureStore()
);
Expand Down Expand Up @@ -372,12 +371,11 @@ ClusterState execute(
updateRequest = buildUpdateRequest(projectId, indexName);
}

assert updateRequest.performReroute() == false
: "rerouteCompletionIsNotRequired() assumes reroute is not called by underlying service";
final var clusterState = createIndexService.applyCreateIndexRequest(
currentState,
updateRequest,
false,
RerouteBehavior.SKIP_REROUTE,
rerouteCompletionIsNotRequired()
);
taskContext.success(getAckListener(indexName, allocationActionMultiListener));
Expand All @@ -392,7 +390,7 @@ private CreateIndexClusterStateUpdateRequest buildUpdateRequest(ProjectId projec
projectId,
indexName,
request.index()
).performReroute(false);
);
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
}
Expand All @@ -414,7 +412,7 @@ private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
projectId,
concreteIndexName,
request.index()
).performReroute(false);
);

updateRequest.waitForActiveShards(ActiveShardCount.ALL);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class CreateIndexClusterStateUpdateRequest {

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

private boolean performReroute = true;

private ComposableIndexTemplate matchingTemplate;

private boolean settingsSystemProvided = false;
Expand Down Expand Up @@ -202,15 +200,6 @@ public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName
return this;
}

public boolean performReroute() {
return performReroute;
}

public CreateIndexClusterStateUpdateRequest performReroute(boolean performReroute) {
this.performReroute = performReroute;
return this;
}

/**
* @return The composable index template that matches with the index that will be created by this request.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RerouteBehavior;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -265,12 +266,11 @@ private RolloverResult rolloverAlias(
rolloverIndexName,
createIndexRequest
);
assert createIndexClusterStateRequest.performReroute() == false
: "rerouteCompletionIsNotRequired() assumes reroute is not called by underlying service";
ClusterState newState = createIndexService.applyCreateIndexRequest(
projectState.cluster(),
createIndexClusterStateRequest,
silent,
RerouteBehavior.SKIP_REROUTE,
rerouteCompletionIsNotRequired()
);

Expand Down Expand Up @@ -419,13 +419,11 @@ yield new DataStreamAutoShardingEvent(
now
);
createIndexClusterStateRequest.setMatchingTemplate(templateV2);
assert createIndexClusterStateRequest.performReroute() == false
: "rerouteCompletionIsNotRequired() assumes reroute is not called by underlying service";

newState = createIndexService.applyCreateIndexRequest(
projectState.cluster(),
createIndexClusterStateRequest,
silent,
RerouteBehavior.SKIP_REROUTE,
(builder, indexMetadata) -> {
downgradeBrokenTsdbBackingIndices(dataStream, builder);
builder.put(
Expand Down Expand Up @@ -581,8 +579,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
return new CreateIndexClusterStateUpdateRequest(cause, projectId, targetIndexName, providedIndexName).settings(b.build())
.aliases(createIndexRequest.aliases())
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
.mappings(createIndexRequest.mappings())
.performReroute(false);
.mappings(createIndexRequest.mappings());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ public void createDataStream(CreateDataStreamClusterStateUpdateRequest request,
public ClusterState execute(ClusterState currentState) throws Exception {
// When we're manually creating a data stream (i.e. not an auto creation), we don't need to initialize the failure store
// because we don't need to redirect any failures in the same request.
ClusterState clusterState = createDataStream(request, currentState, delegate.reroute(), false);
ClusterState clusterState = createDataStream(
request,
currentState,
RerouteBehavior.PERFORM_REROUTE,
delegate.reroute(),
false
);
DataStream createdDataStream = clusterState.metadata().getProject(request.projectId()).dataStreams().get(request.name);
firstBackingIndexRef.set(createdDataStream.getIndices().get(0).getName());
if (createdDataStream.getFailureIndices().isEmpty() == false) {
Expand All @@ -129,6 +135,7 @@ private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String
public ClusterState createDataStream(
CreateDataStreamClusterStateUpdateRequest request,
ClusterState current,
RerouteBehavior rerouteBehavior,
ActionListener<Void> rerouteListener,
boolean initializeFailureStore
) throws Exception {
Expand All @@ -138,6 +145,7 @@ public ClusterState createDataStream(
current,
isDslOnlyMode,
request,
rerouteBehavior,
rerouteListener,
initializeFailureStore
);
Expand All @@ -149,8 +157,7 @@ public record CreateDataStreamClusterStateUpdateRequest(
long startTime,
@Nullable SystemDataStreamDescriptor systemDataStreamDescriptor,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
boolean performReroute
TimeValue ackTimeout
) {
public CreateDataStreamClusterStateUpdateRequest {
Objects.requireNonNull(name);
Expand All @@ -159,18 +166,17 @@ public record CreateDataStreamClusterStateUpdateRequest(
}

public CreateDataStreamClusterStateUpdateRequest(final ProjectId projectId, String name) {
this(projectId, name, null, TimeValue.ZERO, TimeValue.ZERO, true);
this(projectId, name, null, TimeValue.ZERO, TimeValue.ZERO);
}

public CreateDataStreamClusterStateUpdateRequest(
ProjectId projectId,
String name,
SystemDataStreamDescriptor systemDataStreamDescriptor,
TimeValue masterNodeTimeout,
TimeValue ackTimeout,
boolean performReroute
TimeValue ackTimeout
) {
this(projectId, name, System.currentTimeMillis(), systemDataStreamDescriptor, masterNodeTimeout, ackTimeout, performReroute);
this(projectId, name, System.currentTimeMillis(), systemDataStreamDescriptor, masterNodeTimeout, ackTimeout);
}

public boolean isSystem() {
Expand All @@ -184,6 +190,7 @@ static ClusterState createDataStream(
ClusterState currentState,
boolean isDslOnlyMode,
CreateDataStreamClusterStateUpdateRequest request,
RerouteBehavior rerouteBehavior,
ActionListener<Void> rerouteListener,
boolean initializeFailureStore
) throws Exception {
Expand All @@ -195,6 +202,7 @@ static ClusterState createDataStream(
request,
List.of(),
null,
rerouteBehavior,
rerouteListener,
initializeFailureStore
);
Expand All @@ -220,6 +228,7 @@ static ClusterState createDataStream(
CreateDataStreamClusterStateUpdateRequest request,
List<IndexMetadata> backingIndices,
IndexMetadata writeIndex,
RerouteBehavior rerouteBehavior,
ActionListener<Void> rerouteListener,
boolean initializeFailureStore
) throws Exception {
Expand Down Expand Up @@ -304,6 +313,7 @@ static ClusterState createDataStream(
metadataCreateIndexService,
currentState,
request,
rerouteBehavior,
rerouteListener,
dataStreamName,
systemDataStreamDescriptor,
Expand Down Expand Up @@ -374,6 +384,7 @@ private static ClusterState createBackingIndex(
MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
CreateDataStreamClusterStateUpdateRequest request,
RerouteBehavior rerouteBehavior,
ActionListener<Void> rerouteListener,
String dataStreamName,
SystemDataStreamDescriptor systemDataStreamDescriptor,
Expand All @@ -389,7 +400,6 @@ private static ClusterState createBackingIndex(
).dataStreamName(dataStreamName)
.systemDataStreamDescriptor(systemDataStreamDescriptor)
.nameResolvedInstant(request.startTime())
.performReroute(request.performReroute())
.setMatchingTemplate(template);

if (isSystem) {
Expand All @@ -399,7 +409,13 @@ private static ClusterState createBackingIndex(
}

try {
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false, rerouteListener);
currentState = metadataCreateIndexService.applyCreateIndexRequest(
currentState,
createIndexRequest,
false,
rerouteBehavior,
rerouteListener
);
} catch (ResourceAlreadyExistsException e) {
// Rethrow as ElasticsearchStatusException, so that bulk transport action doesn't ignore it during
// auto index/data stream creation.
Expand Down Expand Up @@ -436,7 +452,6 @@ public static ClusterState createFailureStoreIndex(
failureStoreIndexName
).dataStreamName(dataStreamName)
.nameResolvedInstant(nameResolvedInstant)
.performReroute(false)
.setMatchingTemplate(template)
.settings(indexSettings)
.isFailureIndex(true)
Expand All @@ -447,6 +462,7 @@ public static ClusterState createFailureStoreIndex(
currentState,
createIndexRequest,
false,
RerouteBehavior.SKIP_REROUTE,
metadataTransformer,
AllocationActionListener.rerouteCompletionIsNotRequired()
);
Expand Down
Loading
Loading