Skip to content

Using script upsert with _bulk API won't triggered #2607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
anasalkouz opened this issue Mar 25, 2022 · 9 comments
Closed

Using script upsert with _bulk API won't triggered #2607

anasalkouz opened this issue Mar 25, 2022 · 9 comments
Labels
enhancement Enhancement or improvement to existing feature or request hacktoberfest Global event that encourages people to contribute to open-source. Indexing Indexing, Bulk Indexing and anything related to indexing

Comments

@anasalkouz
Copy link
Member

anasalkouz commented Mar 25, 2022

Is your feature request related to a problem? Please describe.
using script upsert with _bulk API will not triggered pipeline. But if use _doc API will triggered pipeline.

Steps to reproduce
A clear and concise description of what you want to happen.

  1. Create ingest pipeline.
PUT _ingest/pipeline/ingest-test
{
  "processors" : [
  {
    "script" : {
      "lang" : "painless",
      "source" : """
      def list = ctx.mylist;
      if (list != null && list.size() > 0) {
        for (int i = 0; i < list.size(); i ++) {
            def e = list.get(i);
            if ("1".equals(e.get("flag"))) {
                ctx.myobject = e;
                break;
          }
        }
      }
    """,
      "ignore_failure" : true
    }
  }
]
  1. Apply ingest pipeline to index with bellow mappings
PUT update-test
{
    "mappings" : {
      "properties" : {
        "mylist" : {
          "type" : "nested",
          "properties" : {
            "data" : {
              "type" : "keyword"
            },
            "flag" : {
              "type" : "keyword"
            }
          }
        },
        "myobject" : {
          "properties" : {
            "data" : {
              "type" : "keyword"
            },
            "flag" : {
              "type" : "keyword"
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "default_pipeline" : "ingest-test"
      }
    }
}
  1. Run _bulk
POST _bulk
{"update":{"_index":"update-test","_id":"1"}}
{"script":{"source":"ctx._source.mylist = new ArrayList();    ctx._source.mylist.add(params.changes);","lang":"painless","params":{"changes":{"data":99,"flag":"1"}}},"upsert":{"mylist":[{"data":9,"flag":"1"}]}}

As the document not exist yet, will upsert "data":9,"flag":"1" and triggered ingest pipeline:

  1. Run same _bulk again and as document already exist the script upsert will update "data":99,"flag":"1" but the ingest pipeline is not triggered. (copy not happened)
{
  "_index" : "update-test",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 26,
  "_seq_no" : 54,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "mylist" : [
      {
        "data" : 99,
        "flag" : "1"
      }
    ],
    "myobject" : {
      "flag" : "1",
      "data" : 9
    }
  }
}
  1. Use _doc API instead to upsert "data":999,"flag":"1" and the ingest pipeline is triggered this time. (copy happened)
POST update-test/_doc/1/_update
{
  "script" : {"source":"ctx._source.mylist = new ArrayList();    ctx._source.mylist.add(params.changes);","lang":"painless","params":{"changes":{"data":999,"flag":"1"}}
  }
}
@anasalkouz anasalkouz added enhancement Enhancement or improvement to existing feature or request Indexing & Search labels Mar 25, 2022
@dblock
Copy link
Member

dblock commented Mar 28, 2022

This would be a good problem to turn into a unit test, adding good first issue.

@dblock dblock added the good first issue Good for newcomers label Mar 28, 2022
@qracle
Copy link

qracle commented Mar 29, 2022

Hope to fix the problem soon.

@rockybean
Copy link
Contributor

@anasalkouz default_pipeline only works for index request, not for update request.

You can use final_pipeline for your case.

PUT update-test
{
  "mappings": {
    "properties": {
      "mylist": {
        "type": "nested",
        "properties": {
          "data": {
            "type": "keyword"
          },
          "flag": {
            "type": "keyword"
          }
        }
      },
      "myobject": {
        "properties": {
          "data": {
            "type": "keyword"
          },
          "flag": {
            "type": "keyword"
          }
        }
      }
    }
  },
  "settings": {
    "index": {
      "final_pipeline": "ingest-test"
    }
  }
}

So I think this is not an issue needed to be resolved. @dblock

@dblock
Copy link
Member

dblock commented May 3, 2022

Is this expected? A documentation issue? A feature request? @qracle @rockybean WDYT?

@rockybean
Copy link
Contributor

Is this expected? A documentation issue? A feature request? @qracle @rockybean WDYT?

Sorry for above reply. I misunderstood the script. Both default_pipeline and final_pipeline are not working when bulk update doc using script.

Will dive deeper.

@Poojita-Raj Poojita-Raj added the hacktoberfest Global event that encourages people to contribute to open-source. label Sep 29, 2022
@dmgerdes-sch
Copy link

Is this expected? A documentation issue? A feature request? @qracle @rockybean WDYT?

Sorry for above reply. I misunderstood the script. Both default_pipeline and final_pipeline are not working when bulk update doc using script.

Will dive deeper.

Did you ever figure out a resolution for this?

@sandeshkr419 sandeshkr419 added Indexing Indexing, Bulk Indexing and anything related to indexing and removed Indexing & Search labels Jan 8, 2025
@sandeshkr419
Copy link
Contributor

[Search Triage] @rockybean Were you able to further investigate?

@msfroh
Copy link
Collaborator

msfroh commented Jan 8, 2025

I think this has to do with the order of executing the script versus the pipeline.

Note that single-doc indexing operations (i.e. a single-doc index request, single-doc delete, or single-doc update) all eventually get converted to _bulk requests of size 1. While index and delete operations are trivially wrapped up in bulk batches (see TransportSingleItemBulkWriteAction, which wraps the request and unwraps the response), updates are more complicated.

An update request flows as follows:

  1. TransportUpdateAction figures out which shard the request should go to (in the shards(ClusterState, UpdateRequest) method) and dispatches to the node with that particular shard.
  2. On the shard node, the shardOperation(UpdateRequest, ActionListener, int) method gets called, which "prepares" the update request with UpdateHelper.prepare(UpdateRequest, IndexShard, LongSupplier) (which delegates to prepare(ShardId, UpdateRequest, GetResult, LongSupplier)).
  3. That calls prepareUpdateScriptRequest, which calls executeScript, which importantly computes the new source and removes the script, wrapping the result into an IndexRequest (with an opType of INDEX).
  4. Back in TransportUpdateAction, we use the transport client to execute that IndexRequest, wrapping it up using toSingleItemBulkRequest.
  5. Now we get into the TransportBulkAction flow, which includes executing the pipelines (in the doInternalExecute method, there's a call to processBulkIndexIngestRequest).
  6. TransportBulkAction groups items per shard (in this case, there's one item) and the resulting shard-level requests to the primary nodes for each shard.
  7. TransportShardBulkAction executes the action. Since the opType is index, we do not call UpdateHelper.prepare.

So, for the document-level _update operation the script executes first (on step 3), and the runs the ingest pipeline (on step 5). When you send a _bulk operation with an update item, it jumps straight to step 5, which runs the ingest pipeline, but then runs the script on step 7.

As is, it looks like @gaobinlong decided to resolve this by going the other way by deprecating the _update API when there's an ingest pipeline: #16663.

We could make things more consistent (and delete some code) by making TransportUpdateAction extend TransportSingleItemBulkWriteAction, just like TransportIndexAction and TransportDeleteAction. That would make things consistent across all three document APIs (since they would just be convenience APIs for the "real" _bulk operations).

@msfroh
Copy link
Collaborator

msfroh commented Jan 21, 2025

I'm going to close this as "won't fix", based on my comment above.

We will potentially reconcile the behavior in OpenSearch 3.0 as described in #16980, but that would make things work according to the _bulk semantics, where the ingest pipeline executes before the upsert.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request hacktoberfest Global event that encourages people to contribute to open-source. Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
Status: Done
Development

No branches or pull requests

8 participants