Skip to content

Commit e0a58c9

Browse files
author
David Roberts
authored
[ML] Adapt to periodic persistent task refresh (#36494)
After #36069 the approach for reallocating ML persistent tasks after refreshing job memory requirements can be simplified.
1 parent 2e2495c commit e0a58c9

File tree

7 files changed

+40
-171
lines changed

7 files changed

+40
-171
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -57,28 +57,24 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom {
5757
public static final String TYPE = "ml";
5858
private static final ParseField JOBS_FIELD = new ParseField("jobs");
5959
private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds");
60-
private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version");
6160

62-
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null);
61+
public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap());
6362
// This parser follows the pattern that metadata is parsed leniently (to allow for enhancements)
6463
public static final ObjectParser<Builder, Void> LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new);
6564

6665
static {
6766
LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD);
6867
LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds,
6968
(p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD);
70-
LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD);
7169
}
7270

7371
private final SortedMap<String, Job> jobs;
7472
private final SortedMap<String, DatafeedConfig> datafeeds;
75-
private final Long lastMemoryRefreshVersion;
7673
private final GroupOrJobLookup groupOrJobLookup;
7774

78-
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds, Long lastMemoryRefreshVersion) {
75+
private MlMetadata(SortedMap<String, Job> jobs, SortedMap<String, DatafeedConfig> datafeeds) {
7976
this.jobs = Collections.unmodifiableSortedMap(jobs);
8077
this.datafeeds = Collections.unmodifiableSortedMap(datafeeds);
81-
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
8278
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
8379
}
8480

@@ -121,10 +117,6 @@ public Set<String> expandDatafeedIds(String expression) {
121117
.expand(expression);
122118
}
123119

124-
public Long getLastMemoryRefreshVersion() {
125-
return lastMemoryRefreshVersion;
126-
}
127-
128120
@Override
129121
public Version getMinimalSupportedVersion() {
130122
return Version.V_5_4_0;
@@ -158,21 +150,13 @@ public MlMetadata(StreamInput in) throws IOException {
158150
datafeeds.put(in.readString(), new DatafeedConfig(in));
159151
}
160152
this.datafeeds = datafeeds;
161-
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
162-
lastMemoryRefreshVersion = in.readOptionalLong();
163-
} else {
164-
lastMemoryRefreshVersion = null;
165-
}
166153
this.groupOrJobLookup = new GroupOrJobLookup(jobs.values());
167154
}
168155

169156
@Override
170157
public void writeTo(StreamOutput out) throws IOException {
171158
writeMap(jobs, out);
172159
writeMap(datafeeds, out);
173-
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
174-
out.writeOptionalLong(lastMemoryRefreshVersion);
175-
}
176160
}
177161

178162
private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOutput out) throws IOException {
@@ -189,9 +173,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
189173
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
190174
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
191175
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
192-
if (lastMemoryRefreshVersion != null) {
193-
builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion);
194-
}
195176
return builder;
196177
}
197178

@@ -208,24 +189,17 @@ public static class MlMetadataDiff implements NamedDiff<MetaData.Custom> {
208189

209190
final Diff<Map<String, Job>> jobs;
210191
final Diff<Map<String, DatafeedConfig>> datafeeds;
211-
final Long lastMemoryRefreshVersion;
212192

213193
MlMetadataDiff(MlMetadata before, MlMetadata after) {
214194
this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer());
215195
this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer());
216-
this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion;
217196
}
218197

219198
public MlMetadataDiff(StreamInput in) throws IOException {
220199
this.jobs = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), Job::new,
221200
MlMetadataDiff::readJobDiffFrom);
222201
this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new,
223202
MlMetadataDiff::readDatafeedDiffFrom);
224-
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
225-
lastMemoryRefreshVersion = in.readOptionalLong();
226-
} else {
227-
lastMemoryRefreshVersion = null;
228-
}
229203
}
230204

