diff --git a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index f4385947dc873..43e1f5a5cc973 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -56,6 +56,7 @@ import java.util.*; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; /** * @@ -166,7 +167,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) { AsyncShardFetch fetch = asyncFetchStarted.get(shard.shardId()); if (fetch == null) { - fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction, clusterService, allocationService); + fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction); asyncFetchStarted.put(shard.shardId(), fetch); } AsyncShardFetch.FetchResult shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId())); @@ -403,7 +404,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) { AsyncShardFetch fetch = asyncFetchStore.get(shard.shardId()); if (fetch == null) { - fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction, clusterService, allocationService); + fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction); asyncFetchStore.put(shard.shardId(), fetch); } AsyncShardFetch.FetchResult shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId())); @@ -513,23 +514,24 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) { return changed; } - static class InternalAsyncFetch extends AsyncShardFetch { + private final AtomicBoolean rerouting = new AtomicBoolean(); - private final ClusterService clusterService; - private final AllocationService allocationService; + class InternalAsyncFetch extends AsyncShardFetch { - public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List, T> action, - ClusterService clusterService, AllocationService allocationService) { + public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List, T> action) { super(logger, type, shardId, action); - this.clusterService = clusterService; - this.allocationService = allocationService; } @Override protected void reroute(ShardId shardId, String reason) { - clusterService.submitStateUpdateTask("async_shard_fetch(" + type + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() { + if (rerouting.compareAndSet(false, true) == false) { + logger.trace("{} already has pending reroute, ignoring {}", shardId, reason); + return; + } + clusterService.submitStateUpdateTask("async_shard_fetch", Priority.HIGH, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { + rerouting.set(false); if (currentState.nodes().masterNode() == null) { return currentState; }