Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -557,15 +557,15 @@ public void testContinuousPivotHistogram() throws Exception {

Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));

final StringBuilder bulk = new StringBuilder();

Expand Down Expand Up @@ -609,15 +609,15 @@ public void testContinuousPivotHistogram() throws Exception {

searchResult = getAsMap(transformIndex + "/_search?q=every_2:0.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(19));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:2.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(30));

searchResult = getAsMap(transformIndex + "/_search?q=every_2:4.0");
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27.0));
assertThat(((List<?>) XContentMapValues.extractValue("hits.hits._source.user_dc", searchResult)).get(0), equalTo(27));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public void testIteration(int iteration) throws IOException {
);
assertThat(
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
XContentMapValues.extractValue("count", source),
equalTo(Double.valueOf(bucket.getDocCount()))
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
equalTo(bucket.getDocCount())
);

// transform should only rewrite documents that require it
Expand All @@ -146,9 +146,11 @@ public void testIteration(int iteration) throws IOException {
// we use a fixed_interval of `1s`, the transform runs every `1s` so it the bucket might be recalculated at the next run
// but
// should NOT be recalculated for the 2nd/3rd/... run
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
.extractValue(MAX_RUN_FIELD, source),
is(lessThanOrEqualTo(1.0))
(Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source) - (Integer) XContentMapValues.extractValue(
MAX_RUN_FIELD,
source
),
is(lessThanOrEqualTo(1))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ private void assertResultsGroupByDateHistogram(int iteration, SearchResponse res
);
assertThat(
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
XContentMapValues.extractValue("count", source),
equalTo(Double.valueOf(bucket.getDocCount()))
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
equalTo(bucket.getDocCount())
);

// transform should only rewrite documents that require it
Expand All @@ -152,9 +152,11 @@ private void assertResultsGroupByDateHistogram(int iteration, SearchResponse res
+ iteration,
// we use a fixed_interval of `1s`, the transform runs every `1s`, a bucket might be recalculated at the next run
// but should NOT be recalculated for the 2nd/3rd/... run
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)) - (Double) XContentMapValues
.extractValue(MAX_RUN_FIELD, source),
is(lessThanOrEqualTo(1.0))
(Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source) - (Integer) XContentMapValues.extractValue(
MAX_RUN_FIELD,
source
),
is(lessThanOrEqualTo(1))
);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public TransformConfig createConfig() {

AggregatorFactories.Builder aggregations = new AggregatorFactories.Builder();
addCommonAggregations(aggregations);
aggregations.addAggregator(AggregationBuilders.max("metric.avg").field("metric"));
aggregations.addAggregator(AggregationBuilders.avg("metric.avg").field("metric"));

pivotConfigBuilder.setAggregations(aggregations);
transformConfigBuilder.setPivotConfig(pivotConfigBuilder.build());
Expand All @@ -83,7 +83,7 @@ public void testIteration(int iteration) throws IOException {
// missing_bucket produces `null`, we can't use `null` in aggs, so we have to use a magic value, see gh#60043
terms.missing(MISSING_BUCKET_KEY);
}
terms.subAggregation(AggregationBuilders.max("metric.avg").field("metric"));
terms.subAggregation(AggregationBuilders.avg("metric.avg").field("metric"));
sourceBuilderSource.aggregation(terms);
searchRequestSource.source(sourceBuilderSource);
SearchResponse responseSource = search(searchRequestSource);
Expand Down Expand Up @@ -129,8 +129,8 @@ public void testIteration(int iteration) throws IOException {
);
assertThat(
"Doc count did not match, source: " + source + ", expected: " + bucket.getDocCount() + ", iteration: " + iteration,
XContentMapValues.extractValue("count", source),
equalTo(Double.valueOf(bucket.getDocCount()))
((Integer) XContentMapValues.extractValue("count", source)).longValue(),
equalTo(bucket.getDocCount())
);

SingleValue avgAgg = (SingleValue) bucket.getAggregations().get("metric.avg");
Expand All @@ -154,8 +154,7 @@ public void testIteration(int iteration) throws IOException {
+ iteration
+ " full source: "
+ source,
// TODO: aggs return double for MAX_RUN_FIELD, although it is an integer
Double.valueOf((Integer) XContentMapValues.extractValue(INGEST_RUN_FIELD, source)),
XContentMapValues.extractValue(INGEST_RUN_FIELD, source),
equalTo(XContentMapValues.extractValue(MAX_RUN_FIELD, source))
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ private void putIndex(String indexName, String dateType, boolean isDataStream) t
.field("type", "keyword")
.endObject()
.startObject("metric")
.field("type", "integer")
.field("type", randomFrom("integer", "long", "unsigned_long"))
.endObject()
.startObject("location")
.field("type", "geo_point")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.convertToIntegerTypeIfNeeded;
import static org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil.isNumericType;

public final class AggregationResultUtils {
Expand Down Expand Up @@ -198,7 +199,7 @@ public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lo
// If the type is numeric or if the formatted string is the same as simply making the value a string,
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
if (isNumericType(fieldType) || aggregation.getValueAsString().equals(String.valueOf(aggregation.value()))) {
return aggregation.value();
return convertToIntegerTypeIfNeeded(fieldType, aggregation.value());
} else {
return aggregation.getValueAsString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,59 @@
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;

import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public final class SchemaUtil {
private static final Logger logger = LogManager.getLogger(SchemaUtil.class);

// Full collection of numeric field type strings
private static final Set<String> NUMERIC_FIELD_MAPPER_TYPES;
// Full collection of numeric field type strings and whether they are floating point or not
private static final Map<String, Boolean> NUMERIC_FIELD_MAPPER_TYPES;
static {
Set<String> types = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toSet());
types.add("scaled_float"); // have to add manually since scaled_float is in a module
Map<String, Boolean> types = Stream.of(NumberFieldMapper.NumberType.values())
.collect(Collectors.toMap(t -> t.typeName(), t -> t.numericType().isFloatingPoint()));

// have to add manually since they are in a module
types.put("scaled_float", true);
types.put("unsigned_long", false);
NUMERIC_FIELD_MAPPER_TYPES = types;
}

private SchemaUtil() {}

public static boolean isNumericType(String type) {
return type != null && NUMERIC_FIELD_MAPPER_TYPES.contains(type);
return type != null && NUMERIC_FIELD_MAPPER_TYPES.containsKey(type);
}

/**
* Convert a numeric value to an integer if it's not a floating point number.
*
* Implementation decision: We do not care about the concrete type, but only if its floating point or not.
* Further checks (e.g. range) are done at indexing.
*
* If type is floating point and value could be an integer (ends with `.0`), we still preserve `.0` in case
* the destination index uses dynamic mappings as well as being json friendly.
*
* @param type the type of the value according to the schema we know
* @param value the value as double (aggs return double for everything)
* @return value if its floating point, a integer
*/
public static Object convertToIntegerTypeIfNeeded(String type, double value) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static Object convertToIntegerTypeIfNeeded(String type, double value) {
public static Object convertToDiscreteNumeralTypeIfNeeded(String type, double value) {

Or some better name as this is is not an integer but instead some possibly large discrete numeral (big int or a long).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its referring to the mathematical definition of integer, that's why integer type, not integer. I can try to find a better name, maybe dropFloatingPointComponentIfNeeded, but lets not over-complicate it, the doc string defines what it does.

if (NUMERIC_FIELD_MAPPER_TYPES.getOrDefault(type, true) == false) {
assert value % 1 == 0;
if (value < Long.MAX_VALUE) {
return (long) value;
}

// special case for unsigned long
return BigDecimal.valueOf(value).toBigInteger();
Comment on lines +68 to +75
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its weird to me that there is not a blanket map doc value -> indexable value :(

This works in a pinch, I wonder if there is anyway to use DocValueFormat.UNSIGNED_LONG_SHIFTED.

But looking at it, there isn't a clean cut solution :/

}

return value;
}

/**
Expand Down Expand Up @@ -188,8 +217,11 @@ private static Map<String, String> resolveMappings(
} else if (destinationMapping != null) {
targetMapping.put(targetFieldName, destinationMapping);
} else {
logger.warn("Failed to deduce mapping for [{}], fall back to dynamic mapping. " +
"Create the destination index with complete mappings first to avoid deducing the mappings", targetFieldName);
logger.warn(
"Failed to deduce mapping for [{}], fall back to dynamic mapping. "
+ "Create the destination index with complete mappings first to avoid deducing the mappings",
targetFieldName
);
}
});

Expand All @@ -199,8 +231,11 @@ private static Map<String, String> resolveMappings(
if (destinationMapping != null) {
targetMapping.put(targetFieldName, destinationMapping);
} else {
logger.warn("Failed to deduce mapping for [{}], fall back to keyword. " +
"Create the destination index with complete mappings first to avoid deducing the mappings", targetFieldName);
logger.warn(
"Failed to deduce mapping for [{}], fall back to keyword. "
+ "Create the destination index with complete mappings first to avoid deducing the mappings",
targetFieldName
);
targetMapping.put(targetFieldName, KeywordFieldMapper.CONTENT_TYPE);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,12 @@ public void testSingleValueAggExtractor() {
AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "string"), ""),
equalTo("one_hundred")
);

agg = createSingleMetricAgg("metric", 100.0, "one_hundred");
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, Collections.singletonMap("metric", "unsigned_long"), ""),
equalTo(100L)
);
}

private ScriptedMetric createScriptedMetric(Object returnValue) {
Expand Down Expand Up @@ -836,7 +842,7 @@ public void testSingleBucketAggExtractor() {
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba2.sub1", "long", "sba2.sub2", "float"), ""),
equalTo(asMap("sub1", 100.0, "sub2", 33.33))
equalTo(asMap("sub1", 100L, "sub2", 33.33))
);

agg = createSingleBucketAgg(
Expand All @@ -848,7 +854,7 @@ public void testSingleBucketAggExtractor() {
);
assertThat(
AggregationResultUtils.getExtractor(agg).value(agg, asStringMap("sba3.sub1", "long", "sba3.sub2", "double"), ""),
equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", 42L))
equalTo(asMap("sub1", 100L, "sub2", 33.33, "sub3", 42L))
);

agg = createSingleBucketAgg(
Expand All @@ -861,7 +867,7 @@ public void testSingleBucketAggExtractor() {
assertThat(
AggregationResultUtils.getExtractor(agg)
.value(agg, asStringMap("sba4.sub3.subsub1", "double", "sba4.sub2", "float", "sba4.sub1", "long"), ""),
equalTo(asMap("sub1", 100.0, "sub2", 33.33, "sub3", asMap("subsub1", 11.1)))
equalTo(asMap("sub1", 100L, "sub2", 33.33, "sub3", asMap("subsub1", 11.1)))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,11 @@ public void testBasic() throws InterruptedException {
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
long numGroupsWithoutScripts = groupConfig.getGroups().values().stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null).count();
long numGroupsWithoutScripts = groupConfig.getGroups()
.values()
.stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null)
.count();

this.<Map<String, String>>assertAsync(
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
Expand Down Expand Up @@ -191,8 +194,11 @@ public void testNested() throws InterruptedException {
AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
GroupConfig groupConfig = GroupConfigTests.randomGroupConfig();
PivotConfig pivotConfig = new PivotConfig(groupConfig, aggregationConfig, null);
long numGroupsWithoutScripts = groupConfig.getGroups().values().stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null).count();
long numGroupsWithoutScripts = groupConfig.getGroups()
.values()
.stream()
.filter(singleGroupSource -> singleGroupSource.getScriptConfig() == null)
.count();

this.<Map<String, String>>assertAsync(
listener -> SchemaUtil.deduceMappings(client, pivotConfig, new String[] { "source-index" }, listener),
Expand All @@ -219,7 +225,7 @@ public void testNested() throws InterruptedException {
23144,
AggregationResultUtilsTests.createSingleMetricAgg("max_drinks_2", 45.0, "forty_five")
);
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45.0)));
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, mappings, ""), equalTo(asMap("max_drinks_2", 45L)));

agg = AggregationResultUtilsTests.createSingleBucketAgg(
"filter_3",
Expand Down