Skip to content

Commit bb23d3b

Browse files
authored
Remove allocation id from replica replication response (#25488)
The replica replication response object has an extra allocationId field that contains the allocation id of the replica on which the request was executed. As we are sending the allocation id with the actual replica replication request, and check when executing the replica replication action that the allocation id of the replica shard is what we expect, there is no need to communicate back the allocation id as part of the response object.
1 parent 6ae4497 commit bb23d3b

File tree

8 files changed

+12
-34
lines changed

8 files changed

+12
-34
lines changed

core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ protected void sendReplicaRequest(
9393
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
9494
super.sendReplicaRequest(replicaRequest, node, listener);
9595
} else {
96-
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
96+
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
9797
}
9898
}
9999

core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
187187
public void onResponse(ReplicaResponse response) {
188188
successfulShards.incrementAndGet();
189189
try {
190-
primary.updateLocalCheckpointForShard(response.allocationId(), response.localCheckpoint());
190+
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
191191
} catch (final AlreadyClosedException e) {
192192
// okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
193193
} catch (final Exception e) {
@@ -429,9 +429,6 @@ public interface ReplicaResponse {
429429

430430
/** the local check point for the shard. see {@link org.elasticsearch.index.seqno.SequenceNumbersService#getLocalCheckpoint()} */
431431
long localCheckpoint();
432-
433-
/** the allocation id of the replica shard */
434-
String allocationId();
435432
}
436433

437434
public static class RetryOnPrimaryException extends ElasticsearchException {

core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5454
import org.elasticsearch.index.IndexNotFoundException;
5555
import org.elasticsearch.index.IndexService;
56+
import org.elasticsearch.index.seqno.SequenceNumbersService;
5657
import org.elasticsearch.index.shard.IndexShard;
5758
import org.elasticsearch.index.shard.IndexShardState;
5859
import org.elasticsearch.index.shard.ShardId;
@@ -523,8 +524,7 @@ public void onResponse(Releasable releasable) {
523524
try {
524525
final ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
525526
releasable.close(); // release shard operation lock before responding to caller
526-
final TransportReplicationAction.ReplicaResponse response =
527-
new ReplicaResponse(replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint());
527+
final TransportReplicationAction.ReplicaResponse response = new ReplicaResponse(replica.getLocalCheckpoint());
528528
replicaResult.respond(new ResponseListener(response));
529529
} catch (final Exception e) {
530530
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
@@ -1011,14 +1011,12 @@ public long globalCheckpoint() {
10111011

10121012
public static class ReplicaResponse extends ActionResponse implements ReplicationOperation.ReplicaResponse {
10131013
private long localCheckpoint;
1014-
private String allocationId;
10151014

10161015
ReplicaResponse() {
10171016

10181017
}
10191018

1020-
public ReplicaResponse(String allocationId, long localCheckpoint) {
1021-
this.allocationId = allocationId;
1019+
public ReplicaResponse(long localCheckpoint) {
10221020
this.localCheckpoint = localCheckpoint;
10231021
}
10241022

@@ -1027,9 +1025,9 @@ public void readFrom(StreamInput in) throws IOException {
10271025
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
10281026
super.readFrom(in);
10291027
localCheckpoint = in.readZLong();
1030-
allocationId = in.readString();
10311028
} else {
10321029
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
1030+
localCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
10331031
}
10341032
}
10351033

@@ -1038,7 +1036,6 @@ public void writeTo(StreamOutput out) throws IOException {
10381036
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
10391037
super.writeTo(out);
10401038
out.writeZLong(localCheckpoint);
1041-
out.writeString(allocationId);
10421039
} else {
10431040
// we use to write empty responses
10441041
Empty.INSTANCE.writeTo(out);
@@ -1049,11 +1046,6 @@ public void writeTo(StreamOutput out) throws IOException {
10491046
public long localCheckpoint() {
10501047
return localCheckpoint;
10511048
}
1052-
1053-
@Override
1054-
public String allocationId() {
1055-
return allocationId;
1056-
}
10571049
}
10581050

10591051
/**

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected void sendReplicaRequest(
8989
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
9090
super.sendReplicaRequest(replicaRequest, node, listener);
9191
} else {
92-
listener.onResponse(new ReplicaResponse(replicaRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO));
92+
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
9393
}
9494
}
9595

core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -464,23 +464,16 @@ public long globalCheckpoint() {
464464
}
465465

466466
static class ReplicaResponse implements ReplicationOperation.ReplicaResponse {
467-
final String allocationId;
468467
final long localCheckpoint;
469468

470-
ReplicaResponse(String allocationId, long localCheckpoint) {
471-
this.allocationId = allocationId;
469+
ReplicaResponse(long localCheckpoint) {
472470
this.localCheckpoint = localCheckpoint;
473471
}
474472

475473
@Override
476474
public long localCheckpoint() {
477475
return localCheckpoint;
478476
}
479-
480-
@Override
481-
public String allocationId() {
482-
return allocationId;
483-
}
484477
}
485478

486479
static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
@@ -515,7 +508,7 @@ public void performOn(
515508
final String allocationId = replica.allocationId().getId();
516509
Long existing = generatedLocalCheckpoints.put(allocationId, checkpoint);
517510
assertNull(existing);
518-
listener.onResponse(new ReplicaResponse(allocationId, checkpoint));
511+
listener.onResponse(new ReplicaResponse(checkpoint));
519512
}
520513
}
521514

core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -636,8 +636,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
636636
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
637637
assertThat(captures, arrayWithSize(1));
638638
if (randomBoolean()) {
639-
final TransportReplicationAction.ReplicaResponse response =
640-
new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
639+
final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
641640
transport.handleResponse(captures[0].requestId, response);
642641
assertTrue(listener.isDone());
643642
assertThat(listener.get(), equalTo(response));

core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,7 @@ public void testReplicaProxy() throws InterruptedException, ExecutionException {
286286
CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear();
287287
assertThat(captures, arrayWithSize(1));
288288
if (randomBoolean()) {
289-
final TransportReplicationAction.ReplicaResponse response =
290-
new TransportReplicationAction.ReplicaResponse(randomAlphaOfLength(10), randomLong());
289+
final TransportReplicationAction.ReplicaResponse response = new TransportReplicationAction.ReplicaResponse(randomLong());
291290
transport.handleResponse(captures[0].requestId, response);
292291
assertTrue(listener.isDone());
293292
assertThat(listener.get(), equalTo(response));

core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -537,9 +537,7 @@ public void onResponse(Releasable releasable) {
537537
try {
538538
performOnReplica(request, replica);
539539
releasable.close();
540-
listener.onResponse(
541-
new ReplicaResponse(
542-
replica.routingEntry().allocationId().getId(), replica.getLocalCheckpoint()));
540+
listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint()));
543541
} catch (final Exception e) {
544542
Releasables.closeWhileHandlingException(releasable);
545543
listener.onFailure(e);

0 commit comments

Comments
 (0)