Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/cmetrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# CMetrics Version
set(CMT_VERSION_MAJOR 0)
set(CMT_VERSION_MINOR 9)
set(CMT_VERSION_PATCH 8)
set(CMT_VERSION_PATCH 9)
set(CMT_VERSION_STR "${CMT_VERSION_MAJOR}.${CMT_VERSION_MINOR}.${CMT_VERSION_PATCH}")

# Include helpers
Expand Down
69 changes: 31 additions & 38 deletions lib/cmetrics/src/cmt_encode_opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValu
{
if (kvpair != NULL) {
if (kvpair->key != NULL) {
free(kvpair->key);
cfl_sds_destroy(kvpair->key);
}

if (kvpair->value != NULL) {
Expand Down Expand Up @@ -323,7 +323,7 @@ static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyV
if (value != NULL) {
if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) {
if (value->string_value != NULL) {
free(value->string_value);
cfl_sds_destroy(value->string_value);
}
}
else if (value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_ARRAY_VALUE) {
Expand Down Expand Up @@ -494,21 +494,19 @@ static inline Opentelemetry__Proto__Common__V1__KeyValue *cfl_variant_kvpair_to_
pair = otlp_kvpair_value_initialize();

if (pair != NULL) {
pair->key = strdup(input_pair->key);
pair->key = cfl_sds_create(input_pair->key);

if (pair->key != NULL) {
pair->value = cfl_variant_to_otlp_any_value(input_pair->val);

if (pair->value == NULL) {
free(pair->key);

cfl_sds_destroy(pair->key);
pair->key = NULL;
}
}

if (pair->key == NULL) {
free(pair);

pair = NULL;
}
}
Expand Down Expand Up @@ -649,11 +647,9 @@ static inline Opentelemetry__Proto__Common__V1__AnyValue *cfl_variant_string_to_
result = otlp_any_value_initialize(CFL_VARIANT_STRING, 0);

if (result != NULL) {
result->string_value = strdup(value->data.as_string);

result->string_value = cfl_sds_create(value->data.as_string);
if (result->string_value == NULL) {
otlp_any_value_destroy(result);

result = NULL;
}
}
Expand Down Expand Up @@ -1061,50 +1057,49 @@ static Opentelemetry__Proto__Common__V1__InstrumentationScope *
return NULL;
}

/* cmetrics: retrieve attributes and metadata fields */
attributes = fetch_metadata_kvlist_key(scope_root, "attributes");
metadata = fetch_metadata_kvlist_key(scope_root, "metadata");

if (cfl_kvlist_count(attributes) == 0 &&
cfl_kvlist_count(metadata) == 0) {
return NULL;
}

/* create scope */
scope = calloc(1, sizeof(Opentelemetry__Proto__Common__V1__InstrumentationScope));
if (scope == NULL) {
*error_detection_flag = CMT_TRUE;

return NULL;
}

opentelemetry__proto__common__v1__instrumentation_scope__init(scope);

scope->attributes = cfl_kvlist_to_otlp_kvpair_list(attributes);
/* attributes */
if (attributes && cfl_kvlist_count(attributes) > 0) {
scope->attributes = cfl_kvlist_to_otlp_kvpair_list(attributes);

if (scope->attributes == NULL) {
*error_detection_flag = CMT_TRUE;
}

scope->n_attributes = cfl_kvlist_count(attributes);
if (scope->attributes == NULL) {
*error_detection_flag = CMT_TRUE;
}

if (!(*error_detection_flag)) {
scope->dropped_attributes_count = (uint32_t) fetch_metadata_int64_key(
metadata,
"dropped_attributes_count",
error_detection_flag);
scope->n_attributes = cfl_kvlist_count(attributes);
}

if (!(*error_detection_flag)) {
scope->name = fetch_metadata_string_key(metadata, "name", error_detection_flag);
}
/* scope metadata */
if (metadata) {
if (!(*error_detection_flag)) {
scope->dropped_attributes_count = (uint32_t) fetch_metadata_int64_key(
metadata,
"dropped_attributes_count",
error_detection_flag);
}

if (!(*error_detection_flag)) {
scope->version = fetch_metadata_string_key(metadata, "version", error_detection_flag);
if (!(*error_detection_flag)) {
scope->name = fetch_metadata_string_key(metadata, "name", error_detection_flag);
}

if (!(*error_detection_flag)) {
scope->version = fetch_metadata_string_key(metadata, "version", error_detection_flag);
}
}

if (*error_detection_flag &&
scope != NULL) {
if (*error_detection_flag && scope != NULL) {
destroy_instrumentation_scope(scope);

scope = NULL;
}

Expand Down Expand Up @@ -1205,13 +1200,11 @@ static void destroy_attribute(Opentelemetry__Proto__Common__V1__KeyValue *attrib
{
if (attribute != NULL) {
if (attribute->value != NULL) {
if (attribute->value->value_case == \
OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) {
if (attribute->value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) {
if (is_string_releaseable(attribute->value->string_value)) {
cfl_sds_destroy(attribute->value->string_value);
}
}

free(attribute->value);
}

Expand Down
18 changes: 2 additions & 16 deletions plugins/processor_content_modifier/cm_metrics.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,6 @@ int cm_metrics_process(struct flb_processor_instance *ins,
int ret = -1;
struct cfl_variant *var = NULL;

printf("\n\n==== BEFORE =====\n");
cfl_kvlist_print(stdout, in_cmt->internal_metadata);
printf("\n");
printf("-----external----\n");
cfl_kvlist_print(stdout, in_cmt->external_metadata);
fflush(stdout);


if (ctx->context_type == CM_CONTEXT_OTEL_RESOURCE_ATTR) {
/* Internal metadata must be valid */
var = cfl_kvlist_fetch(in_cmt->internal_metadata, "producer");
Expand All @@ -282,7 +274,8 @@ int cm_metrics_process(struct flb_processor_instance *ins,
}

/* validate that the value is 'opentelemetry' */
if (strcmp(var->data.as_string, "opentelemetry") != 0) {
if (strcmp(var->data.as_string, "opentelemetry") != 0 &&
strcmp(var->data.as_string, "fluent-bit") != 0) {
return FLB_PROCESSOR_FAILURE;
}

Expand Down Expand Up @@ -335,12 +328,5 @@ int cm_metrics_process(struct flb_processor_instance *ins,
return FLB_PROCESSOR_FAILURE;
}

printf("\n\n==== AFTER =====\n");
cfl_kvlist_print(stdout, in_cmt->internal_metadata);
printf("\n");
printf("-----external----\n");
cfl_kvlist_print(stdout, in_cmt->external_metadata);
fflush(stdout);

return FLB_PROCESSOR_SUCCESS;
}
108 changes: 107 additions & 1 deletion plugins/processor_opentelemetry_envelope/otel_envelope.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,112 @@ static int cb_process_logs(struct flb_processor_instance *ins,
return FLB_PROCESSOR_SUCCESS;
}

static int metrics_add_kvlist(struct cfl_kvlist *kvlist, char *kv1, char *sub_kv1, char *sub_kv2)
{
int ret;
struct cfl_variant *var;
struct cfl_kvlist *tmp_kvlist;

var = cfl_kvlist_fetch(kvlist, kv1);
if (!var) {
tmp_kvlist = cfl_kvlist_create();
if (!tmp_kvlist) {
return -1;
}
ret = cfl_kvlist_insert_kvlist(kvlist, kv1, tmp_kvlist);
if (ret != 0) {
return -1;
}

/* retrieve the last kv inserted */
var = cfl_kvlist_fetch(kvlist, kv1);
}
else if (var->type != CFL_VARIANT_KVLIST) {
return -1;
}

if (!var) {
return -1;
}

if (sub_kv1) {
ret = metrics_add_kvlist(var->data.as_kvlist, sub_kv1, NULL, NULL);
if (ret != 0) {
return -1;
}
}

if (sub_kv2) {
ret = metrics_add_kvlist(var->data.as_kvlist, sub_kv2, NULL, NULL);
if (ret != 0) {
return -1;
}
}

return 0;
}

static int cb_process_metrics(struct flb_processor_instance *processor_instance,
struct cmt *cmt,
struct cmt **out_context,
const char *tag, int tag_len)
{
(void) out_context;
(void) tag;
(void) tag_len;
int ret;
struct cfl_variant *var = NULL;

/* Check internal metadata, look for some producer, if no one is set, add it */
if (!cmt->internal_metadata) {
cmt->internal_metadata = cfl_kvlist_create();
if (!cmt->internal_metadata) {
return FLB_PROCESSOR_FAILURE;
}
}
else {
var = cfl_kvlist_fetch(cmt->internal_metadata, "producer");
}
if (!var) {
cfl_kvlist_insert_string(cmt->internal_metadata, "producer", "fluent-bit");
}

/* externl metadata */
if (!cmt->external_metadata) {
cmt->external_metadata = cfl_kvlist_create();
if (!cmt->external_metadata) {
return FLB_PROCESSOR_FAILURE;
}
}

/* scope */
ret = metrics_add_kvlist(cmt->external_metadata, "scope", "metadata", "attributes");
if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}

/* scope_metrics */
ret = metrics_add_kvlist(cmt->external_metadata, "scope_metrics", "metadata", NULL);
if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}

/* resource */
ret = metrics_add_kvlist(cmt->external_metadata, "resource", "metadata", "attributes");
if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}

/* resource_metrics */
ret = metrics_add_kvlist(cmt->external_metadata, "resource_metrics", "metadata", NULL);
if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}

*out_context = NULL;
return FLB_PROCESSOR_SUCCESS;
}

static struct flb_config_map config_map[] = {
/* EOF */
{0}
Expand All @@ -216,7 +322,7 @@ struct flb_processor_plugin processor_opentelemetry_envelope_plugin = {
.description = "Package log records inside an OpenTelemetry Logs schema",
.cb_init = cb_init,
.cb_process_logs = cb_process_logs,
.cb_process_metrics = NULL,
.cb_process_metrics = cb_process_metrics,
.cb_process_traces = NULL,
.cb_exit = cb_exit,
.config_map = config_map,
Expand Down