diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java index 4291ba5895beb..78df7fdc25542 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkPrimaryExecutionContext.java @@ -61,6 +61,7 @@ enum ItemProcessingState { private DocWriteRequest requestToExecute; private BulkItemResponse executionResult; private int updateRetryCounter; + private long noopMappingUpdateRetryForMappingVersion; BulkPrimaryExecutionContext(BulkShardRequest request, IndexShard primary) { this.request = request; @@ -89,6 +90,7 @@ private void advance() { updateRetryCounter = 0; requestToExecute = null; executionResult = null; + noopMappingUpdateRetryForMappingVersion = -1; assert assertInvariants(ItemProcessingState.INITIAL); } @@ -191,12 +193,39 @@ public void resetForMappingUpdateRetry() { resetForExecutionRetry(); } + /** + * Don't bother the master node if the mapping update is a noop. + * This may happen if there was a concurrent mapping update that added the same field. + * + * @param mappingVersion the current mapping version. This is used to guard against infinite loops. + * @throws IllegalStateException if retried multiple times with the same mapping version, to guard against infinite loops. + */ + public void resetForNoopMappingUpdateRetry(long mappingVersion) { + assert assertInvariants(ItemProcessingState.TRANSLATED); + if (noopMappingUpdateRetryForMappingVersion == mappingVersion) { + // this should never happen, if we end up here, there's probably a bug + // seems like we're in a live lock/infinite loop here + // we've already re-tried and are about to retry again + // as no state has changed in the meantime (the mapping version is still the same), + // we can't expect another retry would yield a different result + // a possible cause: + // maybe we added more dynamic mappers in DocumentParserContext.addDynamicMapper than possible according to the field limit + // the additional fields are then ignored by the mapping merge and the process repeats + String message = "On retry, this indexing request resulted in another noop mapping update. " + + "Failing the indexing operation to prevent an infinite retry loop."; + assert false : message; + throw new IllegalStateException(message); + } + resetForExecutionRetry(); + noopMappingUpdateRetryForMappingVersion = mappingVersion; + } + /** resets the current item state, prepare for a new execution */ private void resetForExecutionRetry() { - assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE, ItemProcessingState.EXECUTED); currentItemState = ItemProcessingState.INITIAL; requestToExecute = null; executionResult = null; + noopMappingUpdateRetryForMappingVersion = -1; assert assertInvariants(ItemProcessingState.INITIAL); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 9266ee3ee0b68..e6d5bdcc46696 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -47,6 +47,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; @@ -64,6 +65,7 @@ import java.io.IOException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.LongSupplier; @@ -370,12 +372,21 @@ static boolean executeBulkItemRequest( if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { try { - primary.mapperService() - .merge( - MapperService.SINGLE_MAPPING_NAME, - new CompressedXContent(result.getRequiredMappingUpdate()), - MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT - ); + Optional mergedSource = Optional.ofNullable( + primary.mapperService() + .merge( + MapperService.SINGLE_MAPPING_NAME, + new CompressedXContent(result.getRequiredMappingUpdate()), + MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT + ) + ).map(DocumentMapper::mappingSource); + Optional previousSource = Optional.ofNullable(primary.mapperService().documentMapper()) + .map(DocumentMapper::mappingSource); + + if (mergedSource.equals(previousSource)) { + context.resetForNoopMappingUpdateRetry(primary.mapperService().mappingVersion()); + return true; + } } catch (Exception e) { logger.info(() -> format("%s mapping update rejected by primary", primary.shardId()), e); assert result.getId() != null; diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java index cbf2dd872da2f..b714eabbd2636 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperService.java @@ -130,6 +130,7 @@ public enum MergeReason { private final Supplier mappingParserContextSupplier; private volatile DocumentMapper mapper; + private volatile long mappingVersion; public MapperService( ClusterService clusterService, @@ -298,6 +299,7 @@ public void updateMapping(final IndexMetadata currentIndexMetadata, final IndexM previousMapper = this.mapper; assert assertRefreshIsNotNeeded(previousMapper, type, incomingMapping); this.mapper = newDocumentMapper(incomingMapping, MergeReason.MAPPING_RECOVERY, incomingMappingSource); + this.mappingVersion = newIndexMetadata.getMappingVersion(); } String op = previousMapper != null ? "updated" : "added"; if (logger.isDebugEnabled() && incomingMappingSource.compressed().length < 512) { @@ -590,6 +592,10 @@ public DocumentMapper documentMapper() { return mapper; } + public long mappingVersion() { + return mappingVersion; + } + /** * Returns {@code true} if the given {@code mappingSource} includes a type * as a top-level object. diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 19abcb93fef4b..2ab5df93b2af2 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.internal.Requests; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -36,6 +37,7 @@ import org.elasticsearch.index.bulk.stats.ShardBulkStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; @@ -262,7 +264,13 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn( mappingUpdate ); - when(shard.mapperService()).thenReturn(mock(MapperService.class)); + MapperService mapperService = mock(MapperService.class); + when(shard.mapperService()).thenReturn(mapperService); + + // merged mapping source needs to be different from previous one for the master node to be invoked + DocumentMapper mergedDoc = mock(DocumentMapper.class); + when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDoc); + when(mergedDoc.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); randomlySetIgnoredPrimaryResponse(items[0]); @@ -875,9 +883,14 @@ public void testRetries() throws Exception { }); when(shard.indexSettings()).thenReturn(indexSettings); when(shard.shardId()).thenReturn(shardId); - when(shard.mapperService()).thenReturn(mock(MapperService.class)); + MapperService mapperService = mock(MapperService.class); + when(shard.mapperService()).thenReturn(mapperService); when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class)); + DocumentMapper mergedDocMapper = mock(DocumentMapper.class); + when(mergedDocMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); + when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDocMapper); + UpdateHelper updateHelper = mock(UpdateHelper.class); when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( new UpdateHelper.Result( @@ -964,7 +977,13 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { success2 ); when(shard.getFailedIndexResult(any(EsRejectedExecutionException.class), anyLong(), anyString())).thenCallRealMethod(); - when(shard.mapperService()).thenReturn(mock(MapperService.class)); + MapperService mapperService = mock(MapperService.class); + when(shard.mapperService()).thenReturn(mapperService); + + // merged mapping source needs to be different from previous one for the master node to be invoked + DocumentMapper mergedDoc = mock(DocumentMapper.class); + when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(mergedDoc); + when(mergedDoc.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); randomlySetIgnoredPrimaryResponse(items[0]); @@ -1072,6 +1091,136 @@ public void testPerformOnPrimaryReportsBulkStats() throws Exception { latch.await(); } + public void testNoopMappingUpdateInfiniteLoopPrevention() throws Exception { + Engine.IndexResult mappingUpdate = new Engine.IndexResult( + new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()), + "id" + ); + + IndexShard shard = mockShard(); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn( + mappingUpdate + ); + MapperService mapperService = mock(MapperService.class); + when(shard.mapperService()).thenReturn(mapperService); + + DocumentMapper documentMapper = mock(DocumentMapper.class); + when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); + // returning the current document mapper as the merge result to simulate a noop mapping update + when(mapperService.documentMapper()).thenReturn(documentMapper); + when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result( + new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"), + randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), + Requests.INDEX_CONTENT_TYPE + ) + ); + + BulkItemRequest[] items = new BulkItemRequest[] { + new BulkItemRequest(0, new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")) }; + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + AssertionError error = expectThrows( + AssertionError.class, + () -> TransportShardBulkAction.performOnPrimary( + bulkShardRequest, + shard, + updateHelper, + threadPool::absoluteTimeInMillis, + (update, shardId, listener) -> fail("the master should not be contacted as the operation yielded a noop mapping update"), + listener -> listener.onResponse(null), + ActionTestUtils.assertNoFailureListener(result -> {}), + threadPool, + Names.WRITE + ) + ); + assertThat( + error.getMessage(), + equalTo( + "On retry, this indexing request resulted in another noop mapping update." + + " Failing the indexing operation to prevent an infinite retry loop." + ) + ); + } + + public void testNoopMappingUpdateSuccessOnRetry() throws Exception { + Engine.IndexResult mappingUpdate = new Engine.IndexResult( + new Mapping(mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()), + "id" + ); + Translog.Location resultLocation = new Translog.Location(42, 42, 42); + Engine.IndexResult successfulResult = new FakeIndexResult(1, 1, 10, true, resultLocation, "id"); + + IndexShard shard = mockShard(); + when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenReturn( + // on the first invocation, return a result that attempts a mapping update + // the mapping update will be a noop and the operation is retired without contacting the master + mappingUpdate, + // the second invocation also returns a mapping update result + // this doesn't trigger the infinite loop detection because MapperService#mappingVersion returns a different mapping version + mappingUpdate, + // on the third attempt, return a successful result, indicating that no mapping update needs to be executed + successfulResult + ); + + MapperService mapperService = mock(MapperService.class); + when(shard.mapperService()).thenReturn(mapperService); + + DocumentMapper documentMapper = mock(DocumentMapper.class); + when(documentMapper.mappingSource()).thenReturn(CompressedXContent.fromJSON("{}")); + when(mapperService.documentMapper()).thenReturn(documentMapper); + // returning the current document mapper as the merge result to simulate a noop mapping update + when(mapperService.merge(any(), any(CompressedXContent.class), any())).thenReturn(documentMapper); + // on the second invocation, the mapping version is incremented + // so that the second mapping update attempt doesn't trigger the infinite loop prevention + when(mapperService.mappingVersion()).thenReturn(0L, 1L); + + UpdateHelper updateHelper = mock(UpdateHelper.class); + when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( + new UpdateHelper.Result( + new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"), + randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, + Collections.singletonMap("field", "value"), + Requests.INDEX_CONTENT_TYPE + ) + ); + + BulkItemRequest[] items = new BulkItemRequest[] { + new BulkItemRequest(0, new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")) }; + BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); + + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( + bulkShardRequest, + shard, + updateHelper, + threadPool::absoluteTimeInMillis, + (update, shardId, listener) -> fail("the master should not be contacted as the operation yielded a noop mapping update"), + listener -> listener.onFailure(new IllegalStateException("no failure expected")), + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + assertFalse(primaryResponse.isFailed()); + }), latch), + threadPool, + Names.WRITE + ); + + latch.await(); + verify(mapperService, times(2)).merge(any(), any(CompressedXContent.class), any()); + } + + private IndexShard mockShard() { + IndexShard shard = mock(IndexShard.class); + when(shard.shardId()).thenReturn(shardId); + when(shard.getBulkOperationListener()).thenReturn(mock(ShardBulkStats.class)); + when(shard.getFailedIndexResult(any(Exception.class), anyLong(), anyString())).thenCallRealMethod(); + return shard; + } + private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { if (randomBoolean()) { // add a response to the request and thereby check that it is ignored for the primary.