Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Use `new SecureRandom()` to avoid blocking ([18729](https://github.com/opensearch-project/OpenSearch/issues/18729))
- Use ScoreDoc instead of FieldDoc when creating TopScoreDocCollectorManager to avoid unnecessary conversion ([#18802](https://github.com/opensearch-project/OpenSearch/pull/18802))
- Fix leafSorter optimization for ReadOnlyEngine and NRTReplicationEngine ([#18639](https://github.com/opensearch-project/OpenSearch/pull/18639))
- Close IndexFieldDataService asynchronously ([#18888](https://github.com/opensearch-project/OpenSearch/pull/18888))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,13 @@ public IndexService(
idFieldDataEnabled,
scriptService
);
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
this.indexFieldData = new IndexFieldDataService(
indexSettings,
indicesFieldDataCache,
circuitBreakerService,
mapperService,
threadPool
);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
// The sort order is validated right after the merge of the mapping later in the process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.search.lookup.SearchLookup;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class IndexFieldDataService extends AbstractIndexComponent implements Clo
Property.IndexScope
);

private final ThreadPool threadPool;

private final CircuitBreakerService circuitBreakerService;

private final IndicesFieldDataCache indicesFieldDataCache;
Expand All @@ -96,12 +99,14 @@ public IndexFieldDataService(
IndexSettings indexSettings,
IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService,
MapperService mapperService
MapperService mapperService,
ThreadPool threadPool
) {
super(indexSettings);
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
this.mapperService = mapperService;
this.threadPool = threadPool;
}

public synchronized void clear() {
Expand Down Expand Up @@ -181,6 +186,7 @@ public void setListener(IndexFieldDataCache.Listener listener) {

@Override
public void close() throws IOException {
clear();
// Clear the field data cache for this index in an async manner
threadPool.executor(ThreadPool.Names.GENERIC).execute(this::clear);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.mapper.TextFieldMapper;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.lookup.SearchLookup;
Expand All @@ -67,6 +68,7 @@
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -93,7 +95,8 @@ public void testGetForFieldDefaults() {
indexService.getIndexSettings(),
indicesService.getIndicesFieldDataCache(),
indicesService.getCircuitBreakerService(),
indexService.mapperService()
indexService.mapperService(),
indexService.getThreadPool()
);
final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
final MappedFieldType stringMapper = new KeywordFieldMapper.Builder("string").build(ctx).fieldType();
Expand Down Expand Up @@ -127,14 +130,72 @@ public void testGetForFieldDefaults() {
assertTrue(fd instanceof SortedNumericIndexFieldData);
}

public void testIndexFieldDataCacheIsCleredAfterIndexRemoval() throws IOException, InterruptedException {
final IndexService indexService = createIndex("test");
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
// copy the ifdService since we can set the listener only once.
final IndexFieldDataService ifdService = new IndexFieldDataService(
indexService.getIndexSettings(),
indicesService.getIndicesFieldDataCache(),
indicesService.getCircuitBreakerService(),
indexService.mapperService(),
indexService.getThreadPool()
);

final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
final MappedFieldType mapper1 = new TextFieldMapper.Builder("field_1", createDefaultIndexAnalyzers()).fielddata(true)
.build(ctx)
.fieldType();
final MappedFieldType mapper2 = new TextFieldMapper.Builder("field_2", createDefaultIndexAnalyzers()).fielddata(true)
.build(ctx)
.fieldType();
final IndexWriter writer = new IndexWriter(new ByteBuffersDirectory(), new IndexWriterConfig(new KeywordAnalyzer()));
Document doc = new Document();
doc.add(new StringField("field_1", "thisisastring", Store.NO));
doc.add(new StringField("field_2", "thisisanotherstring", Store.NO));
writer.addDocument(doc);
final IndexReader reader = DirectoryReader.open(writer);
final AtomicInteger onCacheCalled = new AtomicInteger();
final AtomicInteger onRemovalCalled = new AtomicInteger();
ifdService.setListener(new IndexFieldDataCache.Listener() {
@Override
public void onCache(ShardId shardId, String fieldName, Accountable ramUsage) {}

@Override
public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, long sizeInBytes) {}
});
IndexFieldData<?> ifd1 = ifdService.getForField(mapper1, "test", () -> { throw new UnsupportedOperationException(); });
IndexFieldData<?> ifd2 = ifdService.getForField(mapper2, "test", () -> { throw new UnsupportedOperationException(); });
LeafReaderContext leafReaderContext = reader.getContext().leaves().get(0);
LeafFieldData loadField1 = ifd1.load(leafReaderContext);
LeafFieldData loadField2 = ifd2.load(leafReaderContext);

assertEquals(2, indicesService.getIndicesFieldDataCache().getCache().count());

// Remove index
indicesService.removeIndex(
indexService.index(),
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED,
"Please delete!"
);

waitUntil(() -> indicesService.getIndicesFieldDataCache().getCache().count() == 0);

reader.close();
loadField1.close();
loadField2.close();
writer.close();
}

public void testGetForFieldRuntimeField() {
final IndexService indexService = createIndex("test");
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexFieldDataService ifdService = new IndexFieldDataService(
indexService.getIndexSettings(),
indicesService.getIndicesFieldDataCache(),
indicesService.getCircuitBreakerService(),
indexService.mapperService()
indexService.mapperService(),
indexService.getThreadPool()
);
final SetOnce<Supplier<SearchLookup>> searchLookupSetOnce = new SetOnce<>();
MappedFieldType ft = mock(MappedFieldType.class);
Expand All @@ -159,7 +220,8 @@ public void testClearField() throws Exception {
indexService.getIndexSettings(),
indicesService.getIndicesFieldDataCache(),
indicesService.getCircuitBreakerService(),
indexService.mapperService()
indexService.mapperService(),
indexService.getThreadPool()
);

final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
Expand Down Expand Up @@ -227,7 +289,8 @@ public void testFieldDataCacheListener() throws Exception {
indexService.getIndexSettings(),
indicesService.getIndicesFieldDataCache(),
indicesService.getCircuitBreakerService(),
indexService.mapperService()
indexService.mapperService(),
indexService.getThreadPool()
);

final BuilderContext ctx = new BuilderContext(indexService.getIndexSettings().getSettings(), new ContentPath(1));
Expand Down Expand Up @@ -284,7 +347,8 @@ public void testSetCacheListenerTwice() {
indexService.getIndexSettings(),
indicesService.getIndicesFieldDataCache(),
indicesService.getCircuitBreakerService(),
indexService.mapperService()
indexService.mapperService(),
indexService.getThreadPool()
);
// set it the first time...
shardPrivateService.setListener(new IndexFieldDataCache.Listener() {
Expand Down Expand Up @@ -325,7 +389,8 @@ private void doTestRequireDocValues(MappedFieldType ft) {
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
cache,
null,
null
null,
threadPool
);
if (ft.hasDocValues()) {
ifds.getForField(ft, "test", () -> { throw new UnsupportedOperationException(); }); // no exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3359,7 +3359,8 @@ public void testReaderWrapperWorksWithGlobalOrdinals() throws IOException {
shard.indexSettings,
indicesFieldDataCache,
new NoneCircuitBreakerService(),
shard.mapperService()
shard.mapperService(),
shard.getThreadPool()
);
IndexFieldData.Global ifd = indexFieldDataService.getForField(foo, "test", () -> {
throw new UnsupportedOperationException("search lookup not available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
import org.opensearch.script.ScriptModule;
import org.opensearch.script.ScriptService;
import org.opensearch.search.SearchModule;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -109,6 +111,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -177,6 +180,7 @@ public abstract class AbstractBuilderTestCase extends OpenSearchTestCase {
private static Settings nodeSettings;
private static Index index;
private static long nowInMillis;
private static ThreadPool threadPool;

protected static Index getIndex() {
return index;
Expand All @@ -198,6 +202,7 @@ public static void beforeClass() {

index = new Index(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLength(10));
nowInMillis = randomNonNegativeLong();
threadPool = new TestThreadPool("random_threadpool_name");
}

@Override
Expand Down Expand Up @@ -240,6 +245,7 @@ protected Iterable<MappedFieldType> getMapping() {
public static void afterClass() throws Exception {
IOUtils.close(serviceHolder);
IOUtils.close(serviceHolderWithNoType);
ThreadPool.terminate(threadPool, 1, TimeUnit.MINUTES);
serviceHolder = null;
serviceHolderWithNoType = null;
}
Expand Down Expand Up @@ -432,7 +438,8 @@ private static class ServiceHolder implements Closeable {
idxSettings,
indicesFieldDataCache,
new NoneCircuitBreakerService(),
mapperService
mapperService,
threadPool
);
bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() {
@Override
Expand Down
Loading