-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Watcher: Allow to execute actions for each element in array #41997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
54efef5
6f26c7b
11a96e1
c3cb96d
d58818e
3d0ce81
8a2af63
7bc27f0
46901c8
4e4ba21
db479d7
983e815
d4d0490
1d9456c
66b284c
14dcc8d
2073d46
086fe24
95606e6
8029528
9c0a0ba
edc19a8
b989cfb
c6d433e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,13 +7,17 @@ | |
|
|
||
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.apache.logging.log4j.util.Supplier; | ||
| import org.elasticsearch.ElasticsearchException; | ||
| import org.elasticsearch.ElasticsearchParseException; | ||
| import org.elasticsearch.common.Nullable; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.unit.TimeValue; | ||
| import org.elasticsearch.common.xcontent.ObjectPath; | ||
| import org.elasticsearch.common.xcontent.ToXContentObject; | ||
| import org.elasticsearch.common.xcontent.XContentBuilder; | ||
| import org.elasticsearch.common.xcontent.XContentParser; | ||
| import org.elasticsearch.license.XPackLicenseState; | ||
| import org.elasticsearch.script.JodaCompatibleZonedDateTime; | ||
| import org.elasticsearch.xpack.core.watcher.actions.throttler.ActionThrottler; | ||
| import org.elasticsearch.xpack.core.watcher.actions.throttler.Throttler; | ||
| import org.elasticsearch.xpack.core.watcher.actions.throttler.ThrottlerField; | ||
|
|
@@ -30,7 +34,14 @@ | |
| import java.time.Clock; | ||
| import java.time.ZoneOffset; | ||
| import java.time.ZonedDateTime; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; | ||
|
|
||
|
|
@@ -43,16 +54,20 @@ public class ActionWrapper implements ToXContentObject { | |
| private final ExecutableTransform<Transform, Transform.Result> transform; | ||
| private final ActionThrottler throttler; | ||
| private final ExecutableAction<? extends Action> action; | ||
| @Nullable | ||
| private String path; | ||
|
|
||
| public ActionWrapper(String id, ActionThrottler throttler, | ||
| @Nullable ExecutableCondition condition, | ||
| @Nullable ExecutableTransform<Transform, Transform.Result> transform, | ||
| ExecutableAction<? extends Action> action) { | ||
| ExecutableAction<? extends Action> action, | ||
| @Nullable String path) { | ||
| this.id = id; | ||
| this.condition = condition; | ||
| this.throttler = throttler; | ||
| this.transform = transform; | ||
| this.action = action; | ||
| this.path = path; | ||
| } | ||
|
|
||
| public String id() { | ||
|
|
@@ -140,16 +155,83 @@ public ActionWrapperResult execute(WatchExecutionContext ctx) { | |
| return new ActionWrapperResult(id, conditionResult, null, new Action.Result.FailureWithException(action.type(), e)); | ||
| } | ||
| } | ||
| try { | ||
| Action.Result actionResult = action.execute(id, ctx, payload); | ||
| return new ActionWrapperResult(id, conditionResult, transformResult, actionResult); | ||
| } catch (Exception e) { | ||
| action.logger().error( | ||
| if (Strings.isEmpty(path)) { | ||
| try { | ||
| Action.Result actionResult = action.execute(id, ctx, payload); | ||
| return new ActionWrapperResult(id, conditionResult, transformResult, actionResult); | ||
| } catch (Exception e) { | ||
| action.logger().error( | ||
| (Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e); | ||
| return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); | ||
| } | ||
| } else { | ||
| try { | ||
| List<Action.Result> results = new ArrayList<>(); | ||
| Object object = ObjectPath.eval(path, toMap(ctx)); | ||
| if (object instanceof Collection) { | ||
| Collection collection = Collection.class.cast(object); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add an upper limit here ? I am fine with a hard coded There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set it hundred, also included the number of executed actions in the JSON that gets written to watch history There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 100 seems quite arbitrary from my perspective. Any chance to make this an option? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please open a new issue to make this configurable plus the rationale. The main goal here was to not let the watch run extremely long and block other watch executions, as well as its own executions. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed as #45169 |
||
| if (collection.isEmpty()) { | ||
| throw new ElasticsearchException("foreach object [{}] was an empty list, could not run any action", path); | ||
| } else { | ||
| for (Object o : collection) { | ||
| if (o instanceof Map) { | ||
| results.add(action.execute(id, ctx, new Payload.Simple((Map<String, Object>) o))); | ||
| } else { | ||
| throw new ElasticsearchException("item in foreach [{}] object was not a map", path); | ||
|
||
| } | ||
| } | ||
| } | ||
| } else if (object == null) { | ||
| throw new ElasticsearchException("specified foreach object was null: [{}]", path); | ||
| } else { | ||
| throw new ElasticsearchException("specified foreach object was not a an array/collection: [{}]", path); | ||
| } | ||
|
|
||
| // check if we have mixed results, then set to partial failure | ||
| final Set<Action.Result.Status> statuses = results.stream().map(Action.Result::status).collect(Collectors.toSet()); | ||
| Action.Result.Status status; | ||
| if (statuses.size() == 1) { | ||
| status = statuses.iterator().next(); | ||
| } else { | ||
| status = Action.Result.Status.PARTIAL_FAILURE; | ||
| } | ||
|
|
||
| return new ActionWrapperResult(id, conditionResult, transformResult, | ||
| new Action.Result(action.type(), status) { | ||
| @Override | ||
| public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
| builder.startArray(WatchField.FOREACH.getPreferredName()); | ||
| for (Action.Result result : results) { | ||
| builder.startObject(); | ||
| result.toXContent(builder, params); | ||
| builder.endObject(); | ||
| } | ||
| builder.endArray(); | ||
| return builder; | ||
| } | ||
| }); | ||
| } catch (Exception e) { | ||
| action.logger().error( | ||
| (Supplier<?>) () -> new ParameterizedMessage("failed to execute action [{}/{}]", ctx.watch().id(), id), e); | ||
| return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); | ||
| return new ActionWrapperResult(id, new Action.Result.FailureWithException(action.type(), e)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private Map<String, Object> toMap(WatchExecutionContext ctx) { | ||
| Map<String, Object> model = new HashMap<>(); | ||
| model.put("id", ctx.id().value()); | ||
| model.put("watch_id", ctx.id().watchId()); | ||
| model.put("execution_time", new JodaCompatibleZonedDateTime(ctx.executionTime().toInstant(), ZoneOffset.UTC)); | ||
| model.put("trigger", ctx.triggerEvent().data()); | ||
| model.put("metadata", ctx.watch().metadata()); | ||
| model.put("vars", ctx.vars()); | ||
| if (ctx.payload().data() != null) { | ||
| model.put("payload", ctx.payload().data()); | ||
| } | ||
| return Map.of("ctx", model); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) return true; | ||
|
|
@@ -186,6 +268,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
| .field(transform.type(), transform, params) | ||
| .endObject(); | ||
| } | ||
| if (Strings.isEmpty(path) == false) { | ||
| builder.field(WatchField.FOREACH.getPreferredName(), path); | ||
| } | ||
| builder.field(action.type(), action, params); | ||
| return builder.endObject(); | ||
| } | ||
|
|
@@ -198,6 +283,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse | |
| ExecutableCondition condition = null; | ||
| ExecutableTransform<Transform, Transform.Result> transform = null; | ||
| TimeValue throttlePeriod = null; | ||
| String path = null; | ||
| ExecutableAction<? extends Action> action = null; | ||
|
|
||
| String currentFieldName = null; | ||
|
|
@@ -208,6 +294,8 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse | |
| } else { | ||
| if (WatchField.CONDITION.match(currentFieldName, parser.getDeprecationHandler())) { | ||
| condition = actionRegistry.getConditionRegistry().parseExecutable(watchId, parser); | ||
| } else if (WatchField.FOREACH.match(currentFieldName, parser.getDeprecationHandler())) { | ||
| path = parser.text(); | ||
| } else if (Transform.TRANSFORM.match(currentFieldName, parser.getDeprecationHandler())) { | ||
| transform = actionRegistry.getTransformRegistry().parse(watchId, parser); | ||
| } else if (ThrottlerField.THROTTLE_PERIOD.match(currentFieldName, parser.getDeprecationHandler())) { | ||
|
|
@@ -235,7 +323,7 @@ static ActionWrapper parse(String watchId, String actionId, XContentParser parse | |
| } | ||
|
|
||
| ActionThrottler throttler = new ActionThrottler(clock, throttlePeriod, licenseState); | ||
| return new ActionWrapper(actionId, throttler, condition, transform, action); | ||
| return new ActionWrapper(actionId, throttler, condition, transform, action, path); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| --- | ||
| setup: | ||
| - do: | ||
| cluster.health: | ||
| wait_for_status: yellow | ||
|
|
||
| --- | ||
| "Test execute watch api with foreach action": | ||
| - do: | ||
| watcher.execute_watch: | ||
| body: > | ||
| { | ||
| "watch" : { | ||
| "trigger": { | ||
| "schedule" : { "cron" : "0 0 0 1 * ? 2099" } | ||
| }, | ||
| "input": { | ||
| "simple" : { | ||
| "hits" : { | ||
| "hits" : [ | ||
| { "key" : "first" }, | ||
| { "key" : "second" }, | ||
| { "key" : "third" } | ||
| ] | ||
| } | ||
| } | ||
| }, | ||
| "actions": { | ||
| "log_hits" : { | ||
| "foreach" : "ctx.payload.hits.hits", | ||
| "logging" : { | ||
| "text" : "Logging {{ctx.payload.key}}" | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| - match: { watch_record.trigger_event.type: "manual" } | ||
| - match: { watch_record.state: "executed" } | ||
| - match: { watch_record.status.execution_state: "executed" } | ||
| - match: { watch_record.result.actions.0.foreach.0.logging.logged_text: "Logging first" } | ||
| - match: { watch_record.result.actions.0.foreach.1.logging.logged_text: "Logging second" } | ||
| - match: { watch_record.result.actions.0.foreach.2.logging.logged_text: "Logging third" } |
Uh oh!
There was an error while loading. Please reload this page.