From 9ff8b5d7f1045320996d97d7eb3dbd4fe3cdd4ae Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 5 Nov 2024 14:47:23 -0600 Subject: [PATCH 1/3] processor_content_modifier: allow 'fluent-bit' as a producer in metadata Signed-off-by: Eduardo Silva --- .../processor_content_modifier/cm_metrics.c | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/plugins/processor_content_modifier/cm_metrics.c b/plugins/processor_content_modifier/cm_metrics.c index d4e97aec533..239e048c2e5 100644 --- a/plugins/processor_content_modifier/cm_metrics.c +++ b/plugins/processor_content_modifier/cm_metrics.c @@ -262,14 +262,6 @@ int cm_metrics_process(struct flb_processor_instance *ins, int ret = -1; struct cfl_variant *var = NULL; - printf("\n\n==== BEFORE =====\n"); - cfl_kvlist_print(stdout, in_cmt->internal_metadata); - printf("\n"); - printf("-----external----\n"); - cfl_kvlist_print(stdout, in_cmt->external_metadata); - fflush(stdout); - - if (ctx->context_type == CM_CONTEXT_OTEL_RESOURCE_ATTR) { /* Internal metadata must be valid */ var = cfl_kvlist_fetch(in_cmt->internal_metadata, "producer"); @@ -282,7 +274,8 @@ int cm_metrics_process(struct flb_processor_instance *ins, } /* validate that the value is 'opentelemetry' */ - if (strcmp(var->data.as_string, "opentelemetry") != 0) { + if (strcmp(var->data.as_string, "opentelemetry") != 0 && + strcmp(var->data.as_string, "fluent-bit") != 0) { return FLB_PROCESSOR_FAILURE; } @@ -335,12 +328,5 @@ int cm_metrics_process(struct flb_processor_instance *ins, return FLB_PROCESSOR_FAILURE; } - printf("\n\n==== AFTER =====\n"); - cfl_kvlist_print(stdout, in_cmt->internal_metadata); - printf("\n"); - printf("-----external----\n"); - cfl_kvlist_print(stdout, in_cmt->external_metadata); - fflush(stdout); - return FLB_PROCESSOR_SUCCESS; } From 7ce2bc0c65a19ac4c7dd9bdee27921c3cf74e354 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 5 Nov 2024 14:54:37 -0600 Subject: [PATCH 2/3] lib: cmetrics: upgrade to v0.9.9 Signed-off-by: Eduardo Silva --- lib/cmetrics/CMakeLists.txt | 2 +- lib/cmetrics/src/cmt_encode_opentelemetry.c | 69 +++++++++------------ 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/lib/cmetrics/CMakeLists.txt b/lib/cmetrics/CMakeLists.txt index 55d02dc8619..18582997d61 100644 --- a/lib/cmetrics/CMakeLists.txt +++ b/lib/cmetrics/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # CMetrics Version set(CMT_VERSION_MAJOR 0) set(CMT_VERSION_MINOR 9) -set(CMT_VERSION_PATCH 8) +set(CMT_VERSION_PATCH 9) set(CMT_VERSION_STR "${CMT_VERSION_MAJOR}.${CMT_VERSION_MINOR}.${CMT_VERSION_PATCH}") # Include helpers diff --git a/lib/cmetrics/src/cmt_encode_opentelemetry.c b/lib/cmetrics/src/cmt_encode_opentelemetry.c index af7605f7e73..8f961aa7625 100644 --- a/lib/cmetrics/src/cmt_encode_opentelemetry.c +++ b/lib/cmetrics/src/cmt_encode_opentelemetry.c @@ -273,7 +273,7 @@ static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValu { if (kvpair != NULL) { if (kvpair->key != NULL) { - free(kvpair->key); + cfl_sds_destroy(kvpair->key); } if (kvpair->value != NULL) { @@ -323,7 +323,7 @@ static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyV if (value != NULL) { if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) { if (value->string_value != NULL) { - free(value->string_value); + cfl_sds_destroy(value->string_value); } } else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE) { @@ -494,21 +494,19 @@ static inline Opentelemetry__Proto__Common__V1__KeyValue *cfl_variant_kvpair_to_ pair = otlp_kvpair_value_initialize(); if (pair != NULL) { - pair->key = strdup(input_pair->key); + pair->key = cfl_sds_create(input_pair->key); if (pair->key != NULL) { pair->value = cfl_variant_to_otlp_any_value(input_pair->val); if (pair->value == NULL) { - free(pair->key); - + cfl_sds_destroy(pair->key); pair->key = NULL; } } if (pair->key == NULL) { free(pair); - pair = NULL; } } @@ -649,11 +647,9 @@ static inline Opentelemetry__Proto__Common__V1__AnyValue *cfl_variant_string_to_ result = otlp_any_value_initialize(CFL_VARIANT_STRING, 0); if (result != NULL) { - result->string_value = strdup(value->data.as_string); - + result->string_value = cfl_sds_create(value->data.as_string); if (result->string_value == NULL) { otlp_any_value_destroy(result); - result = NULL; } } @@ -1061,50 +1057,49 @@ static Opentelemetry__Proto__Common__V1__InstrumentationScope * return NULL; } + /* cmetrics: retrieve attributes and metadata fields */ attributes = fetch_metadata_kvlist_key(scope_root, "attributes"); metadata = fetch_metadata_kvlist_key(scope_root, "metadata"); - if (cfl_kvlist_count(attributes) == 0 && - cfl_kvlist_count(metadata) == 0) { - return NULL; - } - + /* create scope */ scope = calloc(1, sizeof(Opentelemetry__Proto__Common__V1__InstrumentationScope)); if (scope == NULL) { *error_detection_flag = CMT_TRUE; - return NULL; } - opentelemetry__proto__common__v1__instrumentation_scope__init(scope); - scope->attributes = cfl_kvlist_to_otlp_kvpair_list(attributes); + /* attributes */ + if (attributes && cfl_kvlist_count(attributes) > 0) { + scope->attributes = cfl_kvlist_to_otlp_kvpair_list(attributes); - if (scope->attributes == NULL) { - *error_detection_flag = CMT_TRUE; - } - - scope->n_attributes = cfl_kvlist_count(attributes); + if (scope->attributes == NULL) { + *error_detection_flag = CMT_TRUE; + } - if (!(*error_detection_flag)) { - scope->dropped_attributes_count = (uint32_t) fetch_metadata_int64_key( - metadata, - "dropped_attributes_count", - error_detection_flag); + scope->n_attributes = cfl_kvlist_count(attributes); } - if (!(*error_detection_flag)) { - scope->name = fetch_metadata_string_key(metadata, "name", error_detection_flag); - } + /* scope metadata */ + if (metadata) { + if (!(*error_detection_flag)) { + scope->dropped_attributes_count = (uint32_t) fetch_metadata_int64_key( + metadata, + "dropped_attributes_count", + error_detection_flag); + } - if (!(*error_detection_flag)) { - scope->version = fetch_metadata_string_key(metadata, "version", error_detection_flag); + if (!(*error_detection_flag)) { + scope->name = fetch_metadata_string_key(metadata, "name", error_detection_flag); + } + + if (!(*error_detection_flag)) { + scope->version = fetch_metadata_string_key(metadata, "version", error_detection_flag); + } } - if (*error_detection_flag && - scope != NULL) { + if (*error_detection_flag && scope != NULL) { destroy_instrumentation_scope(scope); - scope = NULL; } @@ -1205,13 +1200,11 @@ static void destroy_attribute(Opentelemetry__Proto__Common__V1__KeyValue *attrib { if (attribute != NULL) { if (attribute->value != NULL) { - if (attribute->value->value_case == \ - OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) { + if (attribute->value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) { if (is_string_releaseable(attribute->value->string_value)) { cfl_sds_destroy(attribute->value->string_value); } } - free(attribute->value); } From d2109f52fe30f11419d00e6619628c01d6b6119b Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Tue, 5 Nov 2024 14:55:28 -0600 Subject: [PATCH 3/3] processor_opentelemetry_envelope: add support for Metrics Signed-off-by: Eduardo Silva --- .../otel_envelope.c | 108 +++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/plugins/processor_opentelemetry_envelope/otel_envelope.c b/plugins/processor_opentelemetry_envelope/otel_envelope.c index c83fbb8f328..82c22084b0c 100644 --- a/plugins/processor_opentelemetry_envelope/otel_envelope.c +++ b/plugins/processor_opentelemetry_envelope/otel_envelope.c @@ -206,6 +206,112 @@ static int cb_process_logs(struct flb_processor_instance *ins, return FLB_PROCESSOR_SUCCESS; } +static int metrics_add_kvlist(struct cfl_kvlist *kvlist, char *kv1, char *sub_kv1, char *sub_kv2) +{ + int ret; + struct cfl_variant *var; + struct cfl_kvlist *tmp_kvlist; + + var = cfl_kvlist_fetch(kvlist, kv1); + if (!var) { + tmp_kvlist = cfl_kvlist_create(); + if (!tmp_kvlist) { + return -1; + } + ret = cfl_kvlist_insert_kvlist(kvlist, kv1, tmp_kvlist); + if (ret != 0) { + return -1; + } + + /* retrieve the last kv inserted */ + var = cfl_kvlist_fetch(kvlist, kv1); + } + else if (var->type != CFL_VARIANT_KVLIST) { + return -1; + } + + if (!var) { + return -1; + } + + if (sub_kv1) { + ret = metrics_add_kvlist(var->data.as_kvlist, sub_kv1, NULL, NULL); + if (ret != 0) { + return -1; + } + } + + if (sub_kv2) { + ret = metrics_add_kvlist(var->data.as_kvlist, sub_kv2, NULL, NULL); + if (ret != 0) { + return -1; + } + } + + return 0; +} + +static int cb_process_metrics(struct flb_processor_instance *processor_instance, + struct cmt *cmt, + struct cmt **out_context, + const char *tag, int tag_len) +{ + (void) out_context; + (void) tag; + (void) tag_len; + int ret; + struct cfl_variant *var = NULL; + + /* Check internal metadata, look for some producer, if no one is set, add it */ + if (!cmt->internal_metadata) { + cmt->internal_metadata = cfl_kvlist_create(); + if (!cmt->internal_metadata) { + return FLB_PROCESSOR_FAILURE; + } + } + else { + var = cfl_kvlist_fetch(cmt->internal_metadata, "producer"); + } + if (!var) { + cfl_kvlist_insert_string(cmt->internal_metadata, "producer", "fluent-bit"); + } + + /* externl metadata */ + if (!cmt->external_metadata) { + cmt->external_metadata = cfl_kvlist_create(); + if (!cmt->external_metadata) { + return FLB_PROCESSOR_FAILURE; + } + } + + /* scope */ + ret = metrics_add_kvlist(cmt->external_metadata, "scope", "metadata", "attributes"); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + /* scope_metrics */ + ret = metrics_add_kvlist(cmt->external_metadata, "scope_metrics", "metadata", NULL); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + /* resource */ + ret = metrics_add_kvlist(cmt->external_metadata, "resource", "metadata", "attributes"); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + /* resource_metrics */ + ret = metrics_add_kvlist(cmt->external_metadata, "resource_metrics", "metadata", NULL); + if (ret != 0) { + return FLB_PROCESSOR_FAILURE; + } + + *out_context = NULL; + return FLB_PROCESSOR_SUCCESS; +} + static struct flb_config_map config_map[] = { /* EOF */ {0} @@ -216,7 +322,7 @@ struct flb_processor_plugin processor_opentelemetry_envelope_plugin = { .description = "Package log records inside an OpenTelemetry Logs schema", .cb_init = cb_init, .cb_process_logs = cb_process_logs, - .cb_process_metrics = NULL, + .cb_process_metrics = cb_process_metrics, .cb_process_traces = NULL, .cb_exit = cb_exit, .config_map = config_map,