Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include <cmetrics/cmt_encode_msgpack.h>
#include <cmetrics/cmt_encode_splunk_hec.h>
#include <cmetrics/cmt_encode_cloudwatch_emf.h>
#include <cmetrics/cmt_decode_statsd.h>
#include <cmetrics/cmt_filter.h>

/* Metrics IDs for general purpose (used by core and Plugins */
Expand Down
88 changes: 66 additions & 22 deletions plugins/in_statsd/statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct flb_statsd {
char *buf; /* buffer */
char listen[256]; /* listening address (RFC-2181) */
char port[6]; /* listening port (RFC-793) */
int metrics; /* Import as metrics */
flb_sockfd_t server_fd; /* server socket */
flb_pipefd_t coll_fd; /* server handler */
struct flb_input_instance *ins; /* input instance */
Expand Down Expand Up @@ -206,9 +207,15 @@ static int cb_statsd_receive(struct flb_input_instance *ins,
struct flb_config *config, void *data)
{
int ret;
char *line;
int len;
struct flb_statsd *ctx = data;
struct cfl_list *head = NULL;
struct cfl_list *kvs = NULL;
struct cfl_split_entry *cur = NULL;
#ifdef FLB_HAVE_METRICS
struct cmt *cmt = NULL;
int cmt_flags = 0;
#endif

/* Receive a UDP datagram */
len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0);
Expand All @@ -218,35 +225,67 @@ static int cb_statsd_receive(struct flb_input_instance *ins,
}
ctx->buf[len] = '\0';

ret = FLB_EVENT_ENCODER_SUCCESS;
/* Process all messages in buffer */
line = strtok(ctx->buf, "\n");
while (line != NULL) {
flb_plg_trace(ctx->ins, "received a line: '%s'", line);
#ifdef FLB_HAVE_METRICS
if (ctx->metrics == FLB_TRUE) {
cmt_flags |= CMT_DECODE_STATSD_GAUGE_OBSERVER;
flb_plg_trace(ctx->ins, "received a buf: '%s'", ctx->buf);
ret = cmt_decode_statsd_create(&cmt, ctx->buf, len, cmt_flags);
if (ret != CMT_DECODE_STATSD_SUCCESS) {
flb_plg_error(ctx->ins, "failed to process buf: '%s'", ctx->buf);
return -1;
}

ret = statsd_process_line(ctx, line);
/* Append the updated metrics */
ret = flb_input_metrics_append(ins, NULL, 0, cmt);
if (ret != 0) {
flb_plg_error(ins, "could not append metrics");
}

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to process line: '%s'", line);
cmt_destroy(cmt);
}
else {
#endif
ret = FLB_EVENT_ENCODER_SUCCESS;
kvs = cfl_utils_split(ctx->buf, '\n', -1 );
if (kvs == NULL) {
goto split_error;
}

break;
cfl_list_foreach(head, kvs) {
cur = cfl_list_entry(head, struct cfl_split_entry, _head);
flb_plg_trace(ctx->ins, "received a line: '%s'", cur->value);

ret = statsd_process_line(ctx, cur->value);

if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to process line: '%s'", cur->value);

break;
}
}

line = strtok(NULL, "\n");
}
if (kvs != NULL) {
cfl_utils_split_free(kvs);
}

if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
}
else {
flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
}
if (ctx->log_encoder->output_length > 0) {
flb_input_log_append(ctx->ins, NULL, 0,
ctx->log_encoder->output_buffer,
ctx->log_encoder->output_length);
}
else {
flb_plg_error(ctx->ins, "log event encoding error : %d", ret);
}

flb_log_event_encoder_reset(ctx->log_encoder);
flb_log_event_encoder_reset(ctx->log_encoder);
#ifdef FLB_HAVE_METRICS
}
#endif

return 0;

split_error:
return -1;
}

static int cb_statsd_init(struct flb_input_instance *ins,
Expand Down Expand Up @@ -365,8 +404,13 @@ static int cb_statsd_exit(void *data, struct flb_config *config)
}

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_BOOL, "metrics", "off",
0, FLB_TRUE, offsetof(struct flb_statsd, metrics),
"Ingest as metrics type of events."
},
/* EOF */
{0}
{0}
};

