Skip to content

Commit d4b4817

Browse files
author
David Roberts
authored
[ML] Adapt to periodic persistent task refresh (#36633)
* [ML] Adapt to periodic persistent task refresh If https://github.com/elastic/elasticsearch/pull/36069/files is merged then the approach for reallocating ML persistent tasks after refreshing job memory requirements can be simplified. This change begins the simplification process. * Remove AwaitsFix and implement TODO
1 parent d3ead91 commit d4b4817

File tree

7 files changed

+40
-172
lines changed

7 files changed

+40
-172
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,28 +57,24 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
5757
public static final String TYPE = "ml";
5858
private static final ParseField JOBS_FIELD = new ParseField("jobs");
5959
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
60-
private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version");
6160

62-
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null);
61+
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
6362
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
6463
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);
6564

6665
static {
6766
LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD);
6867
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
6968
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
70-
LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD);
7169
}
7270

7371
private final SortedMap<String, Job> jobs;
7472
private final SortedMap<String, DatafeedConfig> datafeeds;
75-
private final Long lastMemoryRefreshVersion;
7673
private final GroupOrJobLookup groupOrJobLookup;
7774

78-
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, Long lastMemoryRefreshVersion) {
75+
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds) {
7976
this.jobs = Collections.unmodifiableSortedMap(jobs);
8077
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
81-
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
8278
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
8379
}
8480

@@ -116,10 +112,6 @@ public Set<String> expandDatafeedIds(String expression, boolean allowNoDatafeeds
116112
.expand(expression, allowNoDatafeeds);
117113
}
118114

119-
public Long getLastMemoryRefreshVersion() {
120-
return lastMemoryRefreshVersion;
121-
}
122-
123115
@Override
124116
public Version getMinimalSupportedVersion() {
125117
return Version.V_6_0_0_alpha1;
@@ -153,21 +145,13 @@ public MlMetadata(StreamInput in) throws IOException {
153145
datafeeds.put(in.readString(), new DatafeedConfig(in));
154146
}
155147
this.datafeeds = datafeeds;
156-
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
157-
lastMemoryRefreshVersion = in.readOptionalLong();
158-
} else {
159-
lastMemoryRefreshVersion = null;
160-
}
161148
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
162149
}
163150

164151
@Override
165152
public void writeTo(StreamOutput out) throws IOException {
166153
writeMap(jobs, out);
167154
writeMap(datafeeds, out);
168-
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
169-
out.writeOptionalLong(lastMemoryRefreshVersion);
170-
}
171155
}
172156

173157
private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
@@ -184,9 +168,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
184168
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
185169
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
186170
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
187-
if (lastMemoryRefreshVersion != null) {
188-
builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion);
189-
}
190171
return builder;
191172
}
192173

@@ -203,24 +184,17 @@ public static class MlMetadataDiff implements NamedDiff<MetaData.Custom> {
203184

204185
final Diff<Map<String, Job>> jobs;
205186
final Diff<Map<String, DatafeedConfig>> datafeeds;
206-
final Long lastMemoryRefreshVersion;
207187

208188
MlMetadataDiff(MlMetadata before, MlMetadata after) {
209189
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
210190
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
211-
this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion;
212191
}
213192

214193
public MlMetadataDiff(StreamInput in) throws IOException {
215194
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
216195
MlMetadataDiff::readJobDiffFrom);
217196
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
218197
MlMetadataDiff::readDatafeedDiffFrom);
219-
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
220-
lastMemoryRefreshVersion = in.readOptionalLong();
221-
} else {
222-
lastMemoryRefreshVersion = null;
223-
}
224198
}
225199

