-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Description
If you configure the .watches index to have a certain primary replica configuration, the distributed execution of the watches only happens on one shard copy.
Note: This is not the default configuration in requires some steps to actually archive (as the indexing template cannot be easily overwritten for example).
The basic premise is, that the number of shards (primaries) is equal to the number of replicas minus one. For example
- 2 primaries, 1 replica: watches get executed on two instead of four shards
- 3 primaries, 2 replicas: watches get executed on three instead of six shards
- 4 primaries, 3 replicas: watches get executed on 4 instead of 12 shards.
Explanation
The explanation for this requires to understand how watcher decides if a watch should be executed on a replica or on the primary shard. This is the logic to decide where a document should be indexed (not watcher specific).
By hashing the id and run a modulo operation through the number of shards we decide which primary shard to index, in this example shard zero.
If we now have the same number of copies plus the primary shard than the number of primary shards, we use the same hashing function to find out where a watch should be executed. This will result in the exact same calculation, which will return the same result for any document (otherwise it would not have ended up in that shard bucket).
We cannot tell which shard (primary or replica) will be used for execution (as we use the allocation ids to find out the order), but it is only going to be one in this particular case.
An easy workaround for now is to not have the same number of shards as replicas + 1.
This is a unit test to reproduce in 6.x/master (added to the WatcherIndexingListenerTests)
public void testDistributionIssue() throws Exception {
int numberOfShards = 2;
int numberOfDocuments = 100;
Map<ShardId, BitSet> perShardBitSet = new HashMap<>();
IndexMetaData indexMetaData = createIndexBuilder(Watch.INDEX, numberOfShards, numberOfShards - 1).build();
for (int currentShardId = 0; currentShardId < numberOfShards; currentShardId++) {
ShardAllocationConfiguration sac = new ShardAllocationConfiguration(currentShardId,
numberOfShards, Collections.emptyList());
ShardId shardId = new ShardId(new Index(Watch.INDEX, "uuid"), currentShardId);
perShardBitSet.computeIfAbsent(shardId, (s) -> new BitSet(numberOfDocuments));
for (int i = 0; i < numberOfDocuments; i++) {
String id = "watch_" + i;
// only care for shard 0, as we have the configuration for primaries and replicas in there, which we are interested in
if (OperationRouting.generateShardId(indexMetaData, id, null) != 0) {
continue;
}
boolean shouldBeTriggered = sac.shouldBeTriggered(id);
if (shouldBeTriggered) {
perShardBitSet.get(shardId).set(i);
}
}
}
String message = String.format(Locale.ROOT, "cardinalities [%s]", perShardBitSet);
for (BitSet shardBitSet : perShardBitSet.values()) {
assertThat(message, shardBitSet.cardinality(), greaterThan(0));
}
}A possible idea of fixing this might be to change the logic of decision and use something more than the id of watch (just adding a prefix or reversing the string might be enough for the hash function to return a different result). In order to remain BWC we could check if all nodes in the clsuter have surpassed a certain version and only then use the new logic.

