diff --git a/CHANGELOG.md b/CHANGELOG.md index 78deb220f4408..573b9834b957c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.x] ### Added +- Expand fetch phase profiling to support inner hits and top hits aggregation phases ([##18936](https://github.com/opensearch-project/OpenSearch/pull/18936)) - Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920)) + ### Changed ### Fixed diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.profile/10_fetch_phase.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.profile/10_fetch_phase.yml index 1de34ee7e3984..05f53cfc7e94b 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.profile/10_fetch_phase.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.profile/10_fetch_phase.yml @@ -42,8 +42,8 @@ setup: --- "Combined fetch sub-phases profiling": - skip: - version: " - 3.1.99" - reason: "Fetch phase profiling was introduced in 3.2.0" + version: " - 3.2.99" + reason: "Inner hits fetch phase profiling was introduced in 3.3.0" features: "contains" - do: @@ -80,12 +80,19 @@ setup: fields: "object_field.nested_field": {} - # 1. Verify basic fetch profile structure - - is_true: profile.shards.0.fetch.0 + # 1. Verify fetch profile structure - should have main fetch + inner hits fetch + - length: { profile.shards.0.fetch: 2 } + + # Main fetch profile - match: { profile.shards.0.fetch.0.type: "fetch" } - match: { profile.shards.0.fetch.0.description: "fetch" } - is_true: profile.shards.0.fetch.0.time_in_nanos + # Inner hits fetch profile + - match: { profile.shards.0.fetch.1.type: "fetch_inner_hits[object_field]" } + - match: { profile.shards.0.fetch.1.description: "fetch_inner_hits[object_field]" } + - is_true: profile.shards.0.fetch.1.time_in_nanos + # 2. Verify detailed breakdown of the main fetch operation - is_true: profile.shards.0.fetch.0.breakdown - is_true: profile.shards.0.fetch.0.breakdown.load_stored_fields @@ -99,7 +106,20 @@ setup: - is_true: profile.shards.0.fetch.0.breakdown.create_stored_fields_visitor - match: { profile.shards.0.fetch.0.breakdown.create_stored_fields_visitor_count: 1} - # 3. Verify all expected fetch sub-phases are present as children + # 3. Verify inner hits fetch breakdown has all required fields (some may be 0) + - is_true: profile.shards.0.fetch.1.breakdown + - gte: { profile.shards.0.fetch.1.breakdown.load_stored_fields: 0 } + - gte: { profile.shards.0.fetch.1.breakdown.load_stored_fields_count: 0 } + - gte: { profile.shards.0.fetch.1.breakdown.load_source: 0 } + - gte: { profile.shards.0.fetch.1.breakdown.load_source_count: 0 } + - gte: { profile.shards.0.fetch.1.breakdown.get_next_reader: 0 } + - match: { profile.shards.0.fetch.1.breakdown.get_next_reader_count: 1 } + - gte: { profile.shards.0.fetch.1.breakdown.build_sub_phase_processors: 0 } + - match: { profile.shards.0.fetch.1.breakdown.build_sub_phase_processors_count: 1 } + - gte: { profile.shards.0.fetch.1.breakdown.create_stored_fields_visitor: 0 } + - match: { profile.shards.0.fetch.1.breakdown.create_stored_fields_visitor_count: 1 } + + # 4. Verify all expected fetch sub-phases are present as children in main fetch - length: { profile.shards.0.fetch.0.children: 9 } - contains: profile.shards.0.fetch.0.children: @@ -129,6 +149,16 @@ setup: profile.shards.0.fetch.0.children: type: "FetchScorePhase" + # 5. Verify inner hits fetch has exactly 1 sub-phase (FetchSourcePhase) + - length: { profile.shards.0.fetch.1.children: 1 } + - match: { profile.shards.0.fetch.1.children.0.type: "FetchSourcePhase" } + - is_true: profile.shards.0.fetch.1.children.0.time_in_nanos + - is_true: profile.shards.0.fetch.1.children.0.breakdown + - is_true: profile.shards.0.fetch.1.children.0.breakdown.process + - gte: { profile.shards.0.fetch.1.children.0.breakdown.process_count: 1 } + - is_true: profile.shards.0.fetch.1.children.0.breakdown.set_next_reader + - match: { profile.shards.0.fetch.1.children.0.breakdown.set_next_reader_count: 1 } + --- "No source or empty fetch profiling": - skip: @@ -169,8 +199,9 @@ setup: --- "Top-hits aggregation profiling": - skip: - version: " - 3.1.99" - reason: "Fetch phase profiling was introduced in 3.2.0" + version: " - 3.2.99" + reason: "Top-hits aggregation profiling was introduced in 3.3.0" + features: "contains" - do: search: @@ -181,13 +212,42 @@ setup: match: text_field: "document" aggs: - top_hits_agg: + top_hits_agg1: + top_hits: + size: 1 + top_hits_agg2: top_hits: size: 1 + sort: + - numeric_field: { order: desc } + + - length: { profile.shards.0.fetch: 3 } + + - contains: + profile.shards.0.fetch: + type: "fetch" + description: "fetch" + + - contains: + profile.shards.0.fetch: + type: "fetch_top_hits_aggregation[top_hits_agg1]" + description: "fetch_top_hits_aggregation[top_hits_agg1]" + + - contains: + profile.shards.0.fetch: + type: "fetch_top_hits_aggregation[top_hits_agg2]" + description: "fetch_top_hits_aggregation[top_hits_agg2]" + + - is_true: profile.shards.0.fetch.0.time_in_nanos + - is_true: profile.shards.0.fetch.0.breakdown + - is_true: profile.shards.0.fetch.1.time_in_nanos + - is_true: profile.shards.0.fetch.1.breakdown + - is_true: profile.shards.0.fetch.2.time_in_nanos + - is_true: profile.shards.0.fetch.2.breakdown - # Verify that the profile contains a single fetch operation for the query - - length: { profile.shards.0.fetch: 1 } - - match: { profile.shards.0.fetch.0.type: "fetch" } - - match: { profile.shards.0.fetch.0.description: "fetch" } - length: { profile.shards.0.fetch.0.children: 1 } - match: { profile.shards.0.fetch.0.children.0.type: "FetchSourcePhase" } + - length: { profile.shards.0.fetch.1.children: 1 } + - match: { profile.shards.0.fetch.1.children.0.type: "FetchSourcePhase" } + - length: { profile.shards.0.fetch.2.children: 1 } + - match: { profile.shards.0.fetch.2.children.0.type: "FetchSourcePhase" } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java index 2f608a0cbe06f..d8bd576ecee04 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/aggregation/AggregationProfilerIT.java @@ -37,6 +37,7 @@ import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.search.SearchResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.opensearch.search.aggregations.BucketOrder; import org.opensearch.search.aggregations.InternalAggregation; @@ -46,8 +47,11 @@ import org.opensearch.search.aggregations.metrics.Stats; import org.opensearch.search.profile.ProfileResult; import org.opensearch.search.profile.ProfileShardResult; +import org.opensearch.search.profile.fetch.FetchProfileShardResult; import org.opensearch.search.profile.query.CollectorResult; import org.opensearch.search.profile.query.QueryProfileShardResult; +import org.opensearch.search.sort.SortBuilders; +import org.opensearch.search.sort.SortOrder; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.hamcrest.core.IsNull; @@ -69,6 +73,7 @@ import static org.opensearch.search.aggregations.AggregationBuilders.max; import static org.opensearch.search.aggregations.AggregationBuilders.stats; import static org.opensearch.search.aggregations.AggregationBuilders.terms; +import static org.opensearch.search.aggregations.AggregationBuilders.topHits; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.containsString; @@ -1000,4 +1005,93 @@ private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardR assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION)); } } + + public void testTopHitsAggregationFetchProfiling() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .setProfile(true) + .setQuery(QueryBuilders.matchAllQuery()) + .addAggregation(topHits("top_hits_agg1").size(1)) + .addAggregation(topHits("top_hits_agg2").size(1).sort(SortBuilders.fieldSort(NUMBER_FIELD).order(SortOrder.DESC))) + .get(); + + assertSearchResponse(response); + Map profileResults = response.getProfileResults(); + assertNotNull("Profile results should not be null", profileResults); + assertFalse("Profile results should not be empty", profileResults.isEmpty()); + + int shardsWithDocuments = 0; + int shardsWithCorrectProfile = 0; + + for (ProfileShardResult shardResult : profileResults.values()) { + FetchProfileShardResult fetchProfileResult = shardResult.getFetchProfileResult(); + if (fetchProfileResult != null && !fetchProfileResult.getFetchProfileResults().isEmpty()) { + shardsWithDocuments++; + List fetchProfileResults = fetchProfileResult.getFetchProfileResults(); + + // Count different types of fetch operations dynamically + int mainFetchCount = 0; + int topHitsAgg1Count = 0; + int topHitsAgg2Count = 0; + ProfileResult topHitsFetch1 = null; + ProfileResult topHitsFetch2 = null; + + for (ProfileResult result : fetchProfileResults) { + if ("fetch".equals(result.getQueryName())) { + mainFetchCount++; + } else if (result.getQueryName().contains("top_hits_agg1")) { + if (topHitsFetch1 == null) { + topHitsFetch1 = result; // Keep first instance for validation + } + topHitsAgg1Count++; + } else if (result.getQueryName().contains("top_hits_agg2")) { + if (topHitsFetch2 == null) { + topHitsFetch2 = result; // Keep first instance for validation + } + topHitsAgg2Count++; + } + } + + // Verify we have the expected aggregations (concurrent search may create multiple instances) + assertTrue("Should have at least 1 top_hits_agg1 fetch operation", topHitsAgg1Count >= 1); + assertTrue("Should have at least 1 top_hits_agg2 fetch operation", topHitsAgg2Count >= 1); + assertTrue("Should have at least one main fetch operation", mainFetchCount >= 1); + assertTrue("Should have at least 3 total fetch operations", fetchProfileResults.size() >= 3); + + assertNotNull("Should have top_hits_agg1 fetch operation", topHitsFetch1); + assertTrue("Should be top_hits aggregation fetch", topHitsFetch1.getQueryName().startsWith("fetch_top_hits_aggregation")); + assertTrue("Should contain aggregation name", topHitsFetch1.getQueryName().contains("top_hits_agg1")); + assertNotNull(topHitsFetch1.getTimeBreakdown()); + assertEquals("Top hits fetch should have 1 child (FetchSourcePhase)", 1, topHitsFetch1.getProfiledChildren().size()); + assertEquals("FetchSourcePhase", topHitsFetch1.getProfiledChildren().get(0).getQueryName()); + + assertNotNull("Should have top_hits_agg2 fetch operation", topHitsFetch2); + assertTrue("Should be top_hits aggregation fetch", topHitsFetch2.getQueryName().startsWith("fetch_top_hits_aggregation")); + assertTrue("Should contain aggregation name", topHitsFetch2.getQueryName().contains("top_hits_agg2")); + assertNotNull(topHitsFetch2.getTimeBreakdown()); + assertEquals("Top hits fetch should have 1 child (FetchSourcePhase)", 1, topHitsFetch2.getProfiledChildren().size()); + assertEquals("FetchSourcePhase", topHitsFetch2.getProfiledChildren().get(0).getQueryName()); + + for (ProfileResult fetchResult : fetchProfileResults) { + Map breakdown = fetchResult.getTimeBreakdown(); + assertTrue( + "CREATE_STORED_FIELDS_VISITOR timing should be present", + breakdown.containsKey("create_stored_fields_visitor") + ); + assertTrue("BUILD_SUB_PHASE_PROCESSORS timing should be present", breakdown.containsKey("build_sub_phase_processors")); + assertTrue("GET_NEXT_READER timing should be present", breakdown.containsKey("get_next_reader")); + assertTrue("LOAD_STORED_FIELDS timing should be present", breakdown.containsKey("load_stored_fields")); + assertTrue("LOAD_SOURCE timing should be present", breakdown.containsKey("load_source")); + } + + shardsWithCorrectProfile++; + } + } + + assertTrue("Should have at least one shard with documents", shardsWithDocuments > 0); + assertEquals( + "All shards with documents should have correct fetch profile structure", + shardsWithDocuments, + shardsWithCorrectProfile + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java index 069a222b899bf..46504ad667235 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/fetch/FetchProfilerIT.java @@ -281,64 +281,98 @@ public void testInnerHitsPhaseProfile() throws Exception { .get(); ensureGreen("test"); - // Index a document with nested fields - client().prepareIndex("test") - .setId("1") - .setSource( - XContentFactory.jsonBuilder() - .startObject() - .startArray("nested_field") - .startObject() - .field("nested_text", "first nested value") - .endObject() - .startObject() - .field("nested_text", "second nested value") - .endObject() - .endArray() - .endObject() - ) - .get(); - refresh(); + // Index many documents to ensure all shards have data + int numDocs = randomIntBetween(100, 150); + IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; i++) { + docs[i] = client().prepareIndex("test") + .setId(String.valueOf(i)) + .setSource( + XContentFactory.jsonBuilder() + .startObject() + .startArray("nested_field") + .startObject() + .field("nested_text", "nested value " + i) + .endObject() + .endArray() + .endObject() + ); + } + indexRandom(true, docs); SearchResponse resp = client().prepareSearch("test") .setQuery( QueryBuilders.nestedQuery("nested_field", QueryBuilders.matchAllQuery(), org.apache.lucene.search.join.ScoreMode.None) - .innerHit(new InnerHitBuilder()) + .innerHit(new InnerHitBuilder().setName("inner_hits_1")) ) .setProfile(true) .get(); - assertFalse(resp.getHits().getAt(0).getInnerHits().isEmpty()); - - assertFetchPhase(resp, "FetchSourcePhase", 1); - - // InnerHitsPhase should no longer be profiled + assertTrue("Should have at least one hit", resp.getHits().getHits().length > 0); + assertFalse("Should have inner hits", resp.getHits().getAt(0).getInnerHits().isEmpty()); + assertEquals("Should have 1 inner hit", 1, resp.getHits().getAt(0).getInnerHits().size()); Map profileResults = resp.getProfileResults(); + assertNotNull("Profile results should not be null", profileResults); + assertFalse("Profile results should not be empty", profileResults.isEmpty()); - boolean foundInnerHitsPhase = false; - boolean foundFetchInnerHits = false; + int shardsWithDocuments = 0; + int shardsWithCorrectProfile = 0; for (ProfileShardResult shardResult : profileResults.values()) { FetchProfileShardResult fetchProfileResult = shardResult.getFetchProfileResult(); + if (fetchProfileResult != null && !fetchProfileResult.getFetchProfileResults().isEmpty()) { + shardsWithDocuments++; + List fetchProfileResults = fetchProfileResult.getFetchProfileResults(); + + assertEquals( + "Every shard with documents should have 2 fetch operations (1 main + 1 inner hit)", + 2, + fetchProfileResults.size() + ); - for (ProfileResult fetchResult : fetchProfileResult.getFetchProfileResults()) { - for (ProfileResult phase : fetchResult.getProfiledChildren()) { - if ("InnerHitsPhase".equals(phase.getQueryName())) { - - foundInnerHitsPhase = true; - Map breakdown = phase.getTimeBreakdown(); - assertTrue(breakdown.containsKey(FetchTimingType.PROCESS.toString())); - assertTrue(breakdown.containsKey(FetchTimingType.SET_NEXT_READER.toString())); - } - } - if ("fetch_inner_hits".equals(fetchResult.getQueryName())) { - foundFetchInnerHits = true; + ProfileResult mainFetch = fetchProfileResults.getFirst(); + assertEquals("fetch", mainFetch.getQueryName()); + assertNotNull(mainFetch.getTimeBreakdown()); + assertTrue("Main fetch should have children", !mainFetch.getProfiledChildren().isEmpty()); + + ProfileResult innerHitsFetch = fetchProfileResults.get(1); + assertTrue("Should be inner hits fetch", innerHitsFetch.getQueryName().startsWith("fetch_inner_hits")); + assertNotNull(innerHitsFetch.getTimeBreakdown()); + assertEquals("Inner hits fetch should have 1 child (FetchSourcePhase)", 1, innerHitsFetch.getProfiledChildren().size()); + assertEquals("FetchSourcePhase", innerHitsFetch.getProfiledChildren().getFirst().getQueryName()); + + for (ProfileResult fetchResult : fetchProfileResults) { + Map breakdown = fetchResult.getTimeBreakdown(); + assertTrue( + "CREATE_STORED_FIELDS_VISITOR timing should be present", + breakdown.containsKey(FetchTimingType.CREATE_STORED_FIELDS_VISITOR.toString()) + ); + assertTrue( + "BUILD_SUB_PHASE_PROCESSORS timing should be present", + breakdown.containsKey(FetchTimingType.BUILD_SUB_PHASE_PROCESSORS.toString()) + ); + assertTrue( + "GET_NEXT_READER timing should be present", + breakdown.containsKey(FetchTimingType.GET_NEXT_READER.toString()) + ); + assertTrue( + "LOAD_STORED_FIELDS timing should be present", + breakdown.containsKey(FetchTimingType.LOAD_STORED_FIELDS.toString()) + ); + assertTrue("LOAD_SOURCE timing should be present", breakdown.containsKey(FetchTimingType.LOAD_SOURCE.toString())); } + + shardsWithCorrectProfile++; } } - assertFalse("InnerHitsPhase should be absent", foundInnerHitsPhase); - assertFalse("fetch_inner_hits profile should be absent", foundFetchInnerHits); + + assertTrue("Should have at least one shard with documents", shardsWithDocuments > 0); + assertEquals( + "All shards with documents should have correct fetch profile structure", + shardsWithDocuments, + shardsWithCorrectProfile + ); } private void assertFetchPhase(SearchResponse resp, String phaseName, int expectedChildren) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregator.java index e30cc9e1b3a4a..e87bcc4e14a3d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/TopHitsAggregator.java @@ -215,7 +215,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE docIdsToLoad[i] = topDocs.scoreDocs[i].doc; } subSearchContext.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); - fetchPhase.execute(subSearchContext, "fetch_top_hits_aggregation"); + fetchPhase.execute(subSearchContext, "fetch_top_hits_aggregation[" + name + "]"); FetchSearchResult fetchResult = subSearchContext.fetchResult(); SearchHit[] internalHits = fetchResult.fetchResult().hits().getHits(); for (int i = 0; i < internalHits.length; i++) { diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 244479fbd10e1..88b8113721a91 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -113,8 +113,7 @@ public void execute(SearchContext context) { public void execute(SearchContext context, String profileDescription) { FetchProfileBreakdown breakdown = null; FetchProfiler fetchProfiler = null; - if (context.getProfilers() != null && "fetch".equals(profileDescription)) { // second condition makes sure only standard fetch phase - // is profiled for now + if (context.getProfilers() != null) { fetchProfiler = context.getProfilers().getFetchProfiler(); if (context.docIdsToLoadSize() > 0) { breakdown = fetchProfiler.startFetchPhase(profileDescription); diff --git a/server/src/main/java/org/opensearch/search/fetch/subphase/InnerHitsPhase.java b/server/src/main/java/org/opensearch/search/fetch/subphase/InnerHitsPhase.java index af31be5f4ae93..0521cf936588b 100644 --- a/server/src/main/java/org/opensearch/search/fetch/subphase/InnerHitsPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/subphase/InnerHitsPhase.java @@ -102,7 +102,7 @@ private void hitExecute(Map innerHi innerHitsContext.setId(hit.getId()); innerHitsContext.setRootLookup(rootLookup); - fetchPhase.execute(innerHitsContext, "fetch_inner_hits"); + fetchPhase.execute(innerHitsContext, "fetch_inner_hits[" + entry.getKey() + "]"); FetchSearchResult fetchResult = innerHitsContext.fetchResult(); SearchHit[] internalHits = fetchResult.fetchResult().hits().getHits(); for (int j = 0; j < internalHits.length; j++) { diff --git a/server/src/main/java/org/opensearch/search/profile/fetch/FlatFetchProfileTree.java b/server/src/main/java/org/opensearch/search/profile/fetch/FlatFetchProfileTree.java index fa5f34526c356..9c9bef2a23e53 100644 --- a/server/src/main/java/org/opensearch/search/profile/fetch/FlatFetchProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/fetch/FlatFetchProfileTree.java @@ -49,32 +49,56 @@ private static class Node { final String element; final FetchProfileBreakdown breakdown; final List children = new ArrayList<>(); + int references; Node(String element) { this.element = element; this.breakdown = new FetchProfileBreakdown(); + this.references = 0; } } private final List roots = new ArrayList<>(); + private final Map rootsMap = new HashMap<>(); private final Map phaseMap = new HashMap<>(); /** Start profiling a new fetch phase and return its breakdown. */ FetchProfileBreakdown startFetchPhase(String element) { - Node node = new Node(element); - roots.add(node); - phaseMap.put(element, node); + // Make phase name unique for concurrent slices by including thread info + String uniqueElement = element + "_" + Thread.currentThread().getId(); + + Node node = rootsMap.get(uniqueElement); + if (node == null) { + node = new Node(element); // Keep original element name for display + roots.add(node); + rootsMap.put(uniqueElement, node); + } + node.references++; + phaseMap.put(uniqueElement, node); return node.breakdown; } /** Start profiling a fetch sub-phase under the specified parent phase. */ FetchProfileBreakdown startSubPhase(String element, String parentElement) { - Node parent = phaseMap.get(parentElement); + // Make phase names unique for concurrent slices + String uniqueParentElement = parentElement + "_" + Thread.currentThread().getId(); + String uniqueElement = element + "_" + Thread.currentThread().getId(); + + Node parent = phaseMap.get(uniqueParentElement); if (parent == null) { throw new IllegalStateException("Parent phase '" + parentElement + "' does not exist for sub-phase '" + element + "'"); } - Node child = new Node(element); - parent.children.add(child); + Node child = null; + for (Node existing : parent.children) { + if (existing.element.equals(element)) { + child = existing; + break; + } + } + if (child == null) { + child = new Node(element); + parent.children.add(child); + } return child.breakdown; } @@ -82,7 +106,17 @@ FetchProfileBreakdown startSubPhase(String element, String parentElement) { * Finish profiling of the specified fetch phase. */ void endFetchPhase(String element) { - phaseMap.remove(element); + // Make phase name unique for concurrent slices + String uniqueElement = element + "_" + Thread.currentThread().getId(); + + Node node = phaseMap.get(uniqueElement); + if (node == null) { + throw new IllegalStateException("Fetch phase '" + element + "' does not exist"); + } + node.references--; + if (node.references == 0) { + phaseMap.remove(uniqueElement); + } } /** diff --git a/server/src/test/java/org/opensearch/search/fetch/FetchProfilePhaseTests.java b/server/src/test/java/org/opensearch/search/fetch/FetchProfilePhaseTests.java index cf4161e9d0562..7576f49c008a0 100644 --- a/server/src/test/java/org/opensearch/search/fetch/FetchProfilePhaseTests.java +++ b/server/src/test/java/org/opensearch/search/fetch/FetchProfilePhaseTests.java @@ -59,6 +59,7 @@ import org.opensearch.search.fetch.subphase.highlight.SearchHighlightContext; import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.internal.SubSearchContext; import org.opensearch.search.lookup.SearchLookup; import org.opensearch.search.lookup.SourceLookup; import org.opensearch.search.profile.ProfileResult; @@ -74,8 +75,10 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -815,9 +818,15 @@ public void testInnerHitsPhaseProfiling() throws Exception { .withParsedQuery(pq) .build(); - InnerHitsContext.InnerHitSubContext innerContext = new DummyInnerHitSubContext("inner", context); + // Create multiple inner hit contexts + InnerHitsContext.InnerHitSubContext innerContext1 = new DummyInnerHitSubContext("inner1", context); + InnerHitsContext.InnerHitSubContext innerContext2 = new DummyInnerHitSubContext("inner2", context); + InnerHitsContext.InnerHitSubContext innerContext3 = new DummyInnerHitSubContext("inner3", context); + InnerHitsContext innerHits = new InnerHitsContext(); - innerHits.addInnerHitDefinition(innerContext); + innerHits.addInnerHitDefinition(innerContext1); + innerHits.addInnerHitDefinition(innerContext2); + innerHits.addInnerHitDefinition(innerContext3); when(context.innerHits()).thenReturn(innerHits); List subPhases = Collections.singletonList(new FetchSourcePhase()); @@ -827,12 +836,15 @@ public void testInnerHitsPhaseProfiling() throws Exception { FetchProfiler fetchProfiler = context.getProfilers().getFetchProfiler(); List profileResults = fetchProfiler.getTree(); - assertThat(profileResults, hasSize(1)); - ProfileResult profile = profileResults.get(0); + // Should have 1 standard fetch profile + 3 inner hits fetch profiles = 4 total + assertThat(profileResults, hasSize(4)); + + // Verify the standard fetch profile (first result) + ProfileResult standardFetchProfile = profileResults.get(0); Map children = new HashMap<>(); - for (ProfileResult child : profile.getProfiledChildren()) { + for (ProfileResult child : standardFetchProfile.getProfiledChildren()) { children.put(child.getQueryName(), child); } @@ -842,6 +854,129 @@ public void testInnerHitsPhaseProfiling() throws Exception { new TimingAssertions(children.get("FetchSourcePhase").getTimeBreakdown()).assertTimingPresent(FetchTimingType.PROCESS) .assertTimingPresent(FetchTimingType.SET_NEXT_READER); + + Set expectedInnerHitNames = Set.of("inner1", "inner2", "inner3"); + Set actualInnerHitNames = new HashSet<>(); + List innerHitsProfiles = new ArrayList<>(); + + for (int i = 1; i < profileResults.size(); i++) { + ProfileResult profile = profileResults.get(i); + String profileName = profile.getQueryName(); + + assertTrue("Profile name should start with 'fetch_inner_hits['", profileName.startsWith("fetch_inner_hits[")); + assertTrue("Profile name should end with ']'", profileName.endsWith("]")); + + String innerHitName = profileName.substring("fetch_inner_hits[".length(), profileName.length() - 1); + actualInnerHitNames.add(innerHitName); + innerHitsProfiles.add(profile); + } + + assertEquals("Should have all expected inner hit names", expectedInnerHitNames, actualInnerHitNames); + assertEquals("Should have 3 inner hits profiles", 3, innerHitsProfiles.size()); + + for (ProfileResult innerHitsFetchProfile : innerHitsProfiles) { + new TimingAssertions(innerHitsFetchProfile.getTimeBreakdown()).assertBreakdownNotEmpty() + .assertTimingPresent(FetchTimingType.CREATE_STORED_FIELDS_VISITOR) + .assertTimingPresent(FetchTimingType.LOAD_SOURCE) + .assertTimingPresent(FetchTimingType.LOAD_STORED_FIELDS) + .assertTimingPresent(FetchTimingType.BUILD_SUB_PHASE_PROCESSORS) + .assertTimingPresent(FetchTimingType.GET_NEXT_READER); + + children = new HashMap<>(); + for (ProfileResult child : innerHitsFetchProfile.getProfiledChildren()) { + children.put(child.getQueryName(), child); + } + + String innerHitName = innerHitsFetchProfile.getQueryName(); + assertEquals("Inner hit " + innerHitName + " should have 1 sub-phase", 1, children.size()); + assertTrue("Inner hit " + innerHitName + " should contain FetchSourcePhase", children.containsKey("FetchSourcePhase")); + + new TimingAssertions(children.get("FetchSourcePhase").getTimeBreakdown()).assertTimingPresent(FetchTimingType.PROCESS) + .assertTimingPresent(FetchTimingType.SET_NEXT_READER); + } + } + } + } + + public void testTopHitsAggregationFetchProfiling() throws Exception { + try (Directory dir = newDirectory()) { + List docs = new TestDocumentBuilder().addDocuments(3, true).build(); + int[] docIds = indexDocumentsAndGetIds(dir, docs, 3); + + try (IndexReader reader = DirectoryReader.open(dir)) { + QueryShardContext qsc = mock(QueryShardContext.class); + ParsedQuery pq = new ParsedQuery(new MatchAllDocsQuery()); + + // Create the main search context + SearchContext mainContext = new SearchContextBuilder(reader, docIds, indexShard).withSourceLoading() + .withStoredFields("_source") + .withQueryShardContext(qsc) + .withParsedQuery(pq) + .build(); + + // Create multiple SubSearchContext instances to simulate top hits aggregations + SubSearchContext topHitsContext1 = new SubSearchContext(mainContext); + topHitsContext1.docIdsToLoad(new int[] { docIds[0] }, 0, 1); + topHitsContext1.size(1); + + SubSearchContext topHitsContext2 = new SubSearchContext(mainContext); + topHitsContext2.docIdsToLoad(new int[] { docIds[1] }, 0, 1); + topHitsContext2.size(1); + + SubSearchContext topHitsContext3 = new SubSearchContext(mainContext); + topHitsContext3.docIdsToLoad(new int[] { docIds[2] }, 0, 1); + topHitsContext3.size(1); + + List subPhases = Collections.singletonList(new FetchSourcePhase()); + FetchPhase fetchPhase = new FetchPhase(subPhases); + + fetchPhase.execute(topHitsContext1, "fetch_top_hits_aggregation[top_hits_agg1]"); + fetchPhase.execute(topHitsContext2, "fetch_top_hits_aggregation[top_hits_agg2]"); + fetchPhase.execute(topHitsContext3, "fetch_top_hits_aggregation[top_hits_agg3]"); + + FetchProfiler fetchProfiler = mainContext.getProfilers().getFetchProfiler(); + List profileResults = fetchProfiler.getTree(); + + assertThat(profileResults, hasSize(3)); + + Set expectedTopHitsNames = Set.of("top_hits_agg1", "top_hits_agg2", "top_hits_agg3"); + Set actualTopHitsNames = new HashSet<>(); + + for (ProfileResult profile : profileResults) { + String profileName = profile.getQueryName(); + + assertTrue( + "Profile name should start with 'fetch_top_hits_aggregation['", + profileName.startsWith("fetch_top_hits_aggregation[") + ); + assertTrue("Profile name should end with ']'", profileName.endsWith("]")); + + String topHitsName = profileName.substring("fetch_top_hits_aggregation[".length(), profileName.length() - 1); + actualTopHitsNames.add(topHitsName); + } + + assertEquals("Should have all expected top hits names", expectedTopHitsNames, actualTopHitsNames); + + for (ProfileResult topHitsFetchProfile : profileResults) { + new TimingAssertions(topHitsFetchProfile.getTimeBreakdown()).assertBreakdownNotEmpty() + .assertTimingPresent(FetchTimingType.CREATE_STORED_FIELDS_VISITOR) + .assertTimingPresent(FetchTimingType.LOAD_SOURCE) + .assertTimingPresent(FetchTimingType.LOAD_STORED_FIELDS) + .assertTimingPresent(FetchTimingType.BUILD_SUB_PHASE_PROCESSORS) + .assertTimingPresent(FetchTimingType.GET_NEXT_READER); + + Map children = new HashMap<>(); + for (ProfileResult child : topHitsFetchProfile.getProfiledChildren()) { + children.put(child.getQueryName(), child); + } + + String topHitsName = topHitsFetchProfile.getQueryName(); + assertEquals("Top hits " + topHitsName + " should have 1 sub-phase", 1, children.size()); + assertTrue("Top hits " + topHitsName + " should contain FetchSourcePhase", children.containsKey("FetchSourcePhase")); + + new TimingAssertions(children.get("FetchSourcePhase").getTimeBreakdown()).assertTimingPresent(FetchTimingType.PROCESS) + .assertTimingPresent(FetchTimingType.SET_NEXT_READER); + } } } } diff --git a/server/src/test/java/org/opensearch/search/profile/fetch/FetchProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/fetch/FetchProfilerTests.java index 8caea1730b542..faa5e24ad4ade 100644 --- a/server/src/test/java/org/opensearch/search/profile/fetch/FetchProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/fetch/FetchProfilerTests.java @@ -96,4 +96,32 @@ public void testTimerAggregation() { assertThat(map.get(FetchTimingType.PROCESS + TIMING_TYPE_COUNT_SUFFIX), equalTo(2L)); assertThat(breakdown.toNodeTime(), equalTo(map.get(FetchTimingType.PROCESS.toString()))); } + + public void testSubPhaseConsolidation() { + FetchProfiler profiler = new FetchProfiler(); + + profiler.startFetchPhase("fetch"); + FetchProfileBreakdown child1 = profiler.startSubPhase("phase", "fetch"); + Timer timer1 = child1.getTimer(FetchTimingType.PROCESS); + timer1.start(); + timer1.stop(); + profiler.endFetchPhase("fetch"); + + profiler.startFetchPhase("fetch"); + FetchProfileBreakdown child2 = profiler.startSubPhase("phase", "fetch"); + Timer timer2 = child2.getTimer(FetchTimingType.PROCESS); + timer2.start(); + timer2.stop(); + profiler.endFetchPhase("fetch"); + + List results = profiler.getTree(); + assertEquals(1, results.size()); + ProfileResult profileResult = results.get(0); + assertEquals("fetch", profileResult.getQueryName()); + assertEquals(1, profileResult.getProfiledChildren().size()); + ProfileResult sub = profileResult.getProfiledChildren().get(0); + assertEquals("phase", sub.getQueryName()); + Map breakdown = sub.getTimeBreakdown(); + assertThat(breakdown.get(FetchTimingType.PROCESS.toString() + TIMING_TYPE_COUNT_SUFFIX), equalTo(2L)); + } }