231205
/**
@@ -237,17 +211,13 @@ public MlMetadataDiff(StreamInput in) throws IOException {
237211
public MetaData.Custom apply(MetaData.Custom part) {
238212
TreeMap<String, Job> newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs));
239213
TreeMap<String, DatafeedConfig> newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds));
240-
// lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value
241-
return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion);
214+
return new MlMetadata(newJobs, newDatafeeds);
242215
}
243216

244217
@Override
245218
public void writeTo(StreamOutput out) throws IOException {
246219
jobs.writeTo(out);
247220
datafeeds.writeTo(out);
248-
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
249-
out.writeOptionalLong(lastMemoryRefreshVersion);
250-
}
251221
}
252222

253223
@Override
@@ -272,8 +242,7 @@ public boolean equals(Object o) {
272242
return false;
273243
MlMetadata that = (MlMetadata) o;
274244
return Objects.equals(jobs, that.jobs) &&
275-
Objects.equals(datafeeds, that.datafeeds) &&
276-
Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion);
245+
Objects.equals(datafeeds, that.datafeeds);
277246
}
278247

279248
@Override
@@ -283,14 +252,13 @@ public final String toString() {
283252

284253
@Override
285254
public int hashCode() {
286-
return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion);
255+
return Objects.hash(jobs, datafeeds);
287256
}
288257

289258
public static class Builder {
290259

291260
private TreeMap<String, Job> jobs;
292261
private TreeMap<String, DatafeedConfig> datafeeds;
293-
private Long lastMemoryRefreshVersion;
294262

295263
public Builder() {
296264
jobs = new TreeMap<>();
@@ -304,7 +272,6 @@ public Builder(@Nullable MlMetadata previous) {
304272
} else {
305273
jobs = new TreeMap<>(previous.jobs);
306274
datafeeds = new TreeMap<>(previous.datafeeds);
307-
lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion;
308275
}
309276
}
310277

@@ -424,13 +391,8 @@ public Builder putDatafeeds(Collection<DatafeedConfig> datafeeds) {
424391
return this;
425392
}
426393

427-
public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) {
428-
this.lastMemoryRefreshVersion = lastMemoryRefreshVersion;
429-
return this;
430-
}
431-
432394
public MlMetadata build() {
433-
return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion);
395+
return new MlMetadata(jobs, datafeeds);
434396
}
435397

436398
public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
424424
this.datafeedManager.set(datafeedManager);
425425
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
426426
autodetectProcessManager);
427-
MlMemoryTracker memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider);
427+
MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider);
428428
this.memoryTracker.set(memoryTracker);
429429

430430
// This object's constructor attaches to the license state, so there's no need to retain another reference to it

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,7 @@ static RemovalResult removeJobsAndDatafeeds(List<String> jobsToRemove, List<Stri
257257
}
258258

259259
MlMetadata.Builder builder = new MlMetadata.Builder();
260-
builder.setLastMemoryRefreshVersion(mlMetadata.getLastMemoryRefreshVersion())
261-
.putJobs(currentJobs.values())
260+
builder.putJobs(currentJobs.values())
262261
.putDatafeeds(currentDatafeeds.values());
263262

264263
return new RemovalResult(builder.build(), removedJobIds, removedDatafeedIds);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j
179179

180180
if (memoryTracker.isRecentlyRefreshed() == false) {
181181

182-
boolean scheduledRefresh = memoryTracker.asyncRefresh(ActionListener.wrap(
183-
acknowledged -> {
184-
if (acknowledged) {
185-
logger.trace("Job memory requirement refresh request completed successfully");
186-
} else {
187-
logger.warn("Job memory requirement refresh request completed but did not set time in cluster state");
188-
}
189-
},
190-
e -> logger.error("Failed to refresh job memory requirements", e)
191-
));
182+
boolean scheduledRefresh = memoryTracker.asyncRefresh();
192183
if (scheduledRefresh) {
193184
String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested";
194185
logger.debug(reason);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java

Lines changed: 20 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,13 @@
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.ResourceNotFoundException;
1111
import org.elasticsearch.action.ActionListener;
12-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
13-
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
14-
import org.elasticsearch.cluster.ClusterState;
1512
import org.elasticsearch.cluster.LocalNodeMasterListener;
16-
import org.elasticsearch.cluster.ack.AckedRequest;
17-
import org.elasticsearch.cluster.metadata.MetaData;
1813
import org.elasticsearch.cluster.service.ClusterService;
14+
import org.elasticsearch.common.settings.Settings;
1915
import org.elasticsearch.common.unit.ByteSizeUnit;
2016
import org.elasticsearch.common.unit.TimeValue;
2117
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
18+
import org.elasticsearch.persistent.PersistentTasksClusterService;
2219
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2320
import org.elasticsearch.threadpool.ThreadPool;
2421
import org.elasticsearch.xpack.core.ml.MlMetadata;
@@ -44,22 +41,10 @@
4441
* 1. For all open ML jobs (via {@link #asyncRefresh})
4542
* 2. For all open ML jobs, plus one named ML job that is not open (via {@link #refreshJobMemoryAndAllOthers})
4643
* 3. For one named ML job (via {@link #refreshJobMemory})
47-
* In all cases a listener informs the caller when the requested updates are complete.
44+
* In cases 2 and 3 a listener informs the caller when the requested updates are complete.
4845
*/
4946
public class MlMemoryTracker implements LocalNodeMasterListener {
5047

51-
private static final AckedRequest ACKED_REQUEST = new AckedRequest() {
52-
@Override
53-
public TimeValue ackTimeout() {
54-
return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
55-
}
56-
57-
@Override
58-
public TimeValue masterNodeTimeout() {
59-
return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT;
60-
}
61-
};
62-
6348
private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1);
6449

