Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix SearchPhaseExecutionException to properly initCause ([#20320](https://github.com/opensearch-project/OpenSearch/pull/20320))
- Fix `cluster.remote.<cluster_alias>.server_name` setting no populating SNI ([#20321](https://github.com/opensearch-project/OpenSearch/pull/20321))
- Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport ([#20371](https://github.com/opensearch-project/OpenSearch/pull/20371))
- Fix indexing regression and bug fixes for grouping criteria. ([20145](https://github.com/opensearch-project/OpenSearch/pull/20145))

### Dependencies
- Bump `com.google.auth:google-auth-library-oauth2-http` from 1.38.0 to 1.41.0 ([#20183](https://github.com/opensearch-project/OpenSearch/pull/20183))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.opensearch.Version.V_2_7_0;
import static org.opensearch.Version.V_3_0_0;
import static org.opensearch.Version.V_3_2_0;
import static org.opensearch.Version.V_3_3_0;

/**
* Utility class to register server exceptions
Expand Down Expand Up @@ -1242,13 +1241,5 @@ public static void registerExceptions() {
V_3_2_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.index.engine.LookupMapLockAcquisitionException.class,
org.opensearch.index.engine.LookupMapLockAcquisitionException::new,
CUSTOM_ELASTICSEARCH_EXCEPTIONS_BASE_ID + 2,
V_3_3_0
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.get.GetResult;
import org.opensearch.index.mapper.MapperException;
Expand Down Expand Up @@ -728,15 +727,7 @@ && isConflictException(executionResult.getFailure().getCause())
&& context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) {
context.resetForExecutionForRetry();
return;
} else if (isFailed
&& context.getPrimary() != null
&& context.getPrimary().indexSettings() != null
&& context.getPrimary().indexSettings().isContextAwareEnabled()
&& isLookupMapLockAcquisitionException(executionResult.getFailure().getCause())
&& context.getRetryCounter() < context.getPrimary().indexSettings().getMaxRetryOnLookupMapAcquisitionException()) {
context.resetForExecutionForRetry();
return;
}
}
final BulkItemResponse response;
if (isUpdate) {
response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult);
Expand Down Expand Up @@ -765,10 +756,6 @@ private static boolean isConflictException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException;
}

private static boolean isLookupMapLockAcquisitionException(final Exception e) {
return ExceptionsHelper.unwrapCause(e) instanceof LookupMapLockAcquisitionException;
}

/**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -52,13 +53,13 @@ public interface DocumentIndexWriter extends Closeable, ReferenceManager.Refresh

void deleteUnusedFiles() throws IOException;

long addDocuments(Iterable<ParseContext.Document> docs, Term uid) throws IOException;
long addDocuments(List<ParseContext.Document> docs, Term uid) throws IOException;

long addDocument(ParseContext.Document doc, Term uid) throws IOException;

void softUpdateDocuments(
Term uid,
Iterable<ParseContext.Document> docs,
List<ParseContext.Document> docs,
long version,
long seqNo,
long primaryTerm,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.index.mapper.ParseContext;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -131,7 +132,7 @@ public void deleteUnusedFiles() throws IOException {
}

@Override
public long addDocuments(Iterable<ParseContext.Document> docs, Term uid) throws IOException {
public long addDocuments(final List<ParseContext.Document> docs, Term uid) throws IOException {
return indexWriter.addDocuments(docs);
}

Expand All @@ -143,7 +144,7 @@ public long addDocument(ParseContext.Document doc, Term uid) throws IOException
@Override
public void softUpdateDocuments(
Term uid,
Iterable<ParseContext.Document> docs,
List<ParseContext.Document> docs,
long version,
long seqNo,
long primaryTerm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ public IndexWriterConfig buildIndexWriterConfig() {
iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
// We are setting the codec here rather than in the CodecService because each CriteriaBasedCodec requires
// associatedCriteria to be attached upon creation during IndexWriter initialisation. This criteria is
// determined on a per-document basis and is only available within the InternalEngine. Therefore, the codec
// for the child writer is created here where the necessary criteria information is accessible
if (engineConfig.getIndexSettings().isContextAwareEnabled()) {
iwc.setCodec(new CriteriaBasedCodec(engineConfig.getCodec(), associatedCriteria));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.mapper;

import org.apache.lucene.index.LeafReader;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.script.ContextAwareGroupingScript;
import org.opensearch.script.Script;

Expand Down Expand Up @@ -181,4 +183,18 @@ public ContextAwareGroupingFieldType fieldType() {
protected String contentType() {
return CONTENT_TYPE;
}

/**
* Context Aware Segment field is not a part of an ingested document, so omitting it from Context Aware Segment
* validation.
*/
@Override
public void canDeriveSource() {}

/**
* Context Aware Segment field is not a part of an ingested document, so omitting it from Context Aware Segment
* generation.
*/
@Override
public void deriveSource(XContentBuilder builder, LeafReader leafReader, int docId) throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
Expand Down Expand Up @@ -690,7 +691,9 @@ public boolean isCompositeIndexPresent() {
}

public Set<CompositeMappedFieldType> getCompositeFieldTypes() {
return compositeMappedFieldTypes;
return compositeMappedFieldTypes.stream()
.filter(compositeMappedFieldType -> compositeMappedFieldType instanceof CompositeDataCubeFieldType)
.collect(Collectors.toSet());
}

private Set<CompositeMappedFieldType> getCompositeFieldTypesFromMapper() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
import org.opensearch.crypto.CryptoRegistryException;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.engine.IngestionEngineException;
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.query.QueryShardException;
import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException;
Expand Down Expand Up @@ -904,7 +903,6 @@ public void testIds() {
ids.put(176, IngestionEngineException.class);
ids.put(177, StreamException.class);
ids.put(10001, IndexCreateBlockException.class);
ids.put(10002, LookupMapLockAcquisitionException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.LookupMapLockAcquisitionException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.Mapping;
Expand Down Expand Up @@ -109,8 +108,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

import static org.opensearch.common.util.FeatureFlags.CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG;
import static org.opensearch.index.IndexSettingsTests.newIndexMeta;
import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -1221,72 +1218,6 @@ public void testRetries() throws Exception {
latch.await();
}

@LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG)
public void testRetriesWithLookupMapLockAcquisitionException() throws Exception {
Settings settings = Settings.builder().put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true).build();
IndexSettings indexSettings = new IndexSettings(newIndexMeta("test", settings), Settings.EMPTY);
UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);

IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Exception err = new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null);
Engine.IndexResult lookupMapExceptionResult = new Engine.IndexResult(err, 0);
Engine.IndexResult mappingUpdate = new Engine.IndexResult(
new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap())
);
Translog.Location resultLocation = new Translog.Location(42, 42, 42);
Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation);

IndexShard shard = mock(IndexShard.class);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(ir -> {
if (randomBoolean()) {
return lookupMapExceptionResult;
} else {
return success;
}
});
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
updateResponse,
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest };
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items);

final CountDownLatch latch = new CountDownLatch(1);
TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertThat(((WritePrimaryResult<BulkShardRequest, BulkShardResponse>) result).location, equalTo(resultLocation));
BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse();
assertThat(primaryResponse.getItemId(), equalTo(0));
assertThat(primaryResponse.getId(), equalTo("id"));
assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE));
DocWriteResponse response = primaryResponse.getResponse();
assertThat(response.status(), equalTo(RestStatus.CREATED));
assertThat(response.getSeqNo(), equalTo(13L));
}), latch),
threadPool,
Names.WRITE
);
latch.await();
}

