@@ -111,7 +111,6 @@ public class AwarenessAllocationDecider extends AllocationDecider {
111
111
);
112
112
113
113
private volatile List <String > awarenessAttributes ;
114
-
115
114
private volatile Map <String , List <String >> forcedAwarenessAttributes ;
116
115
117
116
public AwarenessAllocationDecider (Settings settings , ClusterSettings clusterSettings ) {
@@ -163,8 +162,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
163
162
IndexMetadata indexMetadata = allocation .metadata ().getIndexSafe (shardRouting .index ());
164
163
int shardCount = indexMetadata .getNumberOfReplicas () + 1 ; // 1 for primary
165
164
for (String awarenessAttribute : awarenessAttributes ) {
166
- // the node the shard exists on must be associated with an awareness attribute
167
- if (node . node (). getAttributes (). containsKey ( awarenessAttribute ) == false ) {
165
+ // the node the shard exists on must be associated with an awareness attribute.
166
+ if (isAwarenessAttributeAssociatedWithNode ( node , awarenessAttribute ) == false ) {
168
167
return allocation .decision (
169
168
Decision .NO ,
170
169
NAME ,
@@ -175,36 +174,10 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
175
174
);
176
175
}
177
176
177
+ int currentNodeCount = getCurrentNodeCountForAttribute (shardRouting , node , allocation , moveToNode , awarenessAttribute );
178
+
178
179
// build attr_value -> nodes map
179
180
Set <String > nodesPerAttribute = allocation .routingNodes ().nodesPerAttributesCounts (awarenessAttribute );
180
-
181
- // build the count of shards per attribute value
182
- Map <String , Integer > shardPerAttribute = new HashMap <>();
183
- for (ShardRouting assignedShard : allocation .routingNodes ().assignedShards (shardRouting .shardId ())) {
184
- if (assignedShard .started () || assignedShard .initializing ()) {
185
- // Note: this also counts relocation targets as that will be the new location of the shard.
186
- // Relocation sources should not be counted as the shard is moving away
187
- RoutingNode routingNode = allocation .routingNodes ().node (assignedShard .currentNodeId ());
188
- shardPerAttribute .merge (routingNode .node ().getAttributes ().get (awarenessAttribute ), 1 , Integer ::sum );
189
- }
190
- }
191
-
192
- if (moveToNode ) {
193
- if (shardRouting .assignedToNode ()) {
194
- String nodeId = shardRouting .relocating () ? shardRouting .relocatingNodeId () : shardRouting .currentNodeId ();
195
- if (node .nodeId ().equals (nodeId ) == false ) {
196
- // we work on different nodes, move counts around
197
- shardPerAttribute .compute (
198
- allocation .routingNodes ().node (nodeId ).node ().getAttributes ().get (awarenessAttribute ),
199
- (k , v ) -> (v == null ) ? 0 : v - 1
200
- );
201
- shardPerAttribute .merge (node .node ().getAttributes ().get (awarenessAttribute ), 1 , Integer ::sum );
202
- }
203
- } else {
204
- shardPerAttribute .merge (node .node ().getAttributes ().get (awarenessAttribute ), 1 , Integer ::sum );
205
- }
206
- }
207
-
208
181
int numberOfAttributes = nodesPerAttribute .size ();
209
182
List <String > fullValues = forcedAwarenessAttributes .get (awarenessAttribute );
210
183
@@ -216,9 +189,8 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
216
189
}
217
190
numberOfAttributes = attributesSet .size ();
218
191
}
219
- // TODO should we remove ones that are not part of full list?
220
192
221
- final int currentNodeCount = shardPerAttribute . get ( node . node (). getAttributes (). get ( awarenessAttribute ));
193
+ // TODO should we remove ones that are not part of full list?
222
194
final int maximumNodeCount = (shardCount + numberOfAttributes - 1 ) / numberOfAttributes ; // ceil(shardCount/numberOfAttributes)
223
195
if (currentNodeCount > maximumNodeCount ) {
224
196
return allocation .decision (
@@ -238,4 +210,57 @@ private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, Rout
238
210
239
211
return allocation .decision (Decision .YES , NAME , "node meets all awareness attribute requirements" );
240
212
}
213
+
214
+ private int getCurrentNodeCountForAttribute (
215
+ ShardRouting shardRouting ,
216
+ RoutingNode node ,
217
+ RoutingAllocation allocation ,
218
+ boolean moveToNode ,
219
+ String awarenessAttribute
220
+ ) {
221
+ // build the count of shards per attribute value
222
+ final String shardAttributeForNode = getAttributeValueForNode (node , awarenessAttribute );
223
+ int currentNodeCount = 0 ;
224
+ final List <ShardRouting > assignedShards = allocation .routingNodes ().assignedShards (shardRouting .shardId ());
225
+
226
+ for (ShardRouting assignedShard : assignedShards ) {
227
+ if (assignedShard .started () || assignedShard .initializing ()) {
228
+ // Note: this also counts relocation targets as that will be the new location of the shard.
229
+ // Relocation sources should not be counted as the shard is moving away
230
+ RoutingNode routingNode = allocation .routingNodes ().node (assignedShard .currentNodeId ());
231
+ // Increase node count when
232
+ if (getAttributeValueForNode (routingNode , awarenessAttribute ).equals (shardAttributeForNode )) {
233
+ ++currentNodeCount ;
234
+ }
235
+ }
236
+ }
237
+
238
+ if (moveToNode ) {
239
+ if (shardRouting .assignedToNode ()) {
240
+ String nodeId = shardRouting .relocating () ? shardRouting .relocatingNodeId () : shardRouting .currentNodeId ();
241
+ if (node .nodeId ().equals (nodeId ) == false ) {
242
+ // we work on different nodes, move counts around
243
+ if (getAttributeValueForNode (allocation .routingNodes ().node (nodeId ), awarenessAttribute ).equals (shardAttributeForNode )
244
+ && currentNodeCount > 0 ) {
245
+ --currentNodeCount ;
246
+ }
247
+
248
+ ++currentNodeCount ;
249
+ }
250
+ } else {
251
+ ++currentNodeCount ;
252
+ }
253
+ }
254
+
255
+ return currentNodeCount ;
256
+ }
257
+
258
+ private boolean isAwarenessAttributeAssociatedWithNode (RoutingNode node , String awarenessAttribute ) {
259
+ return node .node ().getAttributes ().containsKey (awarenessAttribute );
260
+ }
261
+
262
+ private String getAttributeValueForNode (final RoutingNode node , final String awarenessAttribute ) {
263
+ return node .node ().getAttributes ().get (awarenessAttribute );
264
+ }
265
+
241
266
}
0 commit comments