Skip to content

Commit 39c7acb

Browse files
author
David Roberts
authored
[ML] Report index unavailable instead of waiting for lazy node (elastic#38444)
If a job cannot be assigned to a node because an index it requires is unavailable and there are lazy ML nodes then index unavailable should be reported as the assignment explanation rather than waiting for a lazy ML node.
1 parent 6e0624a commit 39c7acb

File tree

2 files changed

+87
-46
lines changed

2 files changed

+87
-46
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,15 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
152152
int fallbackMaxNumberOfOpenJobs,
153153
int maxMachineMemoryPercent,
154154
MlMemoryTracker memoryTracker,
155+
boolean isMemoryTrackerRecentlyRefreshed,
155156
Logger logger) {
156-
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
157-
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
158-
if (unavailableIndices.size() != 0) {
159-
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
160-
String.join(",", unavailableIndices) + "]";
161-
logger.debug(reason);
162-
return new PersistentTasksCustomMetaData.Assignment(null, reason);
163-
}
164157

165158
// Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe
166159
// because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs
167-
boolean allocateByMemory = true;
168-
169-
if (memoryTracker.isRecentlyRefreshed() == false) {
170-
171-
boolean scheduledRefresh = memoryTracker.asyncRefresh();
172-
if (scheduledRefresh) {
173-
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
174-
logger.debug(reason);
175-
return new PersistentTasksCustomMetaData.Assignment(null, reason);
176-
} else {
177-
allocateByMemory = false;
178-
logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
179-
jobId);
180-
}
160+
boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed;
161+
if (isMemoryTrackerRecentlyRefreshed == false) {
162+
logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled",
163+
jobId);
181164
}
182165

183166
List<String> reasons = new LinkedList<>();
@@ -722,13 +705,34 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(OpenJobAction.JobP
722705
return AWAITING_UPGRADE;
723706
}
724707

725-
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(params.getJobId(),
708+
String jobId = params.getJobId();
709+
String resultsWriteAlias = AnomalyDetectorsIndex.resultsWriteAlias(jobId);
710+
List<String> unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState);
711+
if (unavailableIndices.size() != 0) {
712+
String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" +
713+
String.join(",", unavailableIndices) + "]";
714+
logger.debug(reason);
715+
return new PersistentTasksCustomMetaData.Assignment(null, reason);
716+
}
717+
718+
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
719+
if (isMemoryTrackerRecentlyRefreshed == false) {
720+
boolean scheduledRefresh = memoryTracker.asyncRefresh();
721+
if (scheduledRefresh) {
722+
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
723+
logger.debug(reason);
724+
return new PersistentTasksCustomMetaData.Assignment(null, reason);
725+
}
726+
}
727+
728+
PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId,
726729
params.getJob(),
727730
clusterState,
728731
maxConcurrentJobAllocations,
729732
fallbackMaxNumberOfOpenJobs,
730733
maxMachineMemoryPercent,
731734
memoryTracker,
735+
isMemoryTrackerRecentlyRefreshed,
732736
logger);
733737
if (assignment.getExecutorNode() == null) {
734738
int numMlNodes = 0;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.transport.TransportAddress;
3131
import org.elasticsearch.common.unit.ByteSizeUnit;
3232
import org.elasticsearch.common.unit.ByteSizeValue;
33+
import org.elasticsearch.common.util.set.Sets;
3334
import org.elasticsearch.index.Index;
3435
import org.elasticsearch.index.shard.ShardId;
3536
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -60,11 +61,9 @@
6061

6162
import java.net.InetAddress;
6263
import java.util.ArrayList;
63-
import java.util.Arrays;
6464
import java.util.Collections;
6565
import java.util.Date;
6666
import java.util.HashMap;
67-
import java.util.HashSet;
6867
import java.util.List;
6968
import java.util.Map;
7069
import java.util.SortedMap;
@@ -80,11 +79,13 @@
8079
public class TransportOpenJobActionTests extends ESTestCase {
8180

8281
private MlMemoryTracker memoryTracker;
82+
private boolean isMemoryTrackerRecentlyRefreshed;
8383

8484
@Before
8585
public void setup() {
8686
memoryTracker = mock(MlMemoryTracker.class);
87-
when(memoryTracker.isRecentlyRefreshed()).thenReturn(true);
87+
isMemoryTrackerRecentlyRefreshed = true;
88+
when(memoryTracker.isRecentlyRefreshed()).thenReturn(isMemoryTrackerRecentlyRefreshed);
8889
}
8990

9091
public void testValidate_jobMissing() {
@@ -141,7 +142,7 @@ public void testSelectLeastLoadedMlNode_byCount() {
141142
jobBuilder.setJobVersion(Version.CURRENT);
142143

143144
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(),
144-
cs.build(), 2, 10, 30, memoryTracker, logger);
145+
cs.build(), 2, 10, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
145146
assertEquals("", result.getExplanation());
146147
assertEquals("_node_id3", result.getExecutorNode());
147148
}
@@ -177,7 +178,7 @@ public void testSelectLeastLoadedMlNode_maxCapacity() {
177178
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date());
178179

179180
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2,
180-
maxRunningJobsPerNode, 30, memoryTracker, logger);
181+
maxRunningJobsPerNode, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
181182
assertNull(result.getExecutorNode());
182183
assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode
183184
+ "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]"));
@@ -203,7 +204,8 @@ public void testSelectLeastLoadedMlNode_noMlNodes() {
203204

204205
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
205206

206-
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, memoryTracker, logger);
207+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 10, 30, memoryTracker,
208+
isMemoryTrackerRecentlyRefreshed, logger);
207209
assertTrue(result.getExplanation().contains("because this node isn't a ml node"));
208210
assertNull(result.getExecutorNode());
209211
}
@@ -237,7 +239,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
237239
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
238240

