diff --git a/CHANGELOG.md b/CHANGELOG.md index fe393e5ddb717..35b50e1643cf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Ignore awareness attributes when a custom preference string is included with a search request ([#18848](https://github.com/opensearch-project/OpenSearch/pull/18848)) - Use ScoreDoc instead of FieldDoc when creating TopScoreDocCollectorManager to avoid unnecessary conversion ([#18802](https://github.com/opensearch-project/OpenSearch/pull/18802)) - Fix leafSorter optimization for ReadOnlyEngine and NRTReplicationEngine ([#18639](https://github.com/opensearch-project/OpenSearch/pull/18639)) +- Close IndexFieldDataService asynchronously ([#18888](https://github.com/opensearch-project/OpenSearch/pull/18888)) - Fix query string regex queries incorrectly swallowing TooComplexToDeterminizeException ([#18883](https://github.com/opensearch-project/OpenSearch/pull/18883)) ### Security diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index d65a9a0323178..321980d7dcd21 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -274,7 +274,13 @@ public IndexService( idFieldDataEnabled, scriptService ); - this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService); + this.indexFieldData = new IndexFieldDataService( + indexSettings, + indicesFieldDataCache, + circuitBreakerService, + mapperService, + threadPool + ); if (indexSettings.getIndexSortConfig().hasIndexSort()) { // we delay the actual creation of the sort order for this index because the mapping has not been merged yet. // The sort order is validated right after the merge of the mapping later in the process. diff --git a/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java b/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java index 98900482176e5..5cc114b80aa36 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java +++ b/server/src/main/java/org/opensearch/index/fielddata/IndexFieldDataService.java @@ -44,6 +44,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.search.lookup.SearchLookup; +import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; import java.io.IOException; @@ -77,6 +78,8 @@ public class IndexFieldDataService extends AbstractIndexComponent implements Clo Property.IndexScope ); + private final ThreadPool threadPool; + private final CircuitBreakerService circuitBreakerService; private final IndicesFieldDataCache indicesFieldDataCache; @@ -96,12 +99,14 @@ public IndexFieldDataService( IndexSettings indexSettings, IndicesFieldDataCache indicesFieldDataCache, CircuitBreakerService circuitBreakerService, - MapperService mapperService + MapperService mapperService, + ThreadPool threadPool ) { super(indexSettings); this.indicesFieldDataCache = indicesFieldDataCache; this.circuitBreakerService = circuitBreakerService; this.mapperService = mapperService; + this.threadPool = threadPool; } public synchronized void clear() { @@ -181,6 +186,17 @@ public void setListener(IndexFieldDataCache.Listener listener) { @Override public void close() throws IOException { - clear(); + // Clear the field data cache for this index in an async manner + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + this.clear(); + } catch (Exception ex) { + logger.warn( + "Exception occurred while clearing index field data cache for index: {}, exception: {}", + indexSettings.getIndex().getName(), + ex + ); + } + }); } } diff --git a/server/src/test/java/org/opensearch/index/fielddata/IndexFieldDataServiceTests.java b/server/src/test/java/org/opensearch/index/fielddata/IndexFieldDataServiceTests.java index 3fb43b7dbdc4e..640ba135b0bad 100644 --- a/server/src/test/java/org/opensearch/index/fielddata/IndexFieldDataServiceTests.java +++ b/server/src/test/java/org/opensearch/index/fielddata/IndexFieldDataServiceTests.java @@ -58,6 +58,7 @@ import org.opensearch.index.mapper.NumberFieldMapper; import org.opensearch.index.mapper.TextFieldMapper; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.cluster.IndicesClusterStateService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; import org.opensearch.plugins.Plugin; import org.opensearch.search.lookup.SearchLookup; @@ -67,6 +68,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -93,7 +95,8 @@ public void testGetForFieldDefaults() { indexService.getIndexSettings(), indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), - indexService.mapperService() + indexService.mapperService(), + indexService.getThreadPool() ); final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1)); final MappedFieldType stringMapper = new KeywordFieldMapper.Builder("string").build(ctx).fieldType(); @@ -127,6 +130,63 @@ public void testGetForFieldDefaults() { assertTrue(fd instanceof SortedNumericIndexFieldData); } + public void testIndexFieldDataCacheIsCleredAfterIndexRemoval() throws IOException, InterruptedException { + final IndexService indexService = createIndex("test"); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + // copy the ifdService since we can set the listener only once. + final IndexFieldDataService ifdService = new IndexFieldDataService( + indexService.getIndexSettings(), + indicesService.getIndicesFieldDataCache(), + indicesService.getCircuitBreakerService(), + indexService.mapperService(), + indexService.getThreadPool() + ); + + final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1)); + final MappedFieldType mapper1 = new TextFieldMapper.Builder("field_1", createDefaultIndexAnalyzers()).fielddata(true) + .build(ctx) + .fieldType(); + final MappedFieldType mapper2 = new TextFieldMapper.Builder("field_2", createDefaultIndexAnalyzers()).fielddata(true) + .build(ctx) + .fieldType(); + final IndexWriter writer = new IndexWriter(new ByteBuffersDirectory(), new IndexWriterConfig(new KeywordAnalyzer())); + Document doc = new Document(); + doc.add(new StringField("field_1", "thisisastring", Store.NO)); + doc.add(new StringField("field_2", "thisisanotherstring", Store.NO)); + writer.addDocument(doc); + final IndexReader reader = DirectoryReader.open(writer); + final AtomicInteger onCacheCalled = new AtomicInteger(); + final AtomicInteger onRemovalCalled = new AtomicInteger(); + ifdService.setListener(new IndexFieldDataCache.Listener() { + @Override + public void onCache(ShardId shardId, String fieldName, Accountable ramUsage) {} + + @Override + public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, long sizeInBytes) {} + }); + IndexFieldData ifd1 = ifdService.getForField(mapper1, "test", () -> { throw new UnsupportedOperationException(); }); + IndexFieldData ifd2 = ifdService.getForField(mapper2, "test", () -> { throw new UnsupportedOperationException(); }); + LeafReaderContext leafReaderContext = reader.getContext().leaves().get(0); + LeafFieldData loadField1 = ifd1.load(leafReaderContext); + LeafFieldData loadField2 = ifd2.load(leafReaderContext); + + assertEquals(2, indicesService.getIndicesFieldDataCache().getCache().count()); + + // Remove index + indicesService.removeIndex( + indexService.index(), + IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED, + "Please delete!" + ); + + waitUntil(() -> indicesService.getIndicesFieldDataCache().getCache().count() == 0); + + reader.close(); + loadField1.close(); + loadField2.close(); + writer.close(); + } + public void testGetForFieldRuntimeField() { final IndexService indexService = createIndex("test"); final IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -134,7 +194,8 @@ public void testGetForFieldRuntimeField() { indexService.getIndexSettings(), indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), - indexService.mapperService() + indexService.mapperService(), + indexService.getThreadPool() ); final SetOnce> searchLookupSetOnce = new SetOnce<>(); MappedFieldType ft = mock(MappedFieldType.class); @@ -159,7 +220,8 @@ public void testClearField() throws Exception { indexService.getIndexSettings(), indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), - indexService.mapperService() + indexService.mapperService(), + indexService.getThreadPool() ); final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1)); @@ -227,7 +289,8 @@ public void testFieldDataCacheListener() throws Exception { indexService.getIndexSettings(), indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), - indexService.mapperService() + indexService.mapperService(), + indexService.getThreadPool() ); final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1)); @@ -284,7 +347,8 @@ public void testSetCacheListenerTwice() { indexService.getIndexSettings(), indicesService.getIndicesFieldDataCache(), indicesService.getCircuitBreakerService(), - indexService.mapperService() + indexService.mapperService(), + indexService.getThreadPool() ); // set it the first time... shardPrivateService.setListener(new IndexFieldDataCache.Listener() { @@ -325,7 +389,8 @@ private void doTestRequireDocValues(MappedFieldType ft) { IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), cache, null, - null + null, + threadPool ); if (ft.hasDocValues()) { ifds.getForField(ft, "test", () -> { throw new UnsupportedOperationException(); }); // no exception diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index c93c997215a55..c03a3873aedfe 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3359,7 +3359,8 @@ public void testReaderWrapperWorksWithGlobalOrdinals() throws IOException { shard.indexSettings, indicesFieldDataCache, new NoneCircuitBreakerService(), - shard.mapperService() + shard.mapperService(), + shard.getThreadPool() ); IndexFieldData.Global ifd = indexFieldDataService.getForField(foo, "test", () -> { throw new UnsupportedOperationException("search lookup not available"); diff --git a/test/framework/src/main/java/org/opensearch/test/AbstractBuilderTestCase.java b/test/framework/src/main/java/org/opensearch/test/AbstractBuilderTestCase.java index 80be999b7d868..b90ef40991003 100644 --- a/test/framework/src/main/java/org/opensearch/test/AbstractBuilderTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/AbstractBuilderTestCase.java @@ -90,6 +90,8 @@ import org.opensearch.script.ScriptModule; import org.opensearch.script.ScriptService; import org.opensearch.search.SearchModule; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; import org.junit.After; import org.junit.AfterClass; @@ -109,6 +111,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; @@ -177,6 +180,7 @@ public abstract class AbstractBuilderTestCase extends OpenSearchTestCase { private static Settings nodeSettings; private static Index index; private static long nowInMillis; + private static ThreadPool threadPool; protected static Index getIndex() { return index; @@ -198,6 +202,7 @@ public static void beforeClass() { index = new Index(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLength(10)); nowInMillis = randomNonNegativeLong(); + threadPool = new TestThreadPool("random_threadpool_name"); } @Override @@ -240,6 +245,7 @@ protected Iterable getMapping() { public static void afterClass() throws Exception { IOUtils.close(serviceHolder); IOUtils.close(serviceHolderWithNoType); + ThreadPool.terminate(threadPool, 1, TimeUnit.MINUTES); serviceHolder = null; serviceHolderWithNoType = null; } @@ -432,7 +438,8 @@ private static class ServiceHolder implements Closeable { idxSettings, indicesFieldDataCache, new NoneCircuitBreakerService(), - mapperService + mapperService, + threadPool ); bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() { @Override