diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json b/rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json new file mode 100644 index 0000000000000..edc2501817164 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json @@ -0,0 +1,41 @@ +{ + "rollup.downsample":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-rollup.html", + "description":"Downsample an index" + }, + "stability":"experimental", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url": { + "paths": [ + { + "path": "/{index}/_downsample/{downsample_index}", + "methods": [ + "POST" + ], + "parts": { + "index": { + "type": "string", + "description": "The index to down sample", + "required": true + }, + "downsample_index": { + "type": "string", + "description": "The name of the down sample index to create", + "required": true + } + } + } + ] + }, + "params":{}, + "body":{ + "description":"The downsample configuration", + "required":true + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java new file mode 100644 index 0000000000000..3f98d6b7210da --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.downsample; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser.ValueType; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.Objects; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class DownsampleDateHistogramConfig implements Writeable, ToXContentObject { + static final String NAME = "date_histogram"; + public static final String FIXED_INTERVAL = "fixed_interval"; + public static final String CALENDAR_INTERVAL = "calendar_interval"; + public static final String TIME_ZONE = "time_zone"; + public static final String DEFAULT_TIMEZONE = ZoneId.of("UTC").getId(); + + private static final ConstructingObjectParser PARSER; + static { + PARSER = new ConstructingObjectParser<>( + NAME, + a -> new DownsampleDateHistogramConfig((DateHistogramInterval) a[0], (DateHistogramInterval) a[1], (String) a[2]) + ); + PARSER.declareField( + optionalConstructorArg(), + p -> new DateHistogramInterval(p.text()), + new ParseField(CALENDAR_INTERVAL), + ValueType.STRING + ); + PARSER.declareField( + optionalConstructorArg(), + p -> new DateHistogramInterval(p.text()), + new ParseField(FIXED_INTERVAL), + ValueType.STRING + ); + PARSER.declareString(optionalConstructorArg(), new ParseField(TIME_ZONE)); + } + + private final DateHistogramInterval calendarInterval; + private final DateHistogramInterval fixedInterval; + private final String timeZone; + + public DownsampleDateHistogramConfig( + final @Nullable DateHistogramInterval calendarInterval, + final @Nullable DateHistogramInterval fixedInterval, + final @Nullable String timeZone + ) { + this.calendarInterval = calendarInterval; + this.fixedInterval = fixedInterval; + this.timeZone = (timeZone != null && timeZone.isEmpty() == false) ? timeZone : DEFAULT_TIMEZONE; + } + + public DownsampleDateHistogramConfig(final StreamInput in) throws IOException { + this.calendarInterval = new DateHistogramInterval(in); + this.fixedInterval = new DateHistogramInterval(in); + this.timeZone = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalWriteable(calendarInterval); + out.writeOptionalWriteable(fixedInterval); + out.writeString(timeZone); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field(CALENDAR_INTERVAL, calendarInterval); + builder.field(FIXED_INTERVAL, fixedInterval); + builder.field(TIME_ZONE, timeZone); + } + return builder.endObject(); + } + + public static DownsampleDateHistogramConfig fromXContent(final XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public DateHistogramInterval getCalendarInterval() { + return calendarInterval; + } + + public DateHistogramInterval getFixedInterval() { + return fixedInterval; + } + + public String getTimeZone() { + return timeZone; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DownsampleDateHistogramConfig that = (DownsampleDateHistogramConfig) o; + return Objects.equals(calendarInterval, that.calendarInterval) + && Objects.equals(fixedInterval, that.fixedInterval) + && Objects.equals(timeZone, that.timeZone); + } + + @Override + public int hashCode() { + return Objects.hash(calendarInterval, fixedInterval, timeZone); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java new file mode 100644 index 0000000000000..528da4e171792 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.downsample.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; + +import java.io.IOException; +import java.util.Objects; + +public class DownSampleAction extends ActionType { + public static final DownSampleAction INSTANCE = new DownSampleAction(); + public static final String NAME = "indices:admin/xpack/downsample"; + + private DownSampleAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + public static class Request extends MasterNodeRequest implements IndicesRequest, ToXContentObject { + private String sourceIndex; + private String downsampleIndex; + private DownsampleDateHistogramConfig downsampleConfig; + + public Request(String sourceIndex, String downsampleIndex, DownsampleDateHistogramConfig downsampleConfig) { + this.sourceIndex = sourceIndex; + this.downsampleIndex = downsampleIndex; + this.downsampleConfig = downsampleConfig; + } + + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); + sourceIndex = in.readString(); + downsampleIndex = in.readString(); + downsampleConfig = new DownsampleDateHistogramConfig(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("source_index", sourceIndex); + builder.field("downsample_index", downsampleIndex); + downsampleConfig.toXContent(builder, params); + builder.endObject(); + return builder; + } + + @Override + public String[] indices() { + return new String[] { sourceIndex }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED; + } + + public String getSourceIndex() { + return sourceIndex; + } + + public String getDownsampleIndex() { + return downsampleIndex; + } + + public DownsampleDateHistogramConfig getDownsampleConfig() { + return downsampleConfig; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Request request = (Request) o; + return Objects.equals(sourceIndex, request.sourceIndex) + && Objects.equals(downsampleIndex, request.downsampleIndex) + && Objects.equals(downsampleConfig, request.downsampleConfig); + } + + @Override + public int hashCode() { + return Objects.hash(sourceIndex, downsampleIndex, downsampleConfig); + } + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 0039f994fb749..7b7ad43f4e42f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -37,6 +37,7 @@ import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.GetRollupCapsAction; @@ -66,7 +67,9 @@ import org.elasticsearch.xpack.rollup.rest.RestRollupSearchAction; import org.elasticsearch.xpack.rollup.rest.RestStartRollupJobAction; import org.elasticsearch.xpack.rollup.rest.RestStopRollupJobAction; +import org.elasticsearch.xpack.rollup.v2.RestDownSampleAction; import org.elasticsearch.xpack.rollup.v2.RestRollupAction; +import org.elasticsearch.xpack.rollup.v2.TransportDownSampleAction; import org.elasticsearch.xpack.rollup.v2.TransportRollupAction; import org.elasticsearch.xpack.rollup.v2.TransportRollupIndexerAction; @@ -142,6 +145,7 @@ public List getRestHandlers( if (RollupV2.isEnabled()) { handlers.add(new RestRollupAction()); + handlers.add(new RestDownSampleAction()); } return handlers; @@ -167,6 +171,7 @@ public List getRestHandlers( if (RollupV2.isEnabled()) { actions.add(new ActionHandler<>(RollupIndexerAction.INSTANCE, TransportRollupIndexerAction.class)); actions.add(new ActionHandler<>(RollupAction.INSTANCE, TransportRollupAction.class)); + actions.add(new ActionHandler<>(DownSampleAction.INSTANCE, TransportDownSampleAction.class)); } return actions; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java new file mode 100644 index 0000000000000..d0cfd6d61ed1a --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestDownSampleAction extends BaseRestHandler { + @Override + public List routes() { + return List.of(new Route(POST, "/{index}/_downsample/{downsample_index}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String index = restRequest.param("index"); + String downsampleIndex = restRequest.param("downsample_index"); + DownsampleDateHistogramConfig config = DownsampleDateHistogramConfig.fromXContent(restRequest.contentParser()); + DownSampleAction.Request request = new DownSampleAction.Request(index, downsampleIndex, config); + return channel -> client.execute(DownSampleAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "downsample_action"; + } + +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java new file mode 100644 index 0000000000000..5d4570fad742d --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; +import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction.Request; +import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig.CalendarInterval; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig.FixedInterval; +import org.elasticsearch.xpack.core.rollup.RollupActionGroupConfig; +import org.elasticsearch.xpack.core.rollup.action.RollupAction; +import org.elasticsearch.xpack.core.rollup.job.MetricConfig; +import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM; +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; + +public class TransportDownSampleAction extends AcknowledgedTransportMasterNodeAction { + private final Client client; + + @Inject + public TransportDownSampleAction( + Client client, + ClusterService clusterService, + TransportService transportService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + RollupAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + DownSampleAction.Request::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + IndexMetadata originalIndexMetadata = state.getMetadata().index(request.getSourceIndex()); + if (originalIndexMetadata == null) { + throw new ResourceNotFoundException("source index [{" + request.getSourceIndex() + "}] not found"); + } + + if (IndexSettings.MODE.get(originalIndexMetadata.getSettings()) != IndexMode.TIME_SERIES) { + throw new UnsupportedOperationException("downsample is support for time_series index"); + } + + RollupActionConfig rollupActionConfig = buildRollupActionConfig(request, originalIndexMetadata); + RollupAction.Request rollupRequest = new RollupAction.Request( + request.getSourceIndex(), + request.getDownsampleIndex(), + rollupActionConfig + ); + client.execute(RollupAction.INSTANCE, rollupRequest, listener); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private RollupActionConfig buildRollupActionConfig(Request request, IndexMetadata indexMetadata) { + Map mapping = indexMetadata.mapping().sourceAsMap(); + @SuppressWarnings("unchecked") + Map properties = (Map) mapping.get("properties"); + if (properties == null) { + throw new IllegalArgumentException("no dimension and metrics fields exception"); + } + + Map fields = flattenFields(properties); + List dimensions = new ArrayList<>(); + List metricsConfig = new ArrayList<>(); + fields.forEach((key, value) -> { + @SuppressWarnings("unchecked") + Map fieldProperty = (Map) value; + if (fieldProperty.containsKey(TIME_SERIES_DIMENSION_PARAM) + && String.valueOf(fieldProperty.get(TIME_SERIES_DIMENSION_PARAM)).equals("true")) { + dimensions.add(key); + } else if (fieldProperty.containsKey(TIME_SERIES_METRIC_PARAM)) { + @SuppressWarnings("unchecked") + MetricType metricType = MetricType.valueOf((String) fieldProperty.get(TIME_SERIES_DIMENSION_PARAM)); + // TODO make sure the right metrics list settings + switch (metricType) { + case counter -> metricsConfig.add(new MetricConfig(key, List.of("max"))); + case gauge -> metricsConfig.add(new MetricConfig(key, List.of("max", "min", "sum", "value_count"))); + case summary -> metricsConfig.add(new MetricConfig(key, List.of("max", "min", "sum", "value_count"))); + case histogram -> { + // TODO to support histogram downsample + throw new IllegalArgumentException("downsample does not support histogram type"); + } + } + } + }); + + DownsampleDateHistogramConfig downsampleConfig = request.getDownsampleConfig(); + RollupActionDateHistogramGroupConfig dateHistogramGroupConfig; + if (request.getDownsampleConfig().getCalendarInterval() != null) { + dateHistogramGroupConfig = new CalendarInterval( + DataStreamTimestampFieldMapper.DEFAULT_PATH, + downsampleConfig.getFixedInterval(), + downsampleConfig.getTimeZone() + ); + } else { + dateHistogramGroupConfig = new FixedInterval( + DataStreamTimestampFieldMapper.DEFAULT_PATH, + downsampleConfig.getFixedInterval(), + downsampleConfig.getTimeZone() + ); + } + + return new RollupActionConfig( + new RollupActionGroupConfig(dateHistogramGroupConfig, null, new TermsGroupConfig(dimensions.toArray(Strings.EMPTY_ARRAY))), + metricsConfig + ); + } + + static Map flattenFields(Map properties) { + return flattenFields(properties, null); + + } + + private static Map flattenFields(Map map, String parentPath) { + Map flatMap = new HashMap<>(); + String prefix = parentPath != null ? parentPath + "." : ""; + for (Map.Entry entry : map.entrySet()) { + if (entry.getValue() instanceof Map) { + @SuppressWarnings("unchecked") + Map value = (Map) entry.getValue(); + String type = (String) value.get("type"); + if (type == null || type.equals("object")) { + if (value.containsKey("properties")) { + @SuppressWarnings("unchecked") + Map properties = (Map) value.get("properties"); + flatMap.putAll(flattenFields(properties, prefix + entry.getKey())); + } + } else { + flatMap.put(prefix + entry.getKey(), entry.getValue()); + } + } + } + return flatMap; + } +}