239241
ClusterState cs = csBuilder.build();
240-
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, memoryTracker, logger);
242+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 10, 30, memoryTracker,
243+
isMemoryTrackerRecentlyRefreshed, logger);
241244
assertEquals("_node_id3", result.getExecutorNode());
242245

243246
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@@ -247,7 +250,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
247250
csBuilder = ClusterState.builder(cs);
248251
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
249252
cs = csBuilder.build();
250-
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
253+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker,
254+
isMemoryTrackerRecentlyRefreshed, logger);
251255
assertNull("no node selected, because OPENING state", result.getExecutorNode());
252256
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
253257

@@ -258,7 +262,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
258262
csBuilder = ClusterState.builder(cs);
259263
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
260264
cs = csBuilder.build();
261-
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
265+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker,
266+
isMemoryTrackerRecentlyRefreshed, logger);
262267
assertNull("no node selected, because stale task", result.getExecutorNode());
263268
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
264269

@@ -269,7 +274,8 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
269274
csBuilder = ClusterState.builder(cs);
270275
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
271276
cs = csBuilder.build();
272-
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
277+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker,
278+
isMemoryTrackerRecentlyRefreshed, logger);
273279
assertNull("no node selected, because null state", result.getExecutorNode());
274280
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
275281
}
@@ -307,7 +313,8 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
307313
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
308314

309315
// Allocation won't be possible if the stale failed job is treated as opening
310-
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker, logger);
316+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 10, 30, memoryTracker,
317+
isMemoryTrackerRecentlyRefreshed, logger);
311318
assertEquals("_node_id1", result.getExecutorNode());
312319

313320
tasksBuilder = PersistentTasksCustomMetaData.builder(tasks);
@@ -317,7 +324,8 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
317324
csBuilder = ClusterState.builder(cs);
318325
csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks));
319326
cs = csBuilder.build();
320-
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, memoryTracker, logger);
327+
result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 10, 30, memoryTracker,
328+
isMemoryTrackerRecentlyRefreshed, logger);
321329
assertNull("no node selected, because OPENING state", result.getExecutorNode());
322330
assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state"));
323331
}
@@ -349,7 +357,7 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
349357
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks);
350358
cs.metaData(metaData);
351359
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30,
352-
memoryTracker, logger);
360+
memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
353361
assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]"));
354362
assertNull(result.getExecutorNode());
355363
}
@@ -377,7 +385,7 @@ public void testSelectLeastLoadedMlNode_noNodesPriorTo_V_5_5() {
377385
Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date());
378386

379387
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 10, 30,
380-
memoryTracker, logger);
388+
memoryTracker, isMemoryTrackerRecentlyRefreshed, logger);
381389
assertThat(result.getExplanation(), containsString("because this node does not support machine learning jobs"));
382390
assertNull(result.getExecutorNode());
383391
}
@@ -404,7 +412,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesButNoNodeMeetsRequiredVersio
404412

