Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -42,6 +44,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Supplier;

Expand Down Expand Up @@ -84,10 +87,14 @@ abstract static class TransportRetentionLeaseAction<T extends Request<T>> extend

@Override
protected ShardsIterator shards(final ClusterState state, final InternalRequest request) {
return state
final IndexShardRoutingTable shardRoutingTable = state
.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
.primaryShardIt();
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id());
if (shardRoutingTable.primaryShard().active()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransportSingleShardAction doesn't do our usual "chase the shard" pattern where we re-resolve shards on each node until we find a place where the shard is locally available. This means that if the coordinating node thinks the shard is active but the node with the shard didn't yet process the shard activation cluster state, I think this still goes wrong (i.e., the primary would not be in primary mode, which is activated when the cluster state is processed). I hope I'm wrong and please let me know what I'm missing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. In this case, I think that we should use the other approach that I considered. I can not think of any situations where we would want to acquire a permit on a non-active primary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was actually why I went looking - my instinct was that this should be done under permit and that the permit shouldn't be given under a non-initialized primary. We currently don't do that so that requires a much bigger change/vision. We can also have a targeted-check in asyncShardOperation that the shard is active before performing the operation. @ywelsch any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RetentionLeaseActions and TransportForgetFollowerAction can also possibly violate the assertion in acquirePrimaryOperationPermit that the shard is actually a primary (by the time the request arrives, the primary could have failed and a replica allocated instead). Ensuring this was previously left to the caller of this method.

We can explore changing acquirePrimaryOperationPermit and acquireAllPrimaryOperationsPermits to throw appropriate exceptions if the shard is not in primary mode (e.g. replica or initializing/relocated primary). TRA can then react to an IndexShardRelocatedException to delegate to the relocation target.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That takes it one level higher - ensuring that the primary is actually an active primary (rather than just an active shard). SGTM.

return shardRoutingTable.primaryShardIt();
} else {
return new PlainShardIterator(request.request().getShardId(), Collections.emptyList());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException;
Expand Down Expand Up @@ -266,7 +265,6 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception {

}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40089")
public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws Exception {
final String leaderIndex = "leader";
final int numberOfShards = randomIntBetween(1, 3);
Expand Down Expand Up @@ -463,7 +461,6 @@ public void testUnfollowRemovesRetentionLeases() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40089")
public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
Expand Down Expand Up @@ -534,7 +531,6 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40089")
public void testRetentionLeaseRenewedWhileFollowing() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
Expand Down Expand Up @@ -618,7 +614,6 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/39509")
@TestLogging(value = "org.elasticsearch.xpack.ccr:trace")
public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
Expand Down Expand Up @@ -748,7 +743,6 @@ public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Ex
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/40089")
public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Exception {
final String leaderIndex = "leader";
final String followerIndex = "follower";
Expand Down