Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Locale;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -121,13 +122,13 @@ public void writeTo(StreamOutput out) throws IOException {
protected final Result result;

public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.shardId = Objects.requireNonNull(shardId);
this.type = Objects.requireNonNull(type);
this.id = Objects.requireNonNull(id);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.result = result;
this.result = Objects.requireNonNull(result);
}

// needed for deserialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,12 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
void markCurrentItemAsDropped() {
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
failedSlots.set(currentSlot);
final String id = indexRequest.id() == null ? "auto-generated" : indexRequest.id();
itemResponses.add(
new BulkItemResponse(currentSlot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetaData.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), indexRequest.id(), SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
indexRequest.version(), DocWriteResponse.Result.NOOP
)
)
Expand Down
21 changes: 21 additions & 0 deletions server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,25 @@ public void testPutWithPipelineFactoryError() throws Exception {
GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id2").get();
assertFalse(response.isFound());
}

public void testWithDedicatedMaster() throws Exception {
String masterOnlyNode = internalCluster().startMasterOnlyNode();
BytesReference source = BytesReference.bytes(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject());
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, XContentType.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkItemResponse item = client(masterOnlyNode).prepareBulk().add(
client().prepareIndex("test", "type").setSource("field", "value2", "drop", true).setPipeline("_id")).get()
.getItems()[0];
assertFalse(item.isFailed());
assertEquals("auto-generated", item.getResponse().getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
if (doc.hasField("fail") && doc.getFieldValue("fail", Boolean.class)) {
throw new IllegalArgumentException("test processor failed");
}
if (doc.hasField("drop") && doc.getFieldValue("drop", Boolean.class)) {
return null;
}
return doc;
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Processor used for testing, keeps track of how many times it is invoked and
Expand All @@ -31,24 +32,30 @@ public class TestProcessor implements Processor {

private final String type;
private final String tag;
private final Consumer<IngestDocument> ingestDocumentConsumer;
private final Function<IngestDocument, IngestDocument> ingestDocumentMapper;
private final AtomicInteger invokedCounter = new AtomicInteger();

public TestProcessor(Consumer<IngestDocument> ingestDocumentConsumer) {
this(null, "test-processor", ingestDocumentConsumer);
}

public TestProcessor(String tag, String type, Consumer<IngestDocument> ingestDocumentConsumer) {
this.ingestDocumentConsumer = ingestDocumentConsumer;
this(tag, type, id -> {
ingestDocumentConsumer.accept(id);
return id;
});
}

public TestProcessor(String tag, String type, Function<IngestDocument, IngestDocument> ingestDocumentMapper) {
this.ingestDocumentMapper = ingestDocumentMapper;
this.type = type;
this.tag = tag;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
invokedCounter.incrementAndGet();
ingestDocumentConsumer.accept(ingestDocument);
return ingestDocument;
return ingestDocumentMapper.apply(ingestDocument);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2002,6 +2002,7 @@ public String startMasterOnlyNode(Settings settings) {
.put(settings)
.put(Node.NODE_MASTER_SETTING.getKey(), true)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_INGEST_SETTING.getKey(), false)
.build();
return startNode(settings1);
}
Expand Down