Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <cmetrics/cmt_encode_prometheus_remote_write.h>
#include <cmetrics/cmt_encode_msgpack.h>
#include <cmetrics/cmt_encode_splunk_hec.h>
#include <cmetrics/cmt_encode_cloudwatch_emf.h>

/* Metrics IDs for general purpose (used by core and Plugins */
#define FLB_METRIC_N_RECORDS 0
Expand Down
140 changes: 113 additions & 27 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <fluent-bit/flb_http_client.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_intermediate_metric.h>
#include <fluent-bit/flb_metrics.h>

#include <monkey/mk_core.h>
#include <msgpack.h>
Expand Down Expand Up @@ -783,13 +784,9 @@ int pack_emf_payload(struct flb_cloudwatch *ctx,
return 0;
}

/*
* Main routine- processes msgpack and sends in batches which ignore the empty ones
* return value is the number of events processed and send.
*/
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes)
static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes)
{
int i = 0;
size_t map_size;
Expand Down Expand Up @@ -834,7 +831,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
if (strncmp(input_plugin, "cpu", 3) == 0) {
intermediate_metric_type = GAUGE;
intermediate_metric_unit = PERCENT;
}
}
else if (strncmp(input_plugin, "mem", 3) == 0) {
intermediate_metric_type = GAUGE;
intermediate_metric_unit = BYTES;
Expand Down Expand Up @@ -901,16 +898,16 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
continue;
}

if (strncmp(input_plugin, "cpu", 3) == 0
if (strncmp(input_plugin, "cpu", 3) == 0
|| strncmp(input_plugin, "mem", 3) == 0) {
/* Added for EMF support: Construct a list */
struct mk_list flb_intermediate_metrics;
mk_list_init(&flb_intermediate_metrics);

kv = map.via.map.ptr;

/*
* Iterate through the record map, extract intermediate metric data,
/*
* Iterate through the record map, extract intermediate metric data,
* and add to the list.
*/
for (i = 0; i < map_size; i++) {
Expand All @@ -926,25 +923,25 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
metric->timestamp = log_event.timestamp;

mk_list_add(&metric->_head, &flb_intermediate_metrics);
}

}

/* The msgpack object is only valid during the lifetime of the
* sbuffer & the unpacked result.
*/
*/
msgpack_sbuffer_init(&mp_sbuf);
msgpack_unpacked_init(&mp_emf_result);

ret = pack_emf_payload(ctx,
&flb_intermediate_metrics,
input_plugin,
log_event.timestamp,
&mp_sbuf,
&mp_emf_result,
&emf_payload);
&flb_intermediate_metrics,
input_plugin,
log_event.timestamp,
&mp_sbuf,
&mp_emf_result,
&emf_payload);

/* free the intermediate metric list */

mk_list_foreach_safe(head, tmp, &flb_intermediate_metrics) {
an_item = mk_list_entry(head, struct flb_intermediate_metric, _head);
mk_list_del(&an_item->_head);
Expand Down Expand Up @@ -978,6 +975,100 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
}
flb_log_event_decoder_destroy(&log_decoder);

return i;

error:
flb_log_event_decoder_destroy(&log_decoder);

return -1;
}


static int process_metric_events(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes)
{
int i = 0;
int ret;
msgpack_object map;
msgpack_unpacked mp_emf_result;

struct log_stream *stream;

size_t off = 0;
struct cmt *cmt;
char *mp_buf = NULL;
size_t mp_size = 0;
size_t mp_off = 0;
struct flb_time tm;

while ((ret = cmt_decode_msgpack_create(&cmt,
data,
bytes, &off)) == CMT_DECODE_MSGPACK_SUCCESS) {
ret = cmt_encode_cloudwatch_emf_create(cmt, &mp_buf, &mp_size, CMT_FALSE);
if (ret < 0) {
goto cmt_error;
}

msgpack_unpacked_init(&mp_emf_result);
while (msgpack_unpack_next(&mp_emf_result, mp_buf, mp_size, &mp_off) == MSGPACK_UNPACK_SUCCESS) {
map = mp_emf_result.data;
if (map.type != MSGPACK_OBJECT_MAP) {
continue;
}

stream = get_log_stream(ctx, tag, map);
if (!stream) {
flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag);
goto cmt_error;
}

flb_time_get(&tm);
ret = add_event(ctx, buf, stream, &map,
&tm);

if (ret < 0 ) {
goto cmt_error;
}

if (ret == 0) {
i++;
}
}
cmt_encode_cloudwatch_emf_destroy(mp_buf);
msgpack_unpacked_destroy(&mp_emf_result);
cmt_destroy(cmt);
}

return i;

cmt_error:
cmt_destroy(cmt);

return -1;
}