226200
/**
@@ -232,17 +206,13 @@ public MlMetadataDiff(StreamInput in) throws IOException {
232206
public MetaData.Custom apply(MetaData.Custom part) {
233207
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
234208
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
235-
// lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value
236-
return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion);
209+
return new MlMetadata(newJobs, newDatafeeds);
237210
}
238211

239212
@Override
240213
public void writeTo(StreamOutput out) throws IOException {
241214
jobs.writeTo(out);
242215
datafeeds.writeTo(out);
243-
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
244-
out.writeOptionalLong(lastMemoryRefreshVersion);
245-
}
246216
}
247217

248218
@Override
@@ -267,8 +237,7 @@ public boolean equals(Object o) {
267237
return false;
268238
MlMetadata that = (MlMetadata) o;
269239
return Objects.equals(jobs, that.jobs) &&
270-
Objects.equals(datafeeds, that.datafeeds) &&
271-
Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion);
240+
Objects.equals(datafeeds, that.datafeeds);
272241
}
273242

274243
@Override
@@ -278,14 +247,13 @@ public final String toString() {
278247

279248
@Override
280249
public int hashCode() {
281-
return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion);
250+
return Objects.hash(jobs, datafeeds);
282251
}
283252

284253
public static class Builder {
285254

286255
private TreeMap<String, Job> jobs;
287256
private TreeMap<String, DatafeedConfig> datafeeds;
288-
private Long lastMemoryRefreshVersion;
289257

290258
public Builder() {
291259
jobs = new TreeMap<>();
@@ -299,7 +267,6 @@ public Builder(@Nullable MlMetadata previous) {
299267
} else {
300268
jobs = new TreeMap<>(previous.jobs);
301269
datafeeds = new TreeMap<>(previous.datafeeds);
302-
lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion;
303270
}
304271
}
305272

@@ -419,13 +386,8 @@ public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
419386
return this;
420387
}
421388

422-
public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) {
423-
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
424-
return this;
425-
}
426-
427389
public MlMetadata build() {
428-
return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion);
390+
return new MlMetadata(jobs, datafeeds);
429391
}
430392

431393
public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
419419
this.datafeedManager.set(datafeedManager);
420420
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
421421
autodetectProcessManager);
422-
MlMemoryTracker memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider);
422+
MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider);
423423
this.memoryTracker.set(memoryTracker);
424424

425425
// This object's constructor attaches to the license state, so there's no need to retain another reference to it

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ static RemovalResult removeJobsAndDatafeeds(List<String> jobsToRemove, List<Stri
248248
}
249249

250250
MlMetadata.Builder builder = new MlMetadata.Builder();
251-
builder.setLastMemoryRefreshVersion(mlMetadata.getLastMemoryRefreshVersion())
252-
.putJobs(currentJobs.values())
251+
builder.putJobs(currentJobs.values())
253252
.putDatafeeds(currentDatafeeds.values());
254253

255254
return new RemovalResult(builder.build(), removedJobIds, removedDatafeedIds);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -167,16 +167,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
167167

168168
if (memoryTracker.isRecentlyRefreshed() == false) {
169169

170-
boolean scheduledRefresh = memoryTracker.asyncRefresh(ActionListener.wrap(
171-
acknowledged -> {
172-
if (acknowledged) {
173-
logger.trace("Job memory requirement refresh request completed successfully");
174-
} else {
175-
logger.warn("Job memory requirement refresh request completed but did not set time in cluster state");
176-
}
177-
},
178-
e -> logger.error("Failed to refresh job memory requirements", e)
179-
));
170+
boolean scheduledRefresh = memoryTracker.asyncRefresh();
180171
if (scheduledRefresh) {
181172
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
182173
logger.debug(reason);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java

Lines changed: 20 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,15 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.ResourceNotFoundException;
1111
import org.elasticsearch.action.ActionListener;
12-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
13-
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
14-
import org.elasticsearch.cluster.ClusterState;
1512
import org.elasticsearch.cluster.LocalNodeMasterListener;
16-
import org.elasticsearch.cluster.ack.AckedRequest;
17-
import org.elasticsearch.cluster.metadata.MetaData;
1813
import org.elasticsearch.cluster.service.ClusterService;
14+
import org.elasticsearch.common.settings.Settings;
1915
import org.elasticsearch.common.unit.ByteSizeUnit;
2016
import org.elasticsearch.common.unit.TimeValue;
2117
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
18+
import org.elasticsearch.persistent.PersistentTasksClusterService;
2219
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2320
import org.elasticsearch.threadpool.ThreadPool;
24-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2521
import org.elasticsearch.xpack.core.ml.MlTasks;
2622
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
2723
import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -44,22 +40,10 @@
4440
* 1. For all open ML jobs (via {@link #asyncRefresh})
4541
* 2. For all open ML jobs, plus one named ML job that is not open (via {@link #refreshJobMemoryAndAllOthers})
4642
* 3. For one named ML job (via {@link #refreshJobMemory})
47-
* In all cases a listener informs the caller when the requested updates are complete.
43+
* In cases 2 and 3 a listener informs the caller when the requested updates are complete.
4844
*/
4945
public class MlMemoryTracker implements LocalNodeMasterListener {
5046

51-
private static final AckedRequest ACKED_REQUEST = new AckedRequest() {
52-
@Override
53-
public TimeValue ackTimeout() {
54-
return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
55-
}
56-
57-
@Override
58-
public TimeValue masterNodeTimeout() {
59-
return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
60-
}
61-
};
62-
6347
private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1);
6448

6549
private final Logger logger = LogManager.getLogger(MlMemoryTracker.class);
@@ -72,14 +56,22 @@ public TimeValue masterNodeTimeout() {
7256
private final JobResultsProvider jobResultsProvider;
7357
private volatile boolean isMaster;
7458
private volatile Instant lastUpdateTime;
59+
private volatile Duration reassignmentRecheckInterval;
7560

76-
public MlMemoryTracker(ClusterService clusterService, ThreadPool threadPool, JobManager jobManager,
61+
public MlMemoryTracker(Settings settings, ClusterService clusterService, ThreadPool threadPool, JobManager jobManager,
7762
JobResultsProvider jobResultsProvider) {
7863
this.threadPool = threadPool;
7964
this.clusterService = clusterService;
8065
this.jobManager = jobManager;
8166
this.jobResultsProvider = jobResultsProvider;
67+
setReassignmentRecheckInterval(PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings));
8268
clusterService.addLocalNodeMasterListener(this);
69+
clusterService.getClusterSettings().addSettingsUpdateConsumer(
70+
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setReassignmentRecheckInterval);
71+
}
72+
73+
private void setReassignmentRecheckInterval(TimeValue recheckInterval) {
74+
reassignmentRecheckInterval = Duration.ofNanos(recheckInterval.getNanos());
8375
}
8476

8577
@Override
@@ -103,11 +95,12 @@ public String executorName() {
10395

10496
/**
10597
* Is the information in this object sufficiently up to date
106-
* for valid allocation decisions to be made using it?
98+
* for valid task assignment decisions to be made using it?
10799
*/
108100
public boolean isRecentlyRefreshed() {
109101
Instant localLastUpdateTime = lastUpdateTime;
110-
return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now());
102+
return localLastUpdateTime != null &&
103+
localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(reassignmentRecheckInterval).isAfter(Instant.now());
111104
}
112105

113106
/**
@@ -143,24 +136,19 @@ public void removeJob(String jobId) {
143136
/**
144137
* Uses a separate thread to refresh the memory requirement for every ML job that has
145138
* a corresponding persistent task. This method only works on the master node.
146-
* @param listener Will be called when the async refresh completes or fails. The
147-
* boolean value indicates whether the cluster state was updated
148-
* with the refresh completion time. (If it was then this will in
149-
* cause the persistent tasks framework to check if any persistent
150-
* tasks are awaiting allocation.)
151139
* @return <code>true</code> if the async refresh is scheduled, and <code>false</code>
152140
* if this is not possible for some reason.
153141
*/
154-
public boolean asyncRefresh(ActionListener<Boolean> listener) {
142+
public boolean asyncRefresh() {
155143

156144
if (isMaster) {
157145
try {
158-
ActionListener<Void> mlMetaUpdateListener = ActionListener.wrap(
159-
aVoid -> recordUpdateTimeInClusterState(listener),
160-
listener::onFailure
146+
ActionListener<Void> listener = ActionListener.wrap(
147+
aVoid -> logger.trace("Job memory requirement refresh request completed successfully"),
148+
e -> logger.error("Failed to refresh job memory requirements", e)
161149
);
162150
threadPool.executor(executorName()).execute(
163-
() -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), mlMetaUpdateListener));
151+
() -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), listener));
164152
return true;
165153
} catch (EsRejectedExecutionException e) {
166154
logger.debug("Couldn't schedule ML memory update - node might be shutting down", e);
@@ -227,33 +215,6 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener<Void>
227215
}
228216
}
229217

230-
private void recordUpdateTimeInClusterState(ActionListener<Boolean> listener) {
231-
232-
clusterService.submitStateUpdateTask("ml-memory-last-update-time",
233-
new AckedClusterStateUpdateTask<Boolean>(ACKED_REQUEST, listener) {
234-
@Override
235-
protected Boolean newResponse(boolean acknowledged) {
236-
return acknowledged;
237-
}
238-
239-
@Override
240-
public ClusterState execute(ClusterState currentState) {
241-
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
242-
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
243-
builder.setLastMemoryRefreshVersion(currentState.getVersion() + 1);
244-
MlMetadata newMlMetadata = builder.build();
245-
if (newMlMetadata.equals(currentMlMetadata)) {
246-
// Return same reference if nothing has changed
247-
return currentState;
248-
} else {
249-
ClusterState.Builder newState = ClusterState.builder(currentState);
250-
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMlMetadata).build());
251-
return newState.build();
252-
}
253-
}
254-
});
255-
}
256-
257218
private void iterateMlJobTasks(Iterator<PersistentTasksCustomMetaData.PersistentTask<?>> iterator,
258219
ActionListener<Void> refreshComplete) {
259220
if (iterator.hasNext()) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ protected MlMetadata createTestInstance() {
6969
builder.putJob(job, false);
7070
}
7171
}
72-
if (randomBoolean()) {
73-
builder.setLastMemoryRefreshVersion(randomNonNegativeLong());
74-
}
7572
return builder.build();
7673
}
7774

@@ -441,9 +438,8 @@ protected MlMetadata mutateInstance(MlMetadata instance) {
441438
for (Map.Entry<String, DatafeedConfig> entry : datafeeds.entrySet()) {
442439
metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap());
443440
}
444-
metadataBuilder.setLastMemoryRefreshVersion(instance.getLastMemoryRefreshVersion());
445441

446-
switch (between(0, 2)) {
442+
switch (between(0, 1)) {
447443
case 0:
448444
metadataBuilder.putJob(JobTests.createRandomizedJob(), true);
449445
break;
@@ -463,13 +459,6 @@ protected MlMetadata mutateInstance(MlMetadata instance) {
463459
metadataBuilder.putJob(randomJob, false);
464460
metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap());
465461
break;
466-
case 2:
467-
if (instance.getLastMemoryRefreshVersion() == null) {
468-
metadataBuilder.setLastMemoryRefreshVersion(randomNonNegativeLong());
469-
} else {
470-
metadataBuilder.setLastMemoryRefreshVersion(null);
471-
}
472-
break;
473462
default:
474463
throw new AssertionError("Illegal randomisation branch");
475464
}

0 commit comments

Comments
 (0)