Skip to content

Commit e37b56b

Browse files
cosmo0920edsiper
authored andcommitted
out_elasticsearch: Process error information properly
The current out_elasticsearch implementation is just giving up to process responses when encountering the error information. In this patch, continue to process until the end of the elements of msgpack converted JSON response. If there is a succeeded infrmation in the converted response, Fluent Bit will assume the requesting payloads to be succeeded to send ES clusters. Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 36c46b7 commit e37b56b

File tree

2 files changed

+38
-22
lines changed

2 files changed

+38
-22
lines changed

plugins/out_es/es.c

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
647647
{
648648
int i, j, k;
649649
int ret;
650-
int check = FLB_FALSE;
650+
int check = 0;
651651
int root_type;
652652
char *out_buf;
653653
size_t off = 0;
@@ -671,17 +671,20 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
671671
if (ret == -1) {
672672
/* Is this an incomplete HTTP Request ? */
673673
if (c->resp.payload_size <= 0) {
674-
return FLB_TRUE;
674+
check |= FLB_ES_STATUS_IMCOMPLETE;
675+
return check;
675676
}
676677

677678
/* Lookup error field */
678679
if (strstr(c->resp.payload, "\"errors\":false,\"items\":[")) {
679-
return FLB_FALSE;
680+
check |= FLB_ES_STATUS_SUCCESS;
681+
return check;
680682
}
681683

682684
flb_plg_error(ctx->ins, "could not pack/validate JSON response\n%s",
683685
c->resp.payload);
684-
return FLB_TRUE;
686+
check |= FLB_ES_STATUS_BAD_RESPONSE;
687+
return check;
685688
}
686689

687690
/* Lookup error field */
@@ -690,14 +693,15 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
690693
if (ret != MSGPACK_UNPACK_SUCCESS) {
691694
flb_plg_error(ctx->ins, "Cannot unpack response to find error\n%s",
692695
c->resp.payload);
693-
return FLB_TRUE;
696+
check |= FLB_ES_STATUS_ERROR_UNPACK;
697+
return check;
694698
}
695699

696700
root = result.data;
697701
if (root.type != MSGPACK_OBJECT_MAP) {
698702
flb_plg_error(ctx->ins, "unexpected payload type=%i",
699703
root.type);
700-
check = FLB_TRUE;
704+
check |= FLB_ES_STATUS_BAD_TYPE;
701705
goto done;
702706
}
703707

@@ -706,7 +710,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
706710
if (key.type != MSGPACK_OBJECT_STR) {
707711
flb_plg_error(ctx->ins, "unexpected key type=%i",
708712
key.type);
709-
check = FLB_TRUE;
713+
check |= FLB_ES_STATUS_INVAILD_ARGUMENT;
710714
goto done;
711715
}
712716

@@ -715,14 +719,14 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
715719
if (val.type != MSGPACK_OBJECT_BOOLEAN) {
716720
flb_plg_error(ctx->ins, "unexpected 'error' value type=%i",
717721
val.type);
718-
check = FLB_TRUE;
722+
check |= FLB_ES_STATUS_BAD_TYPE;
719723
goto done;
720724
}
721725

722726
/* If error == false, we are OK (no errors = FLB_FALSE) */
723727
if (!val.via.boolean) {
724728
/* no errors */
725-
check = FLB_FALSE;
729+
check |= FLB_ES_STATUS_SUCCESS;
726730
goto done;
727731
}
728732
}
@@ -731,7 +735,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
731735
if (val.type != MSGPACK_OBJECT_ARRAY) {
732736
flb_plg_error(ctx->ins, "unexpected 'items' value type=%i",
733737
val.type);
734-
check = FLB_TRUE;
738+
check |= FLB_ES_STATUS_BAD_TYPE;
735739
goto done;
736740
}
737741

@@ -740,22 +744,22 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
740744
if (item.type != MSGPACK_OBJECT_MAP) {
741745
flb_plg_error(ctx->ins, "unexpected 'item' outer value type=%i",
742746
item.type);
743-
check = FLB_TRUE;
747+
check |= FLB_ES_STATUS_BAD_TYPE;
744748
goto done;
745749
}
746750

