From 49e89e5c01e23e3d174ca46e4119013b12ff8c69 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Wed, 10 Jul 2024 21:13:32 -0600 Subject: [PATCH] out_opentelemetry: restore manual parsing for log record fields (fix #9071) When the log records comes as raw, meaning non-otel schema, the user can specify through the configuration how to extract different fields from the main message body such as: - traceId - spanId - traceFlags - severityText - severityNumber This PR fix the manual parsing that got broken after the refactor. This fix issue #9071 Signed-off-by: Eduardo Silva --- .../out_opentelemetry/opentelemetry_logs.c | 263 +++++++++++++++--- 1 file changed, 228 insertions(+), 35 deletions(-) diff --git a/plugins/out_opentelemetry/opentelemetry_logs.c b/plugins/out_opentelemetry/opentelemetry_logs.c index d9dfcfbdd7b..61c5722ba47 100644 --- a/plugins/out_opentelemetry/opentelemetry_logs.c +++ b/plugins/out_opentelemetry/opentelemetry_logs.c @@ -35,6 +35,52 @@ #include "opentelemetry_conf.h" #include "opentelemetry_utils.h" +static int hex_to_int(char ch) +{ + if (ch >= '0' && ch <= '9') { + return ch - '0'; + } + + if (ch >= 'a' && ch <= 'f') { + return ch - 'a' + 10; + } + + if (ch >= 'A' && ch <= 'F') { + return ch - 'A' + 10; + } + + return -1; +} + +/* convert an hex string to the expected id (16 bytes) */ +static int hex_to_id(char *str, int len, unsigned char *out_buf, int out_size) +{ + int i; + int high; + int low; + + if (len % 2 != 0) { + return -1; + } + + for (i = 0; i < len; i += 2) { + if (!isxdigit(str[i]) || !isxdigit(str[i + 1])) { + return -1; + } + + high = hex_to_int(str[i]); + low = hex_to_int(str[i + 1]); + + if (high == -1 || low == -1) { + return -1; + } + + out_buf[i / 2] = (high << 4) | low; + } + + return 0; +} + /* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */ static int is_valid_severity_text(const char *str, size_t str_len) { @@ -281,10 +327,94 @@ static int log_record_set_attributes(struct opentelemetry_context *ctx, return 0; } -static int append_v1_logs_metadata(struct opentelemetry_context *ctx, - struct flb_log_event *event, - Opentelemetry__Proto__Logs__V1__LogRecord *log_record) +static int pack_trace_id(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record, + struct flb_ra_value *ra_val) { + int ret; + + if (ra_val->o.type == MSGPACK_OBJECT_BIN) { + log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (!log_record->trace_id.data) { + return -1; + } + memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->trace_id.len = ra_val->o.via.bin.size; + } + else if (ra_val->o.type == MSGPACK_OBJECT_STR) { + if (ra_val->o.via.str.size > 32) { + return -1; + } + + log_record->trace_id.data = flb_calloc(1, 16); + if (!log_record->trace_id.data) { + flb_errno(); + return -1; + } + + ret = hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size, + log_record->trace_id.data, 16); + if (ret == 0) { + log_record->trace_id.len = 16; + return 0; + } + + flb_plg_warn(ctx->ins, "invalid trace_id format"); + flb_free(log_record->trace_id.data); + log_record->trace_id.data = NULL; + log_record->trace_id.len = 0; + } + else { + flb_plg_warn(ctx->ins, "invalid trace_id type"); + } + + return -1; +} + +static int pack_span_id(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record, + struct flb_ra_value *ra_val) +{ + if (ra_val->o.type == MSGPACK_OBJECT_BIN) { + log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); + if (!log_record->span_id.data) { + return -1; + } + memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); + log_record->span_id.len = ra_val->o.via.bin.size; + } + else if (ra_val->o.type == MSGPACK_OBJECT_STR) { + if (ra_val->o.via.str.size > 16) { + return -1; + } + + log_record->span_id.data = flb_calloc(1, 8); + if (!log_record->span_id.data) { + flb_errno(); + return -1; + } + + hex_to_id((char *) ra_val->o.via.str.ptr, ra_val->o.via.str.size, + log_record->span_id.data, 8); + log_record->span_id.len = 8; + } + else { + flb_plg_warn(ctx->ins, "invalid span_id type"); + } + + return 0; +} + +static int append_v1_logs_metadata_and_fields(struct opentelemetry_context *ctx, + struct flb_log_event *event, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record) +{ + int ret; + int span_id_set = FLB_FALSE; + int trace_id_set = FLB_FALSE; + int severity_text_set = FLB_FALSE; + int severity_number_set = FLB_FALSE; + int trace_flags_set = FLB_FALSE; struct flb_ra_value *ra_val; if (ctx == NULL || event == NULL || log_record == NULL) { @@ -335,14 +465,28 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx, if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) { log_record->severity_number = ra_val->o.via.u64; + severity_number_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } - else if (ctx->ra_severity_number_metadata) { + + if (!severity_number_set && ctx->ra_severity_number_metadata) { ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->metadata); if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) { log_record->severity_number = ra_val->o.via.u64; + severity_number_set = FLB_TRUE; + } + flb_ra_key_value_destroy(ra_val); + } + } + + if (!severity_number_set && ctx->ra_severity_number_message) { + ra_val = flb_ra_get_value_object(ctx->ra_severity_number_metadata, *event->body); + if (ra_val != NULL) { + if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER && is_valid_severity_number(ra_val->o.via.u64)) { + log_record->severity_number = ra_val->o.via.u64; + severity_number_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } @@ -357,11 +501,13 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx, log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1); if (log_record->severity_text) { strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); + severity_text_set = FLB_TRUE; } } flb_ra_key_value_destroy(ra_val); } - else if (ctx->ra_severity_text_metadata) { + + if (!severity_text_set && ctx->ra_severity_text_metadata) { ra_val = flb_ra_get_value_object(ctx->ra_severity_text_metadata, *event->metadata); if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_STR && @@ -369,12 +515,29 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx, log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1); if (log_record->severity_text) { strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); + severity_text_set = FLB_TRUE; } } flb_ra_key_value_destroy(ra_val); } } - else { + + if (!severity_text_set && ctx->ra_severity_text_message) { + ra_val = flb_ra_get_value_object(ctx->ra_severity_text_message, *event->body); + if (ra_val != NULL) { + if (ra_val->o.type == MSGPACK_OBJECT_STR && + is_valid_severity_text(ra_val->o.via.str.ptr, ra_val->o.via.str.size)) { + log_record->severity_text = flb_calloc(1, ra_val->o.via.str.size + 1); + if (log_record->severity_text) { + strncpy(log_record->severity_text, ra_val->o.via.str.ptr, ra_val->o.via.str.size); + severity_text_set = FLB_TRUE; + } + } + flb_ra_key_value_destroy(ra_val); + } + } + + if (!severity_text_set) { /* To prevent invalid free */ log_record->severity_text = NULL; } @@ -406,47 +569,62 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx, /* TraceId */ ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_trace_id, *event->metadata); if (ra_val != NULL) { - if (ra_val->o.type == MSGPACK_OBJECT_BIN) { - log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->trace_id.data) { - memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->trace_id.len = ra_val->o.via.bin.size; - } + ret = pack_trace_id(ctx, log_record, ra_val); + if (ret == 0) { + trace_id_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } - else if (ctx->ra_trace_id_metadata) { + + if (!trace_id_set && ctx->ra_trace_id_metadata) { ra_val = flb_ra_get_value_object(ctx->ra_trace_id_metadata, *event->metadata); - if (ra_val != NULL && ra_val->o.type == MSGPACK_OBJECT_BIN) { - log_record->trace_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->trace_id.data) { - memcpy(log_record->trace_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->trace_id.len = ra_val->o.via.bin.size; + if (ra_val != NULL) { + ret = pack_trace_id(ctx, log_record, ra_val); + if (ret == 0) { + trace_id_set = FLB_TRUE; + } + flb_ra_key_value_destroy(ra_val); + } + } + + if (!trace_id_set && ctx->ra_trace_id_message) { + ra_val = flb_ra_get_value_object(ctx->ra_trace_id_message, *event->body); + if (ra_val != NULL) { + ret = pack_trace_id(ctx, log_record, ra_val); + if (ret == 0) { + trace_id_set = FLB_TRUE; } + flb_ra_key_value_destroy(ra_val); } } /* SpanId */ ra_val = flb_ra_get_value_object(ctx->ra_log_meta_otlp_span_id, *event->metadata); if (ra_val != NULL) { - if (ra_val->o.type == MSGPACK_OBJECT_BIN) { - log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->span_id.data) { - memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->span_id.len = ra_val->o.via.bin.size; - } + ret = pack_span_id(ctx, log_record, ra_val); + if (ret == 0) { + span_id_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } - else if (ctx->ra_span_id_metadata) { + + if (!span_id_set && ctx->ra_span_id_metadata) { ra_val = flb_ra_get_value_object(ctx->ra_span_id_metadata, *event->metadata); if (ra_val != NULL) { - if (ra_val->o.type == MSGPACK_OBJECT_BIN) { - log_record->span_id.data = flb_calloc(1, ra_val->o.via.bin.size); - if (log_record->span_id.data) { - memcpy(log_record->span_id.data, ra_val->o.via.bin.ptr, ra_val->o.via.bin.size); - log_record->span_id.len = ra_val->o.via.bin.size; - } + ret = pack_span_id(ctx, log_record, ra_val); + if (ret == 0) { + span_id_set = FLB_TRUE; + } + flb_ra_key_value_destroy(ra_val); + } + } + + if (!span_id_set && ctx->ra_span_id_message) { + ra_val = flb_ra_get_value_object(ctx->ra_span_id_message, *event->body); + if (ra_val != NULL) { + ret = pack_span_id(ctx, log_record, ra_val); + if (ret == 0) { + span_id_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } @@ -457,14 +635,17 @@ static int append_v1_logs_metadata(struct opentelemetry_context *ctx, if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { log_record->flags = (uint32_t) ra_val->o.via.u64; + trace_flags_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } - else if (ctx->ra_trace_flags_metadata) { + + if (!trace_flags_set && ctx->ra_trace_flags_metadata) { ra_val = flb_ra_get_value_object(ctx->ra_trace_flags_metadata, *event->metadata); if (ra_val != NULL) { if (ra_val->o.type == MSGPACK_OBJECT_POSITIVE_INTEGER) { log_record->flags = (uint32_t) ra_val->o.via.u64; + trace_flags_set = FLB_TRUE; } flb_ra_key_value_destroy(ra_val); } @@ -740,6 +921,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, int log_record_count; int max_scopes; int max_resources; + int native_otel = FLB_FALSE; int64_t prev_group_resource_id = -1; int64_t prev_group_scope_id = -1; int64_t resource_id = -1; @@ -807,6 +989,9 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, continue; } + /* flag this as a native otel schema */ + native_otel = FLB_TRUE; + if (resource_id == -1 && prev_group_resource_id >= 0 && prev_group_resource_id == tmp_resource_id) { /* continue with the previous resource */ resource_id = prev_group_resource_id; @@ -920,8 +1105,14 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, ret = FLB_OK; - /* since we are starting a new group, just continue with the next record */ - continue; + /* + * if we started a new group through a valid OTLP schema, just continue since the active record + * is a group start. If native_otel is off it means the packaging was done for a record which is + * not OTLP schema compatible so it needs to be processed (do not skip it). + */ + if (native_otel) { + continue; + } } else if (record_type == FLB_LOG_EVENT_GROUP_END) { /* do nothing */ @@ -930,6 +1121,8 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, prev_group_scope_id = scope_id; resource_id = -1; scope_id = -1; + native_otel = FLB_FALSE; + continue; } @@ -981,7 +1174,7 @@ int otel_process_logs(struct flb_event_chunk *event_chunk, break; } - append_v1_logs_metadata(ctx, &event, log_record); + append_v1_logs_metadata_and_fields(ctx, &event, log_record); ret = FLB_OK; log_record_count++;