Skip to content

Commit e1e1974

Browse files
authored
Expose timestamp field type on coordinator node (#65873)
Today a coordinating node does not have (easy) access to the mappings for the indices for the searches it wishes to coordinate. This means it can't properly interpret a timestamp range filter in a query and must involve a copy of every shard in at least the `can_match` phase. It therefore cannot cope with cases when shards are temporarily not started even if those shards are irrelevant to the search. This commit captures the mapping of the `@timestamp` field for indices which expose a timestamp range in their index metadata.
1 parent ef6fb59 commit e1e1974

File tree

3 files changed

+269
-0
lines changed

3 files changed

+269
-0
lines changed

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
100100
import org.elasticsearch.index.flush.FlushStats;
101101
import org.elasticsearch.index.get.GetStats;
102+
import org.elasticsearch.index.mapper.DateFieldMapper;
102103
import org.elasticsearch.index.mapper.IdFieldMapper;
103104
import org.elasticsearch.index.mapper.MapperService;
104105
import org.elasticsearch.index.merge.MergeStats;
@@ -232,11 +233,15 @@ public class IndicesService extends AbstractLifecycleComponent
232233
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
233234
private final boolean nodeWriteDanglingIndicesInfo;
234235
private final ValuesSourceRegistry valuesSourceRegistry;
236+
private final TimestampFieldMapperService timestampFieldMapperService;
235237

236238
@Override
237239
protected void doStart() {
238240
// Start thread that will manage cleaning the field data cache periodically
239241
threadPool.schedule(this.cacheCleaner, this.cleanInterval, ThreadPool.Names.SAME);
242+
243+
// Start watching for timestamp fields
244+
clusterService.addStateApplier(timestampFieldMapperService);
240245
}
241246

242247
public IndicesService(Settings settings, PluginsService pluginsService, NodeEnvironment nodeEnv, NamedXContentRegistry xContentRegistry,
@@ -328,6 +333,8 @@ protected void closeInternal() {
328333

329334
this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings());
330335
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
336+
337+
this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this);
331338
}
332339