6550
private final Logger logger = LogManager.getLogger(MlMemoryTracker.class);
@@ -72,14 +57,22 @@ public TimeValue masterNodeTimeout() {
7257
private final JobResultsProvider jobResultsProvider;
7358
private volatile boolean isMaster;
7459
private volatile Instant lastUpdateTime;
60+
private volatile Duration reassignmentRecheckInterval;
7561

76-
public MlMemoryTracker(ClusterService clusterService, ThreadPool threadPool, JobManager jobManager,
62+
public MlMemoryTracker(Settings settings, ClusterService clusterService, ThreadPool threadPool, JobManager jobManager,
7763
JobResultsProvider jobResultsProvider) {
7864
this.threadPool = threadPool;
7965
this.clusterService = clusterService;
8066
this.jobManager = jobManager;
8167
this.jobResultsProvider = jobResultsProvider;
68+
setReassignmentRecheckInterval(PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings));
8269
clusterService.addLocalNodeMasterListener(this);
70+
clusterService.getClusterSettings().addSettingsUpdateConsumer(
71+
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setReassignmentRecheckInterval);
72+
}
73+
74+
private void setReassignmentRecheckInterval(TimeValue recheckInterval) {
75+
reassignmentRecheckInterval = Duration.ofNanos(recheckInterval.getNanos());
8376
}
8477

8578
@Override
@@ -103,11 +96,12 @@ public String executorName() {
10396

10497
/**
10598
* Is the information in this object sufficiently up to date
106-
* for valid allocation decisions to be made using it?
99+
* for valid task assignment decisions to be made using it?
107100
*/
108101
public boolean isRecentlyRefreshed() {
109102
Instant localLastUpdateTime = lastUpdateTime;
110-
return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now());
103+
return localLastUpdateTime != null &&
104+
localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(reassignmentRecheckInterval).isAfter(Instant.now());
111105
}
112106

113107
/**
@@ -149,24 +143,19 @@ public void removeJob(String jobId) {
149143
/**
150144
* Uses a separate thread to refresh the memory requirement for every ML job that has
151145
* a corresponding persistent task. This method only works on the master node.
152-
* @param listener Will be called when the async refresh completes or fails. The
153-
* boolean value indicates whether the cluster state was updated
154-
* with the refresh completion time. (If it was then this will in
155-
* cause the persistent tasks framework to check if any persistent
156-
* tasks are awaiting allocation.)
157146
* @return <code>true</code> if the async refresh is scheduled, and <code>false</code>
158147
* if this is not possible for some reason.
159148
*/
160-
public boolean asyncRefresh(ActionListener<Boolean> listener) {
149+
public boolean asyncRefresh() {
161150

162151
if (isMaster) {
163152
try {
164-
ActionListener<Void> mlMetaUpdateListener = ActionListener.wrap(
165-
aVoid -> recordUpdateTimeInClusterState(listener),
166-
listener::onFailure
153+
ActionListener<Void> listener = ActionListener.wrap(
154+
aVoid -> logger.trace("Job memory requirement refresh request completed successfully"),
155+
e -> logger.error("Failed to refresh job memory requirements", e)
167156
);
168157
threadPool.executor(executorName()).execute(
169-
() -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), mlMetaUpdateListener));
158+
() -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), listener));
170159
return true;
171160
} catch (EsRejectedExecutionException e) {
172161
logger.debug("Couldn't schedule ML memory update - node might be shutting down", e);
@@ -233,33 +222,6 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener<Void>
233222
}
234223
}
235224

