Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ public SimpleIdsMatcher(String[] tokens) {
.collect(Collectors.toList());
}

public SimpleIdsMatcher(String expression) {
this(tokenizeExpression(expression));
}

/**
* Do any of the matchers match {@code id}?
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Objects;

public class CancelJobModelSnapshotUpgradeAction extends ActionType<CancelJobModelSnapshotUpgradeAction.Response> {

public static final CancelJobModelSnapshotUpgradeAction INSTANCE = new CancelJobModelSnapshotUpgradeAction();

// Even though at the time of writing this action doesn't have a REST endpoint the action name is
// still "admin" rather than "internal". This is because there's no conceptual reason why this
// action couldn't have a REST endpoint in the future, and it's painful to change these action
// names after release. The only difference is that in 7.17 the last remaining transport client
// users will be able to call this endpoint. In 8.x there is no transport client, so in 8.x there
// is no difference between having "admin" and "internal" here in the period before a REST endpoint
// exists. Using "admin" just makes life easier if we ever decide to add a REST endpoint in the
// future.
public static final String NAME = "cluster:admin/xpack/ml/job/model_snapshots/upgrade/cancel";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final String NAME = "cluster:admin/xpack/ml/job/model_snapshots/upgrade/cancel";
public static final String NAME = "cluster:internal/xpack/ml/job/model_snapshots/upgrade/cancel";

Since there is no rest layer for this, it seems prudent to be internal.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally made this admin instead of internal. Although there's no REST layer today it's possible that we might need to add one in the future, say if we ever want the UI to be able to clean up a failed upgrade.

The reason we've used internal instead of admin in the past is to make it hard for users of the transport client to call the API, even though it's easy for them to write the code to call it. Since there's nothing inherently wrong with calling this endpoint if you want to clean up failed upgrades I don't think that's a problem in this case. (Plus it's only in 7.17 that the transport client will even exist - after that lack of a REST layer will make this impossible to call.) I will add a comment to say this.

Part of the reason I didn't add a REST layer in this PR was because it's being added so late in the 7.x development cycle and it's not strictly essential at the moment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal conveys a message even if just a convention. If this was to become a public API with a REST interface we would create a new admin wrapper action that calls this one and have this action internal. We have many internal APIs called this way.

I don't consider this a blocker on the PR but there is a path for making internal APIs public, if you think it likely there will be a REST API then keep it admin otherwise it makes more sense to be internal.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's 50/50 whether this will be made public in the future.

If this was to become a public API with a REST interface we would create a new admin wrapper action that calls this one and have this action internal.

This seems like needless baggage to me. The wrapper action would literally call the internal action and do nothing else. It would just be extra boilerplate code.

internal conveys a message even if just a convention.

It's only the ML team that has ever done this. All the internal actions of core Elasticsearch start with either admin or monitor. (For example look at the persistent tasks actions.) The original reason why the ML team did this was to make transport client users think twice before calling an API that was intended to be internal. Of course this only had an effect if security was enabled, but it was in the days of X-Pack as a separate plugin where ML users were likely to be using security but most users weren't using security. It was also in the days before the transport client was deprecated so we were more worried about transport client users accidentally automating things they shouldn't.

Be as I said in the comment I added to the code, in this case we wouldn't really care if a transport client user did call this action. It doesn't do something low level that will mess up the cluster if called at the wrong time. It's an action that could be external, just isn't at the moment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that as the transport client is not used directly any more we don't really gain much by calling the action internal.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its good we had this discussion then as internal is a convention we have been using for a long time. If we chose to stop doing this then we have already established the usage of naming internal actions internal e.g. InternalInferModelAction perhaps we can switch to that convention for actions that are purely internal.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key difference with this action is that it doesn't need to be internal. It should be possible to make it external in the future with as little extra effort as possible. Whereas InternalInferModelAction is specifically designed to be internal, and it would cause problems if people ever started calling it externally. An action that's more similar to this one is DeleteExpiredDataAction, where its primary purpose is to be called by the daily maintenance task, but it doesn't hurt to call it externally. DeleteExpiredDataAction does have a corresponding REST endpoint, but if we'd wanted to avoid that initially to avoid yet another endpoint that shows up in our list of documented endpoints then we could have deferred adding the REST endpoint to a release after the action was added.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only the ML team that has ever done this. All the internal actions of core Elasticsearch start with either admin or monitor.

I agree that as the transport client is not used directly any more we don't really gain much by calling the action internal.

I was replying to these points stating that the original reason for using internal no longer applies and that and suggesting it can be discontinued with the demise of the transport client.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was replying to these points stating that the original reason for using internal no longer applies and that and suggesting it can be discontinued with the demise of the transport client.

Yes, sure, for new actions we can discontinue it. But for the existing ones that contain internal changing it is a non-trivial piece of work, so we might as well leave them as-is.


private CancelJobModelSnapshotUpgradeAction() {
super(NAME, Response::new);
}

public static class Request extends ActionRequest implements ToXContentObject {

public static final String ALL = "_all";

public static final ParseField SNAPSHOT_ID = new ParseField("snapshot_id");
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");

static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);

static {
PARSER.declareString(Request::setJobId, Job.ID);
PARSER.declareString(Request::setSnapshotId, SNAPSHOT_ID);
PARSER.declareBoolean(Request::setAllowNoMatch, ALLOW_NO_MATCH);
}

private String jobId = ALL;
private String snapshotId = ALL;
private boolean allowNoMatch = true;

public Request() {}

public Request(String jobId, String snapshotId) {
setJobId(jobId);
setSnapshotId(snapshotId);
}

public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
snapshotId = in.readString();
allowNoMatch = in.readBoolean();
}

public final Request setJobId(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID);
return this;
}

public String getJobId() {
return jobId;
}

public final Request setSnapshotId(String snapshotId) {
this.snapshotId = ExceptionsHelper.requireNonNull(snapshotId, Job.ID);
return this;
}

public String getSnapshotId() {
return snapshotId;
}

public boolean allowNoMatch() {
return allowNoMatch;
}

public Request setAllowNoMatch(boolean allowNoMatch) {
this.allowNoMatch = allowNoMatch;
return this;
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeString(snapshotId);
out.writeBoolean(allowNoMatch);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field(Job.ID.getPreferredName(), jobId)
.field(SNAPSHOT_ID.getPreferredName(), snapshotId)
.field(ALLOW_NO_MATCH.getPreferredName(), allowNoMatch)
.endObject();
}

@Override
public int hashCode() {
return Objects.hash(jobId, snapshotId, allowNoMatch);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(jobId, other.jobId) && Objects.equals(snapshotId, other.snapshotId) && allowNoMatch == other.allowNoMatch;
}

@Override
public String toString() {
return Strings.toString(this);
}
}

public static class Response extends ActionResponse implements Writeable, ToXContentObject {

private final boolean cancelled;

public Response(boolean cancelled) {
this.cancelled = cancelled;
}

public Response(StreamInput in) throws IOException {
cancelled = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(cancelled);
}

public boolean isCancelled() {
return cancelled;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("cancelled", cancelled);
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return cancelled == response.cancelled;
}

@Override
public int hashCode() {
return Objects.hash(cancelled);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Request;

public class CancelJobModelSnapshotUpgradeActionRequestTests extends AbstractSerializingTestCase<Request> {

@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLengthBetween(5, 20), randomAlphaOfLengthBetween(5, 20));
if (randomBoolean()) {
request.setAllowNoMatch(randomBoolean());
}
return request;
}

@Override
protected boolean supportsUnknownFields() {
return false;
}

@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}

@Override
protected Request doParseInstance(XContentParser parser) {
return Request.PARSER.apply(parser, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction.Response;

public class CancelJobModelSnapshotUpgradeActionResponseTests extends AbstractWireSerializingTestCase<Response> {

@Override
protected Response createTestInstance() {
return new Response(randomBoolean());
}

@Override
protected Writeable.Reader<Response> instanceReader() {
return CancelJobModelSnapshotUpgradeAction.Response::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CancelJobModelSnapshotUpgradeAction;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.CreateTrainedModelAllocationAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
Expand Down Expand Up @@ -183,6 +184,7 @@
import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import org.elasticsearch.xpack.ml.action.TransportCancelJobModelSnapshotUpgradeAction;
import org.elasticsearch.xpack.ml.action.TransportCloseJobAction;
import org.elasticsearch.xpack.ml.action.TransportCreateTrainedModelAllocationAction;
import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction;
Expand Down Expand Up @@ -1281,6 +1283,7 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetTrainedModelsStatsAction.INSTANCE, TransportGetTrainedModelsStatsAction.class),
new ActionHandler<>(PutTrainedModelAction.INSTANCE, TransportPutTrainedModelAction.class),
new ActionHandler<>(UpgradeJobModelSnapshotAction.INSTANCE, TransportUpgradeJobModelSnapshotAction.class),
new ActionHandler<>(CancelJobModelSnapshotUpgradeAction.INSTANCE, TransportCancelJobModelSnapshotUpgradeAction.class),
new ActionHandler<>(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, TransportGetJobModelSnapshotsUpgradeStatsAction.class),
new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class),
new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class),
Expand Down Expand Up @@ -1763,16 +1766,28 @@ public void cleanUpFeature(
}, unsetResetModeListener::onFailure);

// Stop data feeds
ActionListener<CancelJobModelSnapshotUpgradeAction.Response> cancelSnapshotUpgradesListener = ActionListener.wrap(
cancelUpgradesResponse -> {
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
client.execute(
StopDatafeedAction.INSTANCE,
stopDatafeedsReq,
ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
})
);
},
unsetResetModeListener::onFailure
);

// Cancel model snapshot upgrades
ActionListener<AcknowledgedResponse> stopDeploymentsListener = ActionListener.wrap(acknowledgedResponse -> {
StopDatafeedAction.Request stopDatafeedsReq = new StopDatafeedAction.Request("_all").setAllowNoMatch(true);
client.execute(
StopDatafeedAction.INSTANCE,
stopDatafeedsReq,
ActionListener.wrap(afterDataFeedsStopped::onResponse, failure -> {
logger.warn("failed stopping datafeeds for machine learning feature reset. Attempting with force=true", failure);
client.execute(StopDatafeedAction.INSTANCE, stopDatafeedsReq.setForce(true), afterDataFeedsStopped);
})
CancelJobModelSnapshotUpgradeAction.Request cancelSnapshotUpgradesReq = new CancelJobModelSnapshotUpgradeAction.Request(
"_all",
"_all"
);
client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, cancelSnapshotUpgradesListener);
}, unsetResetModeListener::onFailure);

// Stop all model deployments
Expand Down
Loading