diff --git a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java index b48f1cbcc3f1c..b34e597767b4c 100644 --- a/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/SearchPlugin.java @@ -41,6 +41,7 @@ import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.subphase.highlight.Highlighter; import org.elasticsearch.search.rescore.Rescorer; @@ -54,6 +55,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.function.BiFunction; +import java.util.function.Consumer; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -250,6 +252,7 @@ public QuerySpec(String name, Writeable.Reader reader, QueryParser parser) */ class AggregationSpec extends SearchExtensionSpec { private final Map> resultReaders = new TreeMap<>(); + private Consumer aggregatorRegistrar; /** * Specification for an {@link Aggregation}. @@ -300,6 +303,23 @@ public AggregationSpec addResultReader(String writeableName, Writeable.Reader> getResultReaders() { return resultReaders; } + + /** + * Get the function to register the {@link org.elasticsearch.search.aggregations.support.ValuesSource} to aggregator mappings for + * this aggregation + */ + public Consumer getAggregatorRegistrar() { + return aggregatorRegistrar; + } + + /** + * Set the function to register the {@link org.elasticsearch.search.aggregations.support.ValuesSource} to aggregator mappings for + * this aggregation + */ + public AggregationSpec setAggregatorRegistrar(Consumer aggregatorRegistrar) { + this.aggregatorRegistrar = aggregatorRegistrar; + return this; + } } /** diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java index 089132a6ef744..c34f2de984938 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java @@ -228,6 +228,7 @@ import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.SumBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.FetchSubPhase; import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase; @@ -400,7 +401,8 @@ private void registerAggregations(List plugins) { registerAggregation(new AggregationSpec(IpRangeAggregationBuilder.NAME, IpRangeAggregationBuilder::new, IpRangeAggregationBuilder::parse).addResultReader(InternalBinaryRange::new)); registerAggregation(new AggregationSpec(HistogramAggregationBuilder.NAME, HistogramAggregationBuilder::new, - HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new)); + HistogramAggregationBuilder::parse).addResultReader(InternalHistogram::new) + .setAggregatorRegistrar(HistogramAggregationBuilder::registerAggregators)); registerAggregation(new AggregationSpec(DateHistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder::new, DateHistogramAggregationBuilder::parse).addResultReader(InternalDateHistogram::new)); registerAggregation(new AggregationSpec(AutoDateHistogramAggregationBuilder.NAME, AutoDateHistogramAggregationBuilder::new, @@ -440,6 +442,10 @@ private void registerAggregation(AggregationSpec spec) { Writeable.Reader internalReader = t.getValue(); namedWriteables.add(new NamedWriteableRegistry.Entry(InternalAggregation.class, writeableName, internalReader)); } + Consumer register = spec.getAggregatorRegistrar(); + if (register != null) { + register.accept(ValuesSourceRegistry.getInstance()); + } } private void registerPipelineAggregations(List plugins) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregationBuilder.java index cdbfe925bc34e..a6ded979a9212 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregationBuilder.java @@ -40,12 +40,14 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper; +import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry; import org.elasticsearch.search.aggregations.support.ValuesSourceType; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; /** * A builder for histograms on numeric fields. This builder can operate on either base numeric fields, or numeric range fields. IP range @@ -88,6 +90,13 @@ public static HistogramAggregationBuilder parse(String aggregationName, XContent return PARSER.parse(parser, new HistogramAggregationBuilder(aggregationName), null); } + private static AtomicBoolean wasRegistered = new AtomicBoolean(false); + public static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) { + if (wasRegistered.compareAndSet(false, true) == true) { + HistogramAggregatorFactory.registerAggregators(valuesSourceRegistry); + } + } + private double interval; private double offset = 0; private double minBound = Double.POSITIVE_INFINITY; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java index 46ea27c5e938c..2171dc0c8ae0b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/HistogramAggregatorFactory.java @@ -55,8 +55,8 @@ public final class HistogramAggregatorFactory extends ValuesSourceAggregatorFact private final double minBound, maxBound; // TODO: Registration should happen on the actual aggregator classes, but I don't want to set up the whole dynamic loading thing yet - static { - ValuesSourceRegistry.getInstance().register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.RANGE, + static void registerAggregators(ValuesSourceRegistry valuesSourceRegistry) { + valuesSourceRegistry.register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.RANGE, new HistogramAggregatorSupplier() { @Override public Aggregator build(String name, AggregatorFactories factories, double interval, double offset, @@ -76,7 +76,7 @@ public Aggregator build(String name, AggregatorFactories factories, double inter (fieldType, indexFieldData) -> fieldType instanceof RangeFieldMapper.RangeFieldType ); - ValuesSourceRegistry.getInstance().register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC, + valuesSourceRegistry.register(HistogramAggregationBuilder.NAME, CoreValuesSourceType.NUMERIC, new HistogramAggregatorSupplier() { @Override public Aggregator build(String name, AggregatorFactories factories, double interval, double offset, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceRegistry.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceRegistry.java index 5b6599fd004ae..b24edafba7468 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceRegistry.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/ValuesSourceRegistry.java @@ -27,10 +27,9 @@ import org.elasticsearch.search.aggregations.AggregationExecutionException; import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.StringJoiner; import java.util.function.BiFunction; /* @@ -39,49 +38,76 @@ */ public enum ValuesSourceRegistry { INSTANCE { - Map> aggregatorRegistry = new HashMap<>(); + Map> aggregatorRegistry = Map.of(); // We use a List of Entries here to approximate an ordered map Map, ValuesSourceType>>> resolverRegistry - = new HashMap<>(); + = Map.of(); + /** + * Threading behavior notes: This call is both synchronized and expensive. It copies the entire existing mapping structure each + * time it is invoked. We expect that register will be called a small number of times during startup only (as plugins are being + * registered) and we can tolerate the cost at that time. Once all plugins are registered, we should never need to call register + * again. Comparatively, we expect to do many reads from the registry data structures, and those reads may be interleaved on + * different worker threads. Thus we want to optimize the read case to be thread safe and fast, which the immutable + * collections do well. Using immutable collections requires a copy on write mechanic, thus the somewhat non-intuitive + * implementation of this method. + * + * @param aggregationName The name of the family of aggregations, typically found via ValuesSourceAggregationBuilder.getType() + * @param valuesSourceType The ValuesSourceType this mapping applies to. + * @param aggregatorSupplier An Aggregation-specific specialization of AggregatorSupplier which will construct the mapped aggregator + * from the aggregation standard set of parameters + * @param resolveValuesSourceType A predicate operating on MappedFieldType and IndexFieldData instances which decides if the mapped + */ @Override - public void register(String aggregationName, ValuesSourceType valuesSourceType,AggregatorSupplier aggregatorSupplier, + public synchronized void register(String aggregationName, ValuesSourceType valuesSourceType, AggregatorSupplier aggregatorSupplier, BiFunction resolveValuesSourceType) { - if (resolverRegistry.containsKey(aggregationName) == false) { - resolverRegistry.put(aggregationName, new ArrayList<>()); + // Aggregator registry block - do this first in case we need to throw on duplicate registration + Map innerMap; + if (aggregatorRegistry.containsKey(aggregationName)) { + if (aggregatorRegistry.get(aggregationName).containsKey(valuesSourceType)) { + throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", " + + valuesSourceType.toString() + "]"); + } + innerMap = copyAndAdd(aggregatorRegistry.get(aggregationName), + new AbstractMap.SimpleEntry<>(valuesSourceType, aggregatorSupplier)); + } else { + innerMap = Map.of(valuesSourceType, aggregatorSupplier); } - List, ValuesSourceType>> resolverList - = resolverRegistry.get(aggregationName); - resolverList.add(new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType)); + aggregatorRegistry = copyAndAdd(aggregatorRegistry, new AbstractMap.SimpleEntry<>(aggregationName, innerMap)); - if (aggregatorRegistry.containsKey(aggregationName) == false) { - aggregatorRegistry.put(aggregationName, new HashMap<>()); - } - Map innerMap = aggregatorRegistry.get(aggregationName); - if (innerMap.containsKey(valuesSourceType)) { - throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", " - + valuesSourceType.toString() + "]"); + // Resolver registry block + AbstractMap.SimpleEntry[] mappings; + if (resolverRegistry.containsKey(aggregationName)) { + List currentMappings = resolverRegistry.get(aggregationName); + mappings = (AbstractMap.SimpleEntry[]) currentMappings.toArray(new AbstractMap.SimpleEntry[currentMappings.size() + 1]); + } else { + mappings = new AbstractMap.SimpleEntry[1]; } - innerMap.put(valuesSourceType, aggregatorSupplier); + mappings[mappings.length - 1] = new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType); + resolverRegistry = copyAndAdd(resolverRegistry,new AbstractMap.SimpleEntry<>(aggregationName, List.of(mappings))); } @Override public AggregatorSupplier getAggregator(ValuesSourceType valuesSourceType, String aggregationName) { - if (aggregatorRegistry.containsKey(aggregationName)) { + StringJoiner validSourceTypes = new StringJoiner(",", "[", "]"); + if (aggregationName != null && aggregatorRegistry.containsKey(aggregationName)) { Map innerMap = aggregatorRegistry.get(aggregationName); - if (innerMap.containsKey(valuesSourceType)) { + if (valuesSourceType != null && innerMap.containsKey(valuesSourceType)) { return innerMap.get(valuesSourceType); } + for (ValuesSourceType validVST : innerMap.keySet()) { + validSourceTypes.add(validVST.toString()); + } + throw new AggregationExecutionException("ValuesSource type " + valuesSourceType.toString() + + " is not supported for aggregation" + aggregationName + ". Valid choices are " + validSourceTypes.toString()); } - // TODO: Error message should list valid ValuesSource types - throw new AggregationExecutionException("ValuesSource type " + valuesSourceType.toString() + - " is not supported for aggregation" + aggregationName); + throw new AggregationExecutionException("Unregistered Aggregation [" + aggregationName + "]"); } @Override public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFieldData indexFieldData, String aggregationName, ValueType valueType) { - if (resolverRegistry.containsKey(aggregationName)) { + if (aggregationName != null && resolverRegistry.containsKey(aggregationName)) { List, ValuesSourceType>> resolverList = resolverRegistry.get(aggregationName); for (AbstractMap.SimpleEntry, ValuesSourceType> entry : resolverList) { @@ -90,8 +116,9 @@ public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFiel return entry.getValue(); } } - // TODO: Error message should list valid field types; not sure fieldType.toString() is the best choice. - throw new IllegalArgumentException("Field type " + fieldType.toString() + " is not supported for aggregation " + // TODO: Error message should list valid field types + String fieldDescription = fieldType.name() + "(" + fieldType.toString() + ")"; + throw new IllegalArgumentException("Field type " + fieldDescription + " is not supported for aggregation " + aggregationName); } else { // TODO: Legacy resolve logic; remove this after converting all aggregations to the new system @@ -133,4 +160,26 @@ public abstract ValuesSourceType getValuesSourceType(MappedFieldType fieldType, ValueType valueType); public static ValuesSourceRegistry getInstance() {return INSTANCE;} + + private static Map copyAndAdd(Map source, Map.Entry newValue) { + Map.Entry[] entries; + if (source.containsKey(newValue.getKey())) { + // Replace with new value + entries = new Map.Entry[source.size()]; + int i = 0; + for (Map.Entry entry : source.entrySet()) { + if (entry.getKey() == newValue.getKey()) { + entries[i] = newValue; + } else { + entries[i] = entry; + } + i++; + } + } else { + entries = source.entrySet().toArray(new Map.Entry[source.size() + 1]); + entries[entries.length - 1] = newValue; + } + return Map.ofEntries(entries); + } + } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 7c02fb2481f4c..2a214aee3b80d 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -65,6 +65,7 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.fetch.FetchPhase; @@ -76,6 +77,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.junit.After; +import org.junit.BeforeClass; import java.io.IOException; import java.util.ArrayList; @@ -123,8 +125,11 @@ private static void registerFieldTypes(SearchContext searchContext, MapperServic when(mapperService.fullName(fieldName)).thenReturn(fieldType); when(searchContext.smartNameFieldType(fieldName)).thenReturn(fieldType); } + } - + @BeforeClass + public static void initValuesSourceRegistry() { + new SearchModule(Settings.EMPTY, List.of()); } protected A createAggregator(AggregationBuilder aggregationBuilder,