public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException {
IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY);

Expand Down Expand Up @@ -1363,81 +1294,6 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep
});
}

@LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG)
public void testRetriesWithLookupMapLockAcquisitionExceptionWithMaxRetry() throws IOException, InterruptedException {
int retryCount = randomIntBetween(6, 10);
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true)
.put(IndexSettings.INDEX_MAX_RETRY_ON_LOOKUP_MAP_LOCK_ACQUISITION_EXCEPTION.getKey(), retryCount)
.build();
IndexSettings indexSettings = new IndexSettings(newIndexMeta("test", settings), Settings.EMPTY);

int nItems = randomIntBetween(2, 5);
List<BulkItemRequest> items = new ArrayList<>(nItems);
for (int i = 0; i < nItems; i++) {
UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
items.add(new BulkItemRequest(i, updateRequest));
}

IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");

Exception err = new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null);
Engine.IndexResult lookupMapExceptionResult = new Engine.IndexResult(err, 0);

IndexShard shard = mock(IndexShard.class);
when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(
ir -> lookupMapExceptionResult
);
when(shard.indexSettings()).thenReturn(indexSettings);
when(shard.shardId()).thenReturn(shardId);
when(shard.mapperService()).thenReturn(mock(MapperService.class));

UpdateHelper updateHelper = mock(UpdateHelper.class);
when(updateHelper.prepare(any(), eq(shard), any())).thenReturn(
new UpdateHelper.Result(
updateResponse,
randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED,
Collections.singletonMap("field", "value"),
Requests.INDEX_CONTENT_TYPE
)
);

BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new));

final CountDownLatch latch = new CountDownLatch(1);
Runnable runnable = () -> TransportShardBulkAction.performOnPrimary(
bulkShardRequest,
shard,
updateHelper,
threadPool::absoluteTimeInMillis,
new NoopMappingUpdatePerformer(),
listener -> listener.onResponse(null),
new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> {
assertEquals(nItems, result.replicaRequest().items().length);
for (BulkItemRequest item : result.replicaRequest().items()) {
assertEquals(LookupMapLockAcquisitionException.class, item.getPrimaryResponse().getFailure().getCause().getClass());
}
}), latch),
threadPool,
Names.WRITE
);

// execute the runnable on a separate thread so that the infinite loop can be detected
new Thread(runnable).start();

// timeout the request in 10 seconds if there is an infinite loop
assertTrue(latch.await(10, TimeUnit.SECONDS));

items.forEach(item -> {
assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), LookupMapLockAcquisitionException.class);

// this assertion is based on the assumption that all bulk item requests are updates and are hence calling
// UpdateRequest::prepareRequest
UpdateRequest updateRequest = (UpdateRequest) item.request();
verify(updateHelper, times(retryCount + 1)).prepare(eq(updateRequest), any(IndexShard.class), any(LongSupplier.class));
});
}

public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception {
TestThreadPool rejectingThreadPool = new TestThreadPool(
"TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate",
Expand Down
Loading
Loading