Skip to content

Commit 4796557

Browse files
authored
Add primary term to doc write response
This commit adds the primary term to the doc write response. Relates #24171
1 parent e82d800 commit 4796557

21 files changed

+197
-119
lines changed

core/src/main/java/org/elasticsearch/action/DocWriteResponse.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
5757
private static final String _ID = "_id";
5858
private static final String _VERSION = "_version";
5959
private static final String _SEQ_NO = "_seq_no";
60+
private static final String _PRIMARY_TERM = "_primary_term";
6061
private static final String RESULT = "result";
6162
private static final String FORCED_REFRESH = "forced_refresh";
6263

@@ -116,14 +117,16 @@ public void writeTo(StreamOutput out) throws IOException {
116117
private String type;
117118
private long version;
118119
private long seqNo;
120+
private long primaryTerm;
119121
private boolean forcedRefresh;
120122
protected Result result;
121123

122-
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long version, Result result) {
124+
public DocWriteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, Result result) {
123125
this.shardId = shardId;
124126
this.type = type;
125127
this.id = id;
126128
this.seqNo = seqNo;
129+
this.primaryTerm = primaryTerm;
127130
this.version = version;
128131
this.result = result;
129132
}
@@ -182,6 +185,15 @@ public long getSeqNo() {
182185
return seqNo;
183186
}
184187

