@@ -213,8 +213,6 @@ public TransportBulkAction(
213213 /**
214214 * Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions. Upserts are
215215 * modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.opensearch.action.delete.DeleteRequest}'s
216- * Note: with the introduction of system ingest pipelines, we occasionally want to operate on all potential
217- * child index requests of UpdateRequests. In that case, use {@link UpdateRequest#getChildIndexRequests()} instead
218216 *
219217 * @param docWriteRequest The request to find the {@link IndexRequest}
220218 * @return the found {@link IndexRequest} or {@code null} if one can not be found.
@@ -247,64 +245,9 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
247245 final long startTime = relativeTime ();
248246 final AtomicArray <BulkItemResponse > responses = new AtomicArray <>(bulkRequest .requests .size ());
249247
250- boolean hasIndexRequestsWithPipelines = false ;
251248 final Metadata metadata = clusterService .state ().getMetadata ();
252249 final Version minNodeVersion = clusterService .state ().getNodes ().getMinNodeVersion ();
253- for (DocWriteRequest <?> actionRequest : bulkRequest .requests ) {
254- // Each index request needs to be evaluated, because this method also modifies the IndexRequest
255- if (actionRequest instanceof UpdateRequest updateRequest ) {
256- // We execute all pipelines on upsert and docAsUpsert index requests (full update children)
257- // For all other update children, we only resolve system ingest pipelines.
258- // The cases are as follows:
259- // 1. If regular bulk update (no upsert), execute only system pipeline on doc (upsert will be null)
260- // 2. If regular upsert, execute all pipelines on upsert, execute only system pipeline on doc
261- // 3. If upsert with script, execute all pipelines on upsert (doc will be null)
262- // 4. If doc as upsert, execute all pipelines on doc (upsert will be null)
263- // 5. For regular update with script, no pipelines will be executed on doc (there is no doc to execute)
264- // Note that when execute pipelines on update existing docs, the full doc is NOT fetched from shard level.
265- // This means the pipeline is only executed on the partial doc, which may not contain all fields.
266- // System ingest pipelines and processors should handle these cases individually.
267- boolean indexRequestHasPipeline = false ;
268- switch (updateRequest .getType ()) {
269- case NORMAL_UPDATE -> indexRequestHasPipeline |= ingestService .resolveSystemIngestPipeline (
270- actionRequest ,
271- updateRequest .doc (),
272- metadata
273- );
274- case NORMAL_UPSERT -> {
275- indexRequestHasPipeline |= ingestService .resolveSystemIngestPipeline (actionRequest , updateRequest .doc (), metadata );
276- indexRequestHasPipeline |= ingestService .resolvePipelines (actionRequest , updateRequest .upsertRequest (), metadata );
277- }
278- case UPSERT_WITH_SCRIPT -> indexRequestHasPipeline |= ingestService .resolvePipelines (
279- actionRequest ,
280- updateRequest .upsertRequest (),
281- metadata
282- );
283- case DOC_AS_UPSERT -> indexRequestHasPipeline |= ingestService .resolvePipelines (
284- actionRequest ,
285- updateRequest .doc (),
286- metadata
287- );
288- // Pure scripted updates have no child index requests, so nothing is resolved.
289- }
290- hasIndexRequestsWithPipelines |= indexRequestHasPipeline ;
291- } else {
292- IndexRequest indexRequest = getIndexWriteRequest (actionRequest );
293- // Skip resolving pipelines for delete requests
294- if (indexRequest != null ) {
295- boolean indexRequestHasPipeline = ingestService .resolvePipelines (actionRequest , indexRequest , metadata );
296- hasIndexRequestsWithPipelines |= indexRequestHasPipeline ;
297- }
298- }
299-
300- if (actionRequest instanceof IndexRequest ) {
301- IndexRequest ir = (IndexRequest ) actionRequest ;
302- ir .checkAutoIdWithOpTypeCreateSupportedByVersion (minNodeVersion );
303- if (ir .getAutoGeneratedTimestamp () != IndexRequest .UNSET_AUTO_GENERATED_TIMESTAMP ) {
304- throw new IllegalArgumentException ("autoGeneratedTimestamp should not be set externally" );
305- }
306- }
307- }
250+ boolean hasIndexRequestsWithPipelines = resolvePipelinesForActionRequests (bulkRequest .requests , metadata , minNodeVersion );
308251
309252 if (hasIndexRequestsWithPipelines ) {
310253 // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but
@@ -422,6 +365,50 @@ public void onRejection(Exception rejectedException) {
422365 }
423366 }
424367
368+ private boolean resolvePipelinesForActionRequests (
369+ List <DocWriteRequest <?>> docWriteRequests ,
370+ Metadata metadata ,
371+ Version minNodeVersion
372+ ) {
373+ boolean hasIndexRequestsWithPipelines = false ;
374+ for (DocWriteRequest <?> actionRequest : docWriteRequests ) {
375+ // Each index request needs to be evaluated, because this method also modifies the IndexRequest
376+ if (actionRequest instanceof UpdateRequest updateRequest ) {
377+ // Note that when execute pipelines on update existing docs, the full doc is NOT fetched from shard level.
378+ // This means the pipeline is only executed on the partial doc, which may not contain all fields.
379+ // System ingest pipelines and processors should handle these cases individually.
380+ boolean indexRequestHasPipeline = false ;
381+ if (updateRequest .upsertRequest () != null ) {
382+ indexRequestHasPipeline |= ingestService .resolvePipelines (actionRequest , updateRequest .upsertRequest (), metadata );
383+ }
384+ if (updateRequest .doc () != null ) {
385+ if (updateRequest .docAsUpsert ()) {
386+ indexRequestHasPipeline |= ingestService .resolvePipelines (actionRequest , updateRequest .doc (), metadata );
387+ } else {
388+ indexRequestHasPipeline |= ingestService .resolveSystemIngestPipeline (actionRequest , updateRequest .doc (), metadata );
389+ }
390+ }
391+ hasIndexRequestsWithPipelines |= indexRequestHasPipeline ;
392+ } else {
393+ IndexRequest indexRequest = getIndexWriteRequest (actionRequest );
394+ // Skip resolving pipelines for delete requests
395+ if (indexRequest != null ) {
396+ boolean indexRequestHasPipeline = ingestService .resolvePipelines (actionRequest , indexRequest , metadata );
397+ hasIndexRequestsWithPipelines |= indexRequestHasPipeline ;
398+ }
399+ }
400+
401+ if (actionRequest instanceof IndexRequest ) {
402+ IndexRequest ir = (IndexRequest ) actionRequest ;
403+ ir .checkAutoIdWithOpTypeCreateSupportedByVersion (minNodeVersion );
404+ if (ir .getAutoGeneratedTimestamp () != IndexRequest .UNSET_AUTO_GENERATED_TIMESTAMP ) {
405+ throw new IllegalArgumentException ("autoGeneratedTimestamp should not be set externally" );
406+ }
407+ }
408+ }
409+ return hasIndexRequestsWithPipelines ;
410+ }
411+
425412 static void prohibitAppendWritesInBackingIndices (DocWriteRequest <?> writeRequest , Metadata metadata ) {
426413 IndexAbstraction indexAbstraction = metadata .getIndicesLookup ().get (writeRequest .index ());
427414 if (indexAbstraction == null ) {
0 commit comments