diff --git a/bulk_indexer.go b/bulk_indexer.go index cf1e34d..f75973d 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -49,6 +49,15 @@ import ( // of concurrent bulk requests. This way we can ensure bulk requests have the // maximum possible size, based on configuration and throughput. +const ( + // Actions are all the actions that can be used when indexing data. + // `create` will be used by default. + ActionCreate = "create" + ActionDelete = "delete" + ActionIndex = "index" + ActionUpdate = "update" +) + // BulkIndexerConfig holds configuration for BulkIndexer. type BulkIndexerConfig struct { // Client holds the Elasticsearch client. @@ -280,13 +289,25 @@ type BulkIndexerItem struct { Index string DocumentID string Pipeline string + Action string Body io.WriterTo DynamicTemplates map[string]string } // Add encodes an item in the buffer. func (b *BulkIndexer) Add(item BulkIndexerItem) error { - b.writeMeta(item.Index, item.DocumentID, item.Pipeline, item.DynamicTemplates) + action := item.Action + if action == "" { + action = ActionCreate + } + + switch action { + case ActionCreate, ActionDelete, ActionIndex, ActionUpdate: + default: + return fmt.Errorf("%s is not a valid action", action) + } + + b.writeMeta(item.Index, item.DocumentID, item.Pipeline, action, item.DynamicTemplates) if _, err := item.Body.WriteTo(b.writer); err != nil { return fmt.Errorf("failed to write bulk indexer item: %w", err) } @@ -297,8 +318,11 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error { return nil } -func (b *BulkIndexer) writeMeta(index, documentID, pipeline string, dynamicTemplates map[string]string) { - b.jsonw.RawString(`{"create":{`) +func (b *BulkIndexer) writeMeta(index, documentID, pipeline, action string, dynamicTemplates map[string]string) { + b.jsonw.RawString(`{"`) + b.jsonw.RawString(action) + b.jsonw.RawString(`":{`) + first := true if documentID != "" { b.jsonw.RawString(`"_id":`) diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index 7c52483..34dfccd 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/go-docappender/v2" "github.com/elastic/go-docappender/v2/docappendertest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -203,3 +204,65 @@ func TestPipeline(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), stat.Indexed) } + +func TestAction(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := docappendertest.DecodeBulkRequest(r) + err := json.NewEncoder(w).Encode(result) + require.NoError(t, err) + + actions := []string{} + for _, itemsMap := range result.Items { + for a := range itemsMap { + actions = append(actions, a) + } + } + + require.Equal(t, []string{"create", "update", "delete"}, actions) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Action: "update", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Action: "delete", + Pipeline: "test-pipeline2", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Action: "foobar", + Pipeline: "test-pipeline2", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + assert.Error(t, err) + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(3), stat.Indexed) +}