Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/143262.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 143262
summary: Skip time series field type merge for non-TS agg queries
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedExternalRelation;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.promql.PromqlCommand;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -33,9 +35,10 @@ public record PreAnalysis(
List<IndexPattern> lookupIndices,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported,
boolean hasTimeSeriesAggregation,
List<String> icebergPaths
) {
public static final PreAnalysis EMPTY = new PreAnalysis(Map.of(), List.of(), List.of(), false, false, List.of());
public static final PreAnalysis EMPTY = new PreAnalysis(Map.of(), List.of(), List.of(), false, false, false, List.of());
}

public PreAnalysis preAnalyze(LogicalPlan plan) {
Expand Down Expand Up @@ -113,6 +116,10 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
}
}));

Holder<Boolean> hasTimeSeriesAggregation = new Holder<>(false);
plan.forEachUp(TimeSeriesAggregate.class, p -> hasTimeSeriesAggregation.set(true));
plan.forEachUp(PromqlCommand.class, p -> hasTimeSeriesAggregation.set(true));

// mark plan as preAnalyzed (if it were marked, there would be no analysis)
plan.forEachUp(LogicalPlan::setPreAnalyzed);

Expand All @@ -122,6 +129,7 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
lookupIndices,
useAggregateMetricDoubleWhenNotSupported.get(),
useDenseVectorWhenNotSupported.get(),
hasTimeSeriesAggregation.get(),
icebergPaths
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,7 @@ private void preAnalyzeMainIndices(
result.minimumTransportVersion(),
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
preAnalysis.hasTimeSeriesAggregation(),
indicesExpressionGrouper,
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
Expand Down Expand Up @@ -1188,6 +1189,7 @@ private void preAnalyzeFlatMainIndices(
result.minimumTransportVersion(),
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
preAnalysis.hasTimeSeriesAggregation(),
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.initCrossClusterState(indexResolution.inner(), executionInfo);
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void resolveLookupIndices(
minimumVersion,
false,
false,
false,
DO_NOT_GROUP,
listener.map(Versioned::inner)
);
Expand Down Expand Up @@ -147,6 +148,7 @@ public void resolveMainIndicesVersioned(
TransportVersion minimumVersion,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported,
boolean hasTimeSeriesAggregation,
IndicesExpressionGrouper indicesExpressionGrouper,
ActionListener<Versioned<IndexResolution>> listener
) {
Expand All @@ -157,6 +159,7 @@ public void resolveMainIndicesVersioned(
minimumVersion,
useAggregateMetricDoubleWhenNotSupported,
useDenseVectorWhenNotSupported,
hasTimeSeriesAggregation,
(indexPattern1, fieldCapabilitiesResponse) -> Maps.transformValues(
indicesExpressionGrouper.groupIndices(IndicesOptions.DEFAULT, Strings.splitStringByCommaToArray(indexPattern1), false),
v -> List.of(v.indices())
Expand All @@ -182,6 +185,7 @@ public void resolveMainFlatWorldIndicesVersioned(
boolean useAggregateMetricDoubleWhenNotSupported,
// Same as above
boolean useDenseVectorWhenNotSupported,
boolean hasTimeSeriesAggregation,
ActionListener<Versioned<IndexResolution>> listener
) {
doResolveIndices(
Expand All @@ -191,6 +195,7 @@ public void resolveMainFlatWorldIndicesVersioned(
minimumVersion,
useAggregateMetricDoubleWhenNotSupported,
useDenseVectorWhenNotSupported,
hasTimeSeriesAggregation,
(indexPattern1, fieldCapabilitiesResponse) -> Maps.transformValues(
EsqlResolvedIndexExpression.from(fieldCapabilitiesResponse),
v -> List.copyOf(v.expression())
Expand All @@ -206,6 +211,7 @@ private void doResolveIndices(
TransportVersion minimumVersion,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported,
boolean hasTimeSeriesAggregation,
OriginalIndexExtractor originalIndexExtractor,
ActionListener<Versioned<IndexResolution>> listener
) {
Expand All @@ -222,7 +228,8 @@ private void doResolveIndices(
overallMinimumVersion,
Build.current().isSnapshot(),
useAggregateMetricDoubleWhenNotSupported,
useDenseVectorWhenNotSupported
useDenseVectorWhenNotSupported,
hasTimeSeriesAggregation
);
LOGGER.debug(
"previously assumed minimum transport version [{}] updated to effective version [{}]"
Expand Down Expand Up @@ -274,13 +281,18 @@ private void doResolveIndices(
* report that they support the type? This exists because some remotes <strong>do</strong>
* support {@code dense_vector} without reporting that they do. And, for a while, we used the
* query itself to opt into reading these fields.
* @param hasTimeSeriesAggregation whether the query contains a time series aggregation (a {@code TS} source followed by
* {@code STATS}). When {@code false}, time series field type consistency checks
* (dimension vs metric conflicts across indices) are skipped, avoiding spurious
* {@link InvalidMappedField} errors for queries that don't aggregate time series data.
*/
public record FieldsInfo(
FieldCapabilitiesResponse caps,
@Nullable TransportVersion minTransportVersion,
boolean currentBuildIsSnapshot,
boolean useAggregateMetricDoubleWhenNotSupported,
boolean useDenseVectorWhenNotSupported
boolean useDenseVectorWhenNotSupported,
boolean hasTimeSeriesAggregation
) {}

// public for testing only
Expand Down Expand Up @@ -443,13 +455,17 @@ private static EsField createField(
type = UNSUPPORTED;
}
boolean aggregatable = first.isAggregatable();
EsField.TimeSeriesFieldType timeSeriesFieldType = EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(first);
EsField.TimeSeriesFieldType timeSeriesFieldType = fieldsInfo.hasTimeSeriesAggregation()
? EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(first)
: EsField.TimeSeriesFieldType.NONE;
if (rest.isEmpty() == false) {
for (IndexFieldCapabilities fc : rest) {
try {
timeSeriesFieldType = timeSeriesFieldType.merge(EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(fc));
} catch (IllegalArgumentException e) {
return new InvalidMappedField(name, e.getMessage());
if (fieldsInfo.hasTimeSeriesAggregation()) {
for (IndexFieldCapabilities fc : rest) {
try {
timeSeriesFieldType = timeSeriesFieldType.merge(EsField.TimeSeriesFieldType.fromIndexFieldCapabilities(fc));
} catch (IllegalArgumentException e) {
return new InvalidMappedField(name, e.getMessage());
}
}
}
for (IndexFieldCapabilities fc : rest) {
Expand Down Expand Up @@ -503,17 +519,6 @@ private static EsField conflictingTypes(String name, String fullName, FieldCapab
return new InvalidMappedField(name, typesToIndices);
}

private static EsField conflictingMetricTypes(String name, String fullName, FieldCapabilitiesResponse fieldCapsResponse) {
TreeSet<String> indices = new TreeSet<>();
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
IndexFieldCapabilities fc = ir.get().get(fullName);
if (fc != null) {
indices.add(ir.getIndexName());
}
}
return new InvalidMappedField(name, "mapped as different metric types in indices: " + indices);
}

private static FieldCapabilitiesRequest createFieldCapsRequest(
IndicesOptions options,
String index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
Expand Down Expand Up @@ -3476,7 +3477,7 @@ public void testResolveDenseVector() {
IndexResolution resolution = IndexResolver.mergedMappings(
"foo",
false,
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, true, true),
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, true, true, false),
IndexResolver.DO_NOT_GROUP
);
var plan = analyze("FROM foo", analyzer(resolution, TEST_VERIFIER));
Expand All @@ -3487,7 +3488,7 @@ public void testResolveDenseVector() {
IndexResolution resolution = IndexResolver.mergedMappings(
"foo",
false,
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, true, false),
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, true, false, false),
IndexResolver.DO_NOT_GROUP
);
var plan = analyze("FROM foo", analyzer(resolution, TEST_VERIFIER));
Expand All @@ -3511,7 +3512,7 @@ public void testResolveAggregateMetricDouble() {
IndexResolution resolution = IndexResolver.mergedMappings(
"foo",
false,
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, true, true),
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, true, true, false),
IndexResolver.DO_NOT_GROUP
);
var plan = analyze("FROM foo", analyzer(resolution, TEST_VERIFIER));
Expand All @@ -3525,7 +3526,7 @@ public void testResolveAggregateMetricDouble() {
IndexResolution resolution = IndexResolver.mergedMappings(
"foo",
false,
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, false, true),
new IndexResolver.FieldsInfo(caps, TransportVersion.minimumCompatible(), false, false, true, false),
IndexResolver.DO_NOT_GROUP
);
var plan = analyze("FROM foo", analyzer(resolution, TEST_VERIFIER));
Expand All @@ -3534,6 +3535,101 @@ public void testResolveAggregateMetricDouble() {
}
}

/**
* A field that is a dimension in one index and a metric in another does not prevent a FROM query from succeeding,
* because the time series merge is only enforced when a time series aggregation (TS + STATS) is present.
*/
public void testFromQueryWithConflictingTsTypesSucceeds() {
FieldCapabilitiesResponse caps = buildCapsWithConflictingTsTypes();
IndexResolution resolution = IndexResolver.mergedMappings(
"test",
false,
fieldsInfoOnCurrentVersion(caps, false),
(p, r) -> Map.of()
);
var plan = analyze("FROM test | KEEP status", analyzer(resolution, TEST_VERIFIER));
assertThat(plan.output(), hasSize(1));
assertThat(plan.output().getFirst().name(), equalTo("status"));
assertThat(plan.output().getFirst().dataType(), equalTo(KEYWORD));
}

/**
* When a TS source is followed by STATS, the time series merge is enforced and conflicting
* dimension/metric types across indices produce an {@link InvalidMappedField}. The field
* resolves as {@link DataType#UNSUPPORTED} rather than the original KEYWORD type.
*/
public void testTsStatsQueryWithConflictingTsTypesMarksFieldUnsupported() {
FieldCapabilitiesResponse caps = buildCapsWithConflictingTsTypes();
IndexResolution resolution = IndexResolver.mergedMappings(
"test",
false,
fieldsInfoOnCurrentVersion(caps, true),
(p, r) -> Map.of()
);
assertThat(resolution.get().mapping().get("status"), instanceOf(InvalidMappedField.class));
var plan = analyze("TS test | STATS avg(rate(bytes_in)) BY status", analyzer(resolution, TEST_VERIFIER));
var statusAttr = plan.output().stream().filter(a -> a.name().equals("status")).findFirst().orElseThrow();
assertThat(statusAttr.dataType(), equalTo(UNSUPPORTED));
}

/**
* TS without STATS does not produce a TimeSeriesAggregate, so conflicting
* dimension/metric types are ignored and the field resolves as KEYWORD.
*/
public void testTsWithoutStatsAndConflictingTsTypesSucceeds() {
FieldCapabilitiesResponse caps = buildCapsWithConflictingTsTypes();
IndexResolution resolution = IndexResolver.mergedMappings(
"test",
false,
fieldsInfoOnCurrentVersion(caps, false),
(p, r) -> Map.of()
);
var plan = analyze("TS test | KEEP status", analyzer(resolution, TEST_VERIFIER));
assertThat(plan.output(), hasSize(1));
assertThat(plan.output().getFirst().name(), equalTo("status"));
assertThat(plan.output().getFirst().dataType(), equalTo(KEYWORD));
}

/**
* PROMQL queries operate on time series data and should enforce time series field type merging,
* just like TS + STATS.
*/
public void testPromqlQueryWithConflictingTsTypesMarksFieldUnsupported() {
FieldCapabilitiesResponse caps = buildCapsWithConflictingTsTypes();
IndexResolution resolution = IndexResolver.mergedMappings(
"test",
false,
fieldsInfoOnCurrentVersion(caps, true),
(p, r) -> Map.of()
);
assertThat(resolution.get().mapping().get("status"), instanceOf(InvalidMappedField.class));
var plan = analyze("""
PROMQL index=test
step=5m start="2024-05-10T00:20:00.000Z" end="2024-05-10T00:25:00.000Z"
avg(rate(bytes_in[5m]))""", analyzer(resolution, TEST_VERIFIER));
assertThat(resolution.get().mapping().get("status").getDataType(), equalTo(UNSUPPORTED));
}

private static FieldCapabilitiesResponse buildCapsWithConflictingTsTypes() {
IndexFieldCapabilities timestamp = new IndexFieldCapabilitiesBuilder("@timestamp", "date").build();
IndexFieldCapabilities dimensionField = new IndexFieldCapabilitiesBuilder("status", "keyword").isDimension(true).build();
IndexFieldCapabilities metricField = new IndexFieldCapabilitiesBuilder("status", "keyword").metricType(
TimeSeriesParams.MetricType.GAUGE
).build();
IndexFieldCapabilities counter = new IndexFieldCapabilitiesBuilder("bytes_in", "long").metricType(
TimeSeriesParams.MetricType.COUNTER
).build();
Map<String, IndexFieldCapabilities> tsFields = Map.of("@timestamp", timestamp, "status", dimensionField, "bytes_in", counter);
Map<String, IndexFieldCapabilities> stdFields = Map.of("@timestamp", timestamp, "status", metricField, "bytes_in", counter);
return new FieldCapabilitiesResponse(
List.of(
new FieldCapabilitiesIndexResponse("ts_index", "hash_a", tsFields, false, IndexMode.TIME_SERIES),
new FieldCapabilitiesIndexResponse("std_index", "hash_b", stdFields, false, IndexMode.STANDARD)
),
List.of()
);
}

public void testBasicFork() {
LogicalPlan plan = analyze("""
from test
Expand Down Expand Up @@ -6306,7 +6402,11 @@ static Literal literal(int value) {
}

static IndexResolver.FieldsInfo fieldsInfoOnCurrentVersion(FieldCapabilitiesResponse caps) {
return new IndexResolver.FieldsInfo(caps, TransportVersion.current(), false, false, false);
return fieldsInfoOnCurrentVersion(caps, false);
}

static IndexResolver.FieldsInfo fieldsInfoOnCurrentVersion(FieldCapabilitiesResponse caps, boolean hasTimeSeriesAggregation) {
return new IndexResolver.FieldsInfo(caps, TransportVersion.current(), false, false, false, hasTimeSeriesAggregation);
}

// ===== ResolveExternalRelations + FileSet tests =====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private void resolve(String esTypeName, TimeSeriesParams.MetricType metricType,
IndexResolution resolution = IndexResolver.mergedMappings(
"idx-*",
false,
new IndexResolver.FieldsInfo(caps, TransportVersion.current(), false, false, false),
new IndexResolver.FieldsInfo(caps, TransportVersion.current(), false, false, false, false),
IndexResolver.DO_NOT_GROUP
);
EsField f = resolution.get().mapping().get(field);
Expand Down