Skip to content

Commit

Permalink
1.add todo to change version.current
Browse files Browse the repository at this point in the history
2.use exist xcontentUtil to read
3.move processor excution key to ProcessorExecutionDetail

Signed-off-by: Junwei Dai <[email protected]>
  • Loading branch information
Junwei Dai committed Dec 31, 2024
1 parent 500ac7c commit 7526ef4
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_18_0)) {
searchPipeline = in.readOptionalString();
}
// Todo: change version to 2_19_0
if (in.getVersion().onOrAfter(Version.CURRENT)) {
verbosePipeline = in.readBoolean();
}
Expand Down Expand Up @@ -391,6 +392,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_18_0)) {
out.writeOptionalString(searchPipeline);
}
// Todo: change version to 2_19_0
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalBoolean(verbosePipeline);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,12 @@ private static void writeSearchExtBuildersOnOrAfter(StreamOutput out, List<Searc
}

private static List<ProcessorExecutionDetail> readProcessorResultOnOrAfter(StreamInput in) throws IOException {
// Todo: change to 2.19
return (in.getVersion().onOrAfter(Version.CURRENT)) ? in.readList(ProcessorExecutionDetail::new) : Collections.emptyList();
}

private static void writeProcessorResultOnOrAfter(StreamOutput out, List<ProcessorExecutionDetail> processorResult) throws IOException {
// Todo: change to 2.19
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeList(processorResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
import java.util.List;
import java.util.Map;

import static org.opensearch.search.pipeline.ProcessorExecutionDetail.PROCESSOR_EXECUTION_DETAILS_KEY;

/**
* A holder for state that is passed through each processor in the pipeline.
*/
public class PipelineProcessingContext {
private final Map<String, Object> attributes = new HashMap<>();

// Key for processor execution details
private static final String PROCESSOR_EXECUTION_DETAILS_KEY = "processorExecutionDetails";

/**
* Set a generic attribute in the state for this request. Overwrites any existing value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.search.pipeline;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.xcontent.XContentUtils;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -18,7 +19,6 @@
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -29,7 +29,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "1.0.0")
@PublicApi(since = "2.19.0")
public class ProcessorExecutionDetail implements Writeable, ToXContentObject {

private final String processorName;
Expand All @@ -40,6 +40,8 @@ public class ProcessorExecutionDetail implements Writeable, ToXContentObject {
public static final ParseField DURATION_MILLIS_FIELD = new ParseField("duration_millis");
public static final ParseField INPUT_DATA_FIELD = new ParseField("input_data");
public static final ParseField OUTPUT_DATA_FIELD = new ParseField("output_data");
// Key for processor execution details
public static final String PROCESSOR_EXECUTION_DETAILS_KEY = "processorExecutionDetails";

/**
* Constructor for ProcessorExecutionDetail
Expand Down Expand Up @@ -182,9 +184,9 @@ public static ProcessorExecutionDetail fromXContent(XContentParser parser) throw
} else if (DURATION_MILLIS_FIELD.match(fieldName, parser.getDeprecationHandler())) {
durationMillis = parser.longValue();
} else if (INPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) {
inputData = parseFieldFromXContent(parser);
inputData = XContentUtils.readValue(parser, parser.currentToken());
} else if (OUTPUT_DATA_FIELD.match(fieldName, parser.getDeprecationHandler())) {
outputData = parseFieldFromXContent(parser);
outputData = XContentUtils.readValue(parser, parser.currentToken());
} else {
parser.skipChildren();
}
Expand All @@ -197,33 +199,6 @@ public static ProcessorExecutionDetail fromXContent(XContentParser parser) throw
return new ProcessorExecutionDetail(processorName, durationMillis, inputData, outputData);
}

private static Object parseFieldFromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NULL) {
return null;
} else if (token == XContentParser.Token.START_ARRAY) {
return parseArrayFromXContent(parser);
} else if (token == XContentParser.Token.START_OBJECT) {
return parser.map();
} else {
return parser.textOrNull();
}
}

private static List<Object> parseArrayFromXContent(XContentParser parser) throws IOException {
List<Object> list = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
list.add(parser.map());
} else if (parser.currentToken() == XContentParser.Token.START_ARRAY) {
list.add(parseArrayFromXContent(parser));
} else {
list.add(parser.textOrNull());
}
}
return list;
}

@Override
public int hashCode() {
return Objects.hash(processorName, durationMillis, inputData, outputData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void testFromXContent() throws IOException {
assertEquals("testProcessor", detail.getProcessorName());
assertEquals(123L, detail.getDurationMillis());
assertEquals(Map.of("key1", "value1"), detail.getInputData());
assertEquals(List.of(1, 2, 3), detail.getOutputData());
}
}
}

0 comments on commit 7526ef4

Please sign in to comment.