|
43 | 43 | import org.elasticsearch.plugins.Plugin; |
44 | 44 | import org.elasticsearch.snapshots.RestoreInfo; |
45 | 45 | import org.elasticsearch.snapshots.RestoreService; |
46 | | -import org.elasticsearch.test.junit.annotations.TestLogging; |
47 | 46 | import org.elasticsearch.test.transport.MockTransportService; |
48 | 47 | import org.elasticsearch.transport.ConnectTransportException; |
49 | 48 | import org.elasticsearch.transport.RemoteTransportException; |
|
87 | 86 | import static org.hamcrest.Matchers.greaterThan; |
88 | 87 | import static org.hamcrest.Matchers.hasSize; |
89 | 88 |
|
90 | | -@TestLogging( |
91 | | - // issue: https://github.com/elastic/elasticsearch/issues/45192 |
92 | | - value = "org.elasticsearch.xpack.ccr:trace,org.elasticsearch.indices.recovery:trace,org.elasticsearch.index.seqno:debug") |
93 | 89 | public class CcrRetentionLeaseIT extends CcrIntegTestCase { |
94 | 90 |
|
95 | 91 | public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin { |
@@ -782,40 +778,42 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep |
782 | 778 | (connection, requestId, action, request, options) -> { |
783 | 779 | if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) |
784 | 780 | || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { |
785 | | - senderTransportService.clearAllRules(); |
786 | 781 | final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; |
787 | 782 | final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); |
788 | | - assertThat(retentionLeaseId, equalTo(renewRequest.getId())); |
789 | | - logger.info("--> intercepting renewal request for retention lease [{}]", retentionLeaseId); |
790 | | - final String primaryShardNodeId = |
791 | | - getLeaderCluster() |
792 | | - .clusterService() |
793 | | - .state() |
794 | | - .routingTable() |
795 | | - .index(leaderIndex) |
796 | | - .shard(renewRequest.getShardId().id()) |
797 | | - .primaryShard() |
798 | | - .currentNodeId(); |
799 | | - final String primaryShardNodeName = |
800 | | - getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); |
801 | | - final IndexShard primary = |
802 | | - getLeaderCluster() |
803 | | - .getInstance(IndicesService.class, primaryShardNodeName) |
804 | | - .getShardOrNull(renewRequest.getShardId()); |
805 | | - final CountDownLatch innerLatch = new CountDownLatch(1); |
806 | | - // this forces the background renewal from following to face a retention lease not found exception |
807 | | - logger.info("--> removing retention lease [{}] on the leader", retentionLeaseId); |
808 | | - primary.removeRetentionLease(retentionLeaseId, |
809 | | - ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); |
810 | | - logger.info("--> waiting for the removed retention lease [{}] to be synced on the leader", retentionLeaseId); |
811 | | - try { |
812 | | - innerLatch.await(); |
813 | | - } catch (final InterruptedException e) { |
814 | | - Thread.currentThread().interrupt(); |
815 | | - fail(e.toString()); |
| 783 | + if (retentionLeaseId.equals(renewRequest.getId())) { |
| 784 | + logger.info("--> intercepting renewal request for retention lease [{}]", retentionLeaseId); |
| 785 | + senderTransportService.clearAllRules(); |
| 786 | + final String primaryShardNodeId = |
| 787 | + getLeaderCluster() |
| 788 | + .clusterService() |
| 789 | + .state() |
| 790 | + .routingTable() |
| 791 | + .index(leaderIndex) |
| 792 | + .shard(renewRequest.getShardId().id()) |
| 793 | + .primaryShard() |
| 794 | + .currentNodeId(); |
| 795 | + final String primaryShardNodeName = |
| 796 | + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); |
| 797 | + final IndexShard primary = |
| 798 | + getLeaderCluster() |
| 799 | + .getInstance(IndicesService.class, primaryShardNodeName) |
| 800 | + .getShardOrNull(renewRequest.getShardId()); |
| 801 | + final CountDownLatch innerLatch = new CountDownLatch(1); |
| 802 | + try { |
| 803 | + // this forces the background renewal from following to face a retention lease not found exception |
| 804 | + logger.info("--> removing retention lease [{}] on the leader", retentionLeaseId); |
| 805 | + primary.removeRetentionLease(retentionLeaseId, |
| 806 | + ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); |
| 807 | + logger.info("--> waiting for the removed retention lease [{}] to be synced on the leader", |
| 808 | + retentionLeaseId); |
| 809 | + innerLatch.await(); |
| 810 | + logger.info("--> removed retention lease [{}] on the leader", retentionLeaseId); |
| 811 | + } catch (final Exception e) { |
| 812 | + throw new AssertionError("failed to remove retention lease [" + retentionLeaseId + "] on the leader"); |
| 813 | + } finally { |
| 814 | + latch.countDown(); |
| 815 | + } |
816 | 816 | } |
817 | | - logger.info("--> removed retention lease [{}] on the leader", retentionLeaseId); |
818 | | - latch.countDown(); |
819 | 817 | } |
820 | 818 | connection.sendRequest(requestId, action, request, options); |
821 | 819 | }); |
|
0 commit comments