Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions docs/plugins/ingest.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
[[ingest]]
== Ingest Plugin

TODO

=== Put pipeline API

The put pipeline api adds pipelines and updates existing pipelines in the cluster.

[source,js]
--------------------------------------------------
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"processors" : [
{
"simple" : {
// settings
}
},
// other processors
]
}
--------------------------------------------------
// AUTOSENSE

NOTE: Each ingest node updates its processors asynchronously in the background, so it may take a few seconds for all
nodes to have the latest version of the pipeline.

=== Get pipeline API

The get pipeline api returns pipelines based on id. This api always returns a local reference of the pipeline.

[source,js]
--------------------------------------------------
GET _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE

Example response:

[source,js]
--------------------------------------------------
{
"my-pipeline-id": {
"_source" : {
"description": "describe pipeline",
"processors": [
{
"simple" : {
// settings
}
},
// other processors
]
},
"_version" : 0
}
}
--------------------------------------------------

For each returned pipeline the source and the version is returned.
The version is useful for knowing what version of the pipeline the node has.
Multiple ids can be provided at the same time. Also wildcards are supported.

=== Delete pipeline API

The delete pipeline api deletes pipelines by id.

[source,js]
--------------------------------------------------
DELETE _ingest/pipeline/my-pipeline-id
--------------------------------------------------
// AUTOSENSE
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,15 @@ public List<Processor> getProcessors() {

public final static class Builder {

private final String name;
private final String id;
private String description;
private List<Processor> processors = new ArrayList<>();

public Builder(String name) {
this.name = name;
public Builder(String id) {
this.id = id;
}

public Builder(Map<String, Object> config, Map<String, Processor.Builder.Factory> processorRegistry) {
name = (String) config.get("name");
public void fromMap(Map<String, Object> config, Map<String, Processor.Builder.Factory> processorRegistry) {
description = (String) config.get("description");
@SuppressWarnings("unchecked")
List<Map<String, Map<String, Object>>> processors = (List<Map<String, Map<String, Object>>>) config.get("processors");
Expand Down Expand Up @@ -111,7 +110,7 @@ public void addProcessors(Processor.Builder... processors) {
}

public Pipeline build() {
return new Pipeline(name, description, Collections.unmodifiableList(processors));
return new Pipeline(id, description, Collections.unmodifiableList(processors));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.SimpleProcessor;
import org.elasticsearch.plugin.ingest.rest.IngestRestFilter;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -39,7 +37,7 @@ protected void configure() {
binder().bind(IngestRestFilter.class).asEagerSingleton();
binder().bind(PipelineExecutionService.class).asEagerSingleton();
binder().bind(PipelineStore.class).asEagerSingleton();
binder().bind(PipelineConfigDocReader.class).asEagerSingleton();
binder().bind(PipelineStoreClient.class).asEagerSingleton();

registerProcessor(SimpleProcessor.TYPE, SimpleProcessor.Builder.Factory.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@
package org.elasticsearch.plugin.ingest;

import org.elasticsearch.action.ActionModule;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.ingest.rest.RestDeletePipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestGetPipelineAction;
import org.elasticsearch.plugin.ingest.rest.RestPutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineAction;
import org.elasticsearch.plugin.ingest.transport.get.GetPipelineTransportAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineAction;
import org.elasticsearch.plugin.ingest.transport.put.PutPipelineTransportAction;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.action.RestActionModule;
import org.elasticsearch.rest.RestModule;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,9 +52,11 @@ public class IngestPlugin extends Plugin {
public static final String NAME = "ingest";

private final Settings nodeSettings;
private final boolean transportClient;

public IngestPlugin(Settings nodeSettings) {
this.nodeSettings = nodeSettings;
transportClient = "transport".equals(nodeSettings.get(Client.CLIENT_TYPE_SETTING));
}

@Override
Expand All @@ -59,12 +71,20 @@ public String description() {

@Override
public Collection<Module> nodeModules() {
return Collections.singletonList(new IngestModule());
if (transportClient) {
return Collections.emptyList();
} else {
return Collections.singletonList(new IngestModule());
}
}

@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
return Arrays.asList(PipelineStore.class, PipelineConfigDocReader.class);
if (transportClient) {
return Collections.emptyList();
} else {
return Arrays.asList(PipelineStore.class, PipelineStoreClient.class);
}
}

@Override
Expand All @@ -75,7 +95,18 @@ public Settings additionalSettings() {
}

public void onModule(ActionModule module) {
module.registerFilter(IngestActionFilter.class);
if (!transportClient) {
module.registerFilter(IngestActionFilter.class);
}
module.registerAction(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class);
module.registerAction(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class);
module.registerAction(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class);
}

public void onModule(RestModule restModule) {
restModule.addRestAction(RestPutPipelineAction.class);
restModule.addRestAction(RestGetPipelineAction.class);
restModule.addRestAction(RestDeletePipelineAction.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayService;
Expand All @@ -34,10 +35,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;

public class PipelineStore extends AbstractLifecycleComponent {

Expand All @@ -47,13 +45,13 @@ public class PipelineStore extends AbstractLifecycleComponent {
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final TimeValue pipelineUpdateInterval;
private final PipelineConfigDocReader configDocReader;
private final PipelineStoreClient configDocReader;
private final Map<String, Processor.Builder.Factory> processorFactoryRegistry;

private volatile Map<String, PipelineReference> pipelines = new HashMap<>();

@Inject
public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineConfigDocReader configDocReader, Map<String, Processor.Builder.Factory> processors) {
public PipelineStore(Settings settings, ThreadPool threadPool, ClusterService clusterService, PipelineStoreClient configDocReader, Map<String, Processor.Builder.Factory> processors) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand Down Expand Up @@ -84,10 +82,32 @@ public Pipeline get(String id) {
}
}

public List<PipelineReference> getReference(String... ids) {
List<PipelineReference> result = new ArrayList<>(ids.length);
for (String id : ids) {
if (Regex.isSimpleMatchPattern(id)) {
for (Map.Entry<String, PipelineReference> entry : pipelines.entrySet()) {
if (Regex.simpleMatch(id, entry.getKey())) {
result.add(entry.getValue());
}
}
} else {
PipelineReference reference = pipelines.get(id);
if (reference != null) {
result.add(reference);
}
}
}
return result;
}

void updatePipelines() {
// note: this process isn't fast or smart, but the idea is that there will not be many pipelines,
// so for that reason the goal is to keep the update logic simple.

int changed = 0;
Map<String, PipelineReference> newPipelines = new HashMap<>(pipelines);
for (SearchHit hit : configDocReader.readAll()) {
for (SearchHit hit : configDocReader.readAllPipelines()) {
String pipelineId = hit.getId();
BytesReference pipelineSource = hit.getSourceRef();
PipelineReference previous = newPipelines.get(pipelineId);
Expand All @@ -98,15 +118,24 @@ void updatePipelines() {
}

changed++;
Pipeline.Builder builder = new Pipeline.Builder(hit.sourceAsMap(), processorFactoryRegistry);
Pipeline.Builder builder = new Pipeline.Builder(hit.getId());
builder.fromMap(hit.sourceAsMap(), processorFactoryRegistry);
newPipelines.put(pipelineId, new PipelineReference(builder.build(), hit.getVersion(), pipelineSource));
}

if (changed != 0) {
logger.debug("adding or updating [{}] pipelines", changed);
int removed = 0;
for (String existingPipelineId : pipelines.keySet()) {
if (!configDocReader.existPipeline(existingPipelineId)) {
newPipelines.remove(existingPipelineId);
removed++;
}
}

if (changed != 0 || removed != 0) {
logger.debug("adding or updating [{}] pipelines and [{}] pipelines removed", changed, removed);
pipelines = newPipelines;
} else {
logger.debug("adding no new pipelines");
logger.debug("no pipelines changes detected");
}
}

Expand Down Expand Up @@ -142,7 +171,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}
}

static class PipelineReference {
public static class PipelineReference {

private final Pipeline pipeline;
private final long version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.plugin.ingest;

import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
Expand All @@ -33,14 +34,14 @@
import java.util.Collections;
import java.util.Iterator;

public class PipelineConfigDocReader extends AbstractLifecycleComponent {
public class PipelineStoreClient extends AbstractLifecycleComponent {

private volatile Client client;
private final Injector injector;
private final TimeValue scrollTimeout;

@Inject
public PipelineConfigDocReader(Settings settings, Injector injector) {
public PipelineStoreClient(Settings settings, Injector injector) {
super(settings);
this.injector = injector;
this.scrollTimeout = settings.getAsTime("ingest.pipeline.store.scroll.timeout", TimeValue.timeValueSeconds(30));
Expand All @@ -60,7 +61,7 @@ protected void doStop() {
protected void doClose() {
}

public Iterable<SearchHit> readAll() {
public Iterable<SearchHit> readAllPipelines() {
// TODO: the search should be replaced with an ingest API when it is available
SearchResponse searchResponse = client.prepareSearch(PipelineStore.INDEX)
.setVersion(true)
Expand All @@ -81,6 +82,11 @@ public Iterator<SearchHit> iterator() {
};
}

public boolean existPipeline(String pipelineId) {
GetResponse response = client.prepareGet(PipelineStore.INDEX, PipelineStore.TYPE, pipelineId).get();
return response.isExists();
}

class SearchScrollIterator implements Iterator<SearchHit> {

private SearchResponse searchResponse;
Expand Down
Loading