405413
Job job = jobWithRules("job_with_rules");
406414
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker,
407-
logger);
415+
isMemoryTrackerRecentlyRefreshed, logger);
408416
assertThat(result.getExplanation(), containsString(
409417
"because jobs using custom_rules require a node of version [6.4.0] or higher"));
410418
assertNull(result.getExecutorNode());
@@ -432,7 +440,7 @@ public void testSelectLeastLoadedMlNode_jobWithRulesAndNodeMeetsRequiredVersion(
432440

433441
Job job = jobWithRules("job_with_rules");
434442
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 10, 30, memoryTracker,
435-
logger);
443+
isMemoryTrackerRecentlyRefreshed, logger);
436444
assertNotNull(result.getExecutorNode());
437445
}
438446

@@ -448,8 +456,8 @@ public void testSelectLeastLoadedMlNode_indexJobsCannotBeAssignedToPre660Node()
448456
cs.nodes(nodes);
449457

450458
Job job = jobWithRules("post-v650-job");
451-
Assignment result =
452-
TransportOpenJobAction.selectLeastLoadedMlNode("post-v650-job", job, cs.build(), 2, 10, 30, memoryTracker, logger);
459+
Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("post-v650-job", job, cs.build(), 2, 10, 30, memoryTracker,
460+
isMemoryTrackerRecentlyRefreshed, logger);
453461
assertNull(result.getExecutorNode());
454462
assertThat(result.getExplanation(), containsString("Not opening job [post-v650-job] on node [_node_name1] version [6.5.0], " +
455463
"because this node does not support jobs of version "));
@@ -460,7 +468,8 @@ public void testSelectLeastLoadedMlNode_indexJobsCannotBeAssignedToPre660Node()
460468
.add(new DiscoveryNode("_node_name2", "_node_id2", new TransportAddress(InetAddress.getLoopbackAddress(), 9301),
461469
nodeAttr, Collections.emptySet(), Version.V_6_6_0));
462470
cs.nodes(nodes);
463-
result = TransportOpenJobAction.selectLeastLoadedMlNode("post-v650-job", job, cs.build(), 2, 10, 30, memoryTracker, logger);
471+
result = TransportOpenJobAction.selectLeastLoadedMlNode("post-v650-job", job, cs.build(), 2, 10, 30, memoryTracker,
472+
isMemoryTrackerRecentlyRefreshed, logger);
464473
assertThat(result.getExplanation(), isEmptyOrNullString());
465474
assertEquals("_node_id2", result.getExecutorNode());
466475
}
@@ -550,10 +559,10 @@ public void testJobTaskMatcherMatch() {
550559

551560
public void testGetAssignment_GivenJobThatRequiresMigration() {
552561
ClusterService clusterService = mock(ClusterService.class);
553-
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>(
554-
Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
555-
MachineLearning.MAX_LAZY_ML_NODES)
556-
));
562+
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY,
563+
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
564+
MachineLearning.MAX_LAZY_ML_NODES)
565+
);
557566
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
558567

559568
TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
@@ -563,6 +572,34 @@ public void testGetAssignment_GivenJobThatRequiresMigration() {
563572
assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class)));
564573
}
565574

575+
// An index being unavailable should take precedence over waiting for a lazy node
576+
public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() {
577+
Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 1).build();
578+
ClusterService clusterService = mock(ClusterService.class);
579+
ClusterSettings clusterSettings = new ClusterSettings(settings,
580+
Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT,
581+
MachineLearning.MAX_LAZY_ML_NODES)
582+
);
583+
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
584+
585+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
586+
MetaData.Builder metaData = MetaData.builder();
587+
RoutingTable.Builder routingTable = RoutingTable.builder();
588+
addIndices(metaData, routingTable);
589+
routingTable.remove(".ml-state");
590+
csBuilder.metaData(metaData);
591+
csBuilder.routingTable(routingTable.build());
592+
593+
TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor(
594+
settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class));
595+
596+
OpenJobAction.JobParams params = new OpenJobAction.JobParams("unavailable_index_with_lazy_node");
597+
params.setJob(mock(Job.class));
598+
assertEquals("Not opening job [unavailable_index_with_lazy_node], " +
599+
"because not all primary shards are active for the following indices [.ml-state]",
600+
executor.getAssignment(params, csBuilder.build()).getExplanation());
601+
}
602+
566603
public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) {
567604
addJobTask(jobId, nodeId, jobState, builder, false);
568605
}

0 commit comments

Comments
 (0)