333340
private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
@@ -338,6 +345,9 @@ public ClusterService clusterService() {
338345

339346
@Override
340347
protected void doStop() {
348+
clusterService.removeApplier(timestampFieldMapperService);
349+
timestampFieldMapperService.doStop();
350+
341351
ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);
342352

343353
ExecutorService indicesStopExecutor =
@@ -1603,4 +1613,16 @@ public boolean allPendingDanglingIndicesWritten() {
16031613
return nodeWriteDanglingIndicesInfo == false ||
16041614
(danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0);
16051615
}
1616+
1617+
/**
1618+
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
1619+
* - the index is not found,
1620+
* - the field is not found, or
1621+
* - the field is not a timestamp field.
1622+
*/
1623+
@Nullable
1624+
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
1625+
return timestampFieldMapperService.getTimestampFieldType(index);
1626+
}
1627+
16061628
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.indices;
21+
22+
import com.carrotsearch.hppc.cursors.ObjectCursor;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
25+
import org.apache.logging.log4j.message.ParameterizedMessage;
26+
import org.elasticsearch.action.support.PlainActionFuture;
27+
import org.elasticsearch.cluster.ClusterChangedEvent;
28+
import org.elasticsearch.cluster.ClusterStateApplier;
29+
import org.elasticsearch.cluster.metadata.DataStream;
30+
import org.elasticsearch.cluster.metadata.IndexMetadata;
31+
import org.elasticsearch.cluster.metadata.Metadata;
32+
import org.elasticsearch.common.Nullable;
33+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
34+
import org.elasticsearch.common.settings.Settings;
35+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
36+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
37+
import org.elasticsearch.common.util.concurrent.EsExecutors;
38+
import org.elasticsearch.index.Index;
39+
import org.elasticsearch.index.IndexService;
40+
import org.elasticsearch.index.mapper.DateFieldMapper;
41+
import org.elasticsearch.index.mapper.MappedFieldType;
42+
import org.elasticsearch.index.mapper.MapperService;
43+
import org.elasticsearch.index.shard.IndexLongFieldRange;
44+
import org.elasticsearch.node.Node;
45+
import org.elasticsearch.threadpool.ThreadPool;
46+
47+
import java.util.Map;
48+
import java.util.Objects;
49+
import java.util.concurrent.ExecutorService;
50+
import java.util.concurrent.TimeUnit;
51+
52+
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
53+
54+
/**
55+
* Tracks the mapping of the {@code @timestamp} field of immutable indices that expose their timestamp range in their index metadata.
56+
* Coordinating nodes do not have (easy) access to mappings for all indices, so we extract the type of this one field from the mapping here.
57+
*/
58+
public class TimestampFieldMapperService extends AbstractLifecycleComponent implements ClusterStateApplier {
59+
60+
private static final Logger logger = LogManager.getLogger(TimestampFieldMapperService.class);
61+
62+
private final IndicesService indicesService;
63+
private final ExecutorService executor; // single thread to construct mapper services async as needed
64+
65+
/**
66+
* The type of the {@code @timestamp} field keyed by index. Futures may be completed with {@code null} to indicate that there is
67+
* no usable {@code @timestamp} field.
68+
*/
69+
private final Map<Index, PlainActionFuture<DateFieldMapper.DateFieldType>> fieldTypesByIndex = ConcurrentCollections.newConcurrentMap();
70+
71+
public TimestampFieldMapperService(Settings settings, ThreadPool threadPool, IndicesService indicesService) {
72+
this.indicesService = indicesService;
73+
74+
final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
75+
final String threadName = "TimestampFieldMapperService#updateTask";
76+
executor = EsExecutors.newScaling(nodeName + "/" + threadName, 0, 1, 0, TimeUnit.MILLISECONDS,
77+
daemonThreadFactory(nodeName, threadName), threadPool.getThreadContext());
78+
}
79+
80+
@Override
81+
protected void doStart() {
82+
}
83+
84+
@Override
85+
protected void doStop() {
86+
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
87+
}
88+
89+
@Override
90+
protected void doClose() {
91+
}
92+
93+
@Override
94+
public void applyClusterState(ClusterChangedEvent event) {
95+
final Metadata metadata = event.state().metadata();
96+
97+
// clear out mappers for indices that no longer exist or whose timestamp range is no longer known
98+
fieldTypesByIndex.keySet().removeIf(index -> hasUsefulTimestampField(metadata.index(index)) == false);
99+
100+
// capture mappers for indices that do exist
101+
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
102+
final IndexMetadata indexMetadata = cursor.value;
103+
final Index index = indexMetadata.getIndex();
104+
105+
if (hasUsefulTimestampField(indexMetadata) && fieldTypesByIndex.containsKey(index) == false) {
106+
logger.trace("computing timestamp mapping for {}", index);
107+
final PlainActionFuture<DateFieldMapper.DateFieldType> future = new PlainActionFuture<>();
108+
fieldTypesByIndex.put(index, future);
109+
110+
final IndexService indexService = indicesService.indexService(index);
111+
if (indexService == null) {
112+
logger.trace("computing timestamp mapping for {} async", index);
113+
executor.execute(new AbstractRunnable() {
114+
@Override
115+
public void onFailure(Exception e) {
116+
logger.debug(new ParameterizedMessage("failed to compute mapping for {}", index), e);
117+
future.onResponse(null); // no need to propagate a failure to create the mapper service to searches
118+
}
119+
120+
@Override
121+
protected void doRun() throws Exception {
122+
try (MapperService mapperService = indicesService.createIndexMapperService(indexMetadata)) {
123+
mapperService.merge(indexMetadata, MapperService.MergeReason.MAPPING_RECOVERY);
124+
future.onResponse(fromMapperService(mapperService));
125+
}
126+
}
127+
});
128+
} else {
129+
logger.trace("computing timestamp mapping for {} using existing index service", index);
130+
try {
131+
future.onResponse(fromMapperService(indexService.mapperService()));
132+
} catch (Exception e) {
133+
assert false : e;
134+
future.onResponse(null);
135+
}
136+
}
137+
}
138+
}
139+
}
140+
141+
private static boolean hasUsefulTimestampField(IndexMetadata indexMetadata) {
142+
if (indexMetadata == null) {
143+
return false;
144+
}
145+
final IndexLongFieldRange timestampMillisRange = indexMetadata.getTimestampMillisRange();
146+
return timestampMillisRange.isComplete() && timestampMillisRange != IndexLongFieldRange.UNKNOWN;
147+
}
148+
149+
private static DateFieldMapper.DateFieldType fromMapperService(MapperService mapperService) {
150+
final MappedFieldType mappedFieldType = mapperService.fieldType(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD);
151+
if (mappedFieldType instanceof DateFieldMapper.DateFieldType) {
152+
return (DateFieldMapper.DateFieldType) mappedFieldType;
153+
} else {
154+
return null;
155+
}
156+
}
157+
158+
/**
159+
* @return the field type of the {@code @timestamp} field of the given index, or {@code null} if:
160+
* - the index is not found,
161+
* - the field is not found,
162+
* - the mapping is not known yet, or
163+
* - the field is not a timestamp field.
164+
*/
165+
@Nullable
166+
public DateFieldMapper.DateFieldType getTimestampFieldType(Index index) {
167+
final PlainActionFuture<DateFieldMapper.DateFieldType> future = fieldTypesByIndex.get(index);
168+
if (future == null || future.isDone() == false) {
169+
return null;
170+
}
171+
return future.actionGet();
172+
}
173+
174+
}

