Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand All @@ -49,9 +48,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -478,13 +475,7 @@ private static boolean handleMappingUpdateRequired(
final var mapperService = primary.mapperService();
final long initialMappingVersion = mapperService.mappingVersion();
try {
CompressedXContent mergedSource = mapperService.merge(
MapperService.SINGLE_MAPPING_NAME,
result.getRequiredMappingUpdate(),
MapperService.MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT
).mappingSource();
final DocumentMapper existingDocumentMapper = mapperService.documentMapper();
if (existingDocumentMapper != null && mergedSource.equals(existingDocumentMapper.mappingSource())) {
if (mapperService.isNoOpUpdate(result.getRequiredMappingUpdate())) {
context.resetForNoopMappingUpdateRetry(mapperService.mappingVersion());
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,35 +614,41 @@ public DocumentMapper merge(String type, CompressedXContent mappingSource, Merge
return doMerge(type, reason, mappingSourceAsMap);
}

/**
* Check to see if a mapping update would cause a change to the existing mappings if it were applied.
*
* @param update the update to check
*/
public boolean isNoOpUpdate(CompressedXContent update) {
DocumentMapper existing = documentMapper();
if (existing == null) {
return false;
}
MappingBuilder updateBuilder = mappingParser.parseToBuilder(
SINGLE_MAPPING_NAME,
MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT,
MappingParser.convertToMap(update)
);
Mapping mapping = mergeBuilders(mappingParser, indexSettings, updateBuilder, MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, existing);
return mapping.toCompressedXContent().equals(existing.mappingSource());
}

private DocumentMapper doMerge(String type, MergeReason reason, Map<String, Object> mappingSourceAsMap) {
assert reason != MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assert enough? Should it be replaced by a runtime check, throwing an exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An assertion should be fine I think, MergeReason is entirely internal.

MappingBuilder incomingBuilder;
try {
incomingBuilder = mappingParser.parseToBuilder(type, reason, mappingSourceAsMap);
} catch (Exception e) {
throw new MapperParsingException("Failed to parse mapping: {}", e, e.getMessage());
}
if (reason == MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT) {
// only doing a merge without updating the actual #mapper field, no need to synchronize
final DocumentMapper currentMapper = this.mapper;
Mapping mapping = mergeBuilders(
mappingParser,
indexSettings,
incomingBuilder,
MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT,
currentMapper
);
return newDocumentMapper(mapping, MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, mapping.toCompressedXContent());
} else {
// synchronized concurrent mapper updates are guaranteed to set merged mappers derived from the mapper value previously read
// TODO: can we even have concurrent updates here?
synchronized (this) {
final DocumentMapper currentMapper = this.mapper;
Mapping mapping = mergeBuilders(mappingParser, indexSettings, incomingBuilder, reason, currentMapper);
DocumentMapper newMapper = newDocumentMapper(mapping, reason, mapping.toCompressedXContent());
this.mapper = newMapper;
assert assertSerialization(newMapper, reason);
return newMapper;
}
// synchronized concurrent mapper updates are guaranteed to set merged mappers derived from the mapper value previously read
// TODO: can we even have concurrent updates here?
synchronized (this) {
Mapping mapping = mergeBuilders(incomingBuilder, reason);
DocumentMapper newMapper = newDocumentMapper(mapping, reason, mapping.toCompressedXContent());
this.mapper = newMapper;
assert assertSerialization(newMapper, reason);
return newMapper;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,11 +1113,7 @@ public void testNoopMappingUpdateInfiniteLoopPrevention() throws Exception {
Engine.IndexResult mappingUpdate = new Engine.IndexResult(Mapping.emptyCompressed(), "id");

MapperService mapperService = mock(MapperService.class);
DocumentMapper documentMapper = mock(DocumentMapper.class);
when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));
// returning the current document mapper as the merge result to simulate a noop mapping update
when(mapperService.documentMapper()).thenReturn(documentMapper);
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper);
when(mapperService.isNoOpUpdate(any())).thenReturn(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That behavior already existed, but I think we should remove the mockito framework.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good idea. There are several tests in here that use Mockito so I think probably it's best done as a follow-up covering all of them, to keep things separate?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be done in a follow-up pr


IndexShard shard = mockShard(null, mapperService);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn(
Expand Down Expand Up @@ -1166,11 +1162,7 @@ public void testNoopMappingUpdateSuccessOnRetry() throws Exception {
Engine.IndexResult successfulResult = new FakeIndexResult(1, 1, 10, true, resultLocation, "id");

MapperService mapperService = mock(MapperService.class);
DocumentMapper documentMapper = mock(DocumentMapper.class);
when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));
when(mapperService.documentMapper()).thenReturn(documentMapper);
// returning the current document mapper as the merge result to simulate a noop mapping update
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper);
when(mapperService.isNoOpUpdate(any())).thenReturn(true);
// on the second invocation, the mapping version is incremented
// so that the second mapping update attempt doesn't trigger the infinite loop prevention
when(mapperService.mappingVersion()).thenReturn(0L, 0L, 1L);
Expand Down Expand Up @@ -1217,7 +1209,7 @@ public void testNoopMappingUpdateSuccessOnRetry() throws Exception {
);

latch.await();
verify(mapperService, times(2)).merge(any(), any(CompressedXContent.class), any());
verify(mapperService, times(2)).isNoOpUpdate(any());
}

private IndexShard mockShard(IndexSettings indexSettings, MapperService mapperService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public class MapperServiceTests extends MapperServiceTestCase {

public void testPreflightUpdateDoesNotChangeMapping() throws Throwable {
final MapperService mapperService = createMapperService(mapping(b -> {}));
merge(mapperService, MergeReason.MAPPING_AUTO_UPDATE_PREFLIGHT, mapping(b -> createMappingSpecifyingNumberOfFields(b, 1)));
String update = """
{"_doc":{"properties":{"field0":{"type":"keyword"}}}}
""";
mapperService.isNoOpUpdate(new CompressedXContent(update));
assertThat("field was not created by preflight check", mapperService.fieldType("field0"), nullValue());
merge(
mapperService,
Expand Down
Loading