Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ public final class IngestMetadata implements Metadata.Custom {
// IngestMetadata is registered as custom metadata.
private final Map<String, PipelineConfiguration> pipelines;

private IngestMetadata() {
this.pipelines = Collections.emptyMap();
}

public IngestMetadata(Map<String, PipelineConfiguration> pipelines) {
this.pipelines = Collections.unmodifiableMap(pipelines);
}
Expand Down
115 changes: 58 additions & 57 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.metadata.DataStream.TimestampField;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -42,13 +42,11 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
Expand Down Expand Up @@ -110,38 +108,37 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
/**
* Cluster state task executor for ingest pipeline operations
*/
private static final ClusterStateTaskExecutor<PipelineClusterStateUpdateTask> PIPELINE_TASK_EXECUTOR = (currentState, taskContexts) -> {
ClusterState state = currentState;
static final ClusterStateTaskExecutor<PipelineClusterStateUpdateTask> PIPELINE_TASK_EXECUTOR = (currentState, taskContexts) -> {
final var allIndexMetadata = currentState.metadata().indices().values();
final IngestMetadata initialIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);
var currentIngestMetadata = initialIngestMetadata;
for (final var taskContext : taskContexts) {
try {
final var task = taskContext.getTask();
state = task.execute(state);
taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
currentIngestMetadata = task.execute(currentIngestMetadata, allIndexMetadata);
taskContext.success(task.listener.map(ignored -> AcknowledgedResponse.TRUE));
} catch (Exception e) {
taskContext.onFailure(e);
}
}
return state;
final var finalIngestMetadata = currentIngestMetadata;
return finalIngestMetadata == initialIngestMetadata
? currentState
: currentState.copyAndUpdateMetadata(b -> b.putCustom(IngestMetadata.TYPE, finalIngestMetadata));
};

/**
* Specialized cluster state update task specifically for ingest pipeline operations.
* These operations all receive an AcknowledgedResponse.
*/
private abstract static class PipelineClusterStateUpdateTask extends ClusterStateUpdateTask {
private final ActionListener<AcknowledgedResponse> listener;
abstract static class PipelineClusterStateUpdateTask implements ClusterStateTaskListener {
final ActionListener<AcknowledgedResponse> listener;

PipelineClusterStateUpdateTask(TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
super(timeout);
PipelineClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener) {
this.listener = listener;
}

public abstract ClusterState doExecute(ClusterState currentState) throws Exception;

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return doExecute(currentState);
}
public abstract IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata);

@Override
public void onFailure(Exception e) {
Expand All @@ -150,7 +147,7 @@ public void onFailure(Exception e) {

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
assert false : "should not be called";
}
}

Expand Down Expand Up @@ -336,26 +333,19 @@ public ScriptService getScriptService() {
public void delete(DeletePipelineRequest request, ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask(
"delete-pipeline-" + request.getId(),
new PipelineClusterStateUpdateTask(request.timeout(), listener) {
@Override
public ClusterState doExecute(ClusterState currentState) {
return innerDelete(request, currentState);
}
},
new DeletePipelineClusterStateUpdateTask(listener, request),
ClusterStateTaskConfig.build(Priority.NORMAL, request.masterNodeTimeout()),
PIPELINE_TASK_EXECUTOR
);
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}

static ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);
static IngestMetadata innerDelete(
DeletePipelineRequest request,
IngestMetadata currentIngestMetadata,
Collection<IndexMetadata> allIndexMetadata
) {
if (currentIngestMetadata == null) {
return currentState;
return null;
}
Map<String, PipelineConfiguration> pipelines = currentIngestMetadata.getPipelines();
Set<String> toRemove = new HashSet<>();
Expand All @@ -367,25 +357,20 @@ static ClusterState innerDelete(DeletePipelineRequest request, ClusterState curr
if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) {
throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId());
} else if (toRemove.isEmpty()) {
return currentState;
return currentIngestMetadata;
}
final Map<String, PipelineConfiguration> pipelinesCopy = new HashMap<>(pipelines);
ImmutableOpenMap<String, IndexMetadata> indices = currentState.metadata().indices();
for (String key : toRemove) {
validateNotInUse(key, indices);
validateNotInUse(key, allIndexMetadata);
pipelinesCopy.remove(key);
}
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metadata(
Metadata.builder(currentState.getMetadata()).putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy)).build()
);
return newState.build();
return new IngestMetadata(pipelinesCopy);
}

static void validateNotInUse(String pipeline, ImmutableOpenMap<String, IndexMetadata> indices) {
static void validateNotInUse(String pipeline, Collection<IndexMetadata> allIndexMetadata) {
List<String> defaultPipelineIndices = new ArrayList<>();
List<String> finalPipelineIndices = new ArrayList<>();
for (IndexMetadata indexMetadata : indices.values()) {
for (IndexMetadata indexMetadata : allIndexMetadata) {
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
if (pipeline.equals(defaultPipeline)) {
Expand Down Expand Up @@ -499,12 +484,7 @@ public void putPipeline(
validatePipeline(ingestInfos, request.getId(), config);
clusterService.submitStateUpdateTask(
"put-pipeline-" + request.getId(),
new PipelineClusterStateUpdateTask(request.timeout(), listener) {
@Override
public ClusterState doExecute(ClusterState currentState) {
return innerPut(request, currentState);
}
},
new PutPipelineClusterStateUpdateTask(listener, request),
ClusterStateTaskConfig.build(Priority.NORMAL, request.masterNodeTimeout()),
PIPELINE_TASK_EXECUTOR
);
Expand Down Expand Up @@ -570,9 +550,7 @@ private static List<Tuple<Processor, IngestMetric>> getProcessorMetrics(
}

// visible for testing
static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) {
IngestMetadata currentIngestMetadata = currentState.metadata().custom(IngestMetadata.TYPE);

static IngestMetadata innerPut(PutPipelineRequest request, IngestMetadata currentIngestMetadata) {
BytesReference pipelineSource = request.getSource();
if (request.getVersion() != null) {
var currentPipeline = currentIngestMetadata != null ? currentIngestMetadata.getPipelines().get(request.getId()) : null;
Expand Down Expand Up @@ -633,11 +611,7 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta
}

pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
ClusterState.Builder newState = ClusterState.builder(currentState);
newState.metadata(
Metadata.builder(currentState.getMetadata()).putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)).build()
);
return newState.build();
return new IngestMetadata(pipelines);
}

void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelineId, Map<String, Object> pipelineConfig)
Expand Down Expand Up @@ -1164,4 +1138,31 @@ static class PipelineHolder {
}
}

static class DeletePipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
private final DeletePipelineRequest request;

DeletePipelineClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener, DeletePipelineRequest request) {
super(listener);
this.request = request;
}

@Override
public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata) {
return innerDelete(request, currentIngestMetadata, allIndexMetadata);
}
}

static class PutPipelineClusterStateUpdateTask extends PipelineClusterStateUpdateTask {
private final PutPipelineRequest request;

PutPipelineClusterStateUpdateTask(ActionListener<AcknowledgedResponse> listener, PutPipelineRequest request) {
super(listener);
this.request = request;
}

@Override
public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<IndexMetadata> allIndexMetadata) {
return innerPut(request, currentIngestMetadata);
}
}
}
Loading