x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexIT.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@
88

99
import org.elasticsearch.action.index.IndexResponse;
1010
import org.elasticsearch.action.support.ActiveShardCount;
11+
import org.elasticsearch.action.support.PlainActionFuture;
1112
import org.elasticsearch.cluster.metadata.DataStream;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
1415
import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
1516
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.index.Index;
18+
import org.elasticsearch.index.mapper.DateFieldMapper;
1619
import org.elasticsearch.index.shard.IndexLongFieldRange;
20+
import org.elasticsearch.indices.IndicesService;
1721
import org.elasticsearch.plugins.Plugin;
1822
import org.elasticsearch.protocol.xpack.frozen.FreezeRequest;
1923
import org.elasticsearch.rest.RestStatus;
@@ -29,6 +33,7 @@
2933
import java.util.List;
3034

3135
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
36+
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
3237
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3338
import static org.hamcrest.Matchers.equalTo;
3439
import static org.hamcrest.Matchers.not;
@@ -97,4 +102,72 @@ public void testTimestampRangeRecalculatedOnStalePrimaryAllocation() throws IOEx
97102
assertThat(timestampFieldRange.getMax(), equalTo(Instant.parse("2010-01-06T02:03:04.567Z").getMillis()));
98103
}
99104

105+
public void testTimestampFieldTypeExposedByAllIndicesServices() throws Exception {
106+
internalCluster().startNodes(between(2, 4));
107+
108+
final String locale;
109+
final String date;
110+
111+
switch (between(1, 3)) {
112+
case 1:
113+
locale = "";
114+
date = "04 Feb 2020 12:01:23Z";
115+
break;
116+
case 2:
117+
locale = "en_GB";
118+
date = "04 Feb 2020 12:01:23Z";
119+
break;
120+
case 3:
121+
locale = "fr_FR";
122+
date = "04 févr. 2020 12:01:23Z";
123+
break;
124+
default:
125+
throw new AssertionError("impossible");
126+
}
127+
128+
assertAcked(prepareCreate("index")
129+
.setSettings(Settings.builder()
130+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
131+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
132+
.setMapping(jsonBuilder().startObject().startObject("_doc").startObject("properties")
133+
.startObject(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD)
134+
.field("type", "date")
135+
.field("format", "dd LLL yyyy HH:mm:ssX")
136+
.field("locale", locale)
137+
.endObject()
138+
.endObject().endObject().endObject()));
139+
140+
final Index index = client().admin().cluster().prepareState().clear().setIndices("index").setMetadata(true)
141+
.get().getState().metadata().index("index").getIndex();
142+
143+
ensureGreen("index");
144+
if (randomBoolean()) {
145+
client().prepareIndex("index").setSource(DataStream.TimestampField.FIXED_TIMESTAMP_FIELD, date).get();
146+
}
147+
148+
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
149+
assertNull(indicesService.getTimestampFieldType(index));
150+
}
151+
152+
assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet());
153+
ensureGreen("index");
154+
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
155+
final PlainActionFuture<DateFieldMapper.DateFieldType> timestampFieldTypeFuture = new PlainActionFuture<>();
156+
assertBusy(() -> {
157+
final DateFieldMapper.DateFieldType timestampFieldType = indicesService.getTimestampFieldType(index);
158+
assertNotNull(timestampFieldType);
159+
timestampFieldTypeFuture.onResponse(timestampFieldType);
160+
});
161+
assertTrue(timestampFieldTypeFuture.isDone());
162+
assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().locale().toString(), equalTo(locale));
163+
assertThat(timestampFieldTypeFuture.get().dateTimeFormatter().parseMillis(date), equalTo(1580817683000L));
164+
}
165+
166+
assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index").setFreeze(false)).actionGet());
167+
ensureGreen("index");
168+
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
169+
assertNull(indicesService.getTimestampFieldType(index));
170+
}
171+
}
172+
100173
}

0 commit comments

Comments
 (0)