-
Notifications
You must be signed in to change notification settings - Fork 25.6k
[Transform] avoid mapping problems with index templates #51368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.transform.integration; | ||
|
|
||
| import org.elasticsearch.client.Request; | ||
| import org.elasticsearch.common.xcontent.support.XContentMapValues; | ||
| import org.junit.Before; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
|
|
||
| import static org.hamcrest.Matchers.equalTo; | ||
|
|
||
| public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase { | ||
| private static boolean indicesCreated = false; | ||
|
|
||
| // preserve indices in order to reuse source indices in several test cases | ||
| @Override | ||
| protected boolean preserveIndicesUponCompletion() { | ||
| return true; | ||
| } | ||
|
|
||
| @Before | ||
| public void createIndexes() throws IOException { | ||
|
|
||
| // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack | ||
| if (indicesCreated) { | ||
| return; | ||
| } | ||
|
|
||
| createReviewsIndex(); | ||
| indicesCreated = true; | ||
| } | ||
|
|
||
| public void testIndexTemplateMappingClash() throws Exception { | ||
| String transformId = "special_pivot_template_mappings_clash"; | ||
| String transformIndex = "special_pivot_template_mappings_clash"; | ||
|
|
||
| // create a template that defines a field "rating" with a type "float" which will clash later with | ||
| // output field "rating.avg" in the pivot config | ||
| final Request createIndexTemplateRequest = new Request("PUT", "_template/special_pivot_template"); | ||
|
|
||
| String template = "{" | ||
| + "\"index_patterns\" : [\"special_pivot_template*\"]," | ||
| + " \"mappings\" : {" | ||
| + " \"properties\": {" | ||
| + " \"rating\":{" | ||
| + " \"type\": \"float\"\n" | ||
| + " }" | ||
| + " }" | ||
| + " }" | ||
| + "}"; | ||
|
|
||
| createIndexTemplateRequest.setJsonEntity(template); | ||
| Map<String, Object> createIndexTemplateResponse = entityAsMap(client().performRequest(createIndexTemplateRequest)); | ||
| assertThat(createIndexTemplateResponse.get("acknowledged"), equalTo(Boolean.TRUE)); | ||
|
|
||
| final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId); | ||
|
|
||
| String config = "{" | ||
| + " \"source\": {\"index\":\"" | ||
| + REVIEWS_INDEX_NAME | ||
| + "\"}," | ||
| + " \"dest\": {\"index\":\"" | ||
| + transformIndex | ||
| + "\"},"; | ||
|
|
||
| config += " \"pivot\": {" | ||
| + " \"group_by\": {" | ||
| + " \"reviewer\": {" | ||
| + " \"terms\": {" | ||
| + " \"field\": \"user_id\"" | ||
| + " } } }," | ||
| + " \"aggregations\": {" | ||
| + " \"rating.avg\": {" | ||
| + " \"avg\": {" | ||
| + " \"field\": \"stars\"" | ||
| + " } }" | ||
| + " } }" | ||
| + "}"; | ||
|
|
||
| createTransformRequest.setJsonEntity(config); | ||
| Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); | ||
| assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); | ||
|
|
||
| startAndWaitForTransform(transformId, transformIndex); | ||
| assertTrue(indexExists(transformIndex)); | ||
|
|
||
| // we expect 27 documents as there shall be 27 user_id's | ||
| Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats"); | ||
| assertEquals(27, XContentMapValues.extractValue("_all.total.docs.count", indexStats)); | ||
|
|
||
| // get and check some users | ||
| Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=reviewer:user_4"); | ||
|
|
||
| assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult)); | ||
| Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0); | ||
| assertEquals(3.878048780, actual.doubleValue(), 0.000001); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,8 +42,7 @@ public final class SchemaUtil { | |
| NUMERIC_FIELD_MAPPER_TYPES = types; | ||
| } | ||
|
|
||
| private SchemaUtil() { | ||
| } | ||
| private SchemaUtil() {} | ||
|
|
||
| public static boolean isNumericType(String type) { | ||
| return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type); | ||
|
|
@@ -59,27 +58,29 @@ public static boolean isNumericType(String type) { | |
| * @param source Source index that contains the data to pivot | ||
| * @param listener Listener to alert on success or failure. | ||
| */ | ||
| public static void deduceMappings(final Client client, | ||
| final PivotConfig config, | ||
| final String[] source, | ||
| final ActionListener<Map<String, String>> listener) { | ||
| public static void deduceMappings( | ||
| final Client client, | ||
| final PivotConfig config, | ||
| final String[] source, | ||
| final ActionListener<Map<String, String>> listener | ||
| ) { | ||
| // collects the fieldnames used as source for aggregations | ||
| Map<String, String> aggregationSourceFieldNames = new HashMap<>(); | ||
| // collects the aggregation types by source name | ||
| Map<String, String> aggregationTypes = new HashMap<>(); | ||
| // collects the fieldnames and target fieldnames used for grouping | ||
| Map<String, String> fieldNamesForGrouping = new HashMap<>(); | ||
|
|
||
| config.getGroupConfig().getGroups().forEach((destinationFieldName, group) -> { | ||
| fieldNamesForGrouping.put(destinationFieldName, group.getField()); | ||
| }); | ||
| config.getGroupConfig() | ||
| .getGroups() | ||
| .forEach((destinationFieldName, group) -> { fieldNamesForGrouping.put(destinationFieldName, group.getField()); }); | ||
|
|
||
| for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { | ||
| if (agg instanceof ValuesSourceAggregationBuilder) { | ||
| ValuesSourceAggregationBuilder<?, ?> valueSourceAggregation = (ValuesSourceAggregationBuilder<?, ?>) agg; | ||
| aggregationSourceFieldNames.put(valueSourceAggregation.getName(), valueSourceAggregation.field()); | ||
| aggregationTypes.put(valueSourceAggregation.getName(), valueSourceAggregation.getType()); | ||
| } else if(agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) { | ||
| } else if (agg instanceof ScriptedMetricAggregationBuilder || agg instanceof MultiValuesSourceAggregationBuilder) { | ||
| aggregationTypes.put(agg.getName(), agg.getType()); | ||
| } else { | ||
| // execution should not reach this point | ||
|
|
@@ -98,13 +99,17 @@ public static void deduceMappings(final Client client, | |
| allFieldNames.putAll(aggregationSourceFieldNames); | ||
| allFieldNames.putAll(fieldNamesForGrouping); | ||
|
|
||
| getSourceFieldMappings(client, source, allFieldNames.values().toArray(new String[0]), | ||
| getSourceFieldMappings( | ||
| client, | ||
| source, | ||
| allFieldNames.values().toArray(new String[0]), | ||
| ActionListener.wrap( | ||
| sourceMappings -> listener.onResponse(resolveMappings(aggregationSourceFieldNames, | ||
| aggregationTypes, | ||
| fieldNamesForGrouping, | ||
| sourceMappings)), | ||
| listener::onFailure)); | ||
| sourceMappings -> listener.onResponse( | ||
| resolveMappings(aggregationSourceFieldNames, aggregationTypes, fieldNamesForGrouping, sourceMappings) | ||
| ), | ||
| listener::onFailure | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -115,36 +120,37 @@ public static void deduceMappings(final Client client, | |
| * @param index The index, or index pattern, from which to gather all the field mappings | ||
| * @param listener The listener to be alerted on success or failure. | ||
| */ | ||
| public static void getDestinationFieldMappings(final Client client, | ||
| final String index, | ||
| final ActionListener<Map<String, String>> listener) { | ||
| FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest() | ||
| .indices(index) | ||
| public static void getDestinationFieldMappings( | ||
| final Client client, | ||
| final String index, | ||
| final ActionListener<Map<String, String>> listener | ||
| ) { | ||
| FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index) | ||
| .fields("*") | ||
| .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); | ||
| ClientHelper.executeAsyncWithOrigin(client, | ||
| ClientHelper.executeAsyncWithOrigin( | ||
| client, | ||
| ClientHelper.TRANSFORM_ORIGIN, | ||
| FieldCapabilitiesAction.INSTANCE, | ||
| fieldCapabilitiesRequest, | ||
| ActionListener.wrap( | ||
| r -> listener.onResponse(extractFieldMappings(r)), | ||
| listener::onFailure | ||
| )); | ||
| ActionListener.wrap(r -> listener.onResponse(extractFieldMappings(r)), listener::onFailure) | ||
| ); | ||
| } | ||
|
|
||
| private static Map<String, String> resolveMappings(Map<String, String> aggregationSourceFieldNames, | ||
| Map<String, String> aggregationTypes, | ||
| Map<String, String> fieldNamesForGrouping, | ||
| Map<String, String> sourceMappings) { | ||
| private static Map<String, String> resolveMappings( | ||
| Map<String, String> aggregationSourceFieldNames, | ||
| Map<String, String> aggregationTypes, | ||
| Map<String, String> fieldNamesForGrouping, | ||
| Map<String, String> sourceMappings | ||
| ) { | ||
| Map<String, String> targetMapping = new HashMap<>(); | ||
|
|
||
| aggregationTypes.forEach((targetFieldName, aggregationName) -> { | ||
| String sourceFieldName = aggregationSourceFieldNames.get(targetFieldName); | ||
| String sourceMapping = sourceFieldName == null ? null : sourceMappings.get(sourceFieldName); | ||
| String destinationMapping = Aggregations.resolveTargetMapping(aggregationName, sourceMapping); | ||
|
|
||
| logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", | ||
| targetFieldName, aggregationName, destinationMapping); | ||
| logger.debug("Deduced mapping for: [{}], agg type [{}] to [{}]", targetFieldName, aggregationName, destinationMapping); | ||
|
|
||
| if (Aggregations.isDynamicMapping(destinationMapping)) { | ||
| logger.debug("Dynamic target mapping set for field [{}] and aggregation [{}]", targetFieldName, aggregationName); | ||
|
|
@@ -165,34 +171,75 @@ private static Map<String, String> resolveMappings(Map<String, String> aggregati | |
| targetMapping.put(targetFieldName, "keyword"); | ||
| } | ||
| }); | ||
|
|
||
| // insert object mappings for nested fields | ||
| insertNestedObjectMappings(targetMapping); | ||
|
|
||
| return targetMapping; | ||
| } | ||
|
|
||
| /* | ||
| * Very "magic" helper method to extract the source mappings | ||
| */ | ||
| private static void getSourceFieldMappings(Client client, String[] index, String[] fields, | ||
| ActionListener<Map<String, String>> listener) { | ||
| FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest() | ||
| .indices(index) | ||
| private static void getSourceFieldMappings( | ||
| Client client, | ||
| String[] index, | ||
| String[] fields, | ||
| ActionListener<Map<String, String>> listener | ||
| ) { | ||
| FieldCapabilitiesRequest fieldCapabilitiesRequest = new FieldCapabilitiesRequest().indices(index) | ||
| .fields(fields) | ||
| .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); | ||
| client.execute(FieldCapabilitiesAction.INSTANCE, fieldCapabilitiesRequest, ActionListener.wrap( | ||
| response -> listener.onResponse(extractFieldMappings(response)), | ||
| listener::onFailure)); | ||
| client.execute( | ||
| FieldCapabilitiesAction.INSTANCE, | ||
| fieldCapabilitiesRequest, | ||
| ActionListener.wrap(response -> listener.onResponse(extractFieldMappings(response)), listener::onFailure) | ||
| ); | ||
| } | ||
|
|
||
| private static Map<String, String> extractFieldMappings(FieldCapabilitiesResponse response) { | ||
| Map<String, String> extractedTypes = new HashMap<>(); | ||
|
|
||
| response.get().forEach((fieldName, capabilitiesMap) -> { | ||
| // TODO: overwrites types, requires resolve if | ||
| // types are mixed | ||
| capabilitiesMap.forEach((name, capability) -> { | ||
| logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType()); | ||
| extractedTypes.put(fieldName, capability.getType()); | ||
| }); | ||
| }); | ||
| response.get() | ||
| .forEach( | ||
| (fieldName, capabilitiesMap) -> { | ||
| // TODO: overwrites types, requires resolve if | ||
| // types are mixed | ||
| capabilitiesMap.forEach((name, capability) -> { | ||
| logger.trace("Extracted type for [{}] : [{}]", fieldName, capability.getType()); | ||
| extractedTypes.put(fieldName, capability.getType()); | ||
| }); | ||
| } | ||
| ); | ||
| return extractedTypes; | ||
| } | ||
|
|
||
| /** | ||
| * Insert object mappings for fields like: | ||
| * | ||
| * a.b.c : some_type | ||
| * | ||
| * in which case it creates additional mappings: | ||
| * | ||
| * a.b : object | ||
| * a : object | ||
| * | ||
| * avoids snafu with index templates injecting incompatible mappings | ||
| * | ||
| * @param fieldMappings field mappings to inject to | ||
| */ | ||
| static void insertNestedObjectMappings(Map<String, String> fieldMappings) { | ||
| Map<String, String> additionalMappings = new HashMap<>(); | ||
| fieldMappings.keySet().stream().filter(key -> key.contains(".")).forEach(key -> { | ||
| int pos; | ||
| String objectKey = key; | ||
| // lastIndexOf returns -1 on mismatch, but to disallow empty strings check for > 0 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An empty string would also return -1. Do you mean disallow strings containing a single '.' that is at the beginning? Wouldn't that be an error case anyway? I ran a quick check in jshell:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, when I worked on this PR, I discovered that you could create fields with '.' at start or end, which did not work properly (e.g. bulk indexing errors). I fixed it in #51369. So, you are right: this code is "defense in depth". |
||
| while ((pos = objectKey.lastIndexOf(".")) > 0) { | ||
| objectKey = objectKey.substring(0, pos); | ||
| additionalMappings.putIfAbsent(objectKey, "object"); | ||
| } | ||
| }); | ||
|
|
||
| additionalMappings.forEach(fieldMappings::putIfAbsent); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License; | ||
| * you may not use this file except in compliance with the Elastic License. | ||
| */ | ||
|
|
||
| package org.elasticsearch.xpack.transform.transforms.pivot; | ||
|
|
||
| import org.elasticsearch.test.ESTestCase; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
|
|
||
| public class SchemaUtilTests extends ESTestCase { | ||
|
|
||
| public void testInsertNestedObjectMappings() { | ||
| Map<String, String> fieldMappings = new HashMap<>() { | ||
| { | ||
| // creates: a.b, a | ||
| put("a.b.c", "long"); | ||
| put("a.b.d", "double"); | ||
| // creates: c.b, c | ||
| put("c.b.a", "double"); | ||
| // creates: c.d | ||
| put("c.d.e", "object"); | ||
| put("d", "long"); | ||
| put("e.f.g", "long"); | ||
| // cc: already there | ||
| put("e.f", "object"); | ||
| // cc: already there but different type (should not be possible) | ||
| put("e", "long"); | ||
| // cc: start with . (should not be possible) | ||
| put(".x", "long"); | ||
| // cc: start and ends with . (should not be possible), creates: .y | ||
| put(".y.", "long"); | ||
| // cc: ends with . (should not be possible), creates: .z | ||
| put(".z.", "long"); | ||
| } | ||
| }; | ||
|
|
||
| SchemaUtil.insertNestedObjectMappings(fieldMappings); | ||
|
|
||
| assertEquals(18, fieldMappings.size()); | ||
| assertEquals("long", fieldMappings.get("a.b.c")); | ||
| assertEquals("object", fieldMappings.get("a.b")); | ||
| assertEquals("double", fieldMappings.get("a.b.d")); | ||
| assertEquals("object", fieldMappings.get("a")); | ||
| assertEquals("object", fieldMappings.get("c.d")); | ||
| assertEquals("object", fieldMappings.get("e.f")); | ||
| assertEquals("long", fieldMappings.get("e")); | ||
| assertEquals("object", fieldMappings.get(".y")); | ||
| assertEquals("object", fieldMappings.get(".z")); | ||
| assertFalse(fieldMappings.containsKey(".")); | ||
| assertFalse(fieldMappings.containsKey("")); | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
essential change in this file, called on line 176
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 thanks for differentiating the code changes from the formatting changes