diff --git a/docs/changelog/96233.yaml b/docs/changelog/96233.yaml new file mode 100644 index 0000000000000..98e5016fbb15b --- /dev/null +++ b/docs/changelog/96233.yaml @@ -0,0 +1,5 @@ +pr: 96233 +summary: Add new dynamic `until_limit` option +area: Mapping +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/mapper/DynamicMappingIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/mapper/DynamicMappingIT.java index 09606ba3bbe43..ae686cef5772f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/mapper/DynamicMappingIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/mapper/DynamicMappingIT.java @@ -27,9 +27,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder; +import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.xcontent.XContentBuilder; @@ -39,10 +41,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -89,18 +94,36 @@ public void testConflictingDynamicMappingsBulk() { assertTrue(bulkResponse.hasFailures()); } - private static void assertMappingsHaveField(GetMappingsResponse mappings, String index, String field) throws IOException { - MappingMetadata indexMappings = mappings.getMappings().get("index"); - assertNotNull(indexMappings); - Map typeMappingsMap = indexMappings.getSourceAsMap(); - @SuppressWarnings("unchecked") - Map properties = (Map) typeMappingsMap.get("properties"); - assertTrue("Could not find [" + field + "] in " + typeMappingsMap.toString(), properties.containsKey(field)); + public void testConcurrentDynamicUpdates() throws Throwable { + int numberOfFieldsToCreate = 32; + Map properties = indexConcurrently(numberOfFieldsToCreate, "true", Settings.builder()); + assertThat(properties.size(), equalTo(numberOfFieldsToCreate)); + for (int i = 0; i < numberOfFieldsToCreate; i++) { + assertTrue("Could not find [field" + i + "] in " + properties, properties.containsKey("field" + i)); + } } - public void testConcurrentDynamicUpdates() throws Throwable { - createIndex("index"); - final Thread[] indexThreads = new Thread[32]; + public void testConcurrentDynamicUntilLimitUpdates() throws Throwable { + int numberOfFieldsToCreate = 32; + Map properties = indexConcurrently( + numberOfFieldsToCreate, + "until_limit", + Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), numberOfFieldsToCreate) + ); + assertThat(properties.size(), equalTo(numberOfFieldsToCreate / 2)); + SearchResponse response = client().prepareSearch("index") + .setQuery(new MatchAllQueryBuilder()) + .setSize(numberOfFieldsToCreate) + .addFetchField("*") + .get(); + long ignoredFields = Arrays.stream(response.getHits().getHits()).filter(hit -> hit.field("_ignored") != null).count(); + assertEquals(16, ignoredFields); + } + + private Map indexConcurrently(int numberOfFieldsToCreate, String dynamic, Settings.Builder settings) throws Throwable { + client().admin().indices().prepareCreate("index").setSettings(settings).setMapping(Map.of("dynamic", dynamic)).get(); + ensureGreen("index"); + final Thread[] indexThreads = new Thread[numberOfFieldsToCreate]; final CountDownLatch startLatch = new CountDownLatch(1); final AtomicReference error = new AtomicReference<>(); for (int i = 0; i < indexThreads.length; ++i) { @@ -128,14 +151,17 @@ public void run() { if (error.get() != null) { throw error.get(); } - Thread.sleep(2000); - GetMappingsResponse mappings = indicesAdmin().prepareGetMappings("index").get(); - for (int i = 0; i < indexThreads.length; ++i) { - assertMappingsHaveField(mappings, "index", "field" + i); - } - for (int i = 0; i < indexThreads.length; ++i) { + client().admin().indices().prepareRefresh("index").get(); + for (int i = 0; i < numberOfFieldsToCreate; ++i) { assertTrue(client().prepareGet("index", Integer.toString(i)).get().isExists()); } + GetMappingsResponse mappings = indicesAdmin().prepareGetMappings("index").get(); + MappingMetadata indexMappings = mappings.getMappings().get("index"); + assertNotNull(indexMappings); + Map typeMappingsMap = indexMappings.getSourceAsMap(); + @SuppressWarnings("unchecked") + Map properties = (Map) typeMappingsMap.get("properties"); + return properties; } public void testPreflightCheckAvoidsMaster() throws InterruptedException, IOException { @@ -226,15 +252,66 @@ public void onFailure(Exception e) { Exception e = expectThrows(DocumentParsingException.class, () -> indexRequestBuilder.get(TimeValue.timeValueSeconds(10))); assertThat(e.getMessage(), Matchers.containsString("failed to parse")); assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); - assertThat( - e.getCause().getMessage(), - Matchers.containsString("Limit of total fields [2] has been exceeded while adding new fields [1]") - ); + assertThat(e.getCause().getMessage(), Matchers.containsString("Limit of total fields [2] has been exceeded")); } finally { indexingCompletedLatch.countDown(); } } + public void testDynamicUntilLimitMultiField() throws Exception { + var fields = indexUntilLimit(2, orderedMap("field1", 1, "field2", "text")).getFields(); + assertThat(fields.keySet(), equalTo(Set.of("field1", "_ignored"))); + assertThat(fields.get("field1").getValues(), equalTo(List.of(1L))); + assertThat(fields.get("_ignored").getValues(), equalTo(List.of("field2"))); + } + + public void testDynamicUntilLimitObjectField() throws Exception { + var fields = indexUntilLimit(3, orderedMap("a.b", 1, "a.c", 2, "a.d", 3)).getFields(); + assertThat(fields.keySet(), equalTo(Set.of("a.b", "a.c", "_ignored"))); + assertThat(fields.get("a.b").getValues(), equalTo(List.of(1L))); + assertThat(fields.get("a.c").getValues(), equalTo(List.of(2L))); + assertThat(fields.get("_ignored").getValues(), equalTo(List.of("a.d"))); + } + + public void testDynamicUntilLimitDottedObjectMultiField() throws Exception { + var fields = indexUntilLimit(4, orderedMap("a.b", "foo", "a.c", 2, "a.d", 3)).getFields(); + assertThat(fields.keySet(), equalTo(Set.of("a.b", "a.b.keyword", "a.c", "_ignored"))); + assertThat(fields.get("a.b").getValues(), equalTo(List.of("foo"))); + assertThat(fields.get("a.b.keyword").getValues(), equalTo(List.of("foo"))); + assertThat(fields.get("a.c").getValues(), equalTo(List.of(2L))); + assertThat(fields.get("_ignored").getValues(), equalTo(List.of("a.d"))); + } + + public void testDynamicUntilLimitObjectMultiField() throws Exception { + var fields = indexUntilLimit(5, orderedMap("a", orderedMap("b", "foo", "c", "bar", "d", 3))).getFields(); + assertThat(fields.keySet(), equalTo(Set.of("a.b", "a.b.keyword", "a.c", "a.c.keyword", "_ignored"))); + assertThat(fields.get("a.b").getValues(), equalTo(List.of("foo"))); + assertThat(fields.get("a.b.keyword").getValues(), equalTo(List.of("foo"))); + assertThat(fields.get("a.c").getValues(), equalTo(List.of("bar"))); + assertThat(fields.get("a.c.keyword").getValues(), equalTo(List.of("bar"))); + assertThat(fields.get("_ignored").getValues(), equalTo(List.of("a.d"))); + } + + private LinkedHashMap orderedMap(Object... entries) { + var map = new LinkedHashMap(); + for (int i = 0; i < entries.length; i += 2) { + map.put((String) entries[i], entries[i + 1]); + } + return map; + } + + private SearchHit indexUntilLimit(int fieldLimit, Map source) throws Exception { + client().admin() + .indices() + .prepareCreate("index") + .setSettings(Settings.builder().put(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), fieldLimit).build()) + .setMapping(Map.of("dynamic", "until_limit")) + .get(); + ensureGreen("index"); + client().prepareIndex("index").setId("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).setSource(source).get(); + return client().prepareSearch("index").setQuery(new MatchAllQueryBuilder()).addFetchField("*").get().getHits().getHits()[0]; + } + public void testTotalFieldsLimitWithRuntimeFields() { Settings indexSettings = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java index 48dc5f976da5d..d4b1ca77c7c00 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -170,6 +170,15 @@ public > T getRequestToExecute() { return (T) requestToExecute; } + /** indicates that the optional mapping updated did not succeed and prepares for a new execution */ + public void onOptionalMappingUpdateFailed() { + assert assertInvariants(ItemProcessingState.TRANSLATED); + currentItemState = ItemProcessingState.INITIAL; + requestToExecute = null; + executionResult = null; + assert assertInvariants(ItemProcessingState.INITIAL); + } + /** indicates that the current operation can not be completed and needs to wait for a new mapping from the master */ public void markAsRequiringMappingUpdate() { assert assertInvariants(ItemProcessingState.TRANSLATED); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 550dd393f94f5..410948dedce28 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -361,23 +361,27 @@ static boolean executeBulkItemRequest( request.isRetry() ); } - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) { try { primary.mapperService() .merge( MapperService.SINGLE_MAPPING_NAME, - new CompressedXContent(result.getRequiredMappingUpdate()), + new CompressedXContent(result.getMappingUpdate()), MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT ); } catch (Exception e) { - logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e); - assert result.getId() != null; - onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult); + if (result.isMappingUpdateOptional()) { + context.onOptionalMappingUpdateFailed(); + } else { + logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e); + assert result.getId() != null; + onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult); + } return true; } - mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), new ActionListener<>() { + mappingUpdater.updateMappings(result.getMappingUpdate(), primary.shardId(), new ActionListener<>() { @Override public void onResponse(Void v) { context.markAsRequiringMappingUpdate(); @@ -397,9 +401,13 @@ public void onFailure(Exception e) { @Override public void onFailure(Exception e) { - onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult); // Requesting mapping update failed, so we don't have to wait for a cluster state update - assert context.isInitial(); + if (result.isMappingUpdateOptional()) { + context.onOptionalMappingUpdateFailed(); + } else { + onComplete(exceptionToResult(e, primary, isDelete, version, result.getId()), context, updateResult); + assert context.isInitial(); + } itemDoneListener.onResponse(null); } }); @@ -642,7 +650,7 @@ private static Engine.Result performOpOnReplica( throw new IllegalStateException("Unexpected request operation type on replica: " + docWriteRequest.opType().getLowercase()); } } - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) { // Even though the primary waits on all nodes to ack the mapping changes to the master // (see MappingUpdatedAction.updateMappingOnMaster) we still need to protect against missing mappings // and wait for them. The reason is concurrent requests. Request r1 which has new field f triggers a @@ -653,7 +661,7 @@ private static Engine.Result performOpOnReplica( // applied the new mapping, so there is no other option than to wait. throw new TransportReplicationAction.RetryOnReplicaException( replica.shardId(), - "Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate() + "Mappings are not available on the replica yet, triggered update: " + result.getMappingUpdate() ); } return result; diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 31cbf03f80463..7bfb00409aae3 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -168,10 +168,10 @@ public static Translog.Location performOnReplica(ResyncReplicationRequest reques replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary()); for (Translog.Operation operation : request.getOperations()) { final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA); - if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE) { throw new TransportReplicationAction.RetryOnReplicaException( replica.shardId(), - "Mappings are not available on the replica yet, triggered update: " + operationResult.getRequiredMappingUpdate() + "Mappings are not available on the replica yet, triggered update: " + operationResult.getMappingUpdate() ); } location = syncOperationResultOrThrow(operationResult, location); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 8901b1ded7d38..62bca4a6bda0c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -386,7 +386,8 @@ public abstract static class Result { private final long seqNo; private final Exception failure; private final SetOnce freeze = new SetOnce<>(); - private final Mapping requiredMappingUpdate; + private final Mapping mappingUpdate; + private final boolean mappingUpdateOptional; private final String id; private Translog.Location translogLocation; private long took; @@ -397,7 +398,8 @@ protected Result(Operation.TYPE operationType, Exception failure, long version, this.version = version; this.term = term; this.seqNo = seqNo; - this.requiredMappingUpdate = null; + this.mappingUpdate = null; + this.mappingUpdateOptional = false; this.resultType = Type.FAILURE; this.id = id; } @@ -408,19 +410,21 @@ protected Result(Operation.TYPE operationType, long version, long term, long seq this.seqNo = seqNo; this.term = term; this.failure = null; - this.requiredMappingUpdate = null; + this.mappingUpdate = null; + this.mappingUpdateOptional = false; this.resultType = Type.SUCCESS; this.id = id; } - protected Result(Operation.TYPE operationType, Mapping requiredMappingUpdate, String id) { + protected Result(Operation.TYPE operationType, Mapping mappingUpdate, boolean mappingUpdateOptional, String id) { this.operationType = operationType; this.version = Versions.NOT_FOUND; this.seqNo = UNASSIGNED_SEQ_NO; this.term = UNASSIGNED_PRIMARY_TERM; this.failure = null; - this.requiredMappingUpdate = requiredMappingUpdate; - this.resultType = Type.MAPPING_UPDATE_REQUIRED; + this.mappingUpdate = mappingUpdate; + this.mappingUpdateOptional = mappingUpdateOptional; + this.resultType = Type.MAPPING_UPDATE; this.id = id; } @@ -451,8 +455,12 @@ public long getTerm() { * If the operation was aborted due to missing mappings, this method will return the mappings * that are required to complete the operation. */ - public Mapping getRequiredMappingUpdate() { - return requiredMappingUpdate; + public Mapping getMappingUpdate() { + return mappingUpdate; + } + + public boolean isMappingUpdateOptional() { + return mappingUpdateOptional; } /** get the translog location after executing the operation */ @@ -501,7 +509,7 @@ void freeze() { public enum Type { SUCCESS, FAILURE, - MAPPING_UPDATE_REQUIRED + MAPPING_UPDATE } } @@ -526,8 +534,8 @@ public IndexResult(Exception failure, long version, long term, long seqNo, Strin this.created = false; } - public IndexResult(Mapping requiredMappingUpdate, String id) { - super(Operation.TYPE.INDEX, requiredMappingUpdate, id); + public IndexResult(Mapping mappingUpdate, boolean mappingUpdateOptional, String id) { + super(Operation.TYPE.INDEX, mappingUpdate, mappingUpdateOptional, id); this.created = false; } @@ -557,11 +565,6 @@ public DeleteResult(Exception failure, long version, long term, long seqNo, bool this.found = found; } - public DeleteResult(Mapping requiredMappingUpdate, String id) { - super(Operation.TYPE.DELETE, requiredMappingUpdate, id); - this.found = false; - } - public boolean isFound() { return found; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 12d64e66ad126..8d6f75fa7161b 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -600,14 +600,31 @@ private static void parseNullValue(DocumentParserContext context, String lastFie } private static void parseDynamicValue(final DocumentParserContext context, String currentFieldName) throws IOException { - if (context.dynamic() == ObjectMapper.Dynamic.STRICT) { - throw new StrictDynamicMappingException(context.parser().getTokenLocation(), context.parent().fullPath(), currentFieldName); - } - if (context.dynamic() == ObjectMapper.Dynamic.FALSE) { - failIfMatchesRoutingPath(context, currentFieldName); - return; + switch (context.dynamic()) { + case STRICT -> throw new StrictDynamicMappingException( + context.parser().getTokenLocation(), + context.parent().fullPath(), + currentFieldName + ); + case FALSE -> failIfMatchesRoutingPath(context, currentFieldName); + case UNTIL_LIMIT -> { + // fast path if we're exactly at the limit + boolean limitReached = context.mappingLookup().exceedsLimit(context.indexSettings().getMappingTotalFieldsLimit(), 1); + if (limitReached == false) { + try { + context.dynamic().getDynamicFieldsBuilder().createDynamicFieldFromValue(context, currentFieldName); + } catch (IllegalArgumentException e) { + // we're not exactly at the limit but the added field is a multi-field + limitReached = true; + } + } + if (limitReached) { + context.addIgnoredField(context.path().pathAsText(currentFieldName)); + } + } + case TRUE, RUNTIME -> context.dynamic().getDynamicFieldsBuilder().createDynamicFieldFromValue(context, currentFieldName); + default -> throw new IllegalArgumentException("Unknown dynamic [" + context.dynamic() + "]"); } - context.dynamic().getDynamicFieldsBuilder().createDynamicFieldFromValue(context, currentFieldName); } private static void failIfMatchesRoutingPath(DocumentParserContext context, String currentFieldName) { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java index df73170c82bdc..b2d9d1dd2bca1 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParserContext.java @@ -288,7 +288,7 @@ public final void addDynamicMapper(Mapper mapper) { if (mappingLookup.getMapper(mapper.name()) == null && mappingLookup.objectMappers().containsKey(mapper.name()) == false && newFieldsSeen.add(mapper.name())) { - mappingLookup.checkFieldLimit(indexSettings().getMappingTotalFieldsLimit(), newFieldsSeen.size()); + mappingLookup.checkFieldLimit(indexSettings().getMappingTotalFieldsLimit(), getDynamicMappersSize() + mapper.mapperSize()); } if (mapper instanceof ObjectMapper objectMapper) { dynamicObjectMappers.put(objectMapper.name(), objectMapper); @@ -310,6 +310,10 @@ public final void addDynamicMapper(Mapper mapper) { dynamicMappers.add(mapper); } + private int getDynamicMappersSize() { + return dynamicMappers.stream().mapToInt(Mapper::mapperSize).sum(); + } + /** * Get dynamic mappers created as a result of parsing an incoming document. Responsible for exposing all the newly created * fields that need to be merged into the existing mappings. Used to create the required mapping update at the end of document parsing. diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java index 6b5e526e5430a..4f52d4a27c276 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapper.java @@ -105,4 +105,18 @@ public String toString() { public static String internFieldName(String fieldName) { return fieldNameStringDeduplicator.deduplicate(fieldName); } + + /** + * Returns the number of mappers, including children + */ + int mapperSize() { + return recursiveCountMappers(0); + } + + private int recursiveCountMappers(int size) { + for (Mapper mapper : this) { + size += mapper.recursiveCountMappers(size); + } + return size + 1; + } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java b/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java index a67d761522912..1d8375e5fdd2f 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/Mapping.java @@ -128,6 +128,10 @@ public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { return root.syntheticFieldLoader(Arrays.stream(metadataMappers)); } + public ObjectMapper.Dynamic dynamic() { + return root.dynamic(); + } + /** * Merges a new mapping into the existing one. * diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MappingLookup.java b/server/src/main/java/org/elasticsearch/index/mapper/MappingLookup.java index a7b18e1e62139..26ddf6046d1a2 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MappingLookup.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MappingLookup.java @@ -271,7 +271,7 @@ private void checkFieldLimit(long limit) { } void checkFieldLimit(long limit, int additionalFieldsToAdd) { - if (getTotalFieldsCount() + additionalFieldsToAdd - mapping.getSortedMetadataMappers().length > limit) { + if (exceedsLimit(limit, additionalFieldsToAdd)) { throw new IllegalArgumentException( "Limit of total fields [" + limit @@ -281,6 +281,10 @@ void checkFieldLimit(long limit, int additionalFieldsToAdd) { } } + boolean exceedsLimit(long limit, int additionalFieldsToAdd) { + return getTotalFieldsCount() + additionalFieldsToAdd - mapping.getSortedMetadataMappers().length > limit; + } + private void checkDimensionFieldLimit(long limit) { long dimensionFieldCount = fieldMappers.values() .stream() diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java index 23fc86c213f75..93226a7532eb7 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java @@ -48,6 +48,12 @@ DynamicFieldsBuilder getDynamicFieldsBuilder() { return DynamicFieldsBuilder.DYNAMIC_TRUE; } }, + UNTIL_LIMIT { + @Override + DynamicFieldsBuilder getDynamicFieldsBuilder() { + return DynamicFieldsBuilder.DYNAMIC_TRUE; + } + }, FALSE, STRICT, RUNTIME { @@ -217,6 +223,8 @@ protected static boolean parseObjectOrDocumentTypeProperties( builder.dynamic(Dynamic.STRICT); } else if (value.equalsIgnoreCase("runtime")) { builder.dynamic(Dynamic.RUNTIME); + } else if (value.equalsIgnoreCase("until_limit")) { + builder.dynamic(Dynamic.UNTIL_LIMIT); } else { boolean dynamic = XContentMapValues.nodeBooleanValue(fieldNode, fieldName + ".dynamic"); builder.dynamic(dynamic ? Dynamic.TRUE : Dynamic.FALSE); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d08fe8367fbca..d6f5ff20d0aa6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -102,6 +102,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MappingLookup; +import org.elasticsearch.index.mapper.ObjectMapper; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; @@ -972,7 +973,7 @@ private Engine.IndexResult applyIndexOperation( ); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { - return new Engine.IndexResult(update, operation.parsedDoc().id()); + return new Engine.IndexResult(update, update.dynamic() == ObjectMapper.Dynamic.UNTIL_LIMIT, operation.parsedDoc().id()); } } catch (Exception e) { // We treat any exception during parsing and or mapping update as a document level failure @@ -1884,8 +1885,8 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat switch (result.getResultType()) { case FAILURE: throw result.getFailure(); - case MAPPING_UPDATE_REQUIRED: - throw new IllegalArgumentException("unexpected mapping update: " + result.getRequiredMappingUpdate()); + case MAPPING_UPDATE: + throw new IllegalArgumentException("unexpected mapping update: " + result.getMappingUpdate()); case SUCCESS: break; default: diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 19f0a42deb1f1..a40ec325f6b71 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -435,7 +435,7 @@ public void indexTranslogOperations( indexShard().updateRetentionLeasesOnReplica(retentionLeases); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) { throw new MapperException("mapping updates are not allowed [" + operation + "]"); } if (result.getFailure() != null) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 6e8bb4798a198..354f94ca841b6 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -252,6 +252,7 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { Engine.IndexResult mappingUpdate = new Engine.IndexResult( new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()), + false, "id" ); Translog.Location resultLocation = new Translog.Location(42, 42, 42); @@ -849,6 +850,7 @@ public void testRetries() throws Exception { Engine.IndexResult conflictedResult = new Engine.IndexResult(err, 0, "id"); Engine.IndexResult mappingUpdate = new Engine.IndexResult( new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()), + false, "id" ); Translog.Location resultLocation = new Translog.Location(42, 42, 42); @@ -941,6 +943,7 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { Engine.IndexResult mappingUpdate = new Engine.IndexResult( new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()), + false, "id" ); Translog.Location resultLocation1 = new Translog.Location(42, 36, 36); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index bbb09c0c545dc..532946755cf80 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -926,12 +926,10 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source autoGeneratedTimestamp, false ); - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) { updateMappings( shard, - IndexMetadata.builder(shard.indexSettings().getIndexMetadata()) - .putMapping(result.getRequiredMappingUpdate().toString()) - .build() + IndexMetadata.builder(shard.indexSettings().getIndexMetadata()).putMapping(result.getMappingUpdate().toString()).build() ); result = shard.applyIndexOperationOnPrimary( Versions.MATCH_ANY, @@ -957,10 +955,10 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source sourceToParse ); shard.sync(); // advance local checkpoint - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE) { throw new TransportReplicationAction.RetryOnReplicaException( shard.shardId, - "Mappings are not available on the replica yet, triggered update: " + result.getRequiredMappingUpdate() + "Mappings are not available on the replica yet, triggered update: " + result.getMappingUpdate() ); } }