diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index fe24f3937d5..d8a441f6494 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -41,6 +41,7 @@ #include #include #include +#include "fluent-bit/flb_ra_key.h" #include #include @@ -52,6 +53,7 @@ #include "cloudwatch_api.h" + #define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException" #define ERR_CODE_NOT_FOUND "ResourceNotFoundException" @@ -199,12 +201,183 @@ static inline int try_to_write(char *buf, int *off, size_t left, return FLB_TRUE; } +static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, + struct log_stream *stream, int *offset) +{ + char ts[KEY_ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"keyAttributes\":{",0)) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"Type\":\"Service\"",0)) { + goto error; + } + if(stream->entity->key_attributes->name != NULL && + strlen(stream->entity->key_attributes->name) != 0) { + if (snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"Name\":\"",stream->entity->key_attributes->name,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->environment != NULL && + strlen(stream->entity->key_attributes->environment) != 0) { + if (snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"Environment\":\"",stream->entity->key_attributes->environment,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->account_id != NULL && + strlen(stream->entity->key_attributes->account_id) != 0) { + if (snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "},", 2)) { + goto error; + } + return 0; +error: + return -1; +} + +static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, + struct log_stream *stream,int *offset) +{ + char ts[ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"attributes\":{", + 0)) { + goto error; + } + if (stream->entity->attributes->platform_type != NULL && + strlen(stream->entity->attributes->platform_type) != 0) { + if (strcmp(stream->entity->attributes->platform_type, "eks") == 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","AWS::EKS","\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + if(stream->entity->attributes->cluster_name != NULL && + strlen(stream->entity->attributes->cluster_name) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"EKS.Cluster\":\"",stream->entity->attributes->cluster_name,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } + else if (strcmp(stream->entity->attributes->platform_type, "k8s") == 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","K8s","\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + if(stream->entity->attributes->cluster_name != NULL && + strlen(stream->entity->attributes->cluster_name) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Cluster\":\"",stream->entity->attributes->cluster_name,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } + } + else { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, "%s%s%s", + "\"PlatformType\":\"","Generic","\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->namespace != NULL && + strlen(stream->entity->attributes->namespace) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Namespace\":\"",stream->entity->attributes->namespace,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->node != NULL && + strlen(stream->entity->attributes->node) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Node\":\"",stream->entity->attributes->node,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->workload != NULL && + strlen(stream->entity->attributes->workload) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"K8s.Workload\":\"",stream->entity->attributes->workload,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->instance_id != NULL && + strlen(stream->entity->attributes->instance_id) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"EC2.InstanceId\":\"",stream->entity->attributes->instance_id,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->attributes->name_source != NULL && + strlen(stream->entity->attributes->name_source) != 0) { + if (snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s", + "\"AWS.ServiceNameSource\":\"",stream->entity->attributes->name_source,"\"") < 0) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "}", 1)) { + goto error; + } + return 0; +error: + return -1; +} + /* * Writes the "header" for a put log events payload */ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { + int ret; if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "{\"logGroupName\":\"", 17)) { goto error; @@ -229,6 +402,41 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, "\",", 2)) { goto error; } + /* + * If we are missing the service name, the entity will get rejected by the frontend + * anyway so do not emit entity unless service name is filled. If we are missing + * account ID, it is considered not having sufficient information for entity + * therefore we should drop the entity. + */ + if(ctx->add_entity && stream->entity != NULL && + stream->entity->key_attributes != NULL && + stream->entity->key_attributes->name != NULL && + stream->entity->key_attributes->account_id != NULL) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"entity\":{", 10)) { + goto error; + } + + if(stream->entity->key_attributes != NULL) { + ret = entity_add_key_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity KeyAttributes"); + goto error; + } + } + if(stream->entity->attributes != NULL) { + ret = entity_add_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity Attributes"); + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "},", 2)) { + goto error; + } + + } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, "\"logEvents\":[", 13)) { @@ -249,7 +457,7 @@ static int write_event(struct flb_cloudwatch *ctx, struct cw_flush *buf, { char ts[50]; - if (!snprintf(ts, 50, "%llu", event->timestamp)) { + if (snprintf(ts, 50, "%llu", event->timestamp) < 0) { goto error; } @@ -366,6 +574,80 @@ static int truncate_log(const struct flb_cloudwatch *ctx, const char *log_buffer return FLB_FALSE; } +/* + * Helper function to remove keys prefixed with aws_entity + * from a message pack map + */ +void remove_key_from_nested_map(msgpack_object_map *nested_map, msgpack_packer *pk, + int filtered_fields) +{ + const int remaining_kv_pairs = nested_map->size - filtered_fields; + uint32_t j; + + /* Pack the updated nested map into the packer, skipping keys in the remove list */ + msgpack_pack_map(pk, remaining_kv_pairs); + + for (j = 0; j < nested_map->size; j++) { + msgpack_object_kv nested_kv = nested_map->ptr[j]; + + /* Check if the current key is in the removal list */ + if (nested_kv.key.type == MSGPACK_OBJECT_STR && + nested_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && + strncmp(nested_kv.key.via.str.ptr, + AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + /* Skip the key in the remove list */ + continue; + } + + /* Pack the remaining key-value pairs into the packer */ + msgpack_pack_object(pk, nested_kv.key); + msgpack_pack_object(pk, nested_kv.val); + } +} + +/* + * Main function to remove keys prefixed with aws_entity + * from the root and nested message pack map + */ +void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key, + msgpack_packer *pk,int root_filtered_fields, int filtered_fields) +{ + uint32_t i; + + if (root_map->type == MSGPACK_OBJECT_MAP) { + msgpack_object_map root = root_map->via.map; + + /* Prepare to pack the modified root map (size may be unchanged or reduced) */ + msgpack_pack_map(pk, root.size-root_filtered_fields); + + for (i = 0; i < root.size; i++) { + msgpack_object_kv root_kv = root.ptr[i]; + + /* Check if this key matches the nested map key (e.g., "kubernetes") */ + if (filtered_fields > 0 && + root_kv.key.type == MSGPACK_OBJECT_STR && + strncmp(root_kv.key.via.str.ptr, + nested_map_key, root_kv.key.via.str.size) == 0 && + root_kv.val.type == MSGPACK_OBJECT_MAP) { + + msgpack_pack_object(pk, root_kv.key); + + remove_key_from_nested_map(&root_kv.val.via.map, pk,filtered_fields); + } + else if (root_filtered_fields > 0 && + root_kv.key.type == MSGPACK_OBJECT_STR && + root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && + strncmp(root_kv.key.via.str.ptr, + AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) { + } + else { + msgpack_pack_object(pk, root_kv.key); + msgpack_pack_object(pk, root_kv.val); + } + } + } +} + /* * Processes the msgpack object @@ -786,6 +1068,185 @@ int pack_emf_payload(struct flb_cloudwatch *ctx, return 0; } +static char* find_fallback_environment(struct flb_cloudwatch *ctx, entity *entity) +{ + if(!ctx->add_entity || entity == NULL) { + return NULL; + } + char *fallback_env = NULL; + int ret; + /* + * Possible fallback environments: + * 1. eks:cluster-name/namespace + * 2. k8s:cluster-name/namespace + */ + if (entity->attributes->platform_type != NULL && + entity->attributes->cluster_name != NULL && + entity->attributes->namespace != NULL) { + /* + * Calculate required length + * Add 3 for ':' '/' and null terminator + */ + size_t len = strlen(entity->attributes->platform_type) + + strlen(entity->attributes->cluster_name) + + strlen(entity->attributes->namespace) + 3; + + fallback_env = flb_malloc(len); + if (!fallback_env) { + return NULL; + } + + /* Use snprintf for cross-platform compatibility */ + ret = snprintf(fallback_env, len, "%s:%s/%s", + entity->attributes->platform_type, entity->attributes->cluster_name, + entity->attributes->namespace); + if (ret < 0 || ret >= len) { + flb_free(fallback_env); + return NULL; + } + + return fallback_env; + } + return NULL; +} + +/* + * Entity fields can change during stream lifecycle due to service name + * changes. The found_flag ensures filter_count accurately reflects + * which fields need filtering, preventing aws_entity fields from remaining + * in log messages when fallback values are used. + */ +static void set_entity_field(char **field, struct flb_ra_value *val, + int *filter_count, int *found_flag) +{ + if (!val || val->type != FLB_RA_STRING) { + return; + } + + if (found_flag && !*found_flag) { + if (filter_count) { + (*filter_count)++; + } + (*found_flag)++; + } + else if (!found_flag && *field == NULL && filter_count) { + (*filter_count)++; + } + + if (*field) { + flb_free(*field); + } + + if (val->storage == FLB_RA_REF) { + *field = flb_strndup(val->val.ref.buf, val->val.ref.len); + } + else { + *field = flb_strndup(val->val.string, flb_sds_len(val->val.string)); + } +} + +void parse_entity(struct flb_cloudwatch *ctx, entity *entity, + msgpack_object map, int map_size) +{ + struct flb_record_accessor *ra; + struct flb_ra_value *val; + int i; + + struct { + const char *path; + char **field; + int *filter_count; + int *found_flag; + } field_map[] = { + {"$kubernetes['aws_entity_service_name']", &entity->key_attributes->name, + &entity->filter_count, &entity->service_name_found}, + {"$kubernetes['aws_entity_environment']", &entity->key_attributes->environment, + &entity->filter_count, &entity->environment_found}, + {"$kubernetes['namespace_name']", &entity->attributes->namespace, + NULL, NULL}, + {"$kubernetes['host']", &entity->attributes->node, NULL, NULL}, + {"$kubernetes['aws_entity_cluster']", &entity->attributes->cluster_name, + &entity->filter_count, NULL}, + {"$kubernetes['aws_entity_workload']", &entity->attributes->workload, + &entity->filter_count, NULL}, + {"$kubernetes['aws_entity_name_source']", &entity->attributes->name_source, + &entity->filter_count, &entity->name_source_found}, + {"$kubernetes['aws_entity_platform']", &entity->attributes->platform_type, + &entity->filter_count, NULL}, + {"$aws_entity_ec2_instance_id", &entity->attributes->instance_id, + &entity->root_filter_count, NULL}, + {"$aws_entity_account_id", &entity->key_attributes->account_id, + &entity->root_filter_count, NULL}, + {NULL, NULL, NULL, NULL} + }; + + for (i = 0; field_map[i].path; i++) { + ra = flb_ra_create(field_map[i].path, FLB_FALSE); + if (!ra) { + continue; + } + + val = flb_ra_get_value_object(ra, map); + if (val) { + set_entity_field(field_map[i].field, val, field_map[i].filter_count, + field_map[i].found_flag); + flb_ra_key_value_destroy(val); + } + + flb_ra_destroy(ra); + } + + if (entity->key_attributes->name == NULL && + entity->attributes->name_source == NULL && + entity->attributes->workload != NULL) { + entity->key_attributes->name = flb_strndup(entity->attributes->workload, + strlen(entity->attributes->workload)); + entity->attributes->name_source = flb_strndup("K8sWorkload", 11); + } + + if (entity->key_attributes->environment == NULL) { + entity->key_attributes->environment = find_fallback_environment(ctx, entity); + } +} + +void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stream, + const msgpack_object map) +{ + if(stream->entity == NULL) { + stream->entity = flb_malloc(sizeof(entity)); + if (stream->entity == NULL) { + return; + } + memset(stream->entity, 0, sizeof(entity)); + + stream->entity->key_attributes = flb_malloc(sizeof(entity_key_attributes)); + if (stream->entity->key_attributes == NULL) { + flb_free(stream->entity); + stream->entity = NULL; + return; + } + memset(stream->entity->key_attributes, 0, sizeof(entity_key_attributes)); + + stream->entity->attributes = flb_malloc(sizeof(entity_attributes)); + if (stream->entity->attributes == NULL) { + flb_free(stream->entity->key_attributes); + flb_free(stream->entity); + stream->entity = NULL; + return; + } + memset(stream->entity->attributes, 0, sizeof(entity_attributes)); + stream->entity->filter_count = 0; + stream->entity->root_filter_count = 0; + stream->entity->service_name_found = 0; + stream->entity->environment_found = 0; + stream->entity->name_source_found = 0; + } + parse_entity(ctx,stream->entity,map, map.via.map.size); + if (!stream->entity) { + flb_plg_warn(ctx->ins, "Failed to generate entity"); + } +} + static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plugin, struct cw_flush *buf, flb_sds_t tag, const char *data, size_t bytes) @@ -800,6 +1261,12 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug msgpack_object emf_payload; /* msgpack::sbuffer is a simple buffer implementation. */ msgpack_sbuffer mp_sbuf; + /* + * Msgpack objects used to store msgpack after filtering out fields + * with aws entity prefix + */ + msgpack_sbuffer filtered_sbuf; + msgpack_unpacked modified_unpacked; struct log_stream *stream; @@ -848,11 +1315,32 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug map = *log_event.body; map_size = map.via.map.size; + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_init(&filtered_sbuf); + msgpack_unpacked_init(&modified_unpacked); + } stream = get_log_stream(ctx, tag, map); if (!stream) { flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); goto error; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + update_or_create_entity(ctx,stream,map); + if(stream->entity != NULL && + (stream->entity->root_filter_count > 0 || + stream->entity->filter_count > 0)) { + msgpack_packer pk; + msgpack_packer_init(&pk, &filtered_sbuf, msgpack_sbuffer_write); + remove_unneeded_field(&map, "kubernetes",&pk, + stream->entity->root_filter_count, stream->entity->filter_count); + + size_t modified_offset = 0; + if (msgpack_unpack_next(&modified_unpacked, filtered_sbuf.data, + filtered_sbuf.size, &modified_offset)) { + map = modified_unpacked.data; + } + } + } if (ctx->log_key) { key_str = NULL; @@ -974,6 +1462,10 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug if (ret == 0) { i++; } + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } } flb_log_event_decoder_destroy(&log_decoder); @@ -981,7 +1473,10 @@ static int process_log_events(struct flb_cloudwatch *ctx, const char *input_plug error: flb_log_event_decoder_destroy(&log_decoder); - + if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + msgpack_sbuffer_destroy(&filtered_sbuf); + msgpack_unpacked_destroy(&modified_unpacked); + } return -1; } @@ -1537,6 +2032,8 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, if (c) { flb_plg_debug(ctx->ins, "PutLogEvents http status=%d", c->resp.status); + flb_plg_debug(ctx->ins, "PutLogEvents http data=%s", c->resp.data); + flb_plg_debug(ctx->ins, "PutLogEvents http payload=%s", c->resp.payload); if (c->resp.status == 200) { if (c->resp.data == NULL || c->resp.data_len == 0 || strcasestr(c->resp.data, AMZN_REQUEST_ID_HEADER) == NULL) { diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index 05abfff30a1..697b15155cd 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -35,9 +35,21 @@ /* number of characters needed to 'end' a PutLogEvents payload */ #define PUT_LOG_EVENTS_FOOTER_LEN 4 +/* + * https://docs.aws.amazon.com/applicationsignals/latest/APIReference/API_Service.html + * Maximum number of character limits including both the KeyAttributes key and its value + */ +#define KEY_ATTRIBUTES_MAX_LEN 1100 +/* Maximum number of character limits including both the Attributes key and its value */ +#define ATTRIBUTES_MAX_LEN 300 + /* 256KiB minus 26 bytes for the event */ #define MAX_EVENT_LEN 262118 +/* Prefix used for entity fields only */ +#define AWS_ENTITY_PREFIX "aws_entity" +#define AWS_ENTITY_PREFIX_LEN 10 + #include "cloudwatch_logs.h" void cw_flush_destroy(struct cw_flush *buf); diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index c5e808ae141..4ab884ba9a0 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -378,6 +378,15 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, flb_output_upstream_set(upstream, ctx->ins); ctx->cw_client->host = ctx->endpoint; + struct mk_list *head; + struct flb_filter_instance *f_ins; + mk_list_foreach(head, &config->filters) { + f_ins = mk_list_entry(head, struct flb_filter_instance, _head); + if (strstr(f_ins->p->name, "kubernetes")) { + ctx->kubernete_metadata_enabled = true; + } + } + /* Export context */ flb_output_set_context(ins, ctx); @@ -530,6 +539,28 @@ static int cb_cloudwatch_exit(void *data, struct flb_config *config) return 0; } +void entity_destroy(entity *entity) +{ + if(entity->attributes) { + flb_free(entity->attributes->cluster_name); + flb_free(entity->attributes->instance_id); + flb_free(entity->attributes->namespace); + flb_free(entity->attributes->node); + flb_free(entity->attributes->platform_type); + flb_free(entity->attributes->workload); + flb_free(entity->attributes->name_source); + flb_free(entity->attributes); + } + if(entity->key_attributes) { + flb_free(entity->key_attributes->environment); + flb_free(entity->key_attributes->name); + flb_free(entity->key_attributes->type); + flb_free(entity->key_attributes->account_id); + flb_free(entity->key_attributes); + } + flb_free(entity); +} + void log_stream_destroy(struct log_stream *stream) { if (stream) { @@ -539,6 +570,9 @@ void log_stream_destroy(struct log_stream *stream) if (stream->group) { flb_sds_destroy(stream->group); } + if (stream->entity) { + entity_destroy(stream->entity); + } flb_free(stream); } } @@ -689,6 +723,12 @@ static struct flb_config_map config_map[] = { "Specify the log storage class. Valid values are STANDARD (default) and INFREQUENT_ACCESS." }, + { + FLB_CONFIG_MAP_BOOL, "add_entity", "false", + 0, FLB_TRUE, offsetof(struct flb_cloudwatch, add_entity), + "add entity to PutLogEvent calls" + }, + /* EOF */ {0} }; diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 3724863426a..9381e1f1376 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -30,6 +30,45 @@ #include #include +/* + * Entity object used for associating the telemetry + * in the PutLogEvent call + */ +typedef struct entity { + struct entity_key_attributes *key_attributes; + struct entity_attributes *attributes; + int filter_count; + int service_name_found; + int environment_found; + int name_source_found; + int root_filter_count; +}entity; + +/* + * KeyAttributes used for CloudWatch Entity object + * in the PutLogEvent call + */ +typedef struct entity_key_attributes { + char *type; + char *name; + char *environment; + char *account_id; +}entity_key_attributes; + +/* + * Attributes used for CloudWatch Entity object + * in the PutLogEvent call + */ +typedef struct entity_attributes { + char *platform_type; + char *cluster_name; + char *namespace; + char *workload; + char *node; + char *instance_id; + char *name_source; +}entity_attributes; + #define LOG_CLASS_STANDARD "STANDARD" #define LOG_CLASS_STANDARD_LEN 8 #define LOG_CLASS_INFREQUENT_ACCESS "INFREQUENT_ACCESS" @@ -94,6 +133,13 @@ struct log_stream { unsigned long long oldest_event; unsigned long long newest_event; + /* + * PutLogEvents entity object + * variable that store service or infrastructure + * information + */ + struct entity *entity; + struct mk_list _head; }; @@ -159,6 +205,15 @@ struct flb_cloudwatch { /* Plugin output instance reference */ struct flb_output_instance *ins; + + /* + * Checks if kubernete filter is enabled + * So the plugin knows when to scrape for Entity + */ + + int kubernete_metadata_enabled; + + int add_entity; }; void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);