188+
/**
189+
* The primary term for this change.
190+
*
191+
* @return the primary term
192+
*/
193+
public long getPrimaryTerm() {
194+
return primaryTerm;
195+
}
196+
185197
/**
186198
* Did this request force a refresh? Requests that set {@link WriteRequest#setRefreshPolicy(RefreshPolicy)} to
187199
* {@link RefreshPolicy#IMMEDIATE} will always return true for this. Requests that set it to {@link RefreshPolicy#WAIT_UNTIL} will
@@ -251,8 +263,10 @@ public void readFrom(StreamInput in) throws IOException {
251263
version = in.readZLong();
252264
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
253265
seqNo = in.readZLong();
266+
primaryTerm = in.readVLong();
254267
} else {
255268
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
269+
primaryTerm = 0;
256270
}
257271
forcedRefresh = in.readBoolean();
258272
result = Result.readFrom(in);
@@ -267,6 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
267281
out.writeZLong(version);
268282
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
269283
out.writeZLong(seqNo);
284+
out.writeVLong(primaryTerm);
270285
}
271286
out.writeBoolean(forcedRefresh);
272287
result.writeTo(out);
@@ -293,6 +308,7 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
293308
builder.field(_SHARDS, shardInfo);
294309
if (getSeqNo() >= 0) {
295310
builder.field(_SEQ_NO, getSeqNo());
311+
builder.field(_PRIMARY_TERM, getPrimaryTerm());
296312
}
297313
return builder;
298314
}
@@ -333,6 +349,8 @@ protected static void parseInnerToXContent(XContentParser parser, Builder contex
333349
context.setForcedRefresh(parser.booleanValue());
334350
} else if (_SEQ_NO.equals(currentFieldName)) {
335351
context.setSeqNo(parser.longValue());
352+
} else if (_PRIMARY_TERM.equals(currentFieldName)) {
353+
context.setPrimaryTerm(parser.longValue());
336354
} else {
337355
throwUnknownField(currentFieldName, parser.getTokenLocation());
338356
}
@@ -362,6 +380,7 @@ public abstract static class Builder {
362380
protected boolean forcedRefresh;
363381
protected ShardInfo shardInfo = null;
364382
protected Long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
383+
protected Long primaryTerm = 0L;
365384

366385
public ShardId getShardId() {
367386
return shardId;
@@ -407,6 +426,10 @@ public void setSeqNo(Long seqNo) {
407426
this.seqNo = seqNo;
408427
}
409428

429+
public void setPrimaryTerm(Long primaryTerm) {
430+
this.primaryTerm = primaryTerm;
431+
}
432+
410433
public abstract DocWriteResponse build();
411434
}
412435
}

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.index.VersionType;
5555
import org.elasticsearch.index.engine.Engine;
5656
import org.elasticsearch.index.engine.VersionConflictEngineException;
57+
import org.elasticsearch.index.get.GetResult;
5758
import org.elasticsearch.index.mapper.MapperParsingException;
5859
import org.elasticsearch.index.mapper.Mapping;
5960
import org.elasticsearch.index.mapper.SourceToParse;
@@ -142,7 +143,7 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
142143
return new BulkItemResultHolder(null, indexResult, bulkItemRequest);
143144
} else {
144145
IndexResponse response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(),
145-
indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated());
146+
indexResult.getSeqNo(), primary.getPrimaryTerm(), indexResult.getVersion(), indexResult.isCreated());
146147
return new BulkItemResultHolder(response, indexResult, bulkItemRequest);
147148
}
148149
}
@@ -155,7 +156,7 @@ private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest del
155156
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
156157
} else {
157158
DeleteResponse response = new DeleteResponse(primary.shardId(), deleteRequest.type(), deleteRequest.id(),
158-
deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound());
159+
deleteResult.getSeqNo(), primary.getPrimaryTerm(), deleteResult.getVersion(), deleteResult.isFound());
159160
return new BulkItemResultHolder(response, deleteResult, bulkItemRequest);
160161
}
161162
}
@@ -276,7 +277,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
276277
int requestIndex, UpdateHelper updateHelper,
277278
LongSupplier nowInMillis,
278279
final MappingUpdatePerformer mappingUpdater) throws Exception {
279-
Engine.Result updateOperationResult = null;
280+
Engine.Result result = null;
280281
UpdateResponse updateResponse = null;
281282
BulkItemRequest replicaRequest = request.items()[requestIndex];
282283
int maxAttempts = updateRequest.retryOnConflict();
@@ -288,7 +289,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
288289
} catch (Exception failure) {
289290
// we may fail translating a update to index or delete operation
290291
// we use index result to communicate failure while translating update request
291-
updateOperationResult = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
292+
result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO);
292293
break; // out of retry loop
293294
}
294295
// execute translated update request
@@ -298,34 +299,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
298299
IndexRequest indexRequest = translate.action();
299300
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
300301
indexRequest.process(mappingMd, request.index());
301-
updateOperationResult = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
302+
result = executeIndexRequestOnPrimary(indexRequest, primary, mappingUpdater);
302303
break;
303304
case DELETED:
304305
DeleteRequest deleteRequest = translate.action();
305-
updateOperationResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
306+
result = executeDeleteRequestOnPrimary(deleteRequest, primary);
306307
break;
307308
case NOOP:
308309
primary.noopUpdate(updateRequest.type());
309310
break;
310311
default: throw new IllegalStateException("Illegal update operation " + translate.getResponseResult());
311312
}
312-
if (updateOperationResult == null) {
313+
if (result == null) {
313314
// this is a noop operation
314315
updateResponse = translate.action();
315316
break; // out of retry loop
316-
} else if (updateOperationResult.hasFailure() == false) {
317+
} else if (result.hasFailure() == false) {
317318
// enrich update response and
318319
// set translated update (index/delete) request for replica execution in bulk items
319-
switch (updateOperationResult.getOperationType()) {
320+
switch (result.getOperationType()) {
320321
case INDEX:
322+
assert result instanceof Engine.IndexResult : result.getClass();
321323
IndexRequest updateIndexRequest = translate.action();
322-
final IndexResponse indexResponse = new IndexResponse(primary.shardId(),
323-
updateIndexRequest.type(), updateIndexRequest.id(), updateOperationResult.getSeqNo(),
324-
updateOperationResult.getVersion(), ((Engine.IndexResult) updateOperationResult).isCreated());
324+
final IndexResponse indexResponse = new IndexResponse(
325+
primary.shardId(),
326+
updateIndexRequest.type(),
327+
updateIndexRequest.id(),
328+
result.getSeqNo(),
329+
primary.getPrimaryTerm(),
330+
result.getVersion(),
331+
((Engine.IndexResult) result).isCreated());
325332
BytesReference indexSourceAsBytes = updateIndexRequest.source();
326-
updateResponse = new UpdateResponse(indexResponse.getShardInfo(),
327-
indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getSeqNo(),
328-
indexResponse.getVersion(), indexResponse.getResult());
333+
updateResponse = new UpdateResponse(
334+
indexResponse.getShardInfo(),
335+
indexResponse.getShardId(),
336+
indexResponse.getType(),
337+
indexResponse.getId(),
338+
indexResponse.getSeqNo(),
339+
indexResponse.getPrimaryTerm(),
340+
indexResponse.getVersion(),
341+
indexResponse.getResult());
329342
if ((updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) ||
330343
(updateRequest.fields() != null && updateRequest.fields().length > 0)) {
331344
Tuple<XContentType, Map<String, Object>> sourceAndContent =
@@ -337,29 +350,46 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
337350
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateIndexRequest);
338351
break;
339352
case DELETE:
353+
assert result instanceof Engine.DeleteResult : result.getClass();
340354
DeleteRequest updateDeleteRequest = translate.action();
341-
DeleteResponse deleteResponse = new DeleteResponse(primary.shardId(),
342-
updateDeleteRequest.type(), updateDeleteRequest.id(), updateOperationResult.getSeqNo(),
343-
updateOperationResult.getVersion(), ((Engine.DeleteResult) updateOperationResult).isFound());
344-
updateResponse = new UpdateResponse(deleteResponse.getShardInfo(),
345-
deleteResponse.getShardId(), deleteResponse.getType(), deleteResponse.getId(), deleteResponse.getSeqNo(),
346-
deleteResponse.getVersion(), deleteResponse.getResult());
347-
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest,
348-
request.index(), deleteResponse.getVersion(), translate.updatedSourceAsMap(),
349-
translate.updateSourceContentType(), null));
355+
DeleteResponse deleteResponse = new DeleteResponse(
356+
primary.shardId(),
357+
updateDeleteRequest.type(),
358+
updateDeleteRequest.id(),
359+
result.getSeqNo(),
360+
primary.getPrimaryTerm(),
361+
result.getVersion(),
362+
((Engine.DeleteResult) result).isFound());
363+
updateResponse = new UpdateResponse(
364+
deleteResponse.getShardInfo(),
365+
deleteResponse.getShardId(),
366+
deleteResponse.getType(),
367+
deleteResponse.getId(),
368+
deleteResponse.getSeqNo(),
369+
deleteResponse.getPrimaryTerm(),
370+
deleteResponse.getVersion(),
371+
deleteResponse.getResult());
372+
final GetResult getResult = updateHelper.extractGetResult(
373+
updateRequest,
374+
request.index(),
375+
deleteResponse.getVersion(),
376+
translate.updatedSourceAsMap(),
377+
translate.updateSourceContentType(),
378+
null);
379+
updateResponse.setGetResult(getResult);
350380
// set translated request as replica request
351381
replicaRequest = new BulkItemRequest(request.items()[requestIndex].id(), updateDeleteRequest);
352382
break;
353383
}
354-
assert updateOperationResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
384+
assert result.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO;
355385
// successful operation
356386
break; // out of retry loop
357-
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
387+
} else if (result.getFailure() instanceof VersionConflictEngineException == false) {
358388
// not a version conflict exception
359389
break; // out of retry loop
360390
}
361391
}
362-
return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest);
392+
return new BulkItemResultHolder(updateResponse, result, replicaRequest);
363393
}
364394

365395
/** Modes for executing item request on replica depending on corresponding primary execution result */
@@ -513,8 +543,7 @@ private static Engine.IndexResult executeIndexRequestOnReplica(
513543
try {
514544
operation = prepareIndexOperationOnReplica(primaryResponse, request, replica);
515545
} catch (MapperParsingException e) {
516-
return new Engine.IndexResult(e, primaryResponse.getVersion(),
517-
primaryResponse.getSeqNo());
546+
return new Engine.IndexResult(e, primaryResponse.getVersion(), primaryResponse.getSeqNo());
518547
}
519548

520549
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();

core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class DeleteResponse extends DocWriteResponse {
4242
public DeleteResponse() {
4343
}
4444

45-
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean found) {
46-
super(shardId, type, id, seqNo, version, found ? Result.DELETED : Result.NOT_FOUND);
45+
public DeleteResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean found) {
46+
super(shardId, type, id, seqNo, primaryTerm, version, found ? Result.DELETED : Result.NOT_FOUND);
4747
}
4848

4949
@Override
@@ -112,7 +112,7 @@ public void setFound(boolean found) {
112112

113113
@Override
114114
public DeleteResponse build() {
115-
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, version, found);
115+
DeleteResponse deleteResponse = new DeleteResponse(shardId, type, id, seqNo, primaryTerm, version, found);
116116
deleteResponse.setForcedRefresh(forcedRefresh);
117117
if (shardInfo != null) {
118118
deleteResponse.setShardInfo(shardInfo);

core/src/main/java/org/elasticsearch/action/index/IndexResponse.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public class IndexResponse extends DocWriteResponse {
4343
public IndexResponse() {
4444
}
4545

46-
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long version, boolean created) {
47-
super(shardId, type, id, seqNo, version, created ? Result.CREATED : Result.UPDATED);
46+
public IndexResponse(ShardId shardId, String type, String id, long seqNo, long primaryTerm, long version, boolean created) {
47+
super(shardId, type, id, seqNo, primaryTerm, version, created ? Result.CREATED : Result.UPDATED);
4848
}
4949

5050
@Override
@@ -62,6 +62,7 @@ public String toString() {
6262
builder.append(",version=").append(getVersion());
6363
builder.append(",result=").append(getResult().getLowercase());
6464
builder.append(",seqNo=").append(getSeqNo());
65+
builder.append(",primaryTerm=").append(getPrimaryTerm());
6566
builder.append(",shards=").append(Strings.toString(getShardInfo()));
6667
return builder.append("]").toString();
6768
}
@@ -114,7 +115,7 @@ public void setCreated(boolean created) {
114115

115116
@Override
116117
public IndexResponse build() {
117-
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, version, created);
118+
IndexResponse indexResponse = new IndexResponse(shardId, type, id, seqNo, primaryTerm, version, created);
118119
indexResponse.setForcedRefresh(forcedRefresh);
119120
if (shardInfo != null) {
120121
indexResponse.setShardInfo(shardInfo);

0 commit comments

Comments
 (0)