From c21d67269f6b203c8a34228279db768308e18640 Mon Sep 17 00:00:00 2001 From: RS146BIJAY Date: Wed, 7 Jan 2026 16:32:37 +0530 Subject: [PATCH] Fixing indexing regression and bug fixes for grouping criteria Signed-off-by: RS146BIJAY --- CHANGELOG.md | 1 + .../opensearch/OpenSearchServerException.java | 9 - .../action/bulk/TransportShardBulkAction.java | 15 +- .../index/engine/CompositeIndexWriter.java | 233 ++++++---- .../index/engine/DocumentIndexWriter.java | 5 +- .../LookupMapLockAcquisitionException.java | 36 -- .../index/engine/LuceneIndexWriter.java | 5 +- .../NativeLuceneIndexWriterFactory.java | 4 + .../ContextAwareGroupingFieldMapper.java | 16 + .../index/mapper/MapperService.java | 5 +- .../ExceptionSerializationTests.java | 2 - .../bulk/TransportShardBulkActionTests.java | 144 ------ .../CompositeIndexWriterForAppendTests.java | 424 ++++++++++++++++-- ...teIndexWriterForUpdateAndDeletesTests.java | 54 +++ ...riaBasedCompositeIndexWriterBaseTests.java | 76 ++++ .../ContextAwareGroupingFieldMapperTests.java | 20 + .../aggregations/AggregatorTestCase.java | 1 - 17 files changed, 716 insertions(+), 334 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/index/engine/LookupMapLockAcquisitionException.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f511f199cb00b..8e80af17cb718 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix SearchPhaseExecutionException to properly initCause ([#20320](https://github.com/opensearch-project/OpenSearch/pull/20320)) - Fix `cluster.remote..server_name` setting no populating SNI ([#20321](https://github.com/opensearch-project/OpenSearch/pull/20321)) - Fix X-Opaque-Id header propagation (along with other response headers) for streaming Reactor Netty 4 transport ([#20371](https://github.com/opensearch-project/OpenSearch/pull/20371)) +- Fix indexing regression and bug fixes for grouping criteria. ([20145](https://github.com/opensearch-project/OpenSearch/pull/20145)) ### Dependencies - Bump `com.google.auth:google-auth-library-oauth2-http` from 1.38.0 to 1.41.0 ([#20183](https://github.com/opensearch-project/OpenSearch/pull/20183)) diff --git a/server/src/main/java/org/opensearch/OpenSearchServerException.java b/server/src/main/java/org/opensearch/OpenSearchServerException.java index 65f59d179cbab..7e299abd8d943 100644 --- a/server/src/main/java/org/opensearch/OpenSearchServerException.java +++ b/server/src/main/java/org/opensearch/OpenSearchServerException.java @@ -24,7 +24,6 @@ import static org.opensearch.Version.V_2_7_0; import static org.opensearch.Version.V_3_0_0; import static org.opensearch.Version.V_3_2_0; -import static org.opensearch.Version.V_3_3_0; /** * Utility class to register server exceptions @@ -1242,13 +1241,5 @@ public static void registerExceptions() { V_3_2_0 ) ); - registerExceptionHandle( - new OpenSearchExceptionHandle( - org.opensearch.index.engine.LookupMapLockAcquisitionException.class, - org.opensearch.index.engine.LookupMapLockAcquisitionException::new, - CUSTOM_ELASTICSEARCH_EXCEPTIONS_BASE_ID + 2, - V_3_3_0 - ) - ); } } diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index 37ca3161117d5..e9e9cb7f37532 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -87,7 +87,6 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.engine.Engine; -import org.opensearch.index.engine.LookupMapLockAcquisitionException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.get.GetResult; import org.opensearch.index.mapper.MapperException; @@ -728,15 +727,7 @@ && isConflictException(executionResult.getFailure().getCause()) && context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) { context.resetForExecutionForRetry(); return; - } else if (isFailed - && context.getPrimary() != null - && context.getPrimary().indexSettings() != null - && context.getPrimary().indexSettings().isContextAwareEnabled() - && isLookupMapLockAcquisitionException(executionResult.getFailure().getCause()) - && context.getRetryCounter() < context.getPrimary().indexSettings().getMaxRetryOnLookupMapAcquisitionException()) { - context.resetForExecutionForRetry(); - return; - } + } final BulkItemResponse response; if (isUpdate) { response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult); @@ -765,10 +756,6 @@ private static boolean isConflictException(final Exception e) { return ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException; } - private static boolean isLookupMapLockAcquisitionException(final Exception e) { - return ExceptionsHelper.unwrapCause(e) instanceof LookupMapLockAcquisitionException; - } - /** * Creates a new bulk item result from the given requests and result of performing the update operation on the shard. */ diff --git a/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java b/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java index e35c5dd1145d4..e47a1d2370691 100644 --- a/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java +++ b/server/src/main/java/org/opensearch/index/engine/CompositeIndexWriter.java @@ -39,16 +39,14 @@ import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import static org.opensearch.index.BucketedCompositeDirectory.CHILD_DIRECTORY_PREFIX; @@ -129,6 +127,20 @@ public class CompositeIndexWriter implements DocumentIndexWriter { private final Store store; private static final String DUMMY_TOMBSTONE_DOC_ID = "-2"; private final IndexWriterFactory nativeIndexWriterFactory; + /** + * pendingNumDocs is used to track pendingNumDocs for child level IndexWriters. Since pendingNumDocs is incremented + * (by one) only in DocumentsWriterPerThread#reserveOneDoc for any index or update operation, we keep incrementing + * pendingNumDocs by one for each of these operations. We increment this value whenever we call following functions + * on childWriter: + * - softUpdateDocument + * - softUpdateDocuments + * - addDocuments + * - addDocument + * + * This value may overshoot during refresh temporarily due to double counting few documents in both old child + * IndexWriters and parent which should ok as undershooting pendingNumDocs can be problematic. + */ + private final AtomicLong childWriterPendingNumDocs = new AtomicLong(); public CompositeIndexWriter( EngineConfig engineConfig, @@ -336,6 +348,11 @@ public CriteriaBasedIndexWriterLookup tryAcquire() { boolean locked = lock.tryLock(); if (locked) { assert addCurrentThread(); + if (lookup.isClosed()) { + this.close(); + return null; + } + return lookup; } else { return null; @@ -473,9 +490,13 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria( boolean success = false; CriteriaBasedIndexWriterLookup current = null; try { - current = getCurrentMap(); - if (current == null || current.isClosed()) { - throw new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null); + while (current == null || current.isClosed()) { + // This function acquires a first read lock on a map which does not have any write lock present. Current keeps + // on getting rotated during refresh, so there will be one current on which read lock can be obtained. + // Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was + // never applied on this map as write lock gets only during closing time. We are doing this instead of acquire, + // because acquire can also apply a read lock in case refresh completed and map is closed. + current = this.current.mapReadLock.tryAcquire(); } DisposableIndexWriter writer = current.computeIndexWriterIfAbsentForCriteria(criteria, indexWriterSupplier); @@ -489,15 +510,6 @@ DisposableIndexWriter computeIndexWriterIfAbsentForCriteria( } } - // This function acquires a first read lock on a map which does not have any write lock present. Current keeps - // on getting rotated during refresh, so there will be one current on which read lock can be obtained. - // Validate that no write lock is applied on the map and the map is not closed. Idea here is write lock was - // never applied on this map as write lock gets only during closing time. We are doing this instead of acquire, - // because acquire can also apply a read lock in case refresh completed and map is closed. - CriteriaBasedIndexWriterLookup getCurrentMap() { - return current.mapReadLock.tryAcquire(); - } - // Used for Test Case. ReleasableLock acquireCurrentWriteLock() { return current.mapWriteLock.acquire(); @@ -532,15 +544,14 @@ public void beforeRefresh() throws IOException { private void refreshDocumentsForParentDirectory(CriteriaBasedIndexWriterLookup oldMap) throws IOException { final Map markForRefreshIndexWritersMap = oldMap.criteriaBasedIndexWriterMap; deletePreviousVersionsForUpdatedDocuments(); - final List directoryToCombine = new ArrayList<>(); + Directory directoryToCombine; for (CompositeIndexWriter.DisposableIndexWriter childDisposableWriter : markForRefreshIndexWritersMap.values()) { - directoryToCombine.add(childDisposableWriter.getIndexWriter().getDirectory()); + directoryToCombine = childDisposableWriter.getIndexWriter().getDirectory(); childDisposableWriter.getIndexWriter().close(); - } - - if (!directoryToCombine.isEmpty()) { - accumulatingIndexWriter.addIndexes(directoryToCombine.toArray(new Directory[0])); + long pendingNumDocsByOldChildWriter = childDisposableWriter.getIndexWriter().getPendingNumDocs(); + accumulatingIndexWriter.addIndexes(directoryToCombine); IOUtils.closeWhileHandlingException(directoryToCombine); + childWriterPendingNumDocs.addAndGet(-pendingNumDocsByOldChildWriter); } deleteDummyTombstoneEntry(); @@ -587,6 +598,11 @@ public ReleasableLock getNewWriteLock() { return liveIndexWriterDeletesMap.current.mapWriteLock; } + // Used for unit tests. + CriteriaBasedIndexWriterLookup acquireNewReadLock() { + return liveIndexWriterDeletesMap.current.mapReadLock.acquire(); + } + @Override public void afterRefresh(boolean didRefresh) throws IOException { liveIndexWriterDeletesMap = liveIndexWriterDeletesMap.invalidateOldMap(); @@ -627,7 +643,7 @@ DisposableIndexWriter getIndexWriterForIdFromLookup(BytesRef uid, CriteriaBasedI boolean isCriteriaNotNull = false; try { indexWriterLookup.mapReadLock.acquire(); - String criteria = getCriteriaForDoc(uid); + String criteria = indexWriterLookup.getCriteriaForDoc(uid); if (criteria != null) { DisposableIndexWriter disposableIndexWriter = indexWriterLookup.getIndexWriterForCriteria(criteria); if (disposableIndexWriter != null) { @@ -683,13 +699,38 @@ public Map getMarkForRefreshIndexWriterMap() { @Override public long getFlushingBytes() { ensureOpen(); + return getFlushingBytesUtil(liveIndexWriterDeletesMap); + } + + /** + * Utility class to calculate flushingBytes. Since we are point in time current instance of + * LiveIndexWriterDeletesMap, we should not worry about double counting due to map rotation from current to old. Also + * when old will be synced with parent writer, old writer is closed so we do not need to worry about double counting + * in parent and old IndexWriters. + * + * @param currentLiveIndexWriterDeletesMap current point in time instance of LiveIndexWriterDeletesMap. + * @return flushingBytes + */ + public long getFlushingBytesUtil(LiveIndexWriterDeletesMap currentLiveIndexWriterDeletesMap) { long flushingBytes = 0; - Collection currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - for (IndexWriter currentWriter : currentWriterSet) { - flushingBytes += currentWriter.getFlushingBytes(); + for (DisposableIndexWriter disposableIndexWriter : currentLiveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) { + try { + flushingBytes += disposableIndexWriter.getIndexWriter().getFlushingBytes(); + } catch (AlreadyClosedException e) { + if (disposableIndexWriter.getIndexWriter().getTragicException() != null) { + throw e; + } + } + } + + for (DisposableIndexWriter disposableIndexWriter : currentLiveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) { + try { + flushingBytes += disposableIndexWriter.getIndexWriter().getFlushingBytes(); + } catch (AlreadyClosedException e) { + if (disposableIndexWriter.getIndexWriter().getTragicException() != null) { + throw e; + } + } } return flushingBytes + accumulatingIndexWriter.getFlushingBytes(); @@ -698,18 +739,7 @@ public long getFlushingBytes() { @Override public long getPendingNumDocs() { ensureOpen(); - long pendingNumDocs = 0; - Collection currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - ; - for (IndexWriter currentWriter : currentWriterSet) { - pendingNumDocs += currentWriter.getPendingNumDocs(); - } - - // TODO: Should we add docs for old writer as well? - return pendingNumDocs + accumulatingIndexWriter.getPendingNumDocs(); + return childWriterPendingNumDocs.get() + accumulatingIndexWriter.getPendingNumDocs(); } @Override @@ -733,59 +763,63 @@ public boolean hasUncommittedChanges() { @Override public Throwable getTragicException() { - Collection currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - for (IndexWriter writer : currentWriterSet) { - if (writer.isOpen() == false && writer.getTragicException() != null) { - return writer.getTragicException(); + for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) { + if (disposableIndexWriter.getIndexWriter().getTragicException() != null) { + return disposableIndexWriter.getIndexWriter().getTragicException(); } } - Collection oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - ; - for (IndexWriter writer : oldWriterSet) { - if (writer.isOpen() == false && writer.getTragicException() != null) { - return writer.getTragicException(); + for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) { + if (disposableIndexWriter.getIndexWriter().getTragicException() != null) { + return disposableIndexWriter.getIndexWriter().getTragicException(); } } - if (accumulatingIndexWriter.isOpen() == false) { - return accumulatingIndexWriter.getTragicException(); - } - - return null; + return accumulatingIndexWriter.getTragicException(); } + /** + * RamBytesUsed is summation of deleteBytes, flushBytes and activeBytes. + * + * deleteBytes are increased when any updates are added to document. flushBytes are increased post indexing in case + * a DWPT on which indexing is happening is already marked for flush or in case we mark a dwpt for flush. ActiveBytes + * are increased whenever indexing completes and dwpt is not marked for flush. + * + * @return + */ @Override public final long ramBytesUsed() { ensureOpen(); + return ramBytesUsedUtil(liveIndexWriterDeletesMap); + } + + /** + * Utility class to calculate ramBytesUsedUtil. Since we are point in time current instance of + * LiveIndexWriterDeletesMap, we should not worry about double counting due to map rotation from current to old. Also + * when old will be synced with parent writer, old writer is closed so we do not need to worry about double counting + * in parent and old IndexWriters. + * + * @param currentLiveIndexWriterDeletesMap current point in time instance of LiveIndexWriterDeletesMap. + * @return ramBytesUsed + */ + private long ramBytesUsedUtil(LiveIndexWriterDeletesMap currentLiveIndexWriterDeletesMap) { long ramBytesUsed = 0; - Collection currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - - try (ReleasableLock ignore = liveIndexWriterDeletesMap.current.mapWriteLock.acquire()) { - for (IndexWriter indexWriter : currentWriterSet) { - if (indexWriter.isOpen() == true) { - ramBytesUsed += indexWriter.ramBytesUsed(); + for (DisposableIndexWriter disposableIndexWriter : currentLiveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) { + try { + ramBytesUsed += disposableIndexWriter.getIndexWriter().ramBytesUsed(); + } catch (AlreadyClosedException e) { + if (disposableIndexWriter.getIndexWriter().getTragicException() != null) { + throw e; } } } - Collection oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - try (ReleasableLock ignore = liveIndexWriterDeletesMap.old.mapWriteLock.acquire()) { - for (IndexWriter indexWriter : oldWriterSet) { - if (indexWriter.isOpen() == true) { - ramBytesUsed += indexWriter.ramBytesUsed(); + for (DisposableIndexWriter disposableIndexWriter : currentLiveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) { + try { + ramBytesUsed += disposableIndexWriter.getIndexWriter().ramBytesUsed(); + } catch (AlreadyClosedException e) { + if (disposableIndexWriter.getIndexWriter().getTragicException() != null) { + throw e; } } } @@ -813,25 +847,16 @@ public final synchronized Iterable> getLiveCommitData( public void rollback() throws IOException { if (shouldClose()) { - Collection currentWriterSet = liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - - for (IndexWriter indexWriter : currentWriterSet) { - if (indexWriter.isOpen() == true) { - indexWriter.rollback(); - } + // Though calling rollback on child level writer seems like a redundant thing, but this ensures all the + // child level IndexWriters are closed and there is no case of file leaks. Incase rollback throws any + // exception close the shard as rollback gets called in close flow. And rollback throwin exception means + // there is already leaks. + for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.current.criteriaBasedIndexWriterMap.values()) { + disposableIndexWriter.getIndexWriter().rollback(); } - Collection oldWriterSet = liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values() - .stream() - .map(DisposableIndexWriter::getIndexWriter) - .collect(Collectors.toSet()); - for (IndexWriter indexWriter : oldWriterSet) { - if (indexWriter.isOpen() == true) { - indexWriter.rollback(); - } + for (DisposableIndexWriter disposableIndexWriter : liveIndexWriterDeletesMap.old.criteriaBasedIndexWriterMap.values()) { + disposableIndexWriter.getIndexWriter().rollback(); } accumulatingIndexWriter.rollback(); @@ -884,7 +909,7 @@ public IndexWriter getAccumulatingIndexWriter() { } @Override - public long addDocuments(Iterable docs, Term uid) throws IOException { + public long addDocuments(final List docs, Term uid) throws IOException { // We obtain a read lock on a child level IndexWriter and then return it. Post Indexing completes, we close this // IndexWriter. ensureOpen(); @@ -895,7 +920,9 @@ public long addDocuments(Iterable docs, Term uid) throws Releasable ignore1 = acquireLock(uid.bytes()) ) { putCriteria(uid.bytes(), criteria); - return disposableIndexWriter.getIndexWriter().addDocuments(docs); + long seqNo = disposableIndexWriter.getIndexWriter().addDocuments(docs); + childWriterPendingNumDocs.addAndGet(docs.size()); + return seqNo; } } @@ -909,14 +936,16 @@ public long addDocument(ParseContext.Document doc, Term uid) throws IOException Releasable ignore1 = acquireLock(uid.bytes()) ) { putCriteria(uid.bytes(), criteria); - return disposableIndexWriter.getIndexWriter().addDocument(doc); + long seqNo = disposableIndexWriter.getIndexWriter().addDocument(doc); + childWriterPendingNumDocs.incrementAndGet(); + return seqNo; } } @Override public void softUpdateDocuments( Term uid, - Iterable docs, + List docs, long version, long seqNo, long primaryTerm, @@ -931,6 +960,7 @@ public void softUpdateDocuments( ) { putCriteria(uid.bytes(), criteria); disposableIndexWriter.getIndexWriter().softUpdateDocuments(uid, docs, softDeletesField); + childWriterPendingNumDocs.addAndGet(docs.size()); // TODO: Do we need to add more info in delete entry like id, seqNo, primaryTerm for debugging?? // TODO: Entry can be null for first version or if there is term bum up (validate if this is because we need to keep previous // version). @@ -957,6 +987,7 @@ public void softUpdateDocument( ) { putCriteria(uid.bytes(), criteria); disposableIndexWriter.getIndexWriter().softUpdateDocument(uid, doc, softDeletesField); + childWriterPendingNumDocs.incrementAndGet(); // TODO: Do we need to add more info in delete entry like id, seqNo, primaryTerm for debugging?? // TODO: Entry can be null for first version or if there is term bum up (validate if this is because we need to keep previous // version). @@ -998,6 +1029,8 @@ public void deleteDocument( ) { if (currentDisposableWriter.getLookupMap().isClosed() == false && isStaleOperation == false) { addDeleteEntryToWriter(new DeleteEntry(uid, version, seqNo, primaryTerm), currentDisposableWriter.getIndexWriter()); + // only increment this when addDeleteEntry for child writers are called. + childWriterPendingNumDocs.incrementAndGet(); } } } @@ -1007,6 +1040,8 @@ public void deleteDocument( try (CriteriaBasedIndexWriterLookup.CriteriaBasedWriterLock ignore = oldDisposableWriter.getLookupMap().getMapReadLock()) { if (oldDisposableWriter.getLookupMap().isClosed() == false && isStaleOperation == false) { addDeleteEntryToWriter(new DeleteEntry(uid, version, seqNo, primaryTerm), oldDisposableWriter.getIndexWriter()); + // only increment this when addDeleteEntry for child writers are called. + childWriterPendingNumDocs.incrementAndGet(); } } } @@ -1027,6 +1062,8 @@ private void deleteInLucene( } else { currentWriter.softUpdateDocument(uid, doc, softDeletesField); } + + childWriterPendingNumDocs.incrementAndGet(); } private DisposableIndexWriter getAssociatedIndexWriterForCriteria(final String criteria) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/engine/DocumentIndexWriter.java b/server/src/main/java/org/opensearch/index/engine/DocumentIndexWriter.java index 47bdf7b2b5073..3bfbd057fd720 100644 --- a/server/src/main/java/org/opensearch/index/engine/DocumentIndexWriter.java +++ b/server/src/main/java/org/opensearch/index/engine/DocumentIndexWriter.java @@ -18,6 +18,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -52,13 +53,13 @@ public interface DocumentIndexWriter extends Closeable, ReferenceManager.Refresh void deleteUnusedFiles() throws IOException; - long addDocuments(Iterable docs, Term uid) throws IOException; + long addDocuments(List docs, Term uid) throws IOException; long addDocument(ParseContext.Document doc, Term uid) throws IOException; void softUpdateDocuments( Term uid, - Iterable docs, + List docs, long version, long seqNo, long primaryTerm, diff --git a/server/src/main/java/org/opensearch/index/engine/LookupMapLockAcquisitionException.java b/server/src/main/java/org/opensearch/index/engine/LookupMapLockAcquisitionException.java deleted file mode 100644 index 1571b9893789d..0000000000000 --- a/server/src/main/java/org/opensearch/index/engine/LookupMapLockAcquisitionException.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.engine; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.index.shard.ShardId; - -import java.io.IOException; - -/** - * This exception indicates that CompositeIndexWriter was unable to obtain lock on CriteriaBasedIndexWriterLookup map - * during indexing. - * indexing request contains this Exception in the response, we do not need to add a translog entry for this request. - * - */ -public class LookupMapLockAcquisitionException extends EngineException { - public LookupMapLockAcquisitionException(StreamInput in) throws IOException { - super(in); - } - - @Override - public Throwable fillInStackTrace() { - // This is on the hot path for updates; stack traces are expensive to compute and not very useful for VCEEs, so don't fill it in. - return this; - } - - public LookupMapLockAcquisitionException(ShardId shardId, String msg, Throwable cause, Object... params) { - super(shardId, msg, cause, params); - } -} diff --git a/server/src/main/java/org/opensearch/index/engine/LuceneIndexWriter.java b/server/src/main/java/org/opensearch/index/engine/LuceneIndexWriter.java index ef349d8dfdd5f..689b61fd56ee2 100644 --- a/server/src/main/java/org/opensearch/index/engine/LuceneIndexWriter.java +++ b/server/src/main/java/org/opensearch/index/engine/LuceneIndexWriter.java @@ -16,6 +16,7 @@ import org.opensearch.index.mapper.ParseContext; import java.io.IOException; +import java.util.List; import java.util.Map; /** @@ -131,7 +132,7 @@ public void deleteUnusedFiles() throws IOException { } @Override - public long addDocuments(Iterable docs, Term uid) throws IOException { + public long addDocuments(final List docs, Term uid) throws IOException { return indexWriter.addDocuments(docs); } @@ -143,7 +144,7 @@ public long addDocument(ParseContext.Document doc, Term uid) throws IOException @Override public void softUpdateDocuments( Term uid, - Iterable docs, + List docs, long version, long seqNo, long primaryTerm, diff --git a/server/src/main/java/org/opensearch/index/engine/NativeLuceneIndexWriterFactory.java b/server/src/main/java/org/opensearch/index/engine/NativeLuceneIndexWriterFactory.java index d117fb1bdd5b2..6a165419a08ee 100644 --- a/server/src/main/java/org/opensearch/index/engine/NativeLuceneIndexWriterFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/NativeLuceneIndexWriterFactory.java @@ -195,6 +195,10 @@ public IndexWriterConfig buildIndexWriterConfig() { iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); + // We are setting the codec here rather than in the CodecService because each CriteriaBasedCodec requires + // associatedCriteria to be attached upon creation during IndexWriter initialisation. This criteria is + // determined on a per-document basis and is only available within the InternalEngine. Therefore, the codec + // for the child writer is created here where the necessary criteria information is accessible if (engineConfig.getIndexSettings().isContextAwareEnabled()) { iwc.setCodec(new CriteriaBasedCodec(engineConfig.getCodec(), associatedCriteria)); } else { diff --git a/server/src/main/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapper.java index 37267371f8bf7..458fe10acf735 100644 --- a/server/src/main/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapper.java @@ -8,6 +8,8 @@ package org.opensearch.index.mapper; +import org.apache.lucene.index.LeafReader; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.script.ContextAwareGroupingScript; import org.opensearch.script.Script; @@ -181,4 +183,18 @@ public ContextAwareGroupingFieldType fieldType() { protected String contentType() { return CONTENT_TYPE; } + + /** + * Context Aware Segment field is not a part of an ingested document, so omitting it from Context Aware Segment + * validation. + */ + @Override + public void canDeriveSource() {} + + /** + * Context Aware Segment field is not a part of an ingested document, so omitting it from Context Aware Segment + * generation. + */ + @Override + public void deriveSource(XContentBuilder builder, LeafReader leafReader, int docId) throws IOException {} } diff --git a/server/src/main/java/org/opensearch/index/mapper/MapperService.java b/server/src/main/java/org/opensearch/index/mapper/MapperService.java index b0acdceeff9ce..8acd2afb027a8 100644 --- a/server/src/main/java/org/opensearch/index/mapper/MapperService.java +++ b/server/src/main/java/org/opensearch/index/mapper/MapperService.java @@ -84,6 +84,7 @@ import java.util.function.BooleanSupplier; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -690,7 +691,9 @@ public boolean isCompositeIndexPresent() { } public Set getCompositeFieldTypes() { - return compositeMappedFieldTypes; + return compositeMappedFieldTypes.stream() + .filter(compositeMappedFieldType -> compositeMappedFieldType instanceof CompositeDataCubeFieldType) + .collect(Collectors.toSet()); } private Set getCompositeFieldTypesFromMapper() { diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index 5ec2da27d0272..d011826e81af4 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -88,7 +88,6 @@ import org.opensearch.crypto.CryptoRegistryException; import org.opensearch.env.ShardLockObtainFailedException; import org.opensearch.index.engine.IngestionEngineException; -import org.opensearch.index.engine.LookupMapLockAcquisitionException; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.query.QueryShardException; import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException; @@ -904,7 +903,6 @@ public void testIds() { ids.put(176, IngestionEngineException.class); ids.put(177, StreamException.class); ids.put(10001, IndexCreateBlockException.class); - ids.put(10002, LookupMapLockAcquisitionException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index 6340cfe145272..00bd99dd4b349 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -74,7 +74,6 @@ import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; -import org.opensearch.index.engine.LookupMapLockAcquisitionException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.Mapping; @@ -109,8 +108,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; -import static org.opensearch.common.util.FeatureFlags.CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG; -import static org.opensearch.index.IndexSettingsTests.newIndexMeta; import static org.opensearch.index.remote.RemoteStoreTestsHelper.createIndexSettings; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -1221,72 +1218,6 @@ public void testRetries() throws Exception { latch.await(); } - @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) - public void testRetriesWithLookupMapLockAcquisitionException() throws Exception { - Settings settings = Settings.builder().put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true).build(); - IndexSettings indexSettings = new IndexSettings(newIndexMeta("test", settings), Settings.EMPTY); - UpdateRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); - BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - - IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); - - Exception err = new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null); - Engine.IndexResult lookupMapExceptionResult = new Engine.IndexResult(err, 0); - Engine.IndexResult mappingUpdate = new Engine.IndexResult( - new Mapping(null, mock(RootObjectMapper.class), new MetadataFieldMapper[0], Collections.emptyMap()) - ); - Translog.Location resultLocation = new Translog.Location(42, 42, 42); - Engine.IndexResult success = new FakeIndexResult(1, 1, 13, true, resultLocation); - - IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer(ir -> { - if (randomBoolean()) { - return lookupMapExceptionResult; - } else { - return success; - } - }); - when(shard.indexSettings()).thenReturn(indexSettings); - when(shard.shardId()).thenReturn(shardId); - when(shard.mapperService()).thenReturn(mock(MapperService.class)); - - UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( - new UpdateHelper.Result( - updateResponse, - randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, - Collections.singletonMap("field", "value"), - Requests.INDEX_CONTENT_TYPE - ) - ); - - BulkItemRequest[] items = new BulkItemRequest[] { primaryRequest }; - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - - final CountDownLatch latch = new CountDownLatch(1); - TransportShardBulkAction.performOnPrimary( - bulkShardRequest, - shard, - updateHelper, - threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), - listener -> listener.onResponse(null), - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { - assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation)); - BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); - assertThat(primaryResponse.getItemId(), equalTo(0)); - assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); - DocWriteResponse response = primaryResponse.getResponse(); - assertThat(response.status(), equalTo(RestStatus.CREATED)); - assertThat(response.getSeqNo(), equalTo(13L)); - }), latch), - threadPool, - Names.WRITE - ); - latch.await(); - } - public void testUpdateWithRetryOnConflict() throws IOException, InterruptedException { IndexSettings indexSettings = new IndexSettings(indexMetadata(), Settings.EMPTY); @@ -1363,81 +1294,6 @@ public void testUpdateWithRetryOnConflict() throws IOException, InterruptedExcep }); } - @LockFeatureFlag(CONTEXT_AWARE_MIGRATION_EXPERIMENTAL_FLAG) - public void testRetriesWithLookupMapLockAcquisitionExceptionWithMaxRetry() throws IOException, InterruptedException { - int retryCount = randomIntBetween(6, 10); - Settings settings = Settings.builder() - .put(IndexSettings.INDEX_CONTEXT_AWARE_ENABLED_SETTING.getKey(), true) - .put(IndexSettings.INDEX_MAX_RETRY_ON_LOOKUP_MAP_LOCK_ACQUISITION_EXCEPTION.getKey(), retryCount) - .build(); - IndexSettings indexSettings = new IndexSettings(newIndexMeta("test", settings), Settings.EMPTY); - - int nItems = randomIntBetween(2, 5); - List items = new ArrayList<>(nItems); - for (int i = 0; i < nItems; i++) { - UpdateRequest updateRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); - items.add(new BulkItemRequest(i, updateRequest)); - } - - IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); - - Exception err = new LookupMapLockAcquisitionException(shardId, "Unable to obtain lock on the current Lookup map", null); - Engine.IndexResult lookupMapExceptionResult = new Engine.IndexResult(err, 0); - - IndexShard shard = mock(IndexShard.class); - when(shard.applyIndexOperationOnPrimary(anyLong(), any(), any(), anyLong(), anyLong(), anyLong(), anyBoolean())).thenAnswer( - ir -> lookupMapExceptionResult - ); - when(shard.indexSettings()).thenReturn(indexSettings); - when(shard.shardId()).thenReturn(shardId); - when(shard.mapperService()).thenReturn(mock(MapperService.class)); - - UpdateHelper updateHelper = mock(UpdateHelper.class); - when(updateHelper.prepare(any(), eq(shard), any())).thenReturn( - new UpdateHelper.Result( - updateResponse, - randomBoolean() ? DocWriteResponse.Result.CREATED : DocWriteResponse.Result.UPDATED, - Collections.singletonMap("field", "value"), - Requests.INDEX_CONTENT_TYPE - ) - ); - - BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items.toArray(BulkItemRequest[]::new)); - - final CountDownLatch latch = new CountDownLatch(1); - Runnable runnable = () -> TransportShardBulkAction.performOnPrimary( - bulkShardRequest, - shard, - updateHelper, - threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), - listener -> listener.onResponse(null), - new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { - assertEquals(nItems, result.replicaRequest().items().length); - for (BulkItemRequest item : result.replicaRequest().items()) { - assertEquals(LookupMapLockAcquisitionException.class, item.getPrimaryResponse().getFailure().getCause().getClass()); - } - }), latch), - threadPool, - Names.WRITE - ); - - // execute the runnable on a separate thread so that the infinite loop can be detected - new Thread(runnable).start(); - - // timeout the request in 10 seconds if there is an infinite loop - assertTrue(latch.await(10, TimeUnit.SECONDS)); - - items.forEach(item -> { - assertEquals(item.getPrimaryResponse().getFailure().getCause().getClass(), LookupMapLockAcquisitionException.class); - - // this assertion is based on the assumption that all bulk item requests are updates and are hence calling - // UpdateRequest::prepareRequest - UpdateRequest updateRequest = (UpdateRequest) item.request(); - verify(updateHelper, times(retryCount + 1)).prepare(eq(updateRequest), any(IndexShard.class), any(LongSupplier.class)); - }); - } - public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { TestThreadPool rejectingThreadPool = new TestThreadPool( "TransportShardBulkActionTests#testForceExecutionOnRejectionAfterMappingUpdate", diff --git a/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java b/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java index 7625bc5945e5a..b5137829ad775 100644 --- a/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java +++ b/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForAppendTests.java @@ -34,10 +34,10 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.mockito.Mockito.mock; @@ -163,30 +163,6 @@ public void testConcurrentComputeIndexWriterWithMapRotation() throws Exception { assertTrue("Rotation operations completed: " + rotationCount.get(), rotationCount.get() >= 0); } - public void testUnableToObtainLockOnActiveLookupWhenWriteLockDuringIndexing() throws IOException, InterruptedException { - CompositeIndexWriter.LiveIndexWriterDeletesMap map = new CompositeIndexWriter.LiveIndexWriterDeletesMap(); - CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1); - CountDownLatch releaseWriteLockLatch = new CountDownLatch(1); - Thread writer = new Thread(() -> { - try (ReleasableLock ignore = map.acquireCurrentWriteLock()) { - writeLockAcquiredLatch.countDown(); - releaseWriteLockLatch.await(); - } catch (InterruptedException ignored) { - - } - }); - - writer.start(); - writeLockAcquiredLatch.await(1, TimeUnit.SECONDS); - - expectThrows( - LookupMapLockAcquisitionException.class, - () -> map.computeIndexWriterIfAbsentForCriteria("200", this::createChildWriterFactory, new ShardId("foo", "_na_", 1)) - ); - releaseWriteLockLatch.countDown(); - writer.join(); - } - public void testConcurrentIndexingDuringRefresh() throws IOException, InterruptedException { CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( @@ -422,6 +398,31 @@ public void testHasPendingMergesDuringForceMerge() throws IOException, Interrupt } } + public void testGetTragicExceptionWithException() throws IOException { + IndexWriter parentWriter = createWriter(); + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + parentWriter, + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setRAMBufferSizeMB(128); + + try { + // Add a few small documents + for (int i = 0; i < 10; i++) { + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf(i), null, DEFAULT_CRITERIA)); + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } + + assertNull(compositeIndexWriter.getTragicException()); + } finally { + IOUtils.close(compositeIndexWriter); + } + } + public void testGetTragicExceptionWithOutOfMemoryError() throws Exception { AtomicBoolean shouldFail = new AtomicBoolean(false); AtomicReference thrownError = new AtomicReference<>(); @@ -516,6 +517,341 @@ public void testRAMBytesUsedWithOldAndCurrentWriters() throws Exception { } + public void testRAMBytesUsedWithTragicExceptionOnCurrent() throws Exception { + Supplier dirSupplier = () -> new FilterDirectory(newDirectory()) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + IndexOutput out = super.createOutput(name, context); + return new FilterIndexOutput("failing output", "test", out) { + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + throw new OutOfMemoryError("Simulated write failure"); + } + }; + } + }; + + FlushingIndexWriterFactory indexWriterFactory = new FlushingIndexWriterFactory(dirSupplier, new AtomicBoolean(true)); + + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + assertThrows(AlreadyClosedException.class, compositeIndexWriter::ramBytesUsed); + IOUtils.closeWhileHandlingException(compositeIndexWriter, indexWriterFactory); + } + + public void testRAMBytesUsedWithTragicExceptionOnOld() throws Exception { + Supplier dirSupplier = () -> new FilterDirectory(newDirectory()) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + IndexOutput out = super.createOutput(name, context); + return new FilterIndexOutput("failing output", "test", out) { + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + throw new OutOfMemoryError("Simulated write failure"); + } + }; + } + }; + + FlushingIndexWriterFactory indexWriterFactory = new FlushingIndexWriterFactory(dirSupplier, new AtomicBoolean(true)); + + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + ReleasableLock lock = compositeIndexWriter.getNewWriteLock().acquire(); + CountDownLatch latch = new CountDownLatch(1); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + assertThrows(AlreadyClosedException.class, compositeIndexWriter::ramBytesUsed); + } finally { + IOUtils.closeWhileHandlingException(compositeIndexWriter, indexWriterFactory, lock); + refresher.join(); + } + } + + public void testRAMBytesUsedWithWriterOnOld() throws Exception { + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + ReleasableLock lock = compositeIndexWriter.getNewWriteLock().acquire(); + CountDownLatch latch = new CountDownLatch(1); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + long ramBytesUsed = compositeIndexWriter.ramBytesUsed(); + assertTrue("RamBytesUsed bytes should be non-negative ", ramBytesUsed > 0); + } finally { + IOUtils.closeWhileHandlingException(compositeIndexWriter, lock); + refresher.join(); + } + } + + public void testFlushingBytesWithTragicExceptionOnCurrent() throws Exception { + Supplier dirSupplier = () -> new FilterDirectory(newDirectory()) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + IndexOutput out = super.createOutput(name, context); + return new FilterIndexOutput("failing output", "test", out) { + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + throw new OutOfMemoryError("Simulated write failure"); + } + }; + } + }; + + FlushingIndexWriterFactory indexWriterFactory = new FlushingIndexWriterFactory(dirSupplier, new AtomicBoolean(true)); + + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + assertThrows(AlreadyClosedException.class, compositeIndexWriter::getFlushingBytes); + IOUtils.closeWhileHandlingException(compositeIndexWriter, indexWriterFactory); + } + + public void testFlushingBytesUsedWithTragicExceptionOnOld() throws Exception { + Supplier dirSupplier = () -> new FilterDirectory(newDirectory()) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + IndexOutput out = super.createOutput(name, context); + return new FilterIndexOutput("failing output", "test", out) { + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + throw new OutOfMemoryError("Simulated write failure"); + } + }; + } + }; + + FlushingIndexWriterFactory indexWriterFactory = new FlushingIndexWriterFactory(dirSupplier, new AtomicBoolean(true)); + + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + ReleasableLock lock = compositeIndexWriter.getNewWriteLock().acquire(); + CountDownLatch latch = new CountDownLatch(1); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + assertThrows(AlreadyClosedException.class, compositeIndexWriter::getFlushingBytes); + } finally { + IOUtils.closeWhileHandlingException(compositeIndexWriter, indexWriterFactory, lock); + refresher.join(); + } + } + + public void testFlushingBytesUsedWithWriterOnOld() throws Exception { + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + ReleasableLock lock = compositeIndexWriter.getNewWriteLock().acquire(); + CountDownLatch latch = new CountDownLatch(1); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + long flushingBytes = compositeIndexWriter.getFlushingBytes(); + assertEquals("Flushing bytes should be non-negative", 0, flushingBytes); + } finally { + IOUtils.closeWhileHandlingException(compositeIndexWriter, lock); + refresher.join(); + } + } + + public void testTragicExceptionGetWithTragicExceptionOnCurrent() throws Exception { + Supplier dirSupplier = () -> new FilterDirectory(newDirectory()) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + IndexOutput out = super.createOutput(name, context); + return new FilterIndexOutput("failing output", "test", out) { + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + throw new OutOfMemoryError("Simulated write failure"); + } + }; + } + }; + + FlushingIndexWriterFactory indexWriterFactory = new FlushingIndexWriterFactory(dirSupplier, new AtomicBoolean(true)); + + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + assertNotNull(compositeIndexWriter.getTragicException()); + IOUtils.closeWhileHandlingException(compositeIndexWriter, indexWriterFactory); + } + + public void testTragicExceptionGetWithTragicExceptionOnOld() throws Exception { + Supplier dirSupplier = () -> new FilterDirectory(newDirectory()) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + IndexOutput out = super.createOutput(name, context); + return new FilterIndexOutput("failing output", "test", out) { + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + throw new OutOfMemoryError("Simulated write failure"); + } + }; + } + }; + + FlushingIndexWriterFactory indexWriterFactory = new FlushingIndexWriterFactory(dirSupplier, new AtomicBoolean(true)); + + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + ReleasableLock lock = compositeIndexWriter.getNewWriteLock().acquire(); + CountDownLatch latch = new CountDownLatch(1); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + assertNotNull(compositeIndexWriter.getTragicException()); + } finally { + IOUtils.closeWhileHandlingException(compositeIndexWriter, indexWriterFactory, lock); + refresher.join(); + } + } + public void testSetLiveCommitDataWithRollback() throws Exception { CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( config(), @@ -566,6 +902,44 @@ public void testSetLiveCommitDataWithRollback() throws Exception { } } + public void testRollbackWithWriterOnOld() throws Exception { + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + compositeIndexWriter.getConfig().setMaxBufferedDocs(2); + + // Add a document successfully + Engine.Index operation = indexForDoc(createParsedDoc(String.valueOf("-1"), null, DEFAULT_CRITERIA)); + try { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } catch (Error ignored) {} + + ReleasableLock lock = compositeIndexWriter.getNewWriteLock().acquire(); + CountDownLatch latch = new CountDownLatch(1); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + compositeIndexWriter.rollback(); + } catch (Exception ex) { + fail(ex.getMessage()); + } finally { + IOUtils.closeWhileHandlingException(compositeIndexWriter, lock); + refresher.join(); + } + } + public void testObtainLock() throws Exception { try ( CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( diff --git a/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForUpdateAndDeletesTests.java b/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForUpdateAndDeletesTests.java index bb407092af7bd..14966d086ec33 100644 --- a/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForUpdateAndDeletesTests.java +++ b/server/src/test/java/org/opensearch/index/engine/CompositeIndexWriterForUpdateAndDeletesTests.java @@ -13,6 +13,8 @@ import org.opensearch.common.util.io.IOUtils; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; public class CompositeIndexWriterForUpdateAndDeletesTests extends CriteriaBasedCompositeIndexWriterBaseTests { @@ -141,6 +143,58 @@ public void testDeleteWithDocumentInBothChildAndParentWriter() throws IOExceptio } } + public void testDeleteWithDocumentInOldChildWriter() throws IOException, InterruptedException { + final String id = "test"; + CompositeIndexWriter compositeIndexWriter = new CompositeIndexWriter( + config(), + createWriter(), + newSoftDeletesPolicy(), + softDeletesField, + indexWriterFactory + ); + + Engine.Index operation = indexForDoc(createParsedDoc(id, null, DEFAULT_CRITERIA)); + try (Releasable ignore1 = compositeIndexWriter.acquireLock(operation.uid().bytes())) { + compositeIndexWriter.addDocuments(operation.docs(), operation.uid()); + } + + CompositeIndexWriter.CriteriaBasedIndexWriterLookup lock = compositeIndexWriter.acquireNewReadLock(); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean run = new AtomicBoolean(true); + Thread refresher = new Thread(() -> { + latch.countDown(); + try { + compositeIndexWriter.beforeRefresh(); + } catch (Exception ignored) {} + }); + + refresher.start(); + try { + latch.await(); + compositeIndexWriter.deleteDocument( + operation.uid(), + false, + newDeleteTombstoneDoc(id), + 1, + 2, + primaryTerm.get(), + softDeletesField + ); + } finally { + IOUtils.closeWhileHandlingException(lock.getMapReadLock()); + run.set(false); + refresher.join(); + compositeIndexWriter.afterRefresh(true); + compositeIndexWriter.beforeRefresh(); + compositeIndexWriter.afterRefresh(true); + try (DirectoryReader directoryReader = DirectoryReader.open(compositeIndexWriter.getAccumulatingIndexWriter())) { + assertEquals(0, directoryReader.numDocs()); + } + + IOUtils.closeWhileHandlingException(compositeIndexWriter); + } + } + public void testUpdateWithDocumentInParentIndexWriter() throws IOException { final String id = "test"; CompositeIndexWriter compositeIndexWriter = null; diff --git a/server/src/test/java/org/opensearch/index/engine/CriteriaBasedCompositeIndexWriterBaseTests.java b/server/src/test/java/org/opensearch/index/engine/CriteriaBasedCompositeIndexWriterBaseTests.java index 1742f4e481398..5a245c4570fde 100644 --- a/server/src/test/java/org/opensearch/index/engine/CriteriaBasedCompositeIndexWriterBaseTests.java +++ b/server/src/test/java/org/opensearch/index/engine/CriteriaBasedCompositeIndexWriterBaseTests.java @@ -14,6 +14,7 @@ import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.Term; @@ -77,6 +78,7 @@ import org.junit.After; import org.junit.Before; +import java.io.Closeable; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -84,6 +86,7 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -237,6 +240,22 @@ public EngineConfig config() { ); } + public EngineConfig config(Store store) { + return config( + INDEX_SETTINGS, + store, + primaryTranslogDir, + NoMergePolicy.INSTANCE, + null, + null, + null, + null, + null, + new NoneCircuitBreakerService(), + null + ); + } + public EngineConfig config( final IndexSettings indexSettings, final Store store, @@ -486,4 +505,61 @@ public SoftDeletesPolicy newSoftDeletesPolicy() { return softDeletesPolicy; } + + protected static class FlushingIndexWriterFactory extends NativeLuceneIndexWriterFactory implements Closeable { + + private final Supplier failingWriteDirectorySupplier; + private final List directories; + private final AtomicBoolean useFailingDirectorySupplier; + + FlushingIndexWriterFactory(Supplier failingWriteDirectorySupplier, AtomicBoolean useFailingDirectorySupplier) { + this.failingWriteDirectorySupplier = failingWriteDirectorySupplier; + this.directories = new ArrayList<>(); + this.useFailingDirectorySupplier = useFailingDirectorySupplier; + } + + @Override + public IndexWriter createWriter(Directory directory, IndexWriterConfig config) throws IOException { + Directory failingDirectory = useFailingDirectorySupplier.get() ? failingWriteDirectorySupplier.get() : directory; + directories.add(failingDirectory); + return new IndexWriter(failingDirectory, config) { + @Override + public long addDocument(Iterable doc) throws IOException { + long seqNo = super.addDocument(doc); + flush(); + return seqNo; + } + + @Override + public long addDocuments(Iterable> docs) throws IOException { + long seqNo = super.addDocuments(docs); + flush(); + return seqNo; + } + + @Override + public long softUpdateDocument(Term term, Iterable doc, Field... softDeletes) throws IOException { + long seqNo = super.softUpdateDocument(term, doc, softDeletes); + flush(); + return seqNo; + } + + @Override + public long softUpdateDocuments( + Term term, + Iterable> docs, + Field... softDeletes + ) throws IOException { + long seqNo = super.softUpdateDocuments(term, docs, softDeletes); + flush(); + return seqNo; + } + }; + } + + @Override + public void close() throws IOException { + IOUtils.close(directories); + } + } } diff --git a/server/src/test/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapperTests.java index e3368726a8bad..1a4a4210fcc9e 100644 --- a/server/src/test/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapperTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/ContextAwareGroupingFieldMapperTests.java @@ -8,13 +8,16 @@ package org.opensearch.index.mapper; +import org.apache.lucene.index.LeafReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.script.ContextAwareGroupingScript; import org.opensearch.script.Script; import org.opensearch.script.ScriptService; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -146,4 +149,21 @@ public void testIngestAttemptThrowsException() { MapperParsingException e = expectThrows(MapperParsingException.class, () -> mapper.parseCreateField(mockParseContext)); assertTrue(e.getMessage().contains("context_aware_grouping cannot be ingested in the document")); } + + public void testContextAwareFieldMapperWithDerivedSource() throws IOException { + ContextAwareGroupingFieldType fieldType = new ContextAwareGroupingFieldType(Collections.emptyList(), null); + ContextAwareGroupingFieldMapper mapper = new ContextAwareGroupingFieldMapper( + "context_aware_grouping", + fieldType, + new ContextAwareGroupingFieldMapper.Builder("context_aware_grouping") + ); + LeafReader leafReader = mock(LeafReader.class); + + try { + mapper.canDeriveSource(); + mapper.deriveSource(XContentFactory.jsonBuilder().startObject(), leafReader, 0); + } catch (Exception e) { + fail(e.getMessage()); + } + } } diff --git a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java index 947f786333adf..cb8efa2ad130c 100644 --- a/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/search/aggregations/AggregatorTestCase.java @@ -181,7 +181,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;