Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
- Increase the floor segment size to 16MB ([#17699](https://github.com/opensearch-project/OpenSearch/pull/17699))
- Introduce 512 byte limit to search and ingest pipeline IDs ([#17786](https://github.com/opensearch-project/OpenSearch/pull/17786))

### Dependencies
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669))
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.UnicodeUtil;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchParseException;
import org.opensearch.ResourceNotFoundException;
Expand Down Expand Up @@ -86,6 +87,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -107,6 +109,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
public static final String NOOP_PIPELINE_NAME = "_none";

public static final String INGEST_ORIGIN = "ingest";
private static final int MAX_PIPELINE_ID_BYTES = 512;

/**
* Defines the limit for the number of processors which can run on a given document during ingestion.
Expand Down Expand Up @@ -512,6 +515,20 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineReq
throw new IllegalStateException("Ingest info is empty");
}

int pipelineIdLength = UnicodeUtil.calcUTF16toUTF8Length(request.getId(), 0, request.getId().length());

if (pipelineIdLength > MAX_PIPELINE_ID_BYTES) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Pipeline id [%s] exceeds maximum length of %d UTF-8 bytes (actual: %d bytes)",
request.getId(),
MAX_PIPELINE_ID_BYTES,
pipelineIdLength
)
);
}

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.UnicodeUtil;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchParseException;
import org.opensearch.ResourceNotFoundException;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -73,6 +75,7 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ
public static final String SEARCH_PIPELINE_ORIGIN = "search_pipeline";
public static final String AD_HOC_PIPELINE_ID = "_ad_hoc_pipeline";
public static final String NOOP_PIPELINE_ID = "_none";
private static final int MAX_PIPELINE_ID_BYTES = 512;
private static final Logger logger = LogManager.getLogger(SearchPipelineService.class);
private final ClusterService clusterService;
private final ScriptService scriptService;
Expand Down Expand Up @@ -278,6 +281,21 @@ void validatePipeline(Map<DiscoveryNode, SearchPipelineInfo> searchPipelineInfos
if (searchPipelineInfos.isEmpty()) {
throw new IllegalStateException("Search pipeline info is empty");
}

int pipelineIdLength = UnicodeUtil.calcUTF16toUTF8Length(request.getId(), 0, request.getId().length());

if (pipelineIdLength > MAX_PIPELINE_ID_BYTES) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"Search Pipeline id [%s] exceeds maximum length of %d UTF-8 bytes (actual: %d bytes)",
request.getId(),
MAX_PIPELINE_ID_BYTES,
pipelineIdLength
)
);
}

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getMediaType()).v2();
Pipeline pipeline = PipelineWithMetrics.create(
request.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -395,6 +396,39 @@ public void testValidateNoIngestInfo() throws Exception {
ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest);
}

public void testValidatePipelineId_WithNotValidLength_ShouldThrowException() throws Exception {
IngestService ingestService = createWithProcessors();

String longId = "a".repeat(512) + "a";
PutPipelineRequest putRequest = new PutPipelineRequest(
longId,
new BytesArray(
"{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}},"
+ "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"
),
MediaTypeRegistry.JSON
);
DiscoveryNode discoveryNode = new DiscoveryNode(
"_node_id",
buildNewFakeTransportAddress(),
emptyMap(),
emptySet(),
Version.CURRENT
);
IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set")));

Exception e = expectThrows(
IllegalArgumentException.class,
() -> ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest)
);
String errorMessage = String.format(
Locale.ROOT,
"Pipeline id [%s] exceeds maximum length of 512 UTF-8 bytes (actual: 513 bytes)",
longId
);
assertEquals(errorMessage, e.getMessage());
}

public void testGetProcessorsInPipeline() throws Exception {
IngestService ingestService = createWithProcessors();
String id = "_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -878,7 +879,6 @@ public void testValidatePipeline() throws Exception {

ProcessorInfo reqProcessor = new ProcessorInfo("scale_request_size");
ProcessorInfo rspProcessor = new ProcessorInfo("fixed_score");
ProcessorInfo injProcessor = new ProcessorInfo("max_score");
DiscoveryNode n1 = new DiscoveryNode("n1", buildNewFakeTransportAddress(), Version.CURRENT);
DiscoveryNode n2 = new DiscoveryNode("n2", buildNewFakeTransportAddress(), Version.CURRENT);
PutSearchPipelineRequest putRequest = new PutSearchPipelineRequest(
Expand All @@ -893,6 +893,13 @@ public void testValidatePipeline() throws Exception {
MediaTypeRegistry.JSON
);

String longId = "a".repeat(512) + "a";
PutSearchPipelineRequest maxLengthIdPutRequest = new PutSearchPipelineRequest(
longId,
new BytesArray("{\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : \"foo\" } } ] }"),
MediaTypeRegistry.JSON
);

SearchPipelineInfo completePipelineInfo = new SearchPipelineInfo(
Map.of(Pipeline.REQUEST_PROCESSORS_KEY, List.of(reqProcessor), Pipeline.RESPONSE_PROCESSORS_KEY, List.of(rspProcessor))
);
Expand All @@ -906,6 +913,18 @@ public void testValidatePipeline() throws Exception {
// Discovery failed, no infos passed.
expectThrows(IllegalStateException.class, () -> searchPipelineService.validatePipeline(Collections.emptyMap(), putRequest));

// Max length of pipeline length
Exception e = expectThrows(
IllegalArgumentException.class,
() -> searchPipelineService.validatePipeline(Map.of(n1, completePipelineInfo), maxLengthIdPutRequest)
);
String errorMessage = String.format(
Locale.ROOT,
"Search Pipeline id [%s] exceeds maximum length of 512 UTF-8 bytes (actual: 513 bytes)",
longId
);
assertEquals(errorMessage, e.getMessage());

// Invalid configuration in request
PutSearchPipelineRequest badPutRequest = new PutSearchPipelineRequest(
"p1",
Expand Down
Loading