Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.x]
### Added
- Expand fetch phase profiling to support inner hits and top hits aggregation phases ([##18936](https://github.com/opensearch-project/OpenSearch/pull/18936))
- Add temporal routing processors for time-based document routing ([#18920](https://github.com/opensearch-project/OpenSearch/issues/18920))


### Changed

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ setup:
---
"Combined fetch sub-phases profiling":
- skip:
version: " - 3.1.99"
reason: "Fetch phase profiling was introduced in 3.2.0"
version: " - 3.2.99"
reason: "Inner hits fetch phase profiling was introduced in 3.3.0"
features: "contains"

- do:
Expand Down Expand Up @@ -80,12 +80,19 @@ setup:
fields:
"object_field.nested_field": {}

# 1. Verify basic fetch profile structure
- is_true: profile.shards.0.fetch.0
# 1. Verify fetch profile structure - should have main fetch + inner hits fetch
- length: { profile.shards.0.fetch: 2 }

# Main fetch profile
- match: { profile.shards.0.fetch.0.type: "fetch" }
- match: { profile.shards.0.fetch.0.description: "fetch" }
- is_true: profile.shards.0.fetch.0.time_in_nanos

# Inner hits fetch profile
- match: { profile.shards.0.fetch.1.type: "fetch_inner_hits[object_field]" }
- match: { profile.shards.0.fetch.1.description: "fetch_inner_hits[object_field]" }
- is_true: profile.shards.0.fetch.1.time_in_nanos

# 2. Verify detailed breakdown of the main fetch operation
- is_true: profile.shards.0.fetch.0.breakdown
- is_true: profile.shards.0.fetch.0.breakdown.load_stored_fields
Expand All @@ -99,7 +106,20 @@ setup:
- is_true: profile.shards.0.fetch.0.breakdown.create_stored_fields_visitor
- match: { profile.shards.0.fetch.0.breakdown.create_stored_fields_visitor_count: 1}

# 3. Verify all expected fetch sub-phases are present as children
# 3. Verify inner hits fetch breakdown has all required fields (some may be 0)
- is_true: profile.shards.0.fetch.1.breakdown
- gte: { profile.shards.0.fetch.1.breakdown.load_stored_fields: 0 }
- gte: { profile.shards.0.fetch.1.breakdown.load_stored_fields_count: 0 }
- gte: { profile.shards.0.fetch.1.breakdown.load_source: 0 }
- gte: { profile.shards.0.fetch.1.breakdown.load_source_count: 0 }
- gte: { profile.shards.0.fetch.1.breakdown.get_next_reader: 0 }
- match: { profile.shards.0.fetch.1.breakdown.get_next_reader_count: 1 }
- gte: { profile.shards.0.fetch.1.breakdown.build_sub_phase_processors: 0 }
- match: { profile.shards.0.fetch.1.breakdown.build_sub_phase_processors_count: 1 }
- gte: { profile.shards.0.fetch.1.breakdown.create_stored_fields_visitor: 0 }
- match: { profile.shards.0.fetch.1.breakdown.create_stored_fields_visitor_count: 1 }

# 4. Verify all expected fetch sub-phases are present as children in main fetch
- length: { profile.shards.0.fetch.0.children: 9 }
- contains:
profile.shards.0.fetch.0.children:
Expand Down Expand Up @@ -129,6 +149,16 @@ setup:
profile.shards.0.fetch.0.children:
type: "FetchScorePhase"

# 5. Verify inner hits fetch has exactly 1 sub-phase (FetchSourcePhase)
- length: { profile.shards.0.fetch.1.children: 1 }
- match: { profile.shards.0.fetch.1.children.0.type: "FetchSourcePhase" }
- is_true: profile.shards.0.fetch.1.children.0.time_in_nanos
- is_true: profile.shards.0.fetch.1.children.0.breakdown
- is_true: profile.shards.0.fetch.1.children.0.breakdown.process
- gte: { profile.shards.0.fetch.1.children.0.breakdown.process_count: 1 }
- is_true: profile.shards.0.fetch.1.children.0.breakdown.set_next_reader
- match: { profile.shards.0.fetch.1.children.0.breakdown.set_next_reader_count: 1 }

---
"No source or empty fetch profiling":
- skip:
Expand Down Expand Up @@ -169,8 +199,9 @@ setup:
---
"Top-hits aggregation profiling":
- skip:
version: " - 3.1.99"
reason: "Fetch phase profiling was introduced in 3.2.0"
version: " - 3.2.99"
reason: "Top-hits aggregation profiling was introduced in 3.3.0"
features: "contains"

- do:
search:
Expand All @@ -181,13 +212,42 @@ setup:
match:
text_field: "document"
aggs:
top_hits_agg:
top_hits_agg1:
top_hits:
size: 1
top_hits_agg2:
top_hits:
size: 1
sort:
- numeric_field: { order: desc }

- length: { profile.shards.0.fetch: 3 }

- contains:
profile.shards.0.fetch:
type: "fetch"
description: "fetch"

- contains:
profile.shards.0.fetch:
type: "fetch_top_hits_aggregation[top_hits_agg1]"
description: "fetch_top_hits_aggregation[top_hits_agg1]"

- contains:
profile.shards.0.fetch:
type: "fetch_top_hits_aggregation[top_hits_agg2]"
description: "fetch_top_hits_aggregation[top_hits_agg2]"

- is_true: profile.shards.0.fetch.0.time_in_nanos
- is_true: profile.shards.0.fetch.0.breakdown
- is_true: profile.shards.0.fetch.1.time_in_nanos
- is_true: profile.shards.0.fetch.1.breakdown
- is_true: profile.shards.0.fetch.2.time_in_nanos
- is_true: profile.shards.0.fetch.2.breakdown

# Verify that the profile contains a single fetch operation for the query
- length: { profile.shards.0.fetch: 1 }
- match: { profile.shards.0.fetch.0.type: "fetch" }
- match: { profile.shards.0.fetch.0.description: "fetch" }
- length: { profile.shards.0.fetch.0.children: 1 }
- match: { profile.shards.0.fetch.0.children.0.type: "FetchSourcePhase" }
- length: { profile.shards.0.fetch.1.children: 1 }
- match: { profile.shards.0.fetch.1.children.0.type: "FetchSourcePhase" }
- length: { profile.shards.0.fetch.2.children: 1 }
- match: { profile.shards.0.fetch.2.children.0.type: "FetchSourcePhase" }
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.InternalAggregation;
Expand All @@ -46,8 +47,11 @@
import org.opensearch.search.aggregations.metrics.Stats;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.fetch.FetchProfileShardResult;
import org.opensearch.search.profile.query.CollectorResult;
import org.opensearch.search.profile.query.QueryProfileShardResult;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.hamcrest.core.IsNull;
Expand All @@ -69,6 +73,7 @@
import static org.opensearch.search.aggregations.AggregationBuilders.max;
import static org.opensearch.search.aggregations.AggregationBuilders.stats;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.search.aggregations.AggregationBuilders.topHits;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -1000,4 +1005,93 @@ private void assertCollectorResultWithConcurrentSearchEnabled(QueryProfileShardR
assertThat(collectorResult.getCollectorResult().getProfiledChildren().get(1).getReason(), equalTo(REASON_AGGREGATION));
}
}

public void testTopHitsAggregationFetchProfiling() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.setProfile(true)
.setQuery(QueryBuilders.matchAllQuery())
.addAggregation(topHits("top_hits_agg1").size(1))
.addAggregation(topHits("top_hits_agg2").size(1).sort(SortBuilders.fieldSort(NUMBER_FIELD).order(SortOrder.DESC)))
.get();

assertSearchResponse(response);
Map<String, ProfileShardResult> profileResults = response.getProfileResults();
assertNotNull("Profile results should not be null", profileResults);
assertFalse("Profile results should not be empty", profileResults.isEmpty());

int shardsWithDocuments = 0;
int shardsWithCorrectProfile = 0;

for (ProfileShardResult shardResult : profileResults.values()) {
FetchProfileShardResult fetchProfileResult = shardResult.getFetchProfileResult();
if (fetchProfileResult != null && !fetchProfileResult.getFetchProfileResults().isEmpty()) {
shardsWithDocuments++;
List<ProfileResult> fetchProfileResults = fetchProfileResult.getFetchProfileResults();

// Count different types of fetch operations dynamically
int mainFetchCount = 0;
int topHitsAgg1Count = 0;
int topHitsAgg2Count = 0;
ProfileResult topHitsFetch1 = null;
ProfileResult topHitsFetch2 = null;

for (ProfileResult result : fetchProfileResults) {
if ("fetch".equals(result.getQueryName())) {
mainFetchCount++;
} else if (result.getQueryName().contains("top_hits_agg1")) {
if (topHitsFetch1 == null) {
topHitsFetch1 = result; // Keep first instance for validation
}
topHitsAgg1Count++;
} else if (result.getQueryName().contains("top_hits_agg2")) {
if (topHitsFetch2 == null) {
topHitsFetch2 = result; // Keep first instance for validation
}
topHitsAgg2Count++;
}
}

// Verify we have the expected aggregations (concurrent search may create multiple instances)
assertTrue("Should have at least 1 top_hits_agg1 fetch operation", topHitsAgg1Count >= 1);
assertTrue("Should have at least 1 top_hits_agg2 fetch operation", topHitsAgg2Count >= 1);
assertTrue("Should have at least one main fetch operation", mainFetchCount >= 1);
assertTrue("Should have at least 3 total fetch operations", fetchProfileResults.size() >= 3);

assertNotNull("Should have top_hits_agg1 fetch operation", topHitsFetch1);
assertTrue("Should be top_hits aggregation fetch", topHitsFetch1.getQueryName().startsWith("fetch_top_hits_aggregation"));
assertTrue("Should contain aggregation name", topHitsFetch1.getQueryName().contains("top_hits_agg1"));
assertNotNull(topHitsFetch1.getTimeBreakdown());
assertEquals("Top hits fetch should have 1 child (FetchSourcePhase)", 1, topHitsFetch1.getProfiledChildren().size());
assertEquals("FetchSourcePhase", topHitsFetch1.getProfiledChildren().get(0).getQueryName());

assertNotNull("Should have top_hits_agg2 fetch operation", topHitsFetch2);
assertTrue("Should be top_hits aggregation fetch", topHitsFetch2.getQueryName().startsWith("fetch_top_hits_aggregation"));
assertTrue("Should contain aggregation name", topHitsFetch2.getQueryName().contains("top_hits_agg2"));
assertNotNull(topHitsFetch2.getTimeBreakdown());
assertEquals("Top hits fetch should have 1 child (FetchSourcePhase)", 1, topHitsFetch2.getProfiledChildren().size());
assertEquals("FetchSourcePhase", topHitsFetch2.getProfiledChildren().get(0).getQueryName());

for (ProfileResult fetchResult : fetchProfileResults) {
Map<String, Long> breakdown = fetchResult.getTimeBreakdown();
assertTrue(
"CREATE_STORED_FIELDS_VISITOR timing should be present",
breakdown.containsKey("create_stored_fields_visitor")
);
assertTrue("BUILD_SUB_PHASE_PROCESSORS timing should be present", breakdown.containsKey("build_sub_phase_processors"));
assertTrue("GET_NEXT_READER timing should be present", breakdown.containsKey("get_next_reader"));
assertTrue("LOAD_STORED_FIELDS timing should be present", breakdown.containsKey("load_stored_fields"));
assertTrue("LOAD_SOURCE timing should be present", breakdown.containsKey("load_source"));
}

shardsWithCorrectProfile++;
}
}

assertTrue("Should have at least one shard with documents", shardsWithDocuments > 0);
assertEquals(
"All shards with documents should have correct fetch profile structure",
shardsWithDocuments,
shardsWithCorrectProfile
);
}
}
Loading
Loading