/* Plugin reference */
Expand Down
86 changes: 78 additions & 8 deletions tests/runtime/in_statsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ static int init_udp(char *in_host, int in_port, struct sockaddr_in *addr)
return fd;
}

static int test_normal(char *payload, struct str_list *expected)
static int test_normal(char *payload, struct str_list *expected, int use_metrics)
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
Expand Down Expand Up @@ -202,6 +202,13 @@ static int test_normal(char *payload, struct str_list *expected)
exit(EXIT_FAILURE);
}

if (use_metrics == FLB_TRUE) {
ret = flb_input_set(ctx->flb, ctx->i_ffd,
"metrics", "On",
NULL);
TEST_CHECK(ret == 0);
}

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"format", "json",
NULL);
Expand Down Expand Up @@ -249,7 +256,7 @@ void flb_test_statsd_count()
char *buf = "gorets:1|c";
int ret;

ret = test_normal(buf, &expected);
ret = test_normal(buf, &expected, FLB_FALSE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
Expand All @@ -259,7 +266,7 @@ void flb_test_statsd_count()

void flb_test_statsd_sample()
{
char *expected_strs[] = {"\"bucket\":\"gorets\"", "\"type\":\"counter\"",
char *expected_strs[] = {"\"bucket\":\"gorets\"", "\"type\":\"counter\"",
"\"value\":1", "\"sample_rate\":0.1"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
Expand All @@ -269,7 +276,7 @@ void flb_test_statsd_sample()
char *buf = "gorets:1|c|@0.1";
int ret;

ret = test_normal(buf, &expected);
ret = test_normal(buf, &expected, FLB_FALSE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
Expand All @@ -288,7 +295,7 @@ void flb_test_statsd_gauge()
char *buf = "gaugor:333|g";
int ret;

ret = test_normal(buf, &expected);
ret = test_normal(buf, &expected, FLB_FALSE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
Expand All @@ -297,7 +304,7 @@ void flb_test_statsd_gauge()

void flb_test_statsd_set()
{
char *expected_strs[] = {"\"bucket\":\"uniques\"", "\"type\":\"set\"",
char *expected_strs[] = {"\"bucket\":\"uniques\"", "\"type\":\"set\"",
"\"value\":\"765\""};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
Expand All @@ -307,18 +314,81 @@ void flb_test_statsd_set()
char *buf = "uniques:765|s";
int ret;

ret = test_normal(buf, &expected);
ret = test_normal(buf, &expected, FLB_FALSE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
}
}

#ifdef FLB_HAVE_METRICS
void flb_test_statsd_metrics_gauge()
{
char *expected_strs[] = {"\"name\":\"gorets\"", "\"desc\":\"-\"", "\"type\":1"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
};

char *buf = "gorets:1|g";
int ret;

ret = test_normal(buf, &expected, FLB_TRUE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
}

}

void flb_test_statsd_metrics_counter()
{
char *expected_strs[] = {"\"name\":\"gorets\"", "\"desc\":\"-\"", "\"type\":0"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
};

char *buf = "gorets:1|c";
int ret;

ret = test_normal(buf, &expected, FLB_TRUE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
}

}

void flb_test_statsd_metrics_untyped()
{
char *expected_strs[] = {"\"name\":\"gorets\"", "\"desc\":\"-\"", "\"type\":4"};
struct str_list expected = {
.size = sizeof(expected_strs)/sizeof(char*),
.lists = &expected_strs[0],
};

char *buf = "gorets:1|s";
int ret;

ret = test_normal(buf, &expected, FLB_TRUE);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("test failed");
exit(EXIT_FAILURE);
}

}
#endif

TEST_LIST = {
{"count", flb_test_statsd_count},
{"sample", flb_test_statsd_sample},
{"gauge", flb_test_statsd_gauge},
{"set", flb_test_statsd_set},
#ifdef FLB_HAVE_METRICS
{"metrics_gauge", flb_test_statsd_metrics_gauge},
{"metrics_counter", flb_test_statsd_metrics_counter},
{"metrics_untyped", flb_test_statsd_metrics_untyped},
#endif
{NULL, NULL}
};