diff --git a/include/fluent-bit/flb_metrics.h b/include/fluent-bit/flb_metrics.h index 41bcc444b50..fa7541ac047 100644 --- a/include/fluent-bit/flb_metrics.h +++ b/include/fluent-bit/flb_metrics.h @@ -39,6 +39,7 @@ #include #include #include +#include #include /* Metrics IDs for general purpose (used by core and Plugins */ diff --git a/plugins/in_statsd/statsd.c b/plugins/in_statsd/statsd.c index 74022d9c036..b9edb37ca7c 100644 --- a/plugins/in_statsd/statsd.c +++ b/plugins/in_statsd/statsd.c @@ -36,6 +36,7 @@ struct flb_statsd { char *buf; /* buffer */ char listen[256]; /* listening address (RFC-2181) */ char port[6]; /* listening port (RFC-793) */ + int metrics; /* Import as metrics */ flb_sockfd_t server_fd; /* server socket */ flb_pipefd_t coll_fd; /* server handler */ struct flb_input_instance *ins; /* input instance */ @@ -206,9 +207,15 @@ static int cb_statsd_receive(struct flb_input_instance *ins, struct flb_config *config, void *data) { int ret; - char *line; int len; struct flb_statsd *ctx = data; + struct cfl_list *head = NULL; + struct cfl_list *kvs = NULL; + struct cfl_split_entry *cur = NULL; +#ifdef FLB_HAVE_METRICS + struct cmt *cmt = NULL; + int cmt_flags = 0; +#endif /* Receive a UDP datagram */ len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0); @@ -218,35 +225,67 @@ static int cb_statsd_receive(struct flb_input_instance *ins, } ctx->buf[len] = '\0'; - ret = FLB_EVENT_ENCODER_SUCCESS; - /* Process all messages in buffer */ - line = strtok(ctx->buf, "\n"); - while (line != NULL) { - flb_plg_trace(ctx->ins, "received a line: '%s'", line); +#ifdef FLB_HAVE_METRICS + if (ctx->metrics == FLB_TRUE) { + cmt_flags |= CMT_DECODE_STATSD_GAUGE_OBSERVER; + flb_plg_trace(ctx->ins, "received a buf: '%s'", ctx->buf); + ret = cmt_decode_statsd_create(&cmt, ctx->buf, len, cmt_flags); + if (ret != CMT_DECODE_STATSD_SUCCESS) { + flb_plg_error(ctx->ins, "failed to process buf: '%s'", ctx->buf); + return -1; + } - ret = statsd_process_line(ctx, line); + /* Append the updated metrics */ + ret = flb_input_metrics_append(ins, NULL, 0, cmt); + if (ret != 0) { + flb_plg_error(ins, "could not append metrics"); + } - if (ret != FLB_EVENT_ENCODER_SUCCESS) { - flb_plg_error(ctx->ins, "failed to process line: '%s'", line); + cmt_destroy(cmt); + } + else { +#endif + ret = FLB_EVENT_ENCODER_SUCCESS; + kvs = cfl_utils_split(ctx->buf, '\n', -1 ); + if (kvs == NULL) { + goto split_error; + } - break; + cfl_list_foreach(head, kvs) { + cur = cfl_list_entry(head, struct cfl_split_entry, _head); + flb_plg_trace(ctx->ins, "received a line: '%s'", cur->value); + + ret = statsd_process_line(ctx, cur->value); + + if (ret != FLB_EVENT_ENCODER_SUCCESS) { + flb_plg_error(ctx->ins, "failed to process line: '%s'", cur->value); + + break; + } } - line = strtok(NULL, "\n"); - } + if (kvs != NULL) { + cfl_utils_split_free(kvs); + } - if (ctx->log_encoder->output_length > 0) { - flb_input_log_append(ctx->ins, NULL, 0, - ctx->log_encoder->output_buffer, - ctx->log_encoder->output_length); - } - else { - flb_plg_error(ctx->ins, "log event encoding error : %d", ret); - } + if (ctx->log_encoder->output_length > 0) { + flb_input_log_append(ctx->ins, NULL, 0, + ctx->log_encoder->output_buffer, + ctx->log_encoder->output_length); + } + else { + flb_plg_error(ctx->ins, "log event encoding error : %d", ret); + } - flb_log_event_encoder_reset(ctx->log_encoder); + flb_log_event_encoder_reset(ctx->log_encoder); +#ifdef FLB_HAVE_METRICS + } +#endif return 0; + +split_error: + return -1; } static int cb_statsd_init(struct flb_input_instance *ins, @@ -365,8 +404,13 @@ static int cb_statsd_exit(void *data, struct flb_config *config) } static struct flb_config_map config_map[] = { + { + FLB_CONFIG_MAP_BOOL, "metrics", "off", + 0, FLB_TRUE, offsetof(struct flb_statsd, metrics), + "Ingest as metrics type of events." + }, /* EOF */ - {0} + {0} }; /* Plugin reference */ diff --git a/tests/runtime/in_statsd.c b/tests/runtime/in_statsd.c index 1ae9d6cf3d7..0f5fe4d3667 100644 --- a/tests/runtime/in_statsd.c +++ b/tests/runtime/in_statsd.c @@ -173,7 +173,7 @@ static int init_udp(char *in_host, int in_port, struct sockaddr_in *addr) return fd; } -static int test_normal(char *payload, struct str_list *expected) +static int test_normal(char *payload, struct str_list *expected, int use_metrics) { struct flb_lib_out_cb cb_data; struct test_ctx *ctx; @@ -202,6 +202,13 @@ static int test_normal(char *payload, struct str_list *expected) exit(EXIT_FAILURE); } + if (use_metrics == FLB_TRUE) { + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "metrics", "On", + NULL); + TEST_CHECK(ret == 0); + } + ret = flb_output_set(ctx->flb, ctx->o_ffd, "format", "json", NULL); @@ -249,7 +256,7 @@ void flb_test_statsd_count() char *buf = "gorets:1|c"; int ret; - ret = test_normal(buf, &expected); + ret = test_normal(buf, &expected, FLB_FALSE); if (!TEST_CHECK(ret == 0)) { TEST_MSG("test failed"); exit(EXIT_FAILURE); @@ -259,7 +266,7 @@ void flb_test_statsd_count() void flb_test_statsd_sample() { - char *expected_strs[] = {"\"bucket\":\"gorets\"", "\"type\":\"counter\"", + char *expected_strs[] = {"\"bucket\":\"gorets\"", "\"type\":\"counter\"", "\"value\":1", "\"sample_rate\":0.1"}; struct str_list expected = { .size = sizeof(expected_strs)/sizeof(char*), @@ -269,7 +276,7 @@ void flb_test_statsd_sample() char *buf = "gorets:1|c|@0.1"; int ret; - ret = test_normal(buf, &expected); + ret = test_normal(buf, &expected, FLB_FALSE); if (!TEST_CHECK(ret == 0)) { TEST_MSG("test failed"); exit(EXIT_FAILURE); @@ -288,7 +295,7 @@ void flb_test_statsd_gauge() char *buf = "gaugor:333|g"; int ret; - ret = test_normal(buf, &expected); + ret = test_normal(buf, &expected, FLB_FALSE); if (!TEST_CHECK(ret == 0)) { TEST_MSG("test failed"); exit(EXIT_FAILURE); @@ -297,7 +304,7 @@ void flb_test_statsd_gauge() void flb_test_statsd_set() { - char *expected_strs[] = {"\"bucket\":\"uniques\"", "\"type\":\"set\"", + char *expected_strs[] = {"\"bucket\":\"uniques\"", "\"type\":\"set\"", "\"value\":\"765\""}; struct str_list expected = { .size = sizeof(expected_strs)/sizeof(char*), @@ -307,18 +314,81 @@ void flb_test_statsd_set() char *buf = "uniques:765|s"; int ret; - ret = test_normal(buf, &expected); + ret = test_normal(buf, &expected, FLB_FALSE); if (!TEST_CHECK(ret == 0)) { TEST_MSG("test failed"); exit(EXIT_FAILURE); } } +#ifdef FLB_HAVE_METRICS +void flb_test_statsd_metrics_gauge() +{ + char *expected_strs[] = {"\"name\":\"gorets\"", "\"desc\":\"-\"", "\"type\":1"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + char *buf = "gorets:1|g"; + int ret; + + ret = test_normal(buf, &expected, FLB_TRUE); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("test failed"); + exit(EXIT_FAILURE); + } + +} + +void flb_test_statsd_metrics_counter() +{ + char *expected_strs[] = {"\"name\":\"gorets\"", "\"desc\":\"-\"", "\"type\":0"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + char *buf = "gorets:1|c"; + int ret; + + ret = test_normal(buf, &expected, FLB_TRUE); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("test failed"); + exit(EXIT_FAILURE); + } + +} + +void flb_test_statsd_metrics_untyped() +{ + char *expected_strs[] = {"\"name\":\"gorets\"", "\"desc\":\"-\"", "\"type\":4"}; + struct str_list expected = { + .size = sizeof(expected_strs)/sizeof(char*), + .lists = &expected_strs[0], + }; + + char *buf = "gorets:1|s"; + int ret; + + ret = test_normal(buf, &expected, FLB_TRUE); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("test failed"); + exit(EXIT_FAILURE); + } + +} +#endif + TEST_LIST = { {"count", flb_test_statsd_count}, {"sample", flb_test_statsd_sample}, {"gauge", flb_test_statsd_gauge}, {"set", flb_test_statsd_set}, +#ifdef FLB_HAVE_METRICS + {"metrics_gauge", flb_test_statsd_metrics_gauge}, + {"metrics_counter", flb_test_statsd_metrics_counter}, + {"metrics_untyped", flb_test_statsd_metrics_untyped}, +#endif {NULL, NULL} }; -