747751
if (item.via.map.size != 1) {
748752
flb_plg_error(ctx->ins, "unexpected 'item' size=%i",
749753
item.via.map.size);
750-
check = FLB_TRUE;
754+
check |= FLB_ES_STATUS_INVAILD_ARGUMENT;
751755
goto done;
752756
}
753757

754758
item = item.via.map.ptr[0].val;
755759
if (item.type != MSGPACK_OBJECT_MAP) {
756760
flb_plg_error(ctx->ins, "unexpected 'item' inner value type=%i",
757761
item.type);
758-
check = FLB_TRUE;
762+
check |= FLB_ES_STATUS_BAD_TYPE;
759763
goto done;
760764
}
761765

@@ -764,7 +768,7 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
764768
if (item_key.type != MSGPACK_OBJECT_STR) {
765769
flb_plg_error(ctx->ins, "unexpected key type=%i",
766770
item_key.type);
767-
check = FLB_TRUE;
771+
check |= FLB_ES_STATUS_BAD_TYPE;
768772
goto done;
769773
}
770774

@@ -774,13 +778,16 @@ static int elasticsearch_error_check(struct flb_elasticsearch *ctx,
774778
if (item_val.type != MSGPACK_OBJECT_POSITIVE_INTEGER) {
775779
flb_plg_error(ctx->ins, "unexpected 'status' value type=%i",
776780
item_val.type);
777-
check = FLB_TRUE;
781+
check |= FLB_ES_STATUS_BAD_TYPE;
778782
goto done;
779783
}
784+
/* Check for success responses */
785+
if (item_val.via.i64 == 200 || item_val.via.i64 == 201) {
786+
check |= FLB_ES_STATUS_SUCCESS;
787+
}
780788
/* Check for errors other than version conflict (document already exists) */
781789
if (item_val.via.i64 != 409) {
782-
check = FLB_TRUE;
783-
goto done;
790+
check |= FLB_ES_STATUS_ERROR;
784791
}
785792
}
786793
}
@@ -920,7 +927,11 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
920927
* and lookup the 'error' field.
921928
*/
922929
ret = elasticsearch_error_check(ctx, c);
923-
if (ret == FLB_TRUE) {
930+
if (ret & FLB_ES_STATUS_SUCCESS) {
931+
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
932+
c->resp.payload);
933+
}
934+
else {
924935
/* we got an error */
925936
if (ctx->trace_error) {
926937
/*
@@ -946,10 +957,6 @@ static void cb_es_flush(struct flb_event_chunk *event_chunk,
946957
}
947958
goto retry;
948959
}
949-
else {
950-
flb_plg_debug(ctx->ins, "Elasticsearch response\n%s",
951-
c->resp.payload);
952-
}
953960
}
954961
else {
955962
goto retry;

plugins/out_es/es.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@
3636
#define FLB_ES_WRITE_OP_UPDATE "update"
3737
#define FLB_ES_WRITE_OP_UPSERT "upsert"
3838

39+
#define FLB_ES_STATUS_SUCCESS (1 << 0)
40+
#define FLB_ES_STATUS_IMCOMPLETE (1 << 1)
41+
#define FLB_ES_STATUS_ERROR_UNPACK (1 << 2)
42+
#define FLB_ES_STATUS_BAD_TYPE (1 << 3)
43+
#define FLB_ES_STATUS_INVAILD_ARGUMENT (1 << 4)
44+
#define FLB_ES_STATUS_BAD_RESPONSE (1 << 5)
45+
#define FLB_ES_STATUS_DUPLICATES (1 << 6)
46+
#define FLB_ES_STATUS_ERROR (1 << 7)
47+
3948
struct flb_elasticsearch {
4049
/* Elasticsearch index (database) and type (table) */
4150
char *index;

0 commit comments

Comments
 (0)