Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
*
Expand Down Expand Up @@ -166,7 +167,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {

AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch = asyncFetchStarted.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction, clusterService, allocationService);
fetch = new InternalAsyncFetch<>(logger, "shard_started", shard.shardId(), startedAction);
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
Expand Down Expand Up @@ -403,7 +404,7 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {

AsyncShardFetch<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetch = asyncFetchStore.get(shard.shardId());
if (fetch == null) {
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction, clusterService, allocationService);
fetch = new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), storeAction);
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores = fetch.fetchData(nodes, metaData, allocation.getIgnoreNodes(shard.shardId()));
Expand Down Expand Up @@ -513,23 +514,24 @@ public int compare(DiscoveryNode o1, DiscoveryNode o2) {
return changed;
}

static class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {
private final AtomicBoolean rerouting = new AtomicBoolean();

private final ClusterService clusterService;
private final AllocationService allocationService;
class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends BaseNodesResponse<T>, T> action,
ClusterService clusterService, AllocationService allocationService) {
public InternalAsyncFetch(ESLogger logger, String type, ShardId shardId, List<? extends BaseNodesResponse<T>, T> action) {
super(logger, type, shardId, action);
this.clusterService = clusterService;
this.allocationService = allocationService;
}

@Override
protected void reroute(ShardId shardId, String reason) {
clusterService.submitStateUpdateTask("async_shard_fetch(" + type + ") " + shardId + ", reasons (" + reason + ")", Priority.HIGH, new ClusterStateUpdateTask() {
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("{} already has pending reroute, ignoring {}", shardId, reason);
return;
}
clusterService.submitStateUpdateTask("async_shard_fetch", Priority.HIGH, new ClusterStateUpdateTask() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be valuable to have the original type, shardId, and reason in the message, did you remove it on purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thought that it becomes less relevant, since a single reroute actually represents a few events now, and it can be misleading seeing in the pending task information about a shard id, where it might be ones for multiple ones

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks!

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
rerouting.set(false);
if (currentState.nodes().masterNode() == null) {
return currentState;
}
Expand Down