diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java new file mode 100644 index 0000000000000..b42df5ed29f37 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtils.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.apache.log4j.Logger; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +final class AggregationResultUtils { + private static final Logger logger = Logger.getLogger(AggregationResultUtils.class.getName()); + + /** + * Extracts aggregation results from a composite aggregation and puts it into a map. + * + * @param agg The aggregation result + * @param sources The original sources used for querying + * @param aggregationBuilders the aggregation used for querying + * @return a map containing the results of the aggregation in a consumable way + */ + public static Stream<Map<String, Object>> extractCompositeAggregationResults(CompositeAggregation agg, + List<CompositeValuesSourceBuilder<?>> sources, Collection<AggregationBuilder> aggregationBuilders) { + return agg.getBuckets().stream().map(bucket -> { + Map<String, Object> document = new HashMap<>(); + for (CompositeValuesSourceBuilder<?> source : sources) { + String destinationFieldName = source.name(); + document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)); + } + for (AggregationBuilder aggregationBuilder : aggregationBuilders) { + String aggName = aggregationBuilder.getName(); + + // TODO: support other aggregation types + Aggregation aggResult = bucket.getAggregations().get(aggName); + + if (aggResult instanceof NumericMetricsAggregation.SingleValue) { + NumericMetricsAggregation.SingleValue aggResultSingleValue = (SingleValue) aggResult; + document.put(aggName, aggResultSingleValue.value()); + } else { + // Execution should never reach this point! + // Creating jobs with unsupported aggregations shall not be possible + logger.error("Dataframe Internal Error: unsupported aggregation ["+ aggResult.getName() +"], ignoring"); + assert false; + } + } + return document; + }); + } + +} diff --git a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java index 8718eba3d7a1c..1fc22c828042e 100644 --- a/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java +++ b/x-pack/plugin/ml-feature-index-builder/src/main/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/FeatureIndexBuilderIndexer.java @@ -17,7 +17,6 @@ import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; -import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -25,14 +24,16 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; -import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings.DOC_TYPE; public abstract class FeatureIndexBuilderIndexer extends AsyncTwoPhaseIndexer<Map<String, Object>, FeatureIndexBuilderJobStats> { @@ -58,36 +59,37 @@ protected void onStartJob(long now) { @Override protected IterationResult<Map<String, Object>> doProcess(SearchResponse searchResponse) { final CompositeAggregation agg = searchResponse.getAggregations().get("feature"); - return new IterationResult<>(processBuckets(agg), agg.afterKey(), agg.getBuckets().isEmpty()); + return new IterationResult<>(processBucketsToIndexRequests(agg).collect(Collectors.toList()), agg.afterKey(), + agg.getBuckets().isEmpty()); } /* - * Mocked demo case + * Parses the result and creates a stream of indexable documents * - * TODO: replace with proper implementation + * Implementation decisions: + * + * Extraction uses generic maps as intermediate exchange format in order to hook in ingest pipelines/processors + * in later versions, see {@link IngestDocument). */ - private List<IndexRequest> processBuckets(CompositeAggregation agg) { - // for now only 1 source supported - String destinationFieldName = job.getConfig().getSourceConfig().getSources().get(0).name(); - String aggName = job.getConfig().getAggregationConfig().getAggregatorFactories().iterator().next().getName(); + private Stream<IndexRequest> processBucketsToIndexRequests(CompositeAggregation agg) { + String indexName = job.getConfig().getDestinationIndex(); + List<CompositeValuesSourceBuilder<?>> sources = job.getConfig().getSourceConfig().getSources(); + Collection<AggregationBuilder> aggregationBuilders = job.getConfig().getAggregationConfig().getAggregatorFactories(); - return agg.getBuckets().stream().map(b -> { - NumericMetricsAggregation.SingleValue aggResult = b.getAggregations().get(aggName); + return AggregationResultUtils.extractCompositeAggregationResults(agg, sources, aggregationBuilders).map(document -> { XContentBuilder builder; try { builder = jsonBuilder(); builder.startObject(); - builder.field(destinationFieldName, b.getKey().get(destinationFieldName)); - builder.field(aggName, aggResult.value()); + builder.map(document); builder.endObject(); } catch (IOException e) { throw new UncheckedIOException(e); } - String indexName = job.getConfig().getDestinationIndex(); IndexRequest request = new IndexRequest(indexName, DOC_TYPE).source(builder); return request; - }).collect(Collectors.toList()); + }); } @Override diff --git a/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java new file mode 100644 index 0000000000000..e3ac7bf052120 --- /dev/null +++ b/x-pack/plugin/ml-feature-index-builder/src/test/java/org/elasticsearch/xpack/ml/featureindexbuilder/job/AggregationResultUtilsTests.java @@ -0,0 +1,295 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.featureindexbuilder.job; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ContextParser; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; +import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.composite.ParsedComposite; +import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.MinAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ParsedAvg; +import org.elasticsearch.search.aggregations.metrics.ParsedCardinality; +import org.elasticsearch.search.aggregations.metrics.ParsedExtendedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedMax; +import org.elasticsearch.search.aggregations.metrics.ParsedMin; +import org.elasticsearch.search.aggregations.metrics.ParsedStats; +import org.elasticsearch.search.aggregations.metrics.ParsedSum; +import org.elasticsearch.search.aggregations.metrics.ParsedValueCount; +import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.ParsedStatsBucket; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.StatsBucketPipelineAggregationBuilder; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; + +public class AggregationResultUtilsTests extends ESTestCase { + + private final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(namedXContents); + + private final String KEY = Aggregation.CommonFields.KEY.getPreferredName(); + + // aggregations potentially useful for writing tests, to be expanded as necessary + private static final List<NamedXContentRegistry.Entry> namedXContents; + static { + Map<String, ContextParser<Object, ? extends Aggregation>> map = new HashMap<>(); + map.put(CardinalityAggregationBuilder.NAME, (p, c) -> ParsedCardinality.fromXContent(p, (String) c)); + map.put(MinAggregationBuilder.NAME, (p, c) -> ParsedMin.fromXContent(p, (String) c)); + map.put(MaxAggregationBuilder.NAME, (p, c) -> ParsedMax.fromXContent(p, (String) c)); + map.put(SumAggregationBuilder.NAME, (p, c) -> ParsedSum.fromXContent(p, (String) c)); + map.put(AvgAggregationBuilder.NAME, (p, c) -> ParsedAvg.fromXContent(p, (String) c)); + map.put(ValueCountAggregationBuilder.NAME, (p, c) -> ParsedValueCount.fromXContent(p, (String) c)); + map.put(StatsAggregationBuilder.NAME, (p, c) -> ParsedStats.fromXContent(p, (String) c)); + map.put(StatsBucketPipelineAggregationBuilder.NAME, (p, c) -> ParsedStatsBucket.fromXContent(p, (String) c)); + map.put(ExtendedStatsAggregationBuilder.NAME, (p, c) -> ParsedExtendedStats.fromXContent(p, (String) c)); + map.put(StringTerms.NAME, (p, c) -> ParsedStringTerms.fromXContent(p, (String) c)); + map.put(LongTerms.NAME, (p, c) -> ParsedLongTerms.fromXContent(p, (String) c)); + map.put(DoubleTerms.NAME, (p, c) -> ParsedDoubleTerms.fromXContent(p, (String) c)); + + namedXContents = map.entrySet().stream() + .map(entry -> new NamedXContentRegistry.Entry(Aggregation.class, new ParseField(entry.getKey()), entry.getValue())) + .collect(Collectors.toList()); + } + + @Override + protected NamedXContentRegistry xContentRegistry() { + return namedXContentRegistry; + } + + public void testExtractCompositeAggregationResults() throws IOException { + String targetField = randomAlphaOfLengthBetween(5, 10); + + List<CompositeValuesSourceBuilder<?>> sources = Collections.singletonList( + new TermsValuesSourceBuilder(targetField).field("doesn't_matter_for_this_test") + ); + + String aggName = randomAlphaOfLengthBetween(5, 10); + String aggTypedName = "avg#" + aggName; + Collection<AggregationBuilder> aggregationBuilders = Collections.singletonList(AggregationBuilders.avg(aggName)); + + Map<String, Object> input = asMap( + "buckets", + asList( + asMap( + KEY, asMap( + targetField, "ID1"), + aggTypedName, asMap( + "value", 42.33)), + asMap( + KEY, asMap( + targetField, "ID2"), + aggTypedName, asMap( + "value", 28.99)), + asMap( + KEY, asMap( + targetField, "ID3"), + aggTypedName, asMap( + "value", 12.55)) + )); + + List<Map<String, Object>> expected = asList( + asMap( + targetField, "ID1", + aggName, 42.33 + ), + asMap( + targetField, "ID2", + aggName, 28.99 + ), + asMap( + targetField, "ID3", + aggName, 12.55 + ) + ); + + executeTest(sources, aggregationBuilders, input, expected); + } + + public void testExtractCompositeAggregationResultsMultiSources() throws IOException { + String targetField = randomAlphaOfLengthBetween(5, 10); + String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + + List<CompositeValuesSourceBuilder<?>> sources = asList( + new TermsValuesSourceBuilder(targetField).field("doesn't_matter_for_this_test"), + new TermsValuesSourceBuilder(targetField2).field("doesn't_matter_for_this_test_too") + ); + + String aggName = randomAlphaOfLengthBetween(5, 10); + String aggTypedName = "avg#" + aggName; + Collection<AggregationBuilder> aggregationBuilders = Collections.singletonList(AggregationBuilders.avg(aggName)); + + Map<String, Object> input = asMap( + "buckets", + asList( + asMap( + KEY, asMap( + targetField, "ID1", + targetField2, "ID1_2" + ), + aggTypedName, asMap( + "value", 42.33)), + asMap( + KEY, asMap( + targetField, "ID1", + targetField2, "ID2_2" + ), + aggTypedName, asMap( + "value", 8.4)), + asMap( + KEY, asMap( + targetField, "ID2", + targetField2, "ID1_2" + ), + aggTypedName, asMap( + "value", 28.99)), + asMap( + KEY, asMap( + targetField, "ID3", + targetField2, "ID2_2" + ), + aggTypedName, asMap( + "value", 12.55)) + )); + + List<Map<String, Object>> expected = asList( + asMap( + targetField, "ID1", + targetField2, "ID1_2", + aggName, 42.33 + ), + asMap( + targetField, "ID1", + targetField2, "ID2_2", + aggName, 8.4 + ), + asMap( + targetField, "ID2", + targetField2, "ID1_2", + aggName, 28.99 + ), + asMap( + targetField, "ID3", + targetField2, "ID2_2", + aggName, 12.55 + ) + ); + executeTest(sources, aggregationBuilders, input, expected); + } + + public void testExtractCompositeAggregationResultsMultiAggregations() throws IOException { + String targetField = randomAlphaOfLengthBetween(5, 10); + List<CompositeValuesSourceBuilder<?>> sources = Collections.singletonList( + new TermsValuesSourceBuilder(targetField).field("doesn't_matter_for_this_test") + ); + + String aggName = randomAlphaOfLengthBetween(5, 10); + String aggTypedName = "avg#" + aggName; + + String aggName2 = randomAlphaOfLengthBetween(5, 10) + "_2"; + String aggTypedName2 = "max#" + aggName2; + + Collection<AggregationBuilder> aggregationBuilders = asList(AggregationBuilders.avg(aggName), AggregationBuilders.max(aggName2)); + + Map<String, Object> input = asMap( + "buckets", + asList( + asMap( + KEY, asMap( + targetField, "ID1"), + aggTypedName, asMap( + "value", 42.33), + aggTypedName2, asMap( + "value", 9.9)), + asMap( + KEY, asMap( + targetField, "ID2"), + aggTypedName, asMap( + "value", 28.99), + aggTypedName2, asMap( + "value", 222.33)), + asMap( + KEY, asMap( + targetField, "ID3"), + aggTypedName, asMap( + "value", 12.55), + aggTypedName2, asMap( + "value", -2.44)) + )); + + List<Map<String, Object>> expected = asList( + asMap( + targetField, "ID1", + aggName, 42.33, + aggName2, 9.9 + ), + asMap( + targetField, "ID2", + aggName, 28.99, + aggName2, 222.33 + ), + asMap( + targetField, "ID3", + aggName, 12.55, + aggName2, -2.44 + ) + ); + executeTest(sources, aggregationBuilders, input, expected); + } + + private void executeTest(List<CompositeValuesSourceBuilder<?>> sources, Collection<AggregationBuilder> aggregationBuilders, + Map<String, Object> input, List<Map<String, Object>> expected) throws IOException { + XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values())); + builder.map(input); + + try (XContentParser parser = createParser(builder)) { + CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature"); + List<Map<String, Object>> result = AggregationResultUtils.extractCompositeAggregationResults(agg, sources, aggregationBuilders) + .collect(Collectors.toList()); + + assertEquals(expected, result); + } + } + + static Map<String, Object> asMap(Object... fields) { + assert fields.length % 2 == 0; + final Map<String, Object> map = new HashMap<>(); + for (int i = 0; i < fields.length; i += 2) { + String field = (String) fields[i]; + map.put(field, fields[i + 1]); + } + return map; + } +}