Skip to content

Commit f597c11

Browse files
authored
Add a BWC layer to BulkItemRequest so 6.0 won't need to mutate it. (#25512)
This is a companion PR to #25511 . See there for more explanation and background.
1 parent 6a0e64c commit f597c11

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.DocWriteRequest;
24+
import org.elasticsearch.action.DocWriteResponse;
2325
import org.elasticsearch.action.IndicesRequest;
24-
import org.elasticsearch.action.delete.DeleteRequest;
25-
import org.elasticsearch.action.index.IndexRequest;
26-
import org.elasticsearch.action.update.UpdateRequest;
2726
import org.elasticsearch.common.io.stream.StreamInput;
2827
import org.elasticsearch.common.io.stream.StreamOutput;
2928
import org.elasticsearch.common.io.stream.Streamable;
@@ -38,7 +37,6 @@ public class BulkItemRequest implements Streamable {
3837
private int id;
3938
private DocWriteRequest request;
4039
private volatile BulkItemResponse primaryResponse;
41-
private volatile boolean ignoreOnReplica;
4240

4341
BulkItemRequest() {
4442

@@ -71,15 +69,9 @@ void setPrimaryResponse(BulkItemResponse primaryResponse) {
7169
this.primaryResponse = primaryResponse;
7270
}
7371

74-
/**
75-
* Marks this request to be ignored and *not* execute on a replica.
76-
*/
77-
void setIgnoreOnReplica() {
78-
this.ignoreOnReplica = true;
79-
}
80-
8172
boolean isIgnoreOnReplica() {
82-
return ignoreOnReplica;
73+
return primaryResponse != null &&
74+
(primaryResponse.isFailed() || primaryResponse.getResponse().getResult() == DocWriteResponse.Result.NOOP);
8375
}
8476

8577
public static BulkItemRequest readBulkItem(StreamInput in) throws IOException {
@@ -94,15 +86,27 @@ public void readFrom(StreamInput in) throws IOException {
9486
request = DocWriteRequest.readDocumentRequest(in);
9587
if (in.readBoolean()) {
9688
primaryResponse = BulkItemResponse.readBulkItem(in);
89+
// This is a bwc layer for 6.0 which no longer mutates the requests with these
90+
// Since 5.x still requires it we do it here. Note that these are harmless
91+
// as both operations are idempotent. This is something we rely on and assert on
92+
// in InternalEngine.planIndexingAsNonPrimary()
93+
request.version(primaryResponse.getVersion());
94+
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
95+
}
96+
if (in.getVersion().before(Version.V_5_6_0_UNRELEASED)) {
97+
boolean ignoreOnReplica = in.readBoolean();
98+
assert ignoreOnReplica == isIgnoreOnReplica() :
99+
"ignoreOnReplica mismatch. wire [" + ignoreOnReplica + "], ours [" + isIgnoreOnReplica() + "]";
97100
}
98-
ignoreOnReplica = in.readBoolean();
99101
}
100102

101103
@Override
102104
public void writeTo(StreamOutput out) throws IOException {
103105
out.writeVInt(id);
104106
DocWriteRequest.writeDocumentRequest(out, request);
105107
out.writeOptionalStreamable(primaryResponse);
106-
out.writeBoolean(ignoreOnReplica);
108+
if (out.getVersion().before(Version.V_5_6_0_UNRELEASED)) {
109+
out.writeBoolean(isIgnoreOnReplica());
110+
}
107111
}
108112
}

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.elasticsearch.action.index.IndexRequest;
3030
import org.elasticsearch.action.index.IndexResponse;
3131
import org.elasticsearch.action.support.ActionFilters;
32-
import org.elasticsearch.action.support.replication.ReplicationOperation;
3332
import org.elasticsearch.action.support.TransportActions;
33+
import org.elasticsearch.action.support.replication.ReplicationOperation;
3434
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
3535
import org.elasticsearch.action.support.replication.TransportWriteAction;
3636
import org.elasticsearch.action.update.UpdateHelper;
@@ -186,15 +186,16 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
186186
if (operationResult == null) { // in case of noop update operation
187187
assert response.getResult() == DocWriteResponse.Result.NOOP
188188
: "only noop update can have null operation";
189-
replicaRequest.setIgnoreOnReplica();
190189
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), opType, response));
190+
assert replicaRequest.isIgnoreOnReplica();
191191
} else if (operationResult.hasFailure() == false) {
192192
location = locationToSync(location, operationResult.getTranslogLocation());
193193
BulkItemResponse primaryResponse = new BulkItemResponse(replicaRequest.id(), opType, response);
194194
replicaRequest.setPrimaryResponse(primaryResponse);
195195
// set an empty ShardInfo to indicate no shards participated in the request execution
196196
// so we can safely send it to the replicas. We won't use it in the real response though.
197197
primaryResponse.getResponse().setShardInfo(new ShardInfo());
198+
assert replicaRequest.isIgnoreOnReplica() == false;
198199
} else {
199200
DocWriteRequest docWriteRequest = replicaRequest.request();
200201
Exception failure = operationResult.getFailure();
@@ -209,9 +210,9 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
209210
// again, due to primary relocation and only processing up to N bulk items when the shard gets closed)
210211
// then just use the response we got from the successful execution
211212
if (replicaRequest.getPrimaryResponse() == null || isConflictException(failure) == false) {
212-
replicaRequest.setIgnoreOnReplica();
213213
replicaRequest.setPrimaryResponse(new BulkItemResponse(replicaRequest.id(), docWriteRequest.opType(),
214214
new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)));
215+
assert replicaRequest.isIgnoreOnReplica();
215216
}
216217
}
217218
assert replicaRequest.getPrimaryResponse() != null;

0 commit comments

Comments
 (0)