diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 514dc6e7ee132..c5c7906d3d028 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -37,7 +37,6 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; @@ -49,9 +48,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -478,13 +475,7 @@ private static boolean handleMappingUpdateRequired( final var mapperService = primary.mapperService(); final long initialMappingVersion = mapperService.mappingVersion(); try { - CompressedXContent mergedSource = mapperService.merge( - MapperService.SINGLE_MAPPING_NAME, - result.getRequiredMappingUpdate(), - MapperService.MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT - ).mappingSource(); - final DocumentMapper existingDocumentMapper = mapperService.documentMapper(); - if (existingDocumentMapper != null && mergedSource.equals(existingDocumentMapper.mappingSource())) { + if (mapperService.isNoOpUpdate(result.getRequiredMappingUpdate())) { context.resetForNoopMappingUpdateRetry(mapperService.mappingVersion()); return true; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index ac4db986f4c69..0d04968c35d31 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -614,35 +614,41 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge return doMerge(type, reason, mappingSourceAsMap); } + /** + * Check to see if a mapping update would cause a change to the existing mappings if it were applied. + * + * @param update the update to check + */ + public boolean isNoOpUpdate(CompressedXContent update) { + DocumentMapper existing = documentMapper(); + if (existing == null) { + return false; + } + MappingBuilder updateBuilder = mappingParser.parseToBuilder( + SINGLE_MAPPING_NAME, + MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, + MappingParser.convertToMap(update) + ); + Mapping mapping = mergeBuilders(mappingParser, indexSettings, updateBuilder, MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, existing); + return mapping.toCompressedXContent().equals(existing.mappingSource()); + } + private DocumentMapper doMerge(String type, MergeReason reason, Map mappingSourceAsMap) { + assert reason != MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT; MappingBuilder incomingBuilder; try { incomingBuilder = mappingParser.parseToBuilder(type, reason, mappingSourceAsMap); } catch (Exception e) { throw new MapperParsingException("Failed to parse mapping: {}", e, e.getMessage()); } - if (reason == MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT) { - // only doing a merge without updating the actual #mapper field, no need to synchronize - final DocumentMapper currentMapper = this.mapper; - Mapping mapping = mergeBuilders( - mappingParser, - indexSettings, - incomingBuilder, - MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, - currentMapper - ); - return newDocumentMapper(mapping, MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, mapping.toCompressedXContent()); - } else { - // synchronized concurrent mapper updates are guaranteed to set merged mappers derived from the mapper value previously read - // TODO: can we even have concurrent updates here? - synchronized (this) { - final DocumentMapper currentMapper = this.mapper; - Mapping mapping = mergeBuilders(mappingParser, indexSettings, incomingBuilder, reason, currentMapper); - DocumentMapper newMapper = newDocumentMapper(mapping, reason, mapping.toCompressedXContent()); - this.mapper = newMapper; - assert assertSerialization(newMapper, reason); - return newMapper; - } + // synchronized concurrent mapper updates are guaranteed to set merged mappers derived from the mapper value previously read + // TODO: can we even have concurrent updates here? + synchronized (this) { + Mapping mapping = mergeBuilders(incomingBuilder, reason); + DocumentMapper newMapper = newDocumentMapper(mapping, reason, mapping.toCompressedXContent()); + this.mapper = newMapper; + assert assertSerialization(newMapper, reason); + return newMapper; } } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 8c6d09076fac5..6c9e8346f0b99 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -1113,11 +1113,7 @@ public void testNoopMappingUpdateInfiniteLoopPrevention() throws Exception { Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id"); MapperService mapperService = mock(MapperService.class); - DocumentMapper documentMapper = mock(DocumentMapper.class); - when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); - // returning the current document mapper as the merge result to simulate a noop mapping update - when(mapperService.documentMapper()).thenReturn(documentMapper); - when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper); + when(mapperService.isNoOpUpdate(any())).thenReturn(true); IndexShard shard = mockShard(null, mapperService); when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn( @@ -1166,11 +1162,7 @@ public void testNoopMappingUpdateSuccessOnRetry() throws Exception { Engine.IndexResult successfulResult = new FakeIndexResult(1, 1, 10, true, resultLocation, "id"); MapperService mapperService = mock(MapperService.class); - DocumentMapper documentMapper = mock(DocumentMapper.class); - when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); - when(mapperService.documentMapper()).thenReturn(documentMapper); - // returning the current document mapper as the merge result to simulate a noop mapping update - when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper); + when(mapperService.isNoOpUpdate(any())).thenReturn(true); // on the second invocation, the mapping version is incremented // so that the second mapping update attempt doesn't trigger the infinite loop prevention when(mapperService.mappingVersion()).thenReturn(0L, 0L, 1L); @@ -1217,7 +1209,7 @@ public void testNoopMappingUpdateSuccessOnRetry() throws Exception { ); latch.await(); - verify(mapperService, times(2)).merge(any(), any(CompressedXContent.class), any()); + verify(mapperService, times(2)).isNoOpUpdate(any()); } private IndexShard mockShard(IndexSettings indexSettings, MapperService mapperService) { diff --git a/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java b/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java index 073f59bdf9d58..e831fcea3f14d 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/MapperServiceTests.java @@ -47,7 +47,10 @@ public class MapperServiceTests extends MapperServiceTestCase { public void testPreflightUpdateDoesNotChangeMapping() throws Throwable { final MapperService mapperService = createMapperService(mapping(b -> {})); - merge(mapperService, MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, mapping(b -> createMappingSpecifyingNumberOfFields(b, 1))); + String update = """ + {"_doc":{"properties":{"field0":{"type":"keyword"}}}} + """; + mapperService.isNoOpUpdate(new CompressedXContent(update)); assertThat("field was not created by preflight check", mapperService.fieldType("field0"), nullValue()); merge( mapperService,