Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateAction;
import org.elasticsearch.action.admin.indices.create.AutoCreateIndexAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
Expand Down Expand Up @@ -584,6 +589,8 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
actions.register(NodesReloadSecureSettingsAction.INSTANCE, TransportNodesReloadSecureSettingsAction.class);
actions.register(AutoCreateAction.INSTANCE, AutoCreateAction.TransportAction.class);
actions.register(AutoCreateIndexAction.INSTANCE, AutoCreateIndexAction.TransportAction.class);

//Indexed scripts
actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Proxy action for auto creating an indexable resource.
* Currently only auto creates indices by redirecting to {@link AutoCreateIndexAction}.
*/
public final class AutoCreateAction extends ActionType<AutoCreateAction.Response> {

public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";

private AutoCreateAction() {
super(NAME, Response::new);
}

public static class Request extends MasterNodeReadRequest<Request> implements CompositeIndicesRequest {

private final Set<String> names;
private final String cause;
private final Boolean preferV2Templates;

public Request(Set<String> names, String cause, Boolean preferV2Templates) {
this.names = Objects.requireNonNull(names);
this.cause = Objects.requireNonNull(cause);
this.preferV2Templates = preferV2Templates;
assert names.size() != 0;
}

public Request(StreamInput in) throws IOException {
super(in);
this.names = in.readSet(StreamInput::readString);
this.cause = in.readString();
this.preferV2Templates = in.readOptionalBoolean();
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringCollection(names);
out.writeString(cause);
out.writeOptionalBoolean(preferV2Templates);
}

public Set<String> getNames() {
return names;
}

public String getCause() {
return cause;
}

public Boolean getPreferV2Templates() {
return preferV2Templates;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return names.equals(request.names) &&
Objects.equals(preferV2Templates, request.preferV2Templates);
}

@Override
public int hashCode() {
return Objects.hash(names, preferV2Templates);
}
}

public static class Response extends ActionResponse {

private final Map<String, Exception> failureByNames;

public Response(Map<String, Exception> failureByNames) {
this.failureByNames = failureByNames;
}

public Response(StreamInput in) throws IOException {
super(in);
failureByNames = in.readMap(StreamInput::readString, StreamInput::readException);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(failureByNames, StreamOutput::writeString, StreamOutput::writeException);
}

public Map<String, Exception> getFailureByNames() {
return failureByNames;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
/*
* Exception does not implement equals(...) so we will compute the hash code based on the key set and the
* messages.
*/
return Objects.equals(getKeys(failureByNames), getKeys(response.failureByNames)) &&
Objects.equals(getExceptionMessages(failureByNames), getExceptionMessages(response.failureByNames));
}

@Override
public int hashCode() {
/*
* Exception does not implement hash code so we will compute the hash code based on the key set and the
* messages.
*/
return Objects.hash(getKeys(failureByNames), getExceptionMessages(failureByNames));
}

private static List<String> getExceptionMessages(final Map<String, Exception> map) {
return map.values().stream().map(Throwable::getMessage).sorted(String::compareTo).collect(Collectors.toList());
}

private static List<String> getKeys(final Map<String, Exception> map) {
return map.keySet().stream().sorted(String::compareTo).collect(Collectors.toList());
}
}

public static final class TransportAction extends TransportMasterNodeAction<Request, Response> {

private final Client client;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.client = client;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected Response read(StreamInput in) throws IOException {
return new Response(in);
}

@Override
protected void masterOperation(Task task,
Request request,
ClusterState state,
ActionListener<Response> listener) {
autoCreate(request, listener, client);
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, request.names.toArray(new String[0]));
}
}

static void autoCreate(Request request, ActionListener<Response> listener, Client client) {
// For now always redirect to the auto create index action, because only indices get auto created.
final AtomicInteger counter = new AtomicInteger(request.getNames().size());
final Map<String, Exception> results = new HashMap<>();
for (String name : request.getNames()) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(name);
createIndexRequest.cause(request.getCause());
createIndexRequest.masterNodeTimeout(request.masterNodeTimeout());
createIndexRequest.preferV2Templates(request.getPreferV2Templates());
client.execute(AutoCreateIndexAction.INSTANCE, createIndexRequest, ActionListener.wrap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By delegating to a separate transport action, we risk that the retry on no longer master happens inside that transport action, making any decision we make on cluster state here invalid (or at least potentially stale). I would much prefer to do the cluster state update here.

That way we could also create all the indices and streams in one go, reducing the number of cluster states published.

I think auto-create does not have to be specific to data stream or index. It goes together with the document level privileges which are also agnostic of whether the data ends up in a data stream or an index.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think auto-create does not have to be specific to data stream or index. It goes together with the document level privileges which are also agnostic of whether the data ends up in a data stream or an index.

Yes and this is another reason to keep all the auto creation logic in a single action.

createIndexResponse -> {
// Maybe a bit overkill to ensure visibility of results map across threads...
synchronized (results) {
results.put(name, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map is in the response named "failureByNames". I think we should either not add successful indices into it, rename the map or separate the two parts in the response.

if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(results));
}
}
},
e -> {
synchronized (results) {
results.put(name, e);
if (counter.decrementAndGet() == 0) {
listener.onResponse(new Response(results));
}
}
}
));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

/**
* Api that auto creates an index that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateIndexAction extends ActionType<CreateIndexResponse> {

public static final AutoCreateIndexAction INSTANCE = new AutoCreateIndexAction();
public static final String NAME = "indices:admin/auto_create_index";

private AutoCreateIndexAction() {
super(NAME, CreateIndexResponse::new);
}

public static final class TransportAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new,
indexNameExpressionResolver);
this.createIndexService = createIndexService;
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected CreateIndexResponse read(StreamInput in) throws IOException {
return new CreateIndexResponse(in);
}

@Override
protected void masterOperation(Task task,
CreateIndexRequest request,
ClusterState state,
ActionListener<CreateIndexResponse> listener) throws Exception {
TransportCreateIndexAction.innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
}

@Override
protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.index());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,20 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt
@Override
protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
if (request.cause().length() == 0) {
request.cause("api");
}

innerCreateIndex(request, listener, indexNameExpressionResolver, createIndexService);
}

static void innerCreateIndex(CreateIndexRequest request,
ActionListener<CreateIndexResponse> listener,
IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService) {
final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
Expand Down
Loading