Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.elasticsearch.search.aggregations.AggregationExecutionException;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
Expand All @@ -39,37 +37,60 @@
*/
public enum ValuesSourceRegistry {
INSTANCE {
Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry = new HashMap<>();
Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry = Map.of();
// We use a List of Entries here to approximate an ordered map
Map<String, List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>>> resolverRegistry
= new HashMap<>();
= Map.of();

/**
* Threading behavior notes: This call is both synchronized and expensive. It copies the entire existing mapping structure each
* time it is invoked. We expect that register will be called a small number of times during startup only (as plugins are being
* registered) and we can tolerate the cost at that time. Once all plugins are registered, we should never need to call register
* again. Comparatively, we expect to do many reads from the registry data structures, and those reads may be interleaved on
* different worker threads. Thus we want to optimize the read case to be thread safe and fast, which the immutable
* collections do well. Using immutable collections requires a copy on write mechanic, thus the somewhat non-intuitive
* implementation of this method.
*
* @param aggregationName The name of the family of aggregations, typically found via ValuesSourceAggregationBuilder.getType()
* @param valuesSourceType The ValuesSourceType this mapping applies to.
* @param aggregatorSupplier An Aggregation-specific specialization of AggregatorSupplier which will construct the mapped aggregator
* from the aggregation standard set of parameters
* @param resolveValuesSourceType A predicate operating on MappedFieldType and IndexFieldData instances which decides if the mapped
*/
@Override
public void register(String aggregationName, ValuesSourceType valuesSourceType,AggregatorSupplier aggregatorSupplier,
public synchronized void register(String aggregationName, ValuesSourceType valuesSourceType, AggregatorSupplier aggregatorSupplier,
BiFunction<MappedFieldType, IndexFieldData, Boolean> resolveValuesSourceType) {
if (resolverRegistry.containsKey(aggregationName) == false) {
resolverRegistry.put(aggregationName, new ArrayList<>());
// Aggregator registry block - do this first in case we need to throw on duplicate registration
Map<ValuesSourceType, AggregatorSupplier> innerMap;
if (aggregatorRegistry.containsKey(aggregationName)) {
if (aggregatorRegistry.get(aggregationName).containsKey(valuesSourceType)) {
throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", "
+ valuesSourceType.toString() + "]");
}
innerMap = copyAndAdd(aggregatorRegistry.get(aggregationName),
new AbstractMap.SimpleEntry<>(valuesSourceType, aggregatorSupplier));
} else {
innerMap = Map.of(valuesSourceType, aggregatorSupplier);
}
List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>> resolverList
= resolverRegistry.get(aggregationName);
resolverList.add(new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType));
aggregatorRegistry = copyAndAdd(aggregatorRegistry, new AbstractMap.SimpleEntry<>(aggregationName, innerMap));

if (aggregatorRegistry.containsKey(aggregationName) == false) {
aggregatorRegistry.put(aggregationName, new HashMap<>());
}
Map<ValuesSourceType, AggregatorSupplier> innerMap = aggregatorRegistry.get(aggregationName);
if (innerMap.containsKey(valuesSourceType)) {
throw new IllegalStateException("Attempted to register already registered pair [" + aggregationName + ", "
+ valuesSourceType.toString() + "]");
// Resolver registry block
AbstractMap.SimpleEntry[] mappings;
if (resolverRegistry.containsKey(aggregationName)) {
List currentMappings = resolverRegistry.get(aggregationName);
mappings = (AbstractMap.SimpleEntry[]) currentMappings.toArray(new AbstractMap.SimpleEntry[currentMappings.size() + 1]);
} else {
mappings = new AbstractMap.SimpleEntry[1];
}
innerMap.put(valuesSourceType, aggregatorSupplier);
mappings[mappings.length - 1] = new AbstractMap.SimpleEntry<>(resolveValuesSourceType, valuesSourceType);
resolverRegistry = copyAndAdd(resolverRegistry,new AbstractMap.SimpleEntry<>(aggregationName, List.of(mappings)));
}

@Override
public AggregatorSupplier getAggregator(ValuesSourceType valuesSourceType, String aggregationName) {
if (aggregatorRegistry.containsKey(aggregationName)) {
if (aggregationName != null && aggregatorRegistry.containsKey(aggregationName)) {
Map<ValuesSourceType, AggregatorSupplier> innerMap = aggregatorRegistry.get(aggregationName);
if (innerMap.containsKey(valuesSourceType)) {
if (valuesSourceType != null && innerMap.containsKey(valuesSourceType)) {
return innerMap.get(valuesSourceType);
}
}
Expand All @@ -81,7 +102,7 @@ public AggregatorSupplier getAggregator(ValuesSourceType valuesSourceType, Strin
@Override
public ValuesSourceType getValuesSourceType(MappedFieldType fieldType, IndexFieldData indexFieldData, String aggregationName,
ValueType valueType) {
if (resolverRegistry.containsKey(aggregationName)) {
if (aggregationName != null && resolverRegistry.containsKey(aggregationName)) {
List<AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType>> resolverList
= resolverRegistry.get(aggregationName);
for (AbstractMap.SimpleEntry<BiFunction<MappedFieldType, IndexFieldData, Boolean>, ValuesSourceType> entry : resolverList) {
Expand Down Expand Up @@ -133,4 +154,26 @@ public abstract ValuesSourceType getValuesSourceType(MappedFieldType fieldType,
ValueType valueType);

public static ValuesSourceRegistry getInstance() {return INSTANCE;}

private static <K, V> Map copyAndAdd(Map<K, V> source, Map.Entry<K, V> newValue) {
Map.Entry[] entries;
if (source.containsKey(newValue.getKey())) {
// Replace with new value
entries = new Map.Entry[source.size()];
int i = 0;
for (Map.Entry entry : source.entrySet()) {
if (entry.getKey() == newValue.getKey()) {
entries[i] = newValue;
} else {
entries[i] = entry;
}
i++;
}
} else {
entries = source.entrySet().toArray(new Map.Entry[source.size() + 1]);
entries[entries.length - 1] = newValue;
}
return Map.ofEntries(entries);
}

}