Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
Junwei Dai committed Dec 26, 2024
1 parent 0ae4d06 commit 500ac7c
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add `verbose_pipeline` parameter to output each processor's execution details ([#14745](https://github.com/opensearch-project/OpenSearch/pull/14745)).
- Add `verbose_pipeline` parameter to output each processor's execution details ([#16843](https://github.com/opensearch-project/OpenSearch/pull/16843)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ public SearchSourceBuilder verbosePipeline(boolean verbosePipeline) {
return this;
}

public Boolean verbosePipeline() {
public boolean verbosePipeline() {
return verbosePipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
}

private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
return (in.getVersion().onOrAfter(Version.V_2_18_0)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
return (in.getVersion().onOrAfter(Version.CURRENT)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
}

private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeCollection(processorResult, (o, detail) -> detail.writeTo(o));
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeList(processorResult);
}
}

Expand Down
18 changes: 11 additions & 7 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,17 @@ void transformRequest(SearchRequest request, ActionListener<SearchRequest> reque
SearchRequestProcessor processor = searchRequestProcessors.get(i);
currentListener = ActionListener.wrap(r -> {
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType());
detail.addInput(r.source().shallowCopy());
if (r.source().verbosePipeline()) {
detail.addInput(r.source().shallowCopy());
}
long start = relativeTimeSupplier.getAsLong();
beforeRequestProcessor(processor);
processor.processRequestAsync(r, requestContext, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
detail.addOutput(rr.source().shallowCopy());
detail.addTook(took);
if (rr.source().verbosePipeline()) {
detail.addOutput(rr.source().shallowCopy());
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
}
nextListener.onResponse(rr);
Expand Down Expand Up @@ -210,7 +212,7 @@ ActionListener<SearchResponse> transformResponseListener(
) {
if (searchResponseProcessors.isEmpty()) {
// No response transformation necessary
if (!requestContext.getProcessorExecutionDetails().isEmpty()) {
if (request.source() != null && request.source().verbosePipeline()) {
ActionListener<SearchResponse> finalResponseListener = responseListener;
return ActionListener.wrap(r -> {
List<ProcessorExecutionDetail> details = requestContext.getProcessorExecutionDetails();
Expand Down Expand Up @@ -242,15 +244,17 @@ ActionListener<SearchResponse> transformResponseListener(

responseListener = ActionListener.wrap(r -> {
ProcessorExecutionDetail detail = new ProcessorExecutionDetail(processor.getType());
detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits()));
if (request.source().verbosePipeline()) {
detail.addInput(Arrays.asList(r.getHits().deepCopy().getHits()));
}
beforeResponseProcessor(processor);
final long start = relativeTimeSupplier.getAsLong();
processor.processResponseAsync(request, r, requestContext, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits()));
detail.addTook(took);
if (request.source().verbosePipeline()) {
detail.addOutput(Arrays.asList(rr.getHits().deepCopy().getHits()));
detail.addTook(took);
requestContext.addProcessorExecutionDetail(detail);
rr.getInternalResponse().getProcessorResult().add(detail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ public Object getAttribute(String name) {
*
* @param detail the ProcessorExecutionDetail to add
*/
@SuppressWarnings("unchecked")
public void addProcessorExecutionDetail(ProcessorExecutionDetail detail) {
attributes.computeIfAbsent(PROCESSOR_EXECUTION_DETAILS_KEY, k -> new ArrayList<ProcessorExecutionDetail>());
List<ProcessorExecutionDetail> details = (List<ProcessorExecutionDetail>) attributes.get(PROCESSOR_EXECUTION_DETAILS_KEY);
@SuppressWarnings("unchecked")
List<ProcessorExecutionDetail> details = (List<ProcessorExecutionDetail>) attributes.computeIfAbsent(
PROCESSOR_EXECUTION_DETAILS_KEY,
k -> new ArrayList<>()
);
details.add(detail);
}

Expand Down

0 comments on commit 500ac7c

Please sign in to comment.