From 021b92d0929fa39282d261336b697b6fd5f5ac4d Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Tue, 11 Feb 2025 17:40:53 +0100 Subject: [PATCH] Allow passing a custom item action (#231) When manipulating profiles data, we need to be able to use the Update action rather than Create all the time. So this change allows passing custom actions rather than hardcoding create. If no actions is provided, the default value will be create. --- bulk_indexer.go | 30 ++++++++++++++++++--- bulk_indexer_test.go | 63 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/bulk_indexer.go b/bulk_indexer.go index cf1e34d..f75973d 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -49,6 +49,15 @@ import ( // of concurrent bulk requests. This way we can ensure bulk requests have the // maximum possible size, based on configuration and throughput. +const ( + // Actions are all the actions that can be used when indexing data. + // `create` will be used by default. + ActionCreate = "create" + ActionDelete = "delete" + ActionIndex = "index" + ActionUpdate = "update" +) + // BulkIndexerConfig holds configuration for BulkIndexer. type BulkIndexerConfig struct { // Client holds the Elasticsearch client. @@ -280,13 +289,25 @@ type BulkIndexerItem struct { Index string DocumentID string Pipeline string + Action string Body io.WriterTo DynamicTemplates map[string]string } // Add encodes an item in the buffer. func (b *BulkIndexer) Add(item BulkIndexerItem) error { - b.writeMeta(item.Index, item.DocumentID, item.Pipeline, item.DynamicTemplates) + action := item.Action + if action == "" { + action = ActionCreate + } + + switch action { + case ActionCreate, ActionDelete, ActionIndex, ActionUpdate: + default: + return fmt.Errorf("%s is not a valid action", action) + } + + b.writeMeta(item.Index, item.DocumentID, item.Pipeline, action, item.DynamicTemplates) if _, err := item.Body.WriteTo(b.writer); err != nil { return fmt.Errorf("failed to write bulk indexer item: %w", err) } @@ -297,8 +318,11 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error { return nil } -func (b *BulkIndexer) writeMeta(index, documentID, pipeline string, dynamicTemplates map[string]string) { - b.jsonw.RawString(`{"create":{`) +func (b *BulkIndexer) writeMeta(index, documentID, pipeline, action string, dynamicTemplates map[string]string) { + b.jsonw.RawString(`{"`) + b.jsonw.RawString(action) + b.jsonw.RawString(`":{`) + first := true if documentID != "" { b.jsonw.RawString(`"_id":`) diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index 7c52483..34dfccd 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/go-docappender/v2" "github.com/elastic/go-docappender/v2/docappendertest" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -203,3 +204,65 @@ func TestPipeline(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(2), stat.Indexed) } + +func TestAction(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result := docappendertest.DecodeBulkRequest(r) + err := json.NewEncoder(w).Encode(result) + require.NoError(t, err) + + actions := []string{} + for _, itemsMap := range result.Items { + for a := range itemsMap { + actions = append(actions, a) + } + } + + require.Equal(t, []string{"create", "update", "delete"}, actions) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Action: "update", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Action: "delete", + Pipeline: "test-pipeline2", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Action: "foobar", + Pipeline: "test-pipeline2", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + }) + assert.Error(t, err) + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(3), stat.Indexed) +}