Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum ItemProcessingState {
private DocWriteRequest<?> requestToExecute;
private BulkItemResponse executionResult;
private int updateRetryCounter;
private long noopMappingUpdateRetryForMappingVersion;

BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) {
this.request = request;
Expand Down Expand Up @@ -89,6 +90,7 @@ private void advance() {
updateRetryCounter = 0;
requestToExecute = null;
executionResult = null;
noopMappingUpdateRetryForMappingVersion = -1;
assert assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down Expand Up @@ -191,12 +193,39 @@ public void resetForMappingUpdateRetry() {
resetForExecutionRetry();
}

/**
* Don't bother the master node if the mapping update is a noop.
* This may happen if there was a concurrent mapping update that added the same field.
*
* @param mappingVersion the current mapping version. This is used to guard against infinite loops.
* @throws IllegalStateException if retried multiple times with the same mapping version, to guard against infinite loops.
*/
public void resetForNoopMappingUpdateRetry(long mappingVersion) {
assert assertInvariants(ItemProcessingState.TRANSLATED);
if (noopMappingUpdateRetryForMappingVersion == mappingVersion) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow why this infinite loop detection is necessary. I'd expect that since all dynamic fields from previous parse is now in the mappings, the retry would always succeed in one go?

The only case where I could see this not being true is updates, where a new version of the doc could appear and thus a new dynamic mapping update could potentially be required. That would be at odds with the detection here I think?

To some extent having some detection in assertions would be good, but it would have to not trigger for updates. And then be the same for other mapping update retries. I could accept not adding that now though.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect that since all dynamic fields from previous parse is now in the mappings, the retry would always succeed in one go?

If all goes right, this is true. However, there's no strict guarantee about that. And the failure mode I have in mind is not related to concurrency. The risk comes from the fact that there are two distinct places that reason about a mapping's size. One is DocumentParserContext, and the other is Mapping::merge/MappingLookup. In DocumentParserContext, we're just estimating the size of the mapping when adding dynamic mappers. If there's a bug in that estimation (#102885 tries to make that less likely), there's a chance that we add more dynamic mappers than the mapping can hold. We normally find out if we have accepted too many dynamic mappers in DocumentParserContext if the preflight check throws an exception. However, with #96235, for indices that have enabled the setting ignore_dynamic_beyond_limit, the preflight check won't fail but silently ignore fields that are beyond the limit. After realizing that a noop mapping update has been requested, the indexing request gets retried. However, the mapping hasn't changed in the meantime. Therefore, when the document gets parsed, we're adding the same dynamic mapper in DocumentParserContext again, which leads to the same dynamic mapping update that the preflight check will ignore again and we've entered an infinite loop.

Maybe another solution to this could be that the preflight check merge reason, in contrast to the auto-update merge reason doesn't ignore fields that are beyond the limit but throws an exception instead. However, due to a race condition, we might have added a dynamic mapper under the assumption that enough space is still available. After accepting the mapper, but before the preflight check, the mapping might get updated with a version that already puts us at the limit. If we then throw a validation error during the preflight check, it will lead to data loss instead of ignoring the fields that are above the limit.

The only case where I could see this not being true is updates, where a new version of the doc could appear and thus a new dynamic mapping update could potentially be required.

You're referring to the situation when the retry_on_conflict parameter is set, right? I think this case is covered by the fact that resetForExecutionRetry() resets noopMappingUpdateRetryForMappingVersion. This means that the infinite loop prevention only kicks in if the BulkPrimaryExecutionContext has been reset via noopMappingUpdateRetryForMappingVersion twice in a row, and if the mapping also hasn't changed in the meantime.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A way to reproduce the infinite loop is to purposefully introduce a bug in how DocumentParserContext counts the size of a Mapper.Builder in #96235:

If you change that to int builderMapperSize = 1; and execute org.elasticsearch.index.mapper.DynamicMappingIT#testIgnoreDynamicBeyondLimitSingleMultiField, it will trigger this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're referring to the situation when the retry_on_conflict parameter is set, right?

No, I am referring to a case where an update results in a mapping update and thus a retry afterwards. That could potentially be a noop mapping update (due to concurrency). The retried update could then result in new dynamic fields.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is safe towards such updates in the current form, since:

  1. Any retry will at least use a mapping corresponding to the last mapping version, thus only adding fields missing there.
  2. A noop result should thus not be possible.

I think #96235 does not change this, but the argument that we'll not run into it becomes more subtle.

A worry is that an update will fail with this error if it first adds a field f (noop detected) and then another field g (that is also a noop, towards same version). That seems impossible now, but hard to verify.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the potential discrepancy between what the mapper results in and what the document parser holds, I suggest to add testing to verify this as well as some assertion to ensure they are aligned at relevant times. That belongs to the other PR though (perhaps we can have the size tracking as a separate PR?).

Copy link
Copy Markdown
Member Author

@felixbarny felixbarny Dec 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what #102885 is about. It significantly simplifies the calculation of how a mapper counts against the field limit and aligns the implementations as much as possible (Mapper.Builder#getTotalFieldsCount, Mapper#getTotalFieldsCount, MappingLookup#getTotalFieldsCount, and MappingLookup#checkFieldLimit).

Copy link
Copy Markdown
Member Author

@felixbarny felixbarny Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also added an assertion here: befd7f6

// this should never happen, if we end up here, there's probably a bug
// seems like we're in a live lock/infinite loop here
// we've already re-tried and are about to retry again
// as no state has changed in the meantime (the mapping version is still the same),
// we can't expect another retry would yield a different result
// a possible cause:
// maybe we added more dynamic mappers in DocumentParserContext.addDynamicMapper than possible according to the field limit
// the additional fields are then ignored by the mapping merge and the process repeats
String message = "On retry, this indexing request resulted in another noop mapping update. "
+ "Failing the indexing operation to prevent an infinite retry loop.";
assert false : message;
throw new IllegalStateException(message);
}
resetForExecutionRetry();
noopMappingUpdateRetryForMappingVersion = mappingVersion;
}

/** resets the current item state, prepare for a new execution */
private void resetForExecutionRetry() {
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED);
currentItemState = ItemProcessingState.INITIAL;
requestToExecute = null;
executionResult = null;
noopMappingUpdateRetryForMappingVersion = -1;
assert assertInvariants(ItemProcessingState.INITIAL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
Expand All @@ -64,6 +65,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -370,12 +372,21 @@ static boolean executeBulkItemRequest(
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

try {
primary.mapperService()
.merge(
MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(result.getRequiredMappingUpdate()),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT
);
Optional<CompressedXContent> mergedSource = Optional.ofNullable(
primary.mapperService()
.merge(
MapperService.SINGLE_MAPPING_NAME,
new CompressedXContent(result.getRequiredMappingUpdate()),
MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT
)
).map(DocumentMapper::mappingSource);
Optional<CompressedXContent> previousSource = Optional.ofNullable(primary.mapperService().documentMapper())
.map(DocumentMapper::mappingSource);

if (mergedSource.equals(previousSource)) {
context.resetForNoopMappingUpdateRetry(primary.mapperService().mappingVersion());
return true;
}
} catch (Exception e) {
logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e);
assert result.getId() != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public enum MergeReason {
private final Supplier<MappingParserContext> mappingParserContextSupplier;

private volatile DocumentMapper mapper;
private volatile long mappingVersion;

public MapperService(
ClusterService clusterService,
Expand Down Expand Up @@ -298,6 +299,7 @@ public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexM
previousMapper = this.mapper;
assert assertRefreshIsNotNeeded(previousMapper, type, incomingMapping);
this.mapper = newDocumentMapper(incomingMapping, MergeReason.MAPPING_RECOVERY, incomingMappingSource);
this.mappingVersion = newIndexMetadata.getMappingVersion();
}
String op = previousMapper != null ? "updated" : "added";
if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) {
Expand Down Expand Up @@ -590,6 +592,10 @@ public DocumentMapper documentMapper() {
return mapper;
}

public long mappingVersion() {
return mappingVersion;
}

/**
* Returns {@code true} if the given {@code mappingSource} includes a type
* as a top-level object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.index.bulk.stats.ShardBulkStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
Expand Down Expand Up @@ -262,7 +264,13 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception {
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn(
mappingUpdate
);
when(shard.mapperService()).thenReturn(mock(MapperService.class));
MapperService mapperService = mock(MapperService.class);
when(shard.mapperService()).thenReturn(mapperService);

// merged mapping source needs to be different from previous one for the master node to be invoked
DocumentMapper mergedDoc = mock(DocumentMapper.class);
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDoc);
when(mergedDoc.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));

randomlySetIgnoredPrimaryResponse(items[0]);

Expand Down Expand Up @@ -875,9 +883,14 @@ public void testRetries() throws Exception {
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));
MapperService mapperService = mock(MapperService.class);
when(shard.mapperService()).thenReturn(mapperService);
when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class));

DocumentMapper mergedDocMapper = mock(DocumentMapper.class);
when(mergedDocMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDocMapper);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
Expand Down Expand Up @@ -964,7 +977,13 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
success2
);
when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong(), anyString())).thenCallRealMethod();
when(shard.mapperService()).thenReturn(mock(MapperService.class));
MapperService mapperService = mock(MapperService.class);
when(shard.mapperService()).thenReturn(mapperService);

// merged mapping source needs to be different from previous one for the master node to be invoked
DocumentMapper mergedDoc = mock(DocumentMapper.class);
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDoc);
when(mergedDoc.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));

randomlySetIgnoredPrimaryResponse(items[0]);

Expand Down Expand Up @@ -1072,6 +1091,136 @@ public void testPerformOnPrimaryReportsBulkStats() throws Exception {
latch.await();
}

public void testNoopMappingUpdateInfiniteLoopPrevention() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this test tests a non-existing scenario?

The noop prevention would only concern a case where the mapping used during parsing is different from the mapping used during preflight, containing the mapping changes added. And if so, we'd expect any retry to always succeed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this test tests a non-existing scenario?

I hope it does :)

This is meant as a safety net in case there's an unexpected bug that lead to a situation where we accept a field in DocumentParserContext even though it turns out that don't have enough capacity for it when merging the mappings.

Comment thread
felixbarny marked this conversation as resolved.
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
"id"
);

IndexShard shard = mockShard();
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn(
mappingUpdate
);
MapperService mapperService = mock(MapperService.class);
when(shard.mapperService()).thenReturn(mapperService);

DocumentMapper documentMapper = mock(DocumentMapper.class);
when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));
// returning the current document mapper as the merge result to simulate a noop mapping update
when(mapperService.documentMapper()).thenReturn(documentMapper);
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"),
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkItemRequest[] items = new BulkItemRequest[] {
new BulkItemRequest(0, new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")) };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

AssertionError error = expectThrows(
AssertionError.class,
() -> TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
(update, shardId, listener) -> fail("the master should not be contacted as the operation yielded a noop mapping update"),
listener -> listener.onResponse(null),
ActionTestUtils.assertNoFailureListener(result -> {}),
threadPool,
Names.WRITE
)
);
assertThat(
error.getMessage(),
equalTo(
"On retry, this indexing request resulted in another noop mapping update."
+ " Failing the indexing operation to prevent an infinite retry loop."
)
);
}

public void testNoopMappingUpdateSuccessOnRetry() throws Exception {
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()),
"id"
);
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult successfulResult = new FakeIndexResult(1, 1, 10, true, resultLocation, "id");

IndexShard shard = mockShard();
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn(
// on the first invocation, return a result that attempts a mapping update
// the mapping update will be a noop and the operation is retired without contacting the master
mappingUpdate,
// the second invocation also returns a mapping update result
// this doesn't trigger the infinite loop detection because MapperService#mappingVersion returns a different mapping version
mappingUpdate,
// on the third attempt, return a successful result, indicating that no mapping update needs to be executed
successfulResult
);

MapperService mapperService = mock(MapperService.class);
when(shard.mapperService()).thenReturn(mapperService);

DocumentMapper documentMapper = mock(DocumentMapper.class);
when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}"));
when(mapperService.documentMapper()).thenReturn(documentMapper);
// returning the current document mapper as the merge result to simulate a noop mapping update
when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper);
// on the second invocation, the mapping version is incremented
// so that the second mapping update attempt doesn't trigger the infinite loop prevention
when(mapperService.mappingVersion()).thenReturn(0L, 1L);

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"),
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkItemRequest[] items = new BulkItemRequest[] {
new BulkItemRequest(0, new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")) };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

final CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
(update, shardId, listener) -> fail("the master should not be contacted as the operation yielded a noop mapping update"),
listener -> listener.onFailure(new IllegalStateException("no failure expected")),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse();
assertFalse(primaryResponse.isFailed());
}), latch),
threadPool,
Names.WRITE
);

latch.await();
verify(mapperService, times(2)).merge(any(), any(CompressedXContent.class), any());
}

private IndexShard mockShard() {
IndexShard shard = mock(IndexShard.class);
when(shard.shardId()).thenReturn(shardId);
when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class));
when(shard.getFailedIndexResult(any(Exception.class), anyLong(), anyString())).thenCallRealMethod();
return shard;
}

private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) {
if (randomBoolean()) {
// add a response to the request and thereby check that it is ignored for the primary.
Expand Down