/*
* Main routine- processes msgpack and sends in batches which ignore the empty ones
* return value is the number of events processed and send.
*/
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes, int event_type)
{
int ret;
int i = 0;

if (event_type == FLB_EVENT_TYPE_LOGS) {
i = process_log_events(ctx, input_plugin,
buf, tag,
data, bytes);
}
else if (event_type == FLB_EVENT_TYPE_METRICS) {
i = process_metric_events(ctx, input_plugin,
buf, tag,
data, bytes);
}
/* send any remaining events */
ret = send_log_events(ctx, buf);
reset_flush_buf(ctx, buf);
Expand All @@ -987,11 +1078,6 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,

/* return number of events */
return i;

error:
flb_log_event_decoder_destroy(&log_decoder);

return -1;
}

struct log_stream *get_or_create_log_stream(struct flb_cloudwatch *ctx,
Expand Down
4 changes: 2 additions & 2 deletions plugins/out_cloudwatch_logs/cloudwatch_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@

void cw_flush_destroy(struct cw_flush *buf);

int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
struct cw_flush *buf, flb_sds_t tag,
const char *data, size_t bytes);
const char *data, size_t bytes, int event_type);
int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry);
struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag,
const msgpack_object map);
Expand Down
8 changes: 7 additions & 1 deletion plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(FLB_RETRY);
}

event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size);
event_count = process_and_send(ctx, i_ins->p->name, buf, event_chunk->tag, event_chunk->data, event_chunk->size,
event_chunk->type);
if (event_count < 0) {
flb_plg_error(ctx->ins, "Failed to send events");
cw_flush_destroy(buf);
Expand Down Expand Up @@ -477,6 +478,10 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx)
flb_sds_destroy(ctx->stream_name);
}

if (ctx->metric_namespace) {
flb_sds_destroy(ctx->metric_namespace);
}

mk_list_foreach_safe(head, tmp, &ctx->streams) {
stream = mk_list_entry(head, struct log_stream, _head);
mk_list_del(&stream->_head);
Expand Down Expand Up @@ -660,6 +665,7 @@ struct flb_output_plugin out_cloudwatch_logs_plugin = {
.cb_exit = cb_cloudwatch_exit,
.flags = 0,
.workers = 1,
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,

/* Configuration */
.config_map = config_map,
Expand Down
47 changes: 47 additions & 0 deletions tests/runtime/out_cloudwatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,52 @@ void flb_test_cloudwatch_success(void)
flb_destroy(ctx);
}

/* It writes a json/emf formatted metrics */
void flb_test_cloudwatch_success_with_metrics(void)
{
int ret;
flb_ctx_t *ctx;
int in_ffd;
int out_ffd;

/* mocks calls- signals that we are in test mode */
setenv("FLB_CLOUDWATCH_PLUGIN_UNDER_TEST", "true", 1);

ctx = flb_create();
flb_service_set(ctx,
"Flush", "0.200000000",
"Grace", "1",
NULL);

/* Input */
in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL);
TEST_CHECK(in_ffd >= 0);
ret = flb_input_set(ctx, in_ffd, "tag", "test", NULL);
TEST_CHECK(ret == 0);
ret = flb_input_set(ctx, in_ffd, "scrape_on_start", "true", NULL);
TEST_CHECK(ret == 0);
ret = flb_input_set(ctx, in_ffd, "scrape_interval", "1", NULL);
TEST_CHECK(ret == 0);

out_ffd = flb_output(ctx, (char *) "cloudwatch_logs", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd,"match", "test", NULL);
flb_output_set(ctx, out_ffd,"region", "us-west-2", NULL);
flb_output_set(ctx, out_ffd,"log_format", "json_emf", NULL);
flb_output_set(ctx, out_ffd,"log_group_name", "fluent-health", NULL);
flb_output_set(ctx, out_ffd,"log_stream_prefix", "from-cmetrics-", NULL);
flb_output_set(ctx, out_ffd,"auto_create_group", "On", NULL);
flb_output_set(ctx, out_ffd,"net.keepalive", "Off", NULL);
flb_output_set(ctx, out_ffd,"Retry_Limit", "1", NULL);

ret = flb_start(ctx);
TEST_CHECK(ret == 0);

sleep(2);
flb_stop(ctx);
flb_destroy(ctx);
}

void flb_test_cloudwatch_already_exists_create_group(void)
{
int ret;
Expand Down Expand Up @@ -350,6 +396,7 @@ void flb_test_cloudwatch_error_put_retention_policy(void)
/* Test list */
TEST_LIST = {
{"success", flb_test_cloudwatch_success },
{"success_with_metrics", flb_test_cloudwatch_success_with_metrics},
{"group_already_exists", flb_test_cloudwatch_already_exists_create_group },
{"stream_already_exists", flb_test_cloudwatch_already_exists_create_stream },
{"create_group_error", flb_test_cloudwatch_error_create_group },
Expand Down