Skip to content
Oliver Eilhard edited this page Feb 28, 2015 · 6 revisions

The Bulk API of Elasticsearch enables users to perform many index/update/delete operations in a single request.

Here's an example of how to use the Bulk API with Elastic.

client, err := elastic.NewClient()
if err != nil {
  // ...
}

// Set up 4 bulk requests: 2 index requests, 1 delete request, 1 update request.
index1Req := elastic.NewBulkIndexRequest().Index("twitter").Type("tweet").Id("1").Doc(tweet1)
index2Req := elastic.NewBulkIndexRequest().OpType("create").Index("twitter").Type("tweet").Id("2").Doc(tweet2)
delete1Req := elastic.NewBulkDeleteRequest().Index("twitter").Type("tweet").Id("1")
update2Req := elastic.NewBulkUpdateRequest().Index("twitter").Type("tweet").Id("2").
                Doc(struct {
                Retweets int `json:"retweets"`
        }{
                Retweets: 42,
        })

// Create the bulk and add the 4 requests to it
bulkRequest := client.Bulk()
bulkRequest = bulkRequest.Add(index1Req)
bulkRequest = bulkRequest.Add(index2Req)
bulkRequest = bulkRequest.Add(delete1Req)
bulkRequest = bulkRequest.Add(update2Req)

// NumberOfActions contains the number of requests in a bulk
if bulkRequest.NumberOfActions() != 4 {
  // ...
}

// Do sends the bulk requests to Elasticsearch
bulkResponse, err := bulkRequest.Do()
if err != nil {
  // ...
}

// Bulk request actions get cleared
if bulkRequest.NumberOfActions() != 0 {
  // ...
}

// Bulk response contains valuable information about the outcome, 
// e.g. which requests have failed etc.

// Indexed returns information abount indexed documents
indexed := bulkResponse.Indexed()
if len(indexed) != 1 {
  // ...
}
if indexed[0].Id != "1" {
  // ...
}
if indexed[0].Status != 201 {
  // ...
}

// Created returns information about created documents
created := bulkResponse.Created()
if len(created) != 1 {
  // ...
}
if created[0].Id != "2" {
  // ...
}
if created[0].Status != 201 {
  // ...
}

// Deleted returns information about documents that were removed
deleted := bulkResponse.Deleted()
if len(deleted) != 1 {
  // ...
}
if deleted[0].Id != "1" {
  // ...
}
if deleted[0].Status != 200 {
  // ...
}
if !deleted[0].Found {
  // ...
}

// Updated returns information about documents that were updated
updated := bulkResponse.Updated()
if len(updated) != 1 {
  // ...
}
if updated[0].Id != "2" {
  // ...
}
if updated[0].Status != 200 {
  // ...
}
if updated[0].Version != 2 {
  // ...
}

// ById returns information about documents by ID
id1Results := bulkResponse.ById("1")
if len(id1Results) != 2 {
  // Document "1" should have been indexed, then deleted
  // ...
}

// ByAction returns information about a certain action.
// Use with "index", "create", "update", "delete".
deletedResults := bulkResponse.ByAction("delete")
...

// Failed() returns information about failed bulk requests
// (those with a HTTP status code outside [200,299].
failedResults := bulkResponse.Failed()
Clone this wiki locally