236-
private void recordUpdateTimeInClusterState(ActionListener<Boolean> listener) {
237-
238-
clusterService.submitStateUpdateTask("ml-memory-last-update-time",
239-
new AckedClusterStateUpdateTask<Boolean>(ACKED_REQUEST, listener) {
240-
@Override
241-
protected Boolean newResponse(boolean acknowledged) {
242-
return acknowledged;
243-
}
244-
245-
@Override
246-
public ClusterState execute(ClusterState currentState) {
247-
MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState);
248-
MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata);
249-
builder.setLastMemoryRefreshVersion(currentState.getVersion() + 1);
250-
MlMetadata newMlMetadata = builder.build();
251-
if (newMlMetadata.equals(currentMlMetadata)) {
252-
// Return same reference if nothing has changed
253-
return currentState;
254-
} else {
255-
ClusterState.Builder newState = ClusterState.builder(currentState);
256-
newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMlMetadata).build());
257-
return newState.build();
258-
}
259-
}
260-
});
261-
}
262-
263225
private void iterateMlJobTasks(Iterator<PersistentTasksCustomMetaData.PersistentTask<?>> iterator,
264226
ActionListener<Void> refreshComplete) {
265227
if (iterator.hasNext()) {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ protected MlMetadata createTestInstance() {
6969
builder.putJob(job, false);
7070
}
7171
}
72-
if (randomBoolean()) {
73-
builder.setLastMemoryRefreshVersion(randomNonNegativeLong());
74-
}
7572
return builder.build();
7673
}
7774

@@ -441,9 +438,8 @@ protected MlMetadata mutateInstance(MlMetadata instance) {
441438
for (Map.Entry<String, DatafeedConfig> entry : datafeeds.entrySet()) {
442439
metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap());
443440
}
444-
metadataBuilder.setLastMemoryRefreshVersion(instance.getLastMemoryRefreshVersion());
445441

446-
switch (between(0, 2)) {
442+
switch (between(0, 1)) {
447443
case 0:
448444
metadataBuilder.putJob(JobTests.createRandomizedJob(), true);
449445
break;
@@ -463,13 +459,6 @@ protected MlMetadata mutateInstance(MlMetadata instance) {
463459
metadataBuilder.putJob(randomJob, false);
464460
metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap());
465461
break;
466-
case 2:
467-
if (instance.getLastMemoryRefreshVersion() == null) {
468-
metadataBuilder.setLastMemoryRefreshVersion(randomNonNegativeLong());
469-
} else {
470-
metadataBuilder.setLastMemoryRefreshVersion(null);
471-
}
472-
break;
473462
default:
474463
throw new AssertionError("Illegal randomisation branch");
475464
}

0 commit comments

Comments
 (0)