From 043679b697babd9380978ac3af3a87cb207e409b Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 10 Feb 2022 10:43:04 +0800 Subject: [PATCH 1/3] wip --- .../DownsampleDateHistogramConfig.java | 11 ++++ .../downsample/action/DownSampleAction.java | 62 +++++++++++++++++++ .../xpack/rollup/v2/RestDownsampleAction.java | 11 ++++ .../rollup/v2/TransportDownsampleAction.java | 11 ++++ 4 files changed, 95 insertions(+) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java create mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java create mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java new file mode 100644 index 0000000000000..d369d4a00dcdb --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.downsample; + +public class DownsampleDateHistogramConfig { +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java new file mode 100644 index 0000000000000..dd81168b7f947 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.downsample.action; + +import java.io.IOException; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; +import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.rollup.action.RollupAction; + +public class DownSampleAction extends ActionType { + public static final DownSampleAction INSTANCE = new DownSampleAction(); + public static final String NAME = "indices:admin/xpack/downsample"; + + private DownSampleAction() { + super(NAME, AcknowledgedResponse::readFrom); + } + + //public static class Request extends MasterNodeRequest implements IndicesRequest, ToXContentObject { + // private String sourceIndex; + // private String rollupIndex; + // private DownsampleDateHistogramConfig downsampleConfig; + // + // public Request(String sourceIndex, String rollupIndex, DownsampleDateHistogramConfig downsampleConfig) { + // this.sourceIndex = sourceIndex; + // this.rollupIndex = rollupIndex; + // this.downsampleConfig = downsampleConfig; + // } + // + // public Request() {} + // + // public Request(StreamInput in) throws IOException { + // super(in); + // sourceIndex = in.readString(); + // rollupIndex = in.readString(); + // downsampleConfig = new RollupActionConfig(in); + // } + // + // @Override + // public String[] indices() { + // return new String[] {sourceIndex}; + // } + // + // @Override + // public IndicesOptions indicesOptions() { + // return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED; + // } + // + //} +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java new file mode 100644 index 0000000000000..6536070353e6c --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +public class RestDownsampleAction { +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java new file mode 100644 index 0000000000000..5c7060e354e15 --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +public class TransportDownsampleAction { +} From 72bc786d2fea604033767c82e7b644728625e9e5 Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 10 Feb 2022 16:29:32 +0800 Subject: [PATCH 2/3] add DownSampleAction --- .../DownsampleDateHistogramConfig.java | 125 +++++++++++- .../downsample/action/DownSampleAction.java | 116 +++++++---- .../elasticsearch/xpack/rollup/Rollup.java | 5 + .../xpack/rollup/v2/RestDownSampleAction.java | 42 ++++ .../xpack/rollup/v2/RestDownsampleAction.java | 11 -- .../rollup/v2/TransportDownSampleAction.java | 184 ++++++++++++++++++ .../rollup/v2/TransportDownsampleAction.java | 11 -- 7 files changed, 436 insertions(+), 58 deletions(-) create mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java delete mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java create mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java delete mode 100644 x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java index d369d4a00dcdb..3f98d6b7210da 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/DownsampleDateHistogramConfig.java @@ -7,5 +7,128 @@ package org.elasticsearch.xpack.core.downsample; -public class DownsampleDateHistogramConfig { +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ObjectParser.ValueType; +import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.time.ZoneId; +import java.util.Objects; + +import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg; + +public class DownsampleDateHistogramConfig implements Writeable, ToXContentObject { + static final String NAME = "date_histogram"; + public static final String FIXED_INTERVAL = "fixed_interval"; + public static final String CALENDAR_INTERVAL = "calendar_interval"; + public static final String TIME_ZONE = "time_zone"; + public static final String DEFAULT_TIMEZONE = ZoneId.of("UTC").getId(); + + private static final ConstructingObjectParser PARSER; + static { + PARSER = new ConstructingObjectParser<>( + NAME, + a -> new DownsampleDateHistogramConfig((DateHistogramInterval) a[0], (DateHistogramInterval) a[1], (String) a[2]) + ); + PARSER.declareField( + optionalConstructorArg(), + p -> new DateHistogramInterval(p.text()), + new ParseField(CALENDAR_INTERVAL), + ValueType.STRING + ); + PARSER.declareField( + optionalConstructorArg(), + p -> new DateHistogramInterval(p.text()), + new ParseField(FIXED_INTERVAL), + ValueType.STRING + ); + PARSER.declareString(optionalConstructorArg(), new ParseField(TIME_ZONE)); + } + + private final DateHistogramInterval calendarInterval; + private final DateHistogramInterval fixedInterval; + private final String timeZone; + + public DownsampleDateHistogramConfig( + final @Nullable DateHistogramInterval calendarInterval, + final @Nullable DateHistogramInterval fixedInterval, + final @Nullable String timeZone + ) { + this.calendarInterval = calendarInterval; + this.fixedInterval = fixedInterval; + this.timeZone = (timeZone != null && timeZone.isEmpty() == false) ? timeZone : DEFAULT_TIMEZONE; + } + + public DownsampleDateHistogramConfig(final StreamInput in) throws IOException { + this.calendarInterval = new DateHistogramInterval(in); + this.fixedInterval = new DateHistogramInterval(in); + this.timeZone = in.readString(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalWriteable(calendarInterval); + out.writeOptionalWriteable(fixedInterval); + out.writeString(timeZone); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field(CALENDAR_INTERVAL, calendarInterval); + builder.field(FIXED_INTERVAL, fixedInterval); + builder.field(TIME_ZONE, timeZone); + } + return builder.endObject(); + } + + public static DownsampleDateHistogramConfig fromXContent(final XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public DateHistogramInterval getCalendarInterval() { + return calendarInterval; + } + + public DateHistogramInterval getFixedInterval() { + return fixedInterval; + } + + public String getTimeZone() { + return timeZone; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DownsampleDateHistogramConfig that = (DownsampleDateHistogramConfig) o; + return Objects.equals(calendarInterval, that.calendarInterval) + && Objects.equals(fixedInterval, that.fixedInterval) + && Objects.equals(timeZone, that.timeZone); + } + + @Override + public int hashCode() { + return Objects.hash(calendarInterval, fixedInterval, timeZone); + } + + @Override + public String toString() { + return Strings.toString(this, true, true); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java index dd81168b7f947..528da4e171792 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/downsample/action/DownSampleAction.java @@ -7,8 +7,7 @@ package org.elasticsearch.xpack.core.downsample.action; -import java.io.IOException; - +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.support.IndicesOptions; @@ -16,9 +15,11 @@ import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; -import org.elasticsearch.xpack.core.rollup.RollupActionConfig; -import org.elasticsearch.xpack.core.rollup.action.RollupAction; + +import java.io.IOException; +import java.util.Objects; public class DownSampleAction extends ActionType { public static final DownSampleAction INSTANCE = new DownSampleAction(); @@ -28,35 +29,80 @@ private DownSampleAction() { super(NAME, AcknowledgedResponse::readFrom); } - //public static class Request extends MasterNodeRequest implements IndicesRequest, ToXContentObject { - // private String sourceIndex; - // private String rollupIndex; - // private DownsampleDateHistogramConfig downsampleConfig; - // - // public Request(String sourceIndex, String rollupIndex, DownsampleDateHistogramConfig downsampleConfig) { - // this.sourceIndex = sourceIndex; - // this.rollupIndex = rollupIndex; - // this.downsampleConfig = downsampleConfig; - // } - // - // public Request() {} - // - // public Request(StreamInput in) throws IOException { - // super(in); - // sourceIndex = in.readString(); - // rollupIndex = in.readString(); - // downsampleConfig = new RollupActionConfig(in); - // } - // - // @Override - // public String[] indices() { - // return new String[] {sourceIndex}; - // } - // - // @Override - // public IndicesOptions indicesOptions() { - // return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED; - // } - // - //} + public static class Request extends MasterNodeRequest implements IndicesRequest, ToXContentObject { + private String sourceIndex; + private String downsampleIndex; + private DownsampleDateHistogramConfig downsampleConfig; + + public Request(String sourceIndex, String downsampleIndex, DownsampleDateHistogramConfig downsampleConfig) { + this.sourceIndex = sourceIndex; + this.downsampleIndex = downsampleIndex; + this.downsampleConfig = downsampleConfig; + } + + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); + sourceIndex = in.readString(); + downsampleIndex = in.readString(); + downsampleConfig = new DownsampleDateHistogramConfig(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("source_index", sourceIndex); + builder.field("downsample_index", downsampleIndex); + downsampleConfig.toXContent(builder, params); + builder.endObject(); + return builder; + } + + @Override + public String[] indices() { + return new String[] { sourceIndex }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED; + } + + public String getSourceIndex() { + return sourceIndex; + } + + public String getDownsampleIndex() { + return downsampleIndex; + } + + public DownsampleDateHistogramConfig getDownsampleConfig() { + return downsampleConfig; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Request request = (Request) o; + return Objects.equals(sourceIndex, request.sourceIndex) + && Objects.equals(downsampleIndex, request.downsampleIndex) + && Objects.equals(downsampleConfig, request.downsampleConfig); + } + + @Override + public int hashCode() { + return Objects.hash(sourceIndex, downsampleIndex, downsampleConfig); + } + } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 0039f994fb749..7b7ad43f4e42f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -37,6 +37,7 @@ import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction; import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; import org.elasticsearch.xpack.core.rollup.action.GetRollupCapsAction; @@ -66,7 +67,9 @@ import org.elasticsearch.xpack.rollup.rest.RestRollupSearchAction; import org.elasticsearch.xpack.rollup.rest.RestStartRollupJobAction; import org.elasticsearch.xpack.rollup.rest.RestStopRollupJobAction; +import org.elasticsearch.xpack.rollup.v2.RestDownSampleAction; import org.elasticsearch.xpack.rollup.v2.RestRollupAction; +import org.elasticsearch.xpack.rollup.v2.TransportDownSampleAction; import org.elasticsearch.xpack.rollup.v2.TransportRollupAction; import org.elasticsearch.xpack.rollup.v2.TransportRollupIndexerAction; @@ -142,6 +145,7 @@ public List getRestHandlers( if (RollupV2.isEnabled()) { handlers.add(new RestRollupAction()); + handlers.add(new RestDownSampleAction()); } return handlers; @@ -167,6 +171,7 @@ public List getRestHandlers( if (RollupV2.isEnabled()) { actions.add(new ActionHandler<>(RollupIndexerAction.INSTANCE, TransportRollupIndexerAction.class)); actions.add(new ActionHandler<>(RollupAction.INSTANCE, TransportRollupAction.class)); + actions.add(new ActionHandler<>(DownSampleAction.INSTANCE, TransportDownSampleAction.class)); } return actions; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java new file mode 100644 index 0000000000000..d0cfd6d61ed1a --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownSampleAction.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.POST; + +public class RestDownSampleAction extends BaseRestHandler { + @Override + public List routes() { + return List.of(new Route(POST, "/{index}/_downsample/{downsample_index}")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String index = restRequest.param("index"); + String downsampleIndex = restRequest.param("downsample_index"); + DownsampleDateHistogramConfig config = DownsampleDateHistogramConfig.fromXContent(restRequest.contentParser()); + DownSampleAction.Request request = new DownSampleAction.Request(index, downsampleIndex, config); + return channel -> client.execute(DownSampleAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "downsample_action"; + } + +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java deleted file mode 100644 index 6536070353e6c..0000000000000 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RestDownsampleAction.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.rollup.v2; - -public class RestDownsampleAction { -} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java new file mode 100644 index 0000000000000..5d4570fad742d --- /dev/null +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownSampleAction.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.rollup.v2; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; +import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction; +import org.elasticsearch.xpack.core.downsample.action.DownSampleAction.Request; +import org.elasticsearch.xpack.core.rollup.RollupActionConfig; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig.CalendarInterval; +import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig.FixedInterval; +import org.elasticsearch.xpack.core.rollup.RollupActionGroupConfig; +import org.elasticsearch.xpack.core.rollup.action.RollupAction; +import org.elasticsearch.xpack.core.rollup.job.MetricConfig; +import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_DIMENSION_PARAM; +import static org.elasticsearch.index.mapper.TimeSeriesParams.TIME_SERIES_METRIC_PARAM; + +public class TransportDownSampleAction extends AcknowledgedTransportMasterNodeAction { + private final Client client; + + @Inject + public TransportDownSampleAction( + Client client, + ClusterService clusterService, + TransportService transportService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + RollupAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + DownSampleAction.Request::new, + indexNameExpressionResolver, + ThreadPool.Names.SAME + ); + this.client = new OriginSettingClient(client, ClientHelper.ROLLUP_ORIGIN); + } + + @Override + protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + IndexMetadata originalIndexMetadata = state.getMetadata().index(request.getSourceIndex()); + if (originalIndexMetadata == null) { + throw new ResourceNotFoundException("source index [{" + request.getSourceIndex() + "}] not found"); + } + + if (IndexSettings.MODE.get(originalIndexMetadata.getSettings()) != IndexMode.TIME_SERIES) { + throw new UnsupportedOperationException("downsample is support for time_series index"); + } + + RollupActionConfig rollupActionConfig = buildRollupActionConfig(request, originalIndexMetadata); + RollupAction.Request rollupRequest = new RollupAction.Request( + request.getSourceIndex(), + request.getDownsampleIndex(), + rollupActionConfig + ); + client.execute(RollupAction.INSTANCE, rollupRequest, listener); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + private RollupActionConfig buildRollupActionConfig(Request request, IndexMetadata indexMetadata) { + Map mapping = indexMetadata.mapping().sourceAsMap(); + @SuppressWarnings("unchecked") + Map properties = (Map) mapping.get("properties"); + if (properties == null) { + throw new IllegalArgumentException("no dimension and metrics fields exception"); + } + + Map fields = flattenFields(properties); + List dimensions = new ArrayList<>(); + List metricsConfig = new ArrayList<>(); + fields.forEach((key, value) -> { + @SuppressWarnings("unchecked") + Map fieldProperty = (Map) value; + if (fieldProperty.containsKey(TIME_SERIES_DIMENSION_PARAM) + && String.valueOf(fieldProperty.get(TIME_SERIES_DIMENSION_PARAM)).equals("true")) { + dimensions.add(key); + } else if (fieldProperty.containsKey(TIME_SERIES_METRIC_PARAM)) { + @SuppressWarnings("unchecked") + MetricType metricType = MetricType.valueOf((String) fieldProperty.get(TIME_SERIES_DIMENSION_PARAM)); + // TODO make sure the right metrics list settings + switch (metricType) { + case counter -> metricsConfig.add(new MetricConfig(key, List.of("max"))); + case gauge -> metricsConfig.add(new MetricConfig(key, List.of("max", "min", "sum", "value_count"))); + case summary -> metricsConfig.add(new MetricConfig(key, List.of("max", "min", "sum", "value_count"))); + case histogram -> { + // TODO to support histogram downsample + throw new IllegalArgumentException("downsample does not support histogram type"); + } + } + } + }); + + DownsampleDateHistogramConfig downsampleConfig = request.getDownsampleConfig(); + RollupActionDateHistogramGroupConfig dateHistogramGroupConfig; + if (request.getDownsampleConfig().getCalendarInterval() != null) { + dateHistogramGroupConfig = new CalendarInterval( + DataStreamTimestampFieldMapper.DEFAULT_PATH, + downsampleConfig.getFixedInterval(), + downsampleConfig.getTimeZone() + ); + } else { + dateHistogramGroupConfig = new FixedInterval( + DataStreamTimestampFieldMapper.DEFAULT_PATH, + downsampleConfig.getFixedInterval(), + downsampleConfig.getTimeZone() + ); + } + + return new RollupActionConfig( + new RollupActionGroupConfig(dateHistogramGroupConfig, null, new TermsGroupConfig(dimensions.toArray(Strings.EMPTY_ARRAY))), + metricsConfig + ); + } + + static Map flattenFields(Map properties) { + return flattenFields(properties, null); + + } + + private static Map flattenFields(Map map, String parentPath) { + Map flatMap = new HashMap<>(); + String prefix = parentPath != null ? parentPath + "." : ""; + for (Map.Entry entry : map.entrySet()) { + if (entry.getValue() instanceof Map) { + @SuppressWarnings("unchecked") + Map value = (Map) entry.getValue(); + String type = (String) value.get("type"); + if (type == null || type.equals("object")) { + if (value.containsKey("properties")) { + @SuppressWarnings("unchecked") + Map properties = (Map) value.get("properties"); + flatMap.putAll(flattenFields(properties, prefix + entry.getKey())); + } + } else { + flatMap.put(prefix + entry.getKey(), entry.getValue()); + } + } + } + return flatMap; + } +} diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java deleted file mode 100644 index 5c7060e354e15..0000000000000 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/TransportDownsampleAction.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.rollup.v2; - -public class TransportDownsampleAction { -} From 2115c7ee506b8abe15af739439af53fcf1803bef Mon Sep 17 00:00:00 2001 From: weizijun Date: Thu, 10 Feb 2022 17:14:33 +0800 Subject: [PATCH 3/3] add downsample api description --- .../rest-api-spec/api/rollup.downsample.json | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json b/rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json new file mode 100644 index 0000000000000..edc2501817164 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/rollup.downsample.json @@ -0,0 +1,41 @@ +{ + "rollup.downsample":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-rollup.html", + "description":"Downsample an index" + }, + "stability":"experimental", + "visibility":"public", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url": { + "paths": [ + { + "path": "/{index}/_downsample/{downsample_index}", + "methods": [ + "POST" + ], + "parts": { + "index": { + "type": "string", + "description": "The index to down sample", + "required": true + }, + "downsample_index": { + "type": "string", + "description": "The name of the down sample index to create", + "required": true + } + } + } + ] + }, + "params":{}, + "body":{ + "description":"The downsample configuration", + "required":true + } + } +}