Skip to content

Commit 694bfea

Browse files
committed
Add search runtime_mappings to datafeed configuration (elastic#65606)
Adds the runtime_mappings section to DatafeedConfig to define runtime fields. The runtime fields are then passed through to datafeed searches
1 parent 0ff1963 commit 694bfea

File tree

29 files changed

+449
-35
lines changed

29 files changed

+449
-35
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
3030
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
3131
import org.elasticsearch.search.builder.SearchSourceBuilder;
32+
import org.elasticsearch.xpack.core.common.time.TimeUtils;
3233
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
3334
import org.elasticsearch.xpack.core.ml.job.config.Job;
3435
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -37,7 +38,6 @@
3738
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
3839
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
3940
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
40-
import org.elasticsearch.xpack.core.common.time.TimeUtils;
4141

4242
import java.io.IOException;
4343
import java.util.ArrayList;
@@ -166,6 +166,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
166166
parser.declareObject(Builder::setIndicesOptions,
167167
(p, c) -> IndicesOptions.fromMap(p.map(), SearchRequest.DEFAULT_INDICES_OPTIONS),
168168
INDICES_OPTIONS);
169+
parser.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
169170
return parser;
170171
}
171172

@@ -192,11 +193,13 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
192193
private final DelayedDataCheckConfig delayedDataCheckConfig;
193194
private final Integer maxEmptySearches;
194195
private final IndicesOptions indicesOptions;
196+
private final Map<String, Object> runtimeMappings;
195197

196198
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
197199
QueryProvider queryProvider, AggProvider aggProvider, List<SearchSourceBuilder.ScriptField> scriptFields,
198200
Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers,
199-
DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches, IndicesOptions indicesOptions) {
201+
DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches, IndicesOptions indicesOptions,
202+
Map<String, Object> runtimeMappings) {
200203
this.id = id;
201204
this.jobId = jobId;
202205
this.queryDelay = queryDelay;
@@ -211,6 +214,7 @@ private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue
211214
this.delayedDataCheckConfig = delayedDataCheckConfig;
212215
this.maxEmptySearches = maxEmptySearches;
213216
this.indicesOptions = ExceptionsHelper.requireNonNull(indicesOptions, INDICES_OPTIONS);
217+
this.runtimeMappings = Collections.unmodifiableMap(runtimeMappings);
214218
}
215219

216220
public DatafeedConfig(StreamInput in) throws IOException {
@@ -261,6 +265,11 @@ public DatafeedConfig(StreamInput in) throws IOException {
261265
} else {
262266
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
263267
}
268+
if (in.getVersion().onOrAfter(Version.V_7_11_0)) {
269+
runtimeMappings = in.readMap();
270+
} else {
271+
runtimeMappings = Collections.emptyMap();
272+
}
264273
}
265274

266275
/**
@@ -437,6 +446,10 @@ public IndicesOptions getIndicesOptions() {
437446
return indicesOptions;
438447
}
439448

449+
public Map<String, Object> getRuntimeMappings() {
450+
return runtimeMappings;
451+
}
452+
440453
@Override
441454
public void writeTo(StreamOutput out) throws IOException {
442455
out.writeString(id);
@@ -481,6 +494,9 @@ public void writeTo(StreamOutput out) throws IOException {
481494
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
482495
indicesOptions.writeIndicesOptions(out);
483496
}
497+
if (out.getVersion().onOrAfter(Version.V_7_11_0)) {
498+
out.writeMap(runtimeMappings);
499+
}
484500
}
485501

486502
@Override
@@ -539,6 +555,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
539555
if (maxEmptySearches != null) {
540556
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
541557
}
558+
if (runtimeMappings.isEmpty() == false) {
559+
builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings);
560+
}
542561
builder.endObject();
543562
return builder;
544563
}
@@ -591,13 +610,14 @@ public boolean equals(Object other) {
591610
&& Objects.equals(this.headers, that.headers)
592611
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
593612
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches)
594-
&& Objects.equals(this.indicesOptions, that.indicesOptions);
613+
&& Objects.equals(this.indicesOptions, that.indicesOptions)
614+
&& Objects.equals(this.runtimeMappings, that.runtimeMappings);
595615
}
596616

597617
@Override
598618
public int hashCode() {
599619
return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig,
600-
headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
620+
headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions, runtimeMappings);
601621
}
602622

603623
@Override
@@ -668,6 +688,7 @@ public static class Builder {
668688
private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
669689
private Integer maxEmptySearches;
670690
private IndicesOptions indicesOptions;
691+
private Map<String, Object> runtimeMappings = Collections.emptyMap();
671692

672693
public Builder() { }
673694

@@ -692,6 +713,7 @@ public Builder(DatafeedConfig config) {
692713
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
693714
this.maxEmptySearches = config.getMaxEmptySearches();
694715
this.indicesOptions = config.indicesOptions;
716+
this.runtimeMappings = new HashMap<>(config.runtimeMappings);
695717
}
696718

697719
public Builder setId(String datafeedId) {
@@ -819,6 +841,11 @@ public IndicesOptions getIndicesOptions() {
819841
return this.indicesOptions;
820842
}
821843

844+
public void setRuntimeMappings(Map<String, Object> runtimeMappings) {
845+
this.runtimeMappings = ExceptionsHelper.requireNonNull(runtimeMappings,
846+
SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName());
847+
}
848+
822849
public DatafeedConfig build() {
823850
ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
824851
ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@@ -830,14 +857,15 @@ public DatafeedConfig build() {
830857
}
831858

832859
validateScriptFields();
860+
validateRuntimeMappings();
833861
setDefaultChunkingConfig();
834862

835863
setDefaultQueryDelay();
836864
if (indicesOptions == null) {
837865
indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;
838866
}
839867
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
840-
chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions);
868+
chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches, indicesOptions, runtimeMappings);
841869
}
842870

843871
void validateScriptFields() {
@@ -850,6 +878,28 @@ void validateScriptFields() {
850878
}
851879
}
852880

881+
/**
882+
* Perform a light check that the structure resembles runtime_mappings.
883+
* The full check cannot happen until search
884+
*/
885+
void validateRuntimeMappings() {
886+
for (Map.Entry<String, Object> entry : runtimeMappings.entrySet()) {
887+
// top level objects are fields
888+
String fieldName = entry.getKey();
889+
if (entry.getValue() instanceof Map) {
890+
@SuppressWarnings("unchecked")
891+
Map<String, Object> propNode = new HashMap<>(((Map<String, Object>) entry.getValue()));
892+
Object typeNode = propNode.get("type");
893+
if (typeNode == null) {
894+
throw ExceptionsHelper.badRequestException("No type specified for runtime field [" + fieldName + "]");
895+
}
896+
} else {
897+
throw ExceptionsHelper.badRequestException("Expected map for runtime field [" + fieldName + "] " +
898+
"definition but got a " + fieldName.getClass().getSimpleName());
899+
}
900+
}
901+
}
902+
853903
private static void checkNoMoreHistogramAggregations(Collection<AggregationBuilder> aggregations) {
854904
for (AggregationBuilder agg : aggregations) {
855905
if (ExtractorUtils.isHistogram(agg)) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
1414
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
1515

16+
import java.util.Map;
17+
1618
public final class DatafeedJobValidator {
1719

1820
private DatafeedJobValidator() {}
@@ -38,6 +40,8 @@ public static void validate(DatafeedConfig datafeedConfig, Job job, NamedXConten
3840
if (delayedDataCheckConfig.isEnabled()) {
3941
checkValidDelayedDataCheckConfig(bucketSpan, delayedDataCheckConfig);
4042
}
43+
44+
checkTimeFieldIsNotASearchRuntimeField(datafeedConfig, job.getDataDescription().getTimeField());
4145
}
4246

4347
private static void checkValidDelayedDataCheckConfig(TimeValue bucketSpan, DelayedDataCheckConfig delayedDataCheckConfig) {
@@ -97,4 +101,17 @@ private static void checkFrequencyIsMultipleOfHistogramInterval(DatafeedConfig d
97101
}
98102
}
99103
}
104+
105+
private static void checkTimeFieldIsNotASearchRuntimeField(DatafeedConfig datafeedConfig, String timeField) {
106+
// check the search RT mappings defined in the datafeed
107+
Map<String, Object> runtimeMappings = datafeedConfig.getRuntimeMappings();
108+
for (Map.Entry<String, Object> entry : runtimeMappings.entrySet()) {
109+
// top level objects are fields
110+
String fieldName = entry.getKey();
111+
if (timeField.equals(fieldName)) {
112+
throw ExceptionsHelper.badRequestException(Messages.getMessage(
113+
Messages.JOB_CONFIG_TIME_FIELD_CANNOT_BE_RUNTIME, timeField));
114+
}
115+
}
116+
}
100117
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ public final class Messages {
222222
"This job would cause a mapping clash with existing field [{0}] - avoid the clash by assigning a dedicated results index";
223223
public static final String JOB_CONFIG_TIME_FIELD_NOT_ALLOWED_IN_ANALYSIS_CONFIG =
224224
"data_description.time_field may not be used in the analysis_config";
225+
public static final String JOB_CONFIG_TIME_FIELD_CANNOT_BE_RUNTIME =
226+
"data_description.time_field [{0}] cannot be a runtime field";
225227
public static final String JOB_CONFIG_MODEL_SNAPSHOT_RETENTION_SETTINGS_INCONSISTENT =
226228
"The value of '" + Job.DAILY_MODEL_SNAPSHOT_RETENTION_AFTER_DAYS + "' [{0}] cannot be greater than '" +
227229
Job.MODEL_SNAPSHOT_RETENTION_DAYS + "' [{1}]";

x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,10 @@
375375
"results_retention_days" : {
376376
"type" : "long"
377377
},
378+
"runtime_mappings" : {
379+
"type" : "object",
380+
"enabled" : false
381+
},
378382
"script_fields" : {
379383
"type" : "object",
380384
"enabled" : false

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,14 @@ private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(Stri
168168
Boolean.toString(randomBoolean()),
169169
Boolean.toString(randomBoolean()),
170170
SearchRequest.DEFAULT_INDICES_OPTIONS));
171+
if (randomBoolean()) {
172+
Map<String, Object> settings = new HashMap<>();
173+
settings.put("type", "keyword");
174+
settings.put("script", "");
175+
Map<String, Object> field = new HashMap<>();
176+
field.put("runtime_field_foo", settings);
177+
builder.setRuntimeMappings(field);
178+
}
171179
return builder;
172180
}
173181

@@ -504,6 +512,29 @@ public void testBuild_GivenScriptFieldsAndAggregations() {
504512
assertThat(e.getMessage(), equalTo("script_fields cannot be used in combination with aggregations"));
505513
}
506514

515+
public void testBuild_GivenRuntimeMappingMissingType() {
516+
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
517+
builder.setIndices(Collections.singletonList("my_index"));
518+
Map<String, Object> properties = new HashMap<>();
519+
properties.put("type_field_is_missing", "");
520+
Map<String, Object> fields = new HashMap<>();
521+
fields.put("runtime_field_foo", properties);
522+
builder.setRuntimeMappings(fields);
523+
524+
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
525+
assertThat(e.getMessage(), equalTo("No type specified for runtime field [runtime_field_foo]"));
526+
}
527+
528+
public void testBuild_GivenInvalidRuntimeMapping() {
529+
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
530+
builder.setIndices(Collections.singletonList("my_index"));
531+
Map<String, Object> fields = new HashMap<>();
532+
fields.put("field_is_not_an_object", "");
533+
builder.setRuntimeMappings(fields);
534+
ElasticsearchException e = expectThrows(ElasticsearchException.class, builder::build);
535+
assertThat(e.getMessage(), equalTo("Expected map for runtime field [field_is_not_an_object] definition but got a String"));
536+
}
537+
507538
public void testHasAggregations_GivenNull() {
508539
DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1");
509540
builder.setIndices(Collections.singletonList("myIndex"));

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.util.concurrent.ThreadContext;
2525
import org.elasticsearch.index.get.GetResult;
26+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2627
import org.elasticsearch.test.ESTestCase;
2728
import org.elasticsearch.test.VersionUtils;
2829
import org.elasticsearch.threadpool.ThreadPool;
@@ -67,18 +68,19 @@ public class ElasticsearchMappingsTests extends ESTestCase {
6768

6869
// These are not reserved because they're Elasticsearch keywords, not
6970
// field names
70-
private static List<String> KEYWORDS = Arrays.asList(
71+
private static final List<String> KEYWORDS = Arrays.asList(
7172
ElasticsearchMappings.ANALYZER,
7273
ElasticsearchMappings.COPY_TO,
7374
ElasticsearchMappings.DYNAMIC,
7475
ElasticsearchMappings.ENABLED,
7576
ElasticsearchMappings.NESTED,
7677
ElasticsearchMappings.PROPERTIES,
7778
ElasticsearchMappings.TYPE,
78-
ElasticsearchMappings.WHITESPACE
79+
ElasticsearchMappings.WHITESPACE,
80+
SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName()
7981
);
8082

81-
private static List<String> INTERNAL_FIELDS = Arrays.asList(
83+
private static final List<String> INTERNAL_FIELDS = Arrays.asList(
8284
GetResult._ID,
8385
GetResult._INDEX,
8486
GetResult._TYPE
@@ -195,6 +197,7 @@ public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOExcepti
195197
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
196198
}
197199

200+
@SuppressWarnings({"unchecked", "rawtypes"})
198201
public void testAddDocMappingIfMissing() throws IOException {
199202
ThreadPool threadPool = mock(ThreadPool.class);
200203
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));

0 commit